1import re 2import time 3from distutils.version import LooseVersion 4 5import pytest 6 7from conftest import option 8from conftest import skip_alert 9from unit.applications.lang.python import TestApplicationPython 10 11 12class TestASGIApplication(TestApplicationPython): 13 prerequisites = {'modules': {'python': 14 lambda v: LooseVersion(v) >= LooseVersion('3.5')}} 15 load_module = 'asgi' 16 17 def findall(self, pattern): 18 with open(option.temp_dir + '/unit.log', 'r', errors='ignore') as f: 19 return re.findall(pattern, f.read()) 20 21 def test_asgi_application_variables(self): 22 self.load('variables') 23 24 body = 'Test body string.' 25 26 resp = self.http( 27 b"""POST / HTTP/1.1 28Host: localhost 29Content-Length: %d 30Custom-Header: blah 31Custom-hEader: Blah 32Content-Type: text/html 33Connection: close 34custom-header: BLAH 35 36%s""" % (len(body), body.encode()), 37 raw=True, 38 ) 39 40 assert resp['status'] == 200, 'status' 41 headers = resp['headers'] 42 header_server = headers.pop('Server') 43 assert re.search(r'Unit/[\d\.]+', header_server), 'server header' 44 45 date = headers.pop('Date') 46 assert date[-4:] == ' GMT', 'date header timezone' 47 assert ( 48 abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5 49 ), 'date header' 50 51 assert headers == { 52 'Connection': 'close', 53 'content-length': str(len(body)), 54 'content-type': 'text/html', 55 'request-method': 'POST', 56 'request-uri': '/', 57 'http-host': 'localhost', 58 'http-version': '1.1', 59 'custom-header': 'blah, Blah, BLAH', 60 'asgi-version': '3.0', 61 'asgi-spec-version': '2.1', 62 'scheme': 'http', 63 }, 'headers' 64 assert resp['body'] == body, 'body' 65 66 def test_asgi_application_query_string(self): 67 self.load('query_string') 68 69 resp = self.get(url='/?var1=val1&var2=val2') 70 71 assert ( 72 resp['headers']['query-string'] == 'var1=val1&var2=val2' 73 ), 'query-string header' 74 75 def test_asgi_application_query_string_space(self): 76 self.load('query_string') 77 78 resp = self.get(url='/ ?var1=val1&var2=val2') 79 assert ( 80 resp['headers']['query-string'] == 'var1=val1&var2=val2' 81 ), 'query-string space' 82 83 resp = self.get(url='/ %20?var1=val1&var2=val2') 84 assert ( 85 resp['headers']['query-string'] == 'var1=val1&var2=val2' 86 ), 'query-string space 2' 87 88 resp = self.get(url='/ %20 ?var1=val1&var2=val2') 89 assert ( 90 resp['headers']['query-string'] == 'var1=val1&var2=val2' 91 ), 'query-string space 3' 92 93 resp = self.get(url='/blah %20 blah? var1= val1 & var2=val2') 94 assert ( 95 resp['headers']['query-string'] == ' var1= val1 & var2=val2' 96 ), 'query-string space 4' 97 98 def test_asgi_application_query_string_empty(self): 99 self.load('query_string') 100 101 resp = self.get(url='/?') 102 103 assert resp['status'] == 200, 'query string empty status' 104 assert resp['headers']['query-string'] == '', 'query string empty' 105 106 def test_asgi_application_query_string_absent(self): 107 self.load('query_string') 108 109 resp = self.get() 110 111 assert resp['status'] == 200, 'query string absent status' 112 assert resp['headers']['query-string'] == '', 'query string absent' 113 114 @pytest.mark.skip('not yet') 115 def test_asgi_application_server_port(self): 116 self.load('server_port') 117 118 assert ( 119 self.get()['headers']['Server-Port'] == '7080' 120 ), 'Server-Port header' 121 122 @pytest.mark.skip('not yet') 123 def test_asgi_application_working_directory_invalid(self): 124 self.load('empty') 125 126 assert 'success' in self.conf( 127 '"/blah"', 'applications/empty/working_directory' 128 ), 'configure invalid working_directory' 129 130 assert self.get()['status'] == 500, 'status' 131 132 def test_asgi_application_204_transfer_encoding(self): 133 self.load('204_no_content') 134 135 assert ( 136 'Transfer-Encoding' not in self.get()['headers'] 137 ), '204 header transfer encoding' 138 139 def test_asgi_application_shm_ack_handle(self): 140 # Minimum possible limit 141 shm_limit = 10 * 1024 * 1024 142 143 self.load('mirror', limits={"shm": shm_limit}) 144 145 # Should exceed shm_limit 146 max_body_size = 12 * 1024 * 1024 147 148 assert 'success' in self.conf( 149 '{"http":{"max_body_size": ' + str(max_body_size) + ' }}', 150 'settings' 151 ) 152 153 assert self.get()['status'] == 200, 'init' 154 155 body = '0123456789AB' * 1024 * 1024 # 12 Mb 156 resp = self.post( 157 headers={ 158 'Host': 'localhost', 159 'Connection': 'close', 160 'Content-Type': 'text/html', 161 }, 162 body=body, 163 read_buffer_size=1024 * 1024, 164 ) 165 166 assert resp['body'] == body, 'keep-alive 1' 167 168 def test_asgi_keepalive_body(self): 169 self.load('mirror') 170 171 assert self.get()['status'] == 200, 'init' 172 173 body = '0123456789' * 500 174 (resp, sock) = self.post( 175 headers={ 176 'Host': 'localhost', 177 'Connection': 'keep-alive', 178 'Content-Type': 'text/html', 179 }, 180 start=True, 181 body=body, 182 read_timeout=1, 183 ) 184 185 assert resp['body'] == body, 'keep-alive 1' 186 187 body = '0123456789' 188 resp = self.post( 189 headers={ 190 'Host': 'localhost', 191 'Connection': 'close', 192 'Content-Type': 'text/html', 193 }, 194 sock=sock, 195 body=body, 196 ) 197 198 assert resp['body'] == body, 'keep-alive 2' 199 200 def test_asgi_keepalive_reconfigure(self): 201 self.load('mirror') 202 203 assert self.get()['status'] == 200, 'init' 204 205 body = '0123456789' 206 conns = 3 207 socks = [] 208 209 for i in range(conns): 210 (resp, sock) = self.post( 211 headers={ 212 'Host': 'localhost', 213 'Connection': 'keep-alive', 214 'Content-Type': 'text/html', 215 }, 216 start=True, 217 body=body, 218 read_timeout=1, 219 ) 220 221 assert resp['body'] == body, 'keep-alive open' 222 223 self.load('mirror', processes=i + 1) 224 225 socks.append(sock) 226 227 for i in range(conns): 228 (resp, sock) = self.post( 229 headers={ 230 'Host': 'localhost', 231 'Connection': 'keep-alive', 232 'Content-Type': 'text/html', 233 }, 234 start=True, 235 sock=socks[i], 236 body=body, 237 read_timeout=1, 238 ) 239 240 assert resp['body'] == body, 'keep-alive request' 241 242 self.load('mirror', processes=i + 1) 243 244 for i in range(conns): 245 resp = self.post( 246 headers={ 247 'Host': 'localhost', 248 'Connection': 'close', 249 'Content-Type': 'text/html', 250 }, 251 sock=socks[i], 252 body=body, 253 ) 254 255 assert resp['body'] == body, 'keep-alive close' 256 257 self.load('mirror', processes=i + 1) 258 259 def test_asgi_keepalive_reconfigure_2(self): 260 self.load('mirror') 261 262 assert self.get()['status'] == 200, 'init' 263 264 body = '0123456789' 265 266 (resp, sock) = self.post( 267 headers={ 268 'Host': 'localhost', 269 'Connection': 'keep-alive', 270 'Content-Type': 'text/html', 271 }, 272 start=True, 273 body=body, 274 read_timeout=1, 275 ) 276 277 assert resp['body'] == body, 'reconfigure 2 keep-alive 1' 278 279 self.load('empty') 280 281 assert self.get()['status'] == 200, 'init' 282 283 (resp, sock) = self.post( 284 headers={ 285 'Host': 'localhost', 286 'Connection': 'close', 287 'Content-Type': 'text/html', 288 }, 289 start=True, 290 sock=sock, 291 body=body, 292 ) 293 294 assert resp['status'] == 200, 'reconfigure 2 keep-alive 2' 295 assert resp['body'] == '', 'reconfigure 2 keep-alive 2 body' 296 297 assert 'success' in self.conf( 298 {"listeners": {}, "applications": {}} 299 ), 'reconfigure 2 clear configuration' 300 301 resp = self.get(sock=sock) 302 303 assert resp == {}, 'reconfigure 2 keep-alive 3' 304 305 def test_asgi_keepalive_reconfigure_3(self): 306 self.load('empty') 307 308 assert self.get()['status'] == 200, 'init' 309 310 (_, sock) = self.http( 311 b"""GET / HTTP/1.1 312""", 313 start=True, 314 raw=True, 315 no_recv=True, 316 ) 317 318 assert self.get()['status'] == 200 319 320 assert 'success' in self.conf( 321 {"listeners": {}, "applications": {}} 322 ), 'reconfigure 3 clear configuration' 323 324 resp = self.http( 325 b"""Host: localhost 326Connection: close 327 328""", 329 sock=sock, 330 raw=True, 331 ) 332 333 assert resp['status'] == 200, 'reconfigure 3' 334 335 def test_asgi_process_switch(self): 336 self.load('delayed', processes=2) 337 338 self.get( 339 headers={ 340 'Host': 'localhost', 341 'Content-Length': '0', 342 'X-Delay': '5', 343 'Connection': 'close', 344 }, 345 no_recv=True, 346 ) 347 348 headers_delay_1 = { 349 'Connection': 'close', 350 'Host': 'localhost', 351 'Content-Length': '0', 352 'X-Delay': '1', 353 } 354 355 self.get(headers=headers_delay_1, no_recv=True) 356 357 time.sleep(0.5) 358 359 for _ in range(10): 360 self.get(headers=headers_delay_1, no_recv=True) 361 362 self.get(headers=headers_delay_1) 363 364 def test_asgi_application_loading_error(self): 365 skip_alert(r'Python failed to import module "blah"') 366 367 self.load('empty', module="blah") 368 369 assert self.get()['status'] == 503, 'loading error' 370 371 def test_asgi_application_threading(self): 372 """wait_for_record() timeouts after 5s while every thread works at 373 least 3s. So without releasing GIL test should fail. 374 """ 375 376 self.load('threading') 377 378 for _ in range(10): 379 self.get(no_recv=True) 380 381 assert ( 382 self.wait_for_record(r'\(5\) Thread: 100') is not None 383 ), 'last thread finished' 384 385 def test_asgi_application_threads(self): 386 self.load('threads', threads=2) 387 388 socks = [] 389 390 for i in range(2): 391 (_, sock) = self.get( 392 headers={ 393 'Host': 'localhost', 394 'X-Delay': '3', 395 'Connection': 'close', 396 }, 397 no_recv=True, 398 start=True, 399 ) 400 401 socks.append(sock) 402 403 time.sleep(1.0) # required to avoid greedy request reading 404 405 threads = set() 406 407 for sock in socks: 408 resp = self.recvall(sock).decode('utf-8') 409 410 self.log_in(resp) 411 412 resp = self._resp_to_dict(resp) 413 414 assert resp['status'] == 200, 'status' 415 416 threads.add(resp['headers']['x-thread']) 417 418 sock.close() 419 420 assert len(socks) == len(threads), 'threads differs' 421 422 def test_asgi_application_legacy(self): 423 self.load('legacy') 424 425 resp = self.get( 426 headers={ 427 'Host': 'localhost', 428 'Content-Length': '0', 429 'Connection': 'close', 430 }, 431 ) 432 433 assert resp['status'] == 200, 'status' 434 435 def test_asgi_application_legacy_force(self): 436 self.load('legacy_force', protocol='asgi') 437 438 resp = self.get( 439 headers={ 440 'Host': 'localhost', 441 'Content-Length': '0', 442 'Connection': 'close', 443 }, 444 ) 445 446 assert resp['status'] == 200, 'status' 447