1import re 2 3import pytest 4 5from unit.applications.lang.node import TestApplicationNode 6from unit.utils import waitforfiles 7 8 9class TestNodeApplication(TestApplicationNode): 10 prerequisites = {'modules': {'node': 'all'}} 11 12 def test_node_application_basic(self): 13 self.load('basic') 14 15 resp = self.get() 16 assert resp['headers']['Content-Type'] == 'text/plain', 'basic header' 17 assert resp['body'] == 'Hello World\n', 'basic body' 18 19 def test_node_application_seq(self): 20 self.load('basic') 21 22 assert self.get()['status'] == 200, 'seq' 23 assert self.get()['status'] == 200, 'seq 2' 24 25 def test_node_application_variables(self): 26 self.load('variables') 27 28 body = 'Test body string.' 29 30 resp = self.post( 31 headers={ 32 'Host': 'localhost', 33 'Content-Type': 'text/html', 34 'Custom-Header': 'blah', 35 'Connection': 'close', 36 }, 37 body=body, 38 ) 39 40 assert resp['status'] == 200, 'status' 41 headers = resp['headers'] 42 header_server = headers.pop('Server') 43 assert re.search(r'Unit/[\d\.]+', header_server), 'server header' 44 45 date = headers.pop('Date') 46 assert date[-4:] == ' GMT', 'date header timezone' 47 assert ( 48 abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5 49 ), 'date header' 50 51 raw_headers = headers.pop('Request-Raw-Headers') 52 assert re.search( 53 r'^(?:Host|localhost|Content-Type|' 54 r'text\/html|Custom-Header|blah|Content-Length|17|Connection|' 55 r'close|,)+$', 56 raw_headers, 57 ), 'raw headers' 58 59 assert headers == { 60 'Connection': 'close', 61 'Content-Length': str(len(body)), 62 'Content-Type': 'text/html', 63 'Request-Method': 'POST', 64 'Request-Uri': '/', 65 'Http-Host': 'localhost', 66 'Server-Protocol': 'HTTP/1.1', 67 'Custom-Header': 'blah', 68 }, 'headers' 69 assert resp['body'] == body, 'body' 70 71 def test_node_application_get_variables(self): 72 self.load('get_variables') 73 74 resp = self.get(url='/?var1=val1&var2=&var3') 75 assert resp['headers']['X-Var-1'] == 'val1', 'GET variables' 76 assert resp['headers']['X-Var-2'] == '', 'GET variables 2' 77 assert resp['headers']['X-Var-3'] == '', 'GET variables 3' 78 79 def test_node_application_post_variables(self): 80 self.load('post_variables') 81 82 resp = self.post( 83 headers={ 84 'Content-Type': 'application/x-www-form-urlencoded', 85 'Host': 'localhost', 86 'Connection': 'close', 87 }, 88 body='var1=val1&var2=&var3', 89 ) 90 91 assert resp['headers']['X-Var-1'] == 'val1', 'POST variables' 92 assert resp['headers']['X-Var-2'] == '', 'POST variables 2' 93 assert resp['headers']['X-Var-3'] == '', 'POST variables 3' 94 95 def test_node_application_404(self): 96 self.load('404') 97 98 resp = self.get() 99 100 assert resp['status'] == 404, '404 status' 101 assert re.search( 102 r'<title>404 Not Found</title>', resp['body'] 103 ), '404 body' 104 105 def test_node_keepalive_body(self): 106 self.load('mirror') 107 108 assert self.get()['status'] == 200, 'init' 109 110 body = '0123456789' * 500 111 (resp, sock) = self.post( 112 headers={ 113 'Host': 'localhost', 114 'Connection': 'keep-alive', 115 'Content-Type': 'text/html', 116 }, 117 start=True, 118 body=body, 119 read_timeout=1, 120 ) 121 122 assert resp['body'] == '0123456789' * 500, 'keep-alive 1' 123 124 body = '0123456789' 125 resp = self.post( 126 headers={ 127 'Host': 'localhost', 128 'Connection': 'close', 129 'Content-Type': 'text/html', 130 }, 131 sock=sock, 132 body=body, 133 ) 134 135 assert resp['body'] == body, 'keep-alive 2' 136 137 def test_node_application_write_buffer(self): 138 self.load('write_buffer') 139 140 assert self.get()['body'] == 'buffer', 'write buffer' 141 142 def test_node_application_write_callback(self, temp_dir): 143 self.load('write_callback') 144 145 assert self.get()['body'] == 'helloworld', 'write callback order' 146 assert waitforfiles(temp_dir + '/node/callback'), 'write callback' 147 148 def test_node_application_write_before_write_head(self): 149 self.load('write_before_write_head') 150 151 assert self.get()['status'] == 200, 'write before writeHead' 152 153 def test_node_application_double_end(self): 154 self.load('double_end') 155 156 assert self.get()['status'] == 200, 'double end' 157 assert self.get()['status'] == 200, 'double end 2' 158 159 def test_node_application_write_return(self): 160 self.load('write_return') 161 162 assert self.get()['body'] == 'bodytrue', 'write return' 163 164 def test_node_application_remove_header(self): 165 self.load('remove_header') 166 167 resp = self.get( 168 headers={ 169 'Host': 'localhost', 170 'X-Remove': 'X-Header', 171 'Connection': 'close', 172 } 173 ) 174 assert resp['headers']['Was-Header'] == 'true', 'was header' 175 assert resp['headers']['Has-Header'] == 'false', 'has header' 176 assert not ('X-Header' in resp['headers']), 'remove header' 177 178 def test_node_application_remove_header_nonexisting(self): 179 self.load('remove_header') 180 181 assert ( 182 self.get( 183 headers={ 184 'Host': 'localhost', 185 'X-Remove': 'blah', 186 'Connection': 'close', 187 } 188 )['headers']['Has-Header'] 189 == 'true' 190 ), 'remove header nonexisting' 191 192 def test_node_application_update_header(self): 193 self.load('update_header') 194 195 assert self.get()['headers']['X-Header'] == 'new', 'update header' 196 197 def test_node_application_set_header_array(self): 198 self.load('set_header_array') 199 200 assert self.get()['headers']['Set-Cookie'] == [ 201 'tc=one,two,three', 202 'tc=four,five,six', 203 ], 'set header array' 204 205 @pytest.mark.skip('not yet') 206 def test_node_application_status_message(self): 207 self.load('status_message') 208 209 assert re.search(r'200 blah', self.get(raw_resp=True)), 'status message' 210 211 def test_node_application_get_header_type(self): 212 self.load('get_header_type') 213 214 assert self.get()['headers']['X-Type'] == 'number', 'get header type' 215 216 def test_node_application_header_name_case(self): 217 self.load('header_name_case') 218 219 headers = self.get()['headers'] 220 221 assert headers['X-HEADER'] == '3', 'header value' 222 assert 'X-Header' not in headers, 'insensitive' 223 assert 'X-header' not in headers, 'insensitive 2' 224 225 def test_node_application_promise_handler_write_after_end(self): 226 self.load('promise_handler') 227 228 assert ( 229 self.post( 230 headers={ 231 'Host': 'localhost', 232 'Content-Type': 'text/html', 233 'X-Write-Call': '1', 234 'Connection': 'close', 235 }, 236 body='callback', 237 )['status'] 238 == 200 239 ), 'promise handler request write after end' 240 241 def test_node_application_promise_end(self, temp_dir): 242 self.load('promise_end') 243 244 assert ( 245 self.post( 246 headers={ 247 'Host': 'localhost', 248 'Content-Type': 'text/html', 249 'Connection': 'close', 250 }, 251 body='end', 252 )['status'] 253 == 200 254 ), 'promise end request' 255 assert waitforfiles(temp_dir + '/node/callback'), 'promise end' 256 257 @pytest.mark.skip('not yet') 258 def test_node_application_header_name_valid(self): 259 self.load('header_name_valid') 260 261 assert 'status' not in self.get(), 'header name valid' 262 263 def test_node_application_header_value_object(self): 264 self.load('header_value_object') 265 266 assert 'X-Header' in self.get()['headers'], 'header value object' 267 268 def test_node_application_get_header_names(self): 269 self.load('get_header_names') 270 271 assert self.get()['headers']['X-Names'] == [ 272 'date', 273 'x-header', 274 ], 'get header names' 275 276 def test_node_application_has_header(self): 277 self.load('has_header') 278 279 assert ( 280 self.get( 281 headers={ 282 'Host': 'localhost', 283 'X-Header': 'length', 284 'Connection': 'close', 285 } 286 )['headers']['X-Has-Header'] 287 == 'false' 288 ), 'has header length' 289 290 assert ( 291 self.get( 292 headers={ 293 'Host': 'localhost', 294 'X-Header': 'Date', 295 'Connection': 'close', 296 } 297 )['headers']['X-Has-Header'] 298 == 'false' 299 ), 'has header date' 300 301 def test_node_application_write_multiple(self): 302 self.load('write_multiple') 303 304 assert self.get()['body'] == 'writewrite2end', 'write multiple' 305