test_node_application.py (1453:71af60a59338) test_node_application.py (1467:195fe0a92670)
1import unittest
2from unit.applications.lang.node import TestApplicationNode
3
4
5class TestNodeApplication(TestApplicationNode):
1import unittest
2from unit.applications.lang.node import TestApplicationNode
3
4
5class TestNodeApplication(TestApplicationNode):
6 prerequisites = {'modules': ['node']}
6 prerequisites = {'modules': {'node': 'all'}}
7
8 def test_node_application_basic(self):
9 self.load('basic')
10
11 resp = self.get()
12 self.assertEqual(
13 resp['headers']['Content-Type'], 'text/plain', 'basic header'
14 )
15 self.assertEqual(resp['body'], 'Hello World\n', 'basic body')
16
17 def test_node_application_seq(self):
18 self.load('basic')
19
20 self.assertEqual(self.get()['status'], 200, 'seq')
21 self.assertEqual(self.get()['status'], 200, 'seq 2')
22
23 def test_node_application_variables(self):
24 self.load('variables')
25
26 body = 'Test body string.'
27
28 resp = self.post(
29 headers={
30 'Host': 'localhost',
31 'Content-Type': 'text/html',
32 'Custom-Header': 'blah',
33 'Connection': 'close',
34 },
35 body=body,
36 )
37
38 self.assertEqual(resp['status'], 200, 'status')
39 headers = resp['headers']
40 header_server = headers.pop('Server')
41 self.assertRegex(header_server, r'Unit/[\d\.]+', 'server header')
42
43 date = headers.pop('Date')
44 self.assertEqual(date[-4:], ' GMT', 'date header timezone')
45 self.assertLess(
46 abs(self.date_to_sec_epoch(date) - self.sec_epoch()),
47 5,
48 'date header',
49 )
50
51 raw_headers = headers.pop('Request-Raw-Headers')
52 self.assertRegex(
53 raw_headers,
54 r'^(?:Host|localhost|Content-Type|'
55 'text\/html|Custom-Header|blah|Content-Length|17|Connection|'
56 'close|,)+$',
57 'raw headers',
58 )
59
60 self.assertDictEqual(
61 headers,
62 {
63 'Connection': 'close',
64 'Content-Length': str(len(body)),
65 'Content-Type': 'text/html',
66 'Request-Method': 'POST',
67 'Request-Uri': '/',
68 'Http-Host': 'localhost',
69 'Server-Protocol': 'HTTP/1.1',
70 'Custom-Header': 'blah',
71 },
72 'headers',
73 )
74 self.assertEqual(resp['body'], body, 'body')
75
76 def test_node_application_get_variables(self):
77 self.load('get_variables')
78
79 resp = self.get(url='/?var1=val1&var2=&var3')
80 self.assertEqual(resp['headers']['X-Var-1'], 'val1', 'GET variables')
81 self.assertEqual(resp['headers']['X-Var-2'], '', 'GET variables 2')
82 self.assertEqual(resp['headers']['X-Var-3'], '', 'GET variables 3')
83
84 def test_node_application_post_variables(self):
85 self.load('post_variables')
86
87 resp = self.post(
88 headers={
89 'Content-Type': 'application/x-www-form-urlencoded',
90 'Host': 'localhost',
91 'Connection': 'close',
92 },
93 body='var1=val1&var2=&var3',
94 )
95
96 self.assertEqual(resp['headers']['X-Var-1'], 'val1', 'POST variables')
97 self.assertEqual(resp['headers']['X-Var-2'], '', 'POST variables 2')
98 self.assertEqual(resp['headers']['X-Var-3'], '', 'POST variables 3')
99
100 def test_node_application_404(self):
101 self.load('404')
102
103 resp = self.get()
104
105 self.assertEqual(resp['status'], 404, '404 status')
106 self.assertRegex(
107 resp['body'], r'<title>404 Not Found</title>', '404 body'
108 )
109
110 def test_node_keepalive_body(self):
111 self.load('mirror')
112
113 self.assertEqual(self.get()['status'], 200, 'init')
114
115 body = '0123456789' * 500
116 (resp, sock) = self.post(
117 headers={
118 'Host': 'localhost',
119 'Connection': 'keep-alive',
120 'Content-Type': 'text/html',
121 },
122 start=True,
123 body=body,
124 read_timeout=1,
125 )
126
127 self.assertEqual(resp['body'], '0123456789' * 500, 'keep-alive 1')
128
129 body = '0123456789'
130 resp = self.post(
131 headers={
132 'Host': 'localhost',
133 'Connection': 'close',
134 'Content-Type': 'text/html',
135 },
136 sock=sock,
137 body=body,
138 )
139
140 self.assertEqual(resp['body'], body, 'keep-alive 2')
141
142 def test_node_application_write_buffer(self):
143 self.load('write_buffer')
144
145 self.assertEqual(
146 self.get()['body'], 'buffer', 'write buffer'
147 )
148
149 def test_node_application_write_callback(self):
150 self.load('write_callback')
151
152 self.assertEqual(
153 self.get()['body'],
154 'helloworld',
155 'write callback order',
156 )
157 self.assertTrue(
158 self.waitforfiles(self.testdir + '/node/callback'),
159 'write callback',
160 )
161
162 def test_node_application_write_before_write_head(self):
163 self.load('write_before_write_head')
164
165 self.assertEqual(self.get()['status'], 200, 'write before writeHead')
166
167 def test_node_application_double_end(self):
168 self.load('double_end')
169
170 self.assertEqual(self.get()['status'], 200, 'double end')
171 self.assertEqual(self.get()['status'], 200, 'double end 2')
172
173 def test_node_application_write_return(self):
174 self.load('write_return')
175
176 self.assertEqual(
177 self.get()['body'],
178 'bodytrue',
179 'write return',
180 )
181
182 def test_node_application_remove_header(self):
183 self.load('remove_header')
184
185 resp = self.get(
186 headers={
187 'Host': 'localhost',
188 'X-Remove': 'X-Header',
189 'Connection': 'close',
190 }
191 )
192 self.assertEqual(resp['headers']['Was-Header'], 'true', 'was header')
193 self.assertEqual(resp['headers']['Has-Header'], 'false', 'has header')
194 self.assertFalse('X-Header' in resp['headers'], 'remove header')
195
196 def test_node_application_remove_header_nonexisting(self):
197 self.load('remove_header')
198
199 self.assertEqual(
200 self.get(
201 headers={
202 'Host': 'localhost',
203 'X-Remove': 'blah',
204 'Connection': 'close',
205 }
206 )['headers']['Has-Header'],
207 'true',
208 'remove header nonexisting',
209 )
210
211 def test_node_application_update_header(self):
212 self.load('update_header')
213
214 self.assertEqual(
215 self.get()['headers']['X-Header'], 'new', 'update header'
216 )
217
218 def test_node_application_set_header_array(self):
219 self.load('set_header_array')
220
221 self.assertListEqual(
222 self.get()['headers']['Set-Cookie'],
223 ['tc=one,two,three', 'tc=four,five,six'],
224 'set header array',
225 )
226
227 @unittest.skip('not yet')
228 def test_node_application_status_message(self):
229 self.load('status_message')
230
231 self.assertRegex(
232 self.get(raw_resp=True), r'200 blah', 'status message'
233 )
234
235 def test_node_application_get_header_type(self):
236 self.load('get_header_type')
237
238 self.assertEqual(
239 self.get()['headers']['X-Type'], 'number', 'get header type'
240 )
241
242 def test_node_application_header_name_case(self):
243 self.load('header_name_case')
244
245 headers = self.get()['headers']
246
247 self.assertEqual(headers['X-HEADER'], '3', 'header value')
248 self.assertNotIn('X-Header', headers, 'insensitive')
249 self.assertNotIn('X-header', headers, 'insensitive 2')
250
251 def test_node_application_promise_handler(self):
252 self.load('promise_handler')
253
254 self.assertEqual(
255 self.post(
256 headers={
257 'Host': 'localhost',
258 'Content-Type': 'text/html',
259 'Connection': 'close',
260 },
261 body='callback',
262 )['status'],
263 200,
264 'promise handler request',
265 )
266 self.assertTrue(
267 self.waitforfiles(self.testdir + '/node/callback'),
268 'promise handler',
269 )
270
271 def test_node_application_promise_handler_write_after_end(self):
272 self.load('promise_handler')
273
274 self.assertEqual(
275 self.post(
276 headers={
277 'Host': 'localhost',
278 'Content-Type': 'text/html',
279 'X-Write-Call': '1',
280 'Connection': 'close',
281 },
282 body='callback',
283 )['status'],
284 200,
285 'promise handler request write after end',
286 )
287
288 def test_node_application_promise_end(self):
289 self.load('promise_end')
290
291 self.assertEqual(
292 self.post(
293 headers={
294 'Host': 'localhost',
295 'Content-Type': 'text/html',
296 'Connection': 'close',
297 },
298 body='end',
299 )['status'],
300 200,
301 'promise end request',
302 )
303 self.assertTrue(
304 self.waitforfiles(self.testdir + '/node/callback'), 'promise end'
305 )
306
307 def test_node_application_promise_multiple_calls(self):
308 self.load('promise_handler')
309
310 self.post(
311 headers={
312 'Host': 'localhost',
313 'Content-Type': 'text/html',
314 'Connection': 'close',
315 },
316 body='callback1',
317 )
318
319 self.assertTrue(
320 self.waitforfiles(self.testdir + '/node/callback1'),
321 'promise first call',
322 )
323
324 self.post(
325 headers={
326 'Host': 'localhost',
327 'Content-Type': 'text/html',
328 'Connection': 'close',
329 },
330 body='callback2',
331 )
332
333 self.assertTrue(
334 self.waitforfiles(self.testdir + '/node/callback2'),
335 'promise second call',
336 )
337
338 @unittest.skip('not yet')
339 def test_node_application_header_name_valid(self):
340 self.load('header_name_valid')
341
342 self.assertNotIn('status', self.get(), 'header name valid')
343
344 def test_node_application_header_value_object(self):
345 self.load('header_value_object')
346
347 self.assertIn('X-Header', self.get()['headers'], 'header value object')
348
349 def test_node_application_get_header_names(self):
350 self.load('get_header_names')
351
352 self.assertListEqual(
353 self.get()['headers']['X-Names'],
354 ['date', 'x-header'],
355 'get header names',
356 )
357
358 def test_node_application_has_header(self):
359 self.load('has_header')
360
361 self.assertEqual(
362 self.get(
363 headers={
364 'Host': 'localhost',
365 'X-Header': 'length',
366 'Connection': 'close',
367 }
368 )['headers']['X-Has-Header'],
369 'false',
370 'has header length',
371 )
372
373 self.assertEqual(
374 self.get(
375 headers={
376 'Host': 'localhost',
377 'X-Header': 'Date',
378 'Connection': 'close',
379 }
380 )['headers']['X-Has-Header'],
381 'false',
382 'has header date',
383 )
384
385 def test_node_application_write_multiple(self):
386 self.load('write_multiple')
387
388 self.assertEqual(
389 self.get()['body'], 'writewrite2end', 'write multiple'
390 )
391
392
393if __name__ == '__main__':
394 TestNodeApplication.main()
7
8 def test_node_application_basic(self):
9 self.load('basic')
10
11 resp = self.get()
12 self.assertEqual(
13 resp['headers']['Content-Type'], 'text/plain', 'basic header'
14 )
15 self.assertEqual(resp['body'], 'Hello World\n', 'basic body')
16
17 def test_node_application_seq(self):
18 self.load('basic')
19
20 self.assertEqual(self.get()['status'], 200, 'seq')
21 self.assertEqual(self.get()['status'], 200, 'seq 2')
22
23 def test_node_application_variables(self):
24 self.load('variables')
25
26 body = 'Test body string.'
27
28 resp = self.post(
29 headers={
30 'Host': 'localhost',
31 'Content-Type': 'text/html',
32 'Custom-Header': 'blah',
33 'Connection': 'close',
34 },
35 body=body,
36 )
37
38 self.assertEqual(resp['status'], 200, 'status')
39 headers = resp['headers']
40 header_server = headers.pop('Server')
41 self.assertRegex(header_server, r'Unit/[\d\.]+', 'server header')
42
43 date = headers.pop('Date')
44 self.assertEqual(date[-4:], ' GMT', 'date header timezone')
45 self.assertLess(
46 abs(self.date_to_sec_epoch(date) - self.sec_epoch()),
47 5,
48 'date header',
49 )
50
51 raw_headers = headers.pop('Request-Raw-Headers')
52 self.assertRegex(
53 raw_headers,
54 r'^(?:Host|localhost|Content-Type|'
55 'text\/html|Custom-Header|blah|Content-Length|17|Connection|'
56 'close|,)+$',
57 'raw headers',
58 )
59
60 self.assertDictEqual(
61 headers,
62 {
63 'Connection': 'close',
64 'Content-Length': str(len(body)),
65 'Content-Type': 'text/html',
66 'Request-Method': 'POST',
67 'Request-Uri': '/',
68 'Http-Host': 'localhost',
69 'Server-Protocol': 'HTTP/1.1',
70 'Custom-Header': 'blah',
71 },
72 'headers',
73 )
74 self.assertEqual(resp['body'], body, 'body')
75
76 def test_node_application_get_variables(self):
77 self.load('get_variables')
78
79 resp = self.get(url='/?var1=val1&var2=&var3')
80 self.assertEqual(resp['headers']['X-Var-1'], 'val1', 'GET variables')
81 self.assertEqual(resp['headers']['X-Var-2'], '', 'GET variables 2')
82 self.assertEqual(resp['headers']['X-Var-3'], '', 'GET variables 3')
83
84 def test_node_application_post_variables(self):
85 self.load('post_variables')
86
87 resp = self.post(
88 headers={
89 'Content-Type': 'application/x-www-form-urlencoded',
90 'Host': 'localhost',
91 'Connection': 'close',
92 },
93 body='var1=val1&var2=&var3',
94 )
95
96 self.assertEqual(resp['headers']['X-Var-1'], 'val1', 'POST variables')
97 self.assertEqual(resp['headers']['X-Var-2'], '', 'POST variables 2')
98 self.assertEqual(resp['headers']['X-Var-3'], '', 'POST variables 3')
99
100 def test_node_application_404(self):
101 self.load('404')
102
103 resp = self.get()
104
105 self.assertEqual(resp['status'], 404, '404 status')
106 self.assertRegex(
107 resp['body'], r'<title>404 Not Found</title>', '404 body'
108 )
109
110 def test_node_keepalive_body(self):
111 self.load('mirror')
112
113 self.assertEqual(self.get()['status'], 200, 'init')
114
115 body = '0123456789' * 500
116 (resp, sock) = self.post(
117 headers={
118 'Host': 'localhost',
119 'Connection': 'keep-alive',
120 'Content-Type': 'text/html',
121 },
122 start=True,
123 body=body,
124 read_timeout=1,
125 )
126
127 self.assertEqual(resp['body'], '0123456789' * 500, 'keep-alive 1')
128
129 body = '0123456789'
130 resp = self.post(
131 headers={
132 'Host': 'localhost',
133 'Connection': 'close',
134 'Content-Type': 'text/html',
135 },
136 sock=sock,
137 body=body,
138 )
139
140 self.assertEqual(resp['body'], body, 'keep-alive 2')
141
142 def test_node_application_write_buffer(self):
143 self.load('write_buffer')
144
145 self.assertEqual(
146 self.get()['body'], 'buffer', 'write buffer'
147 )
148
149 def test_node_application_write_callback(self):
150 self.load('write_callback')
151
152 self.assertEqual(
153 self.get()['body'],
154 'helloworld',
155 'write callback order',
156 )
157 self.assertTrue(
158 self.waitforfiles(self.testdir + '/node/callback'),
159 'write callback',
160 )
161
162 def test_node_application_write_before_write_head(self):
163 self.load('write_before_write_head')
164
165 self.assertEqual(self.get()['status'], 200, 'write before writeHead')
166
167 def test_node_application_double_end(self):
168 self.load('double_end')
169
170 self.assertEqual(self.get()['status'], 200, 'double end')
171 self.assertEqual(self.get()['status'], 200, 'double end 2')
172
173 def test_node_application_write_return(self):
174 self.load('write_return')
175
176 self.assertEqual(
177 self.get()['body'],
178 'bodytrue',
179 'write return',
180 )
181
182 def test_node_application_remove_header(self):
183 self.load('remove_header')
184
185 resp = self.get(
186 headers={
187 'Host': 'localhost',
188 'X-Remove': 'X-Header',
189 'Connection': 'close',
190 }
191 )
192 self.assertEqual(resp['headers']['Was-Header'], 'true', 'was header')
193 self.assertEqual(resp['headers']['Has-Header'], 'false', 'has header')
194 self.assertFalse('X-Header' in resp['headers'], 'remove header')
195
196 def test_node_application_remove_header_nonexisting(self):
197 self.load('remove_header')
198
199 self.assertEqual(
200 self.get(
201 headers={
202 'Host': 'localhost',
203 'X-Remove': 'blah',
204 'Connection': 'close',
205 }
206 )['headers']['Has-Header'],
207 'true',
208 'remove header nonexisting',
209 )
210
211 def test_node_application_update_header(self):
212 self.load('update_header')
213
214 self.assertEqual(
215 self.get()['headers']['X-Header'], 'new', 'update header'
216 )
217
218 def test_node_application_set_header_array(self):
219 self.load('set_header_array')
220
221 self.assertListEqual(
222 self.get()['headers']['Set-Cookie'],
223 ['tc=one,two,three', 'tc=four,five,six'],
224 'set header array',
225 )
226
227 @unittest.skip('not yet')
228 def test_node_application_status_message(self):
229 self.load('status_message')
230
231 self.assertRegex(
232 self.get(raw_resp=True), r'200 blah', 'status message'
233 )
234
235 def test_node_application_get_header_type(self):
236 self.load('get_header_type')
237
238 self.assertEqual(
239 self.get()['headers']['X-Type'], 'number', 'get header type'
240 )
241
242 def test_node_application_header_name_case(self):
243 self.load('header_name_case')
244
245 headers = self.get()['headers']
246
247 self.assertEqual(headers['X-HEADER'], '3', 'header value')
248 self.assertNotIn('X-Header', headers, 'insensitive')
249 self.assertNotIn('X-header', headers, 'insensitive 2')
250
251 def test_node_application_promise_handler(self):
252 self.load('promise_handler')
253
254 self.assertEqual(
255 self.post(
256 headers={
257 'Host': 'localhost',
258 'Content-Type': 'text/html',
259 'Connection': 'close',
260 },
261 body='callback',
262 )['status'],
263 200,
264 'promise handler request',
265 )
266 self.assertTrue(
267 self.waitforfiles(self.testdir + '/node/callback'),
268 'promise handler',
269 )
270
271 def test_node_application_promise_handler_write_after_end(self):
272 self.load('promise_handler')
273
274 self.assertEqual(
275 self.post(
276 headers={
277 'Host': 'localhost',
278 'Content-Type': 'text/html',
279 'X-Write-Call': '1',
280 'Connection': 'close',
281 },
282 body='callback',
283 )['status'],
284 200,
285 'promise handler request write after end',
286 )
287
288 def test_node_application_promise_end(self):
289 self.load('promise_end')
290
291 self.assertEqual(
292 self.post(
293 headers={
294 'Host': 'localhost',
295 'Content-Type': 'text/html',
296 'Connection': 'close',
297 },
298 body='end',
299 )['status'],
300 200,
301 'promise end request',
302 )
303 self.assertTrue(
304 self.waitforfiles(self.testdir + '/node/callback'), 'promise end'
305 )
306
307 def test_node_application_promise_multiple_calls(self):
308 self.load('promise_handler')
309
310 self.post(
311 headers={
312 'Host': 'localhost',
313 'Content-Type': 'text/html',
314 'Connection': 'close',
315 },
316 body='callback1',
317 )
318
319 self.assertTrue(
320 self.waitforfiles(self.testdir + '/node/callback1'),
321 'promise first call',
322 )
323
324 self.post(
325 headers={
326 'Host': 'localhost',
327 'Content-Type': 'text/html',
328 'Connection': 'close',
329 },
330 body='callback2',
331 )
332
333 self.assertTrue(
334 self.waitforfiles(self.testdir + '/node/callback2'),
335 'promise second call',
336 )
337
338 @unittest.skip('not yet')
339 def test_node_application_header_name_valid(self):
340 self.load('header_name_valid')
341
342 self.assertNotIn('status', self.get(), 'header name valid')
343
344 def test_node_application_header_value_object(self):
345 self.load('header_value_object')
346
347 self.assertIn('X-Header', self.get()['headers'], 'header value object')
348
349 def test_node_application_get_header_names(self):
350 self.load('get_header_names')
351
352 self.assertListEqual(
353 self.get()['headers']['X-Names'],
354 ['date', 'x-header'],
355 'get header names',
356 )
357
358 def test_node_application_has_header(self):
359 self.load('has_header')
360
361 self.assertEqual(
362 self.get(
363 headers={
364 'Host': 'localhost',
365 'X-Header': 'length',
366 'Connection': 'close',
367 }
368 )['headers']['X-Has-Header'],
369 'false',
370 'has header length',
371 )
372
373 self.assertEqual(
374 self.get(
375 headers={
376 'Host': 'localhost',
377 'X-Header': 'Date',
378 'Connection': 'close',
379 }
380 )['headers']['X-Has-Header'],
381 'false',
382 'has header date',
383 )
384
385 def test_node_application_write_multiple(self):
386 self.load('write_multiple')
387
388 self.assertEqual(
389 self.get()['body'], 'writewrite2end', 'write multiple'
390 )
391
392
393if __name__ == '__main__':
394 TestNodeApplication.main()