1import re 2 3import pytest 4 5from conftest import waitforfiles 6from unit.applications.lang.node import TestApplicationNode 7 8 9class TestNodeApplication(TestApplicationNode): 10 prerequisites = {'modules': {'node': 'all'}} 11 12 def test_node_application_basic(self): 13 self.load('basic') 14 15 resp = self.get() 16 assert resp['headers']['Content-Type'] == 'text/plain', 'basic header' 17 assert resp['body'] == 'Hello World\n', 'basic body' 18 19 def test_node_application_seq(self): 20 self.load('basic') 21 22 assert self.get()['status'] == 200, 'seq' 23 assert self.get()['status'] == 200, 'seq 2' 24 25 def test_node_application_variables(self): 26 self.load('variables') 27 28 body = 'Test body string.' 29 30 resp = self.post( 31 headers={ 32 'Host': 'localhost', 33 'Content-Type': 'text/html', 34 'Custom-Header': 'blah', 35 'Connection': 'close', 36 }, 37 body=body, 38 ) 39 40 assert resp['status'] == 200, 'status' 41 headers = resp['headers'] 42 header_server = headers.pop('Server') 43 assert re.search(r'Unit/[\d\.]+', header_server), 'server header' 44 45 date = headers.pop('Date') 46 assert date[-4:] == ' GMT', 'date header timezone' 47 assert ( 48 abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5 49 ), 'date header' 50 51 raw_headers = headers.pop('Request-Raw-Headers') 52 assert re.search( 53 r'^(?:Host|localhost|Content-Type|' 54 r'text\/html|Custom-Header|blah|Content-Length|17|Connection|' 55 r'close|,)+$', 56 raw_headers, 57 ), 'raw headers' 58 59 assert headers == { 60 'Connection': 'close', 61 'Content-Length': str(len(body)), 62 'Content-Type': 'text/html', 63 'Request-Method': 'POST', 64 'Request-Uri': '/', 65 'Http-Host': 'localhost', 66 'Server-Protocol': 'HTTP/1.1', 67 'Custom-Header': 'blah', 68 }, 'headers' 69 assert resp['body'] == body, 'body' 70 71 def test_node_application_get_variables(self): 72 self.load('get_variables') 73 74 resp = self.get(url='/?var1=val1&var2=&var3') 75 assert resp['headers']['X-Var-1'] == 'val1', 'GET variables' 76 assert resp['headers']['X-Var-2'] == '', 'GET variables 2' 77 assert resp['headers']['X-Var-3'] == '', 'GET variables 3' 78 79 def test_node_application_post_variables(self): 80 self.load('post_variables') 81 82 resp = self.post( 83 headers={ 84 'Content-Type': 'application/x-www-form-urlencoded', 85 'Host': 'localhost', 86 'Connection': 'close', 87 }, 88 body='var1=val1&var2=&var3', 89 ) 90 91 assert resp['headers']['X-Var-1'] == 'val1', 'POST variables' 92 assert resp['headers']['X-Var-2'] == '', 'POST variables 2' 93 assert resp['headers']['X-Var-3'] == '', 'POST variables 3' 94 95 def test_node_application_404(self): 96 self.load('404') 97 98 resp = self.get() 99 100 assert resp['status'] == 404, '404 status' 101 assert re.search( 102 r'<title>404 Not Found</title>', resp['body'] 103 ), '404 body' 104 105 def test_node_keepalive_body(self): 106 self.load('mirror') 107 108 assert self.get()['status'] == 200, 'init' 109 110 body = '0123456789' * 500 111 (resp, sock) = self.post( 112 headers={ 113 'Host': 'localhost', 114 'Connection': 'keep-alive', 115 'Content-Type': 'text/html', 116 }, 117 start=True, 118 body=body, 119 read_timeout=1, 120 ) 121 122 assert resp['body'] == '0123456789' * 500, 'keep-alive 1' 123 124 body = '0123456789' 125 resp = self.post( 126 headers={ 127 'Host': 'localhost', 128 'Connection': 'close', 129 'Content-Type': 'text/html', 130 }, 131 sock=sock, 132 body=body, 133 ) 134 135 assert resp['body'] == body, 'keep-alive 2' 136 137 def test_node_application_write_buffer(self): 138 self.load('write_buffer') 139 140 assert self.get()['body'] == 'buffer', 'write buffer' 141 142 def test_node_application_write_callback(self, temp_dir): 143 self.load('write_callback') 144 145 assert self.get()['body'] == 'helloworld', 'write callback order' 146 assert waitforfiles(temp_dir + '/node/callback'), 'write callback' 147 148 def test_node_application_write_before_write_head(self): 149 self.load('write_before_write_head') 150 151 assert self.get()['status'] == 200, 'write before writeHead' 152 153 def test_node_application_double_end(self): 154 self.load('double_end') 155 156 assert self.get()['status'] == 200, 'double end' 157 assert self.get()['status'] == 200, 'double end 2' 158 159 def test_node_application_write_return(self): 160 self.load('write_return') 161 162 assert self.get()['body'] == 'bodytrue', 'write return' 163 164 def test_node_application_remove_header(self): 165 self.load('remove_header') 166 167 resp = self.get( 168 headers={ 169 'Host': 'localhost', 170 'X-Remove': 'X-Header', 171 'Connection': 'close', 172 } 173 ) 174 assert resp['headers']['Was-Header'] == 'true', 'was header' 175 assert resp['headers']['Has-Header'] == 'false', 'has header' 176 assert not ('X-Header' in resp['headers']), 'remove header' 177 178 def test_node_application_remove_header_nonexisting(self): 179 self.load('remove_header') 180 181 assert ( 182 self.get( 183 headers={ 184 'Host': 'localhost', 185 'X-Remove': 'blah', 186 'Connection': 'close', 187 } 188 )['headers']['Has-Header'] 189 == 'true' 190 ), 'remove header nonexisting' 191 192 def test_node_application_update_header(self): 193 self.load('update_header') 194 195 assert self.get()['headers']['X-Header'] == 'new', 'update header' 196 197 def test_node_application_set_header_array(self): 198 self.load('set_header_array') 199 200 assert self.get()['headers']['Set-Cookie'] == [ 201 'tc=one,two,three', 202 'tc=four,five,six', 203 ], 'set header array' 204 205 @pytest.mark.skip('not yet') 206 def test_node_application_status_message(self): 207 self.load('status_message') 208 209 assert re.search(r'200 blah', self.get(raw_resp=True)), 'status message' 210 211 def test_node_application_get_header_type(self): 212 self.load('get_header_type') 213 214 assert self.get()['headers']['X-Type'] == 'number', 'get header type' 215 216 def test_node_application_header_name_case(self): 217 self.load('header_name_case') 218 219 headers = self.get()['headers'] 220 221 assert headers['X-HEADER'] == '3', 'header value' 222 assert 'X-Header' not in headers, 'insensitive' 223 assert 'X-header' not in headers, 'insensitive 2' 224 225 def test_node_application_promise_handler(self, temp_dir): 226 self.load('promise_handler') 227 228 assert ( 229 self.post( 230 headers={ 231 'Host': 'localhost', 232 'Content-Type': 'text/html', 233 'Connection': 'close', 234 }, 235 body='callback', 236 )['status'] 237 == 200 238 ), 'promise handler request' 239 assert waitforfiles(temp_dir + '/node/callback'), 'promise handler' 240 241 def test_node_application_promise_handler_write_after_end(self): 242 self.load('promise_handler') 243 244 assert ( 245 self.post( 246 headers={ 247 'Host': 'localhost', 248 'Content-Type': 'text/html', 249 'X-Write-Call': '1', 250 'Connection': 'close', 251 }, 252 body='callback', 253 )['status'] 254 == 200 255 ), 'promise handler request write after end' 256 257 def test_node_application_promise_end(self, temp_dir): 258 self.load('promise_end') 259 260 assert ( 261 self.post( 262 headers={ 263 'Host': 'localhost', 264 'Content-Type': 'text/html', 265 'Connection': 'close', 266 }, 267 body='end', 268 )['status'] 269 == 200 270 ), 'promise end request' 271 assert waitforfiles(temp_dir + '/node/callback'), 'promise end' 272 273 def test_node_application_promise_multiple_calls(self, temp_dir): 274 self.load('promise_handler') 275 276 self.post( 277 headers={ 278 'Host': 'localhost', 279 'Content-Type': 'text/html', 280 'Connection': 'close', 281 }, 282 body='callback1', 283 ) 284 285 assert waitforfiles( 286 temp_dir + '/node/callback1' 287 ), 'promise first call' 288 289 self.post( 290 headers={ 291 'Host': 'localhost', 292 'Content-Type': 'text/html', 293 'Connection': 'close', 294 }, 295 body='callback2', 296 ) 297 298 assert waitforfiles( 299 temp_dir + '/node/callback2' 300 ), 'promise second call' 301 302 @pytest.mark.skip('not yet') 303 def test_node_application_header_name_valid(self): 304 self.load('header_name_valid') 305 306 assert 'status' not in self.get(), 'header name valid' 307 308 def test_node_application_header_value_object(self): 309 self.load('header_value_object') 310 311 assert 'X-Header' in self.get()['headers'], 'header value object' 312 313 def test_node_application_get_header_names(self): 314 self.load('get_header_names') 315 316 assert self.get()['headers']['X-Names'] == [ 317 'date', 318 'x-header', 319 ], 'get header names' 320 321 def test_node_application_has_header(self): 322 self.load('has_header') 323 324 assert ( 325 self.get( 326 headers={ 327 'Host': 'localhost', 328 'X-Header': 'length', 329 'Connection': 'close', 330 } 331 )['headers']['X-Has-Header'] 332 == 'false' 333 ), 'has header length' 334 335 assert ( 336 self.get( 337 headers={ 338 'Host': 'localhost', 339 'X-Header': 'Date', 340 'Connection': 'close', 341 } 342 )['headers']['X-Has-Header'] 343 == 'false' 344 ), 'has header date' 345 346 def test_node_application_write_multiple(self): 347 self.load('write_multiple') 348 349 assert self.get()['body'] == 'writewrite2end', 'write multiple' 350