1import grp 2import pytest 3import pwd 4import re 5import time 6from distutils.version import LooseVersion 7 8from unit.applications.lang.python import TestApplicationPython 9from conftest import skip_alert 10 11 12class TestASGIApplication(TestApplicationPython): 13 prerequisites = {'modules': {'python': 14 lambda v: LooseVersion(v) >= LooseVersion('3.5')}} 15 load_module = 'asgi' 16 17 def findall(self, pattern): 18 with open(self.temp_dir + '/unit.log', 'r', errors='ignore') as f: 19 return re.findall(pattern, f.read()) 20 21 def test_asgi_application__variables(self): 22 self.load('variables') 23 24 body = 'Test body string.' 25 26 resp = self.http( 27 b"""POST / HTTP/1.1 28Host: localhost 29Content-Length: %d 30Custom-Header: blah 31Custom-hEader: Blah 32Content-Type: text/html 33Connection: close 34custom-header: BLAH 35 36%s""" % (len(body), body.encode()), 37 raw=True, 38 ) 39 40 assert resp['status'] == 200, 'status' 41 headers = resp['headers'] 42 header_server = headers.pop('Server') 43 assert re.search(r'Unit/[\d\.]+', header_server), 'server header' 44 45 date = headers.pop('Date') 46 assert date[-4:] == ' GMT', 'date header timezone' 47 assert ( 48 abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5 49 ), 'date header' 50 51 assert headers == { 52 'Connection': 'close', 53 'content-length': str(len(body)), 54 'content-type': 'text/html', 55 'request-method': 'POST', 56 'request-uri': '/', 57 'http-host': 'localhost', 58 'http-version': '1.1', 59 'custom-header': 'blah, Blah, BLAH', 60 'asgi-version': '3.0', 61 'asgi-spec-version': '2.1', 62 'scheme': 'http', 63 }, 'headers' 64 assert resp['body'] == body, 'body' 65 66 def test_asgi_application__query_string(self): 67 self.load('query_string') 68 69 resp = self.get(url='/?var1=val1&var2=val2') 70 71 assert ( 72 resp['headers']['query-string'] == 'var1=val1&var2=val2' 73 ), 'query-string header' 74 75 def test_asgi_application__query_string_space(self): 76 self.load('query_string') 77 78 resp = self.get(url='/ ?var1=val1&var2=val2') 79 assert ( 80 resp['headers']['query-string'] == 'var1=val1&var2=val2' 81 ), 'query-string space' 82 83 resp = self.get(url='/ %20?var1=val1&var2=val2') 84 assert ( 85 resp['headers']['query-string'] == 'var1=val1&var2=val2' 86 ), 'query-string space 2' 87 88 resp = self.get(url='/ %20 ?var1=val1&var2=val2') 89 assert ( 90 resp['headers']['query-string'] == 'var1=val1&var2=val2' 91 ), 'query-string space 3' 92 93 resp = self.get(url='/blah %20 blah? var1= val1 & var2=val2') 94 assert ( 95 resp['headers']['query-string'] == ' var1= val1 & var2=val2' 96 ), 'query-string space 4' 97 98 def test_asgi_application__query_string_empty(self): 99 self.load('query_string') 100 101 resp = self.get(url='/?') 102 103 assert resp['status'] == 200, 'query string empty status' 104 assert resp['headers']['query-string'] == '', 'query string empty' 105 106 def test_asgi_application__query_string_absent(self): 107 self.load('query_string') 108 109 resp = self.get() 110 111 assert resp['status'] == 200, 'query string absent status' 112 assert resp['headers']['query-string'] == '', 'query string absent' 113 114 @pytest.mark.skip('not yet') 115 def test_asgi_application__server_port(self): 116 self.load('server_port') 117 118 assert ( 119 self.get()['headers']['Server-Port'] == '7080' 120 ), 'Server-Port header' 121 122 @pytest.mark.skip('not yet') 123 def test_asgi_application__working_directory_invalid(self): 124 self.load('empty') 125 126 assert 'success' in self.conf( 127 '"/blah"', 'applications/empty/working_directory' 128 ), 'configure invalid working_directory' 129 130 assert self.get()['status'] == 500, 'status' 131 132 def test_asgi_application__204_transfer_encoding(self): 133 self.load('204_no_content') 134 135 assert ( 136 'Transfer-Encoding' not in self.get()['headers'] 137 ), '204 header transfer encoding' 138 139 def test_asgi_application__shm_ack_handle(self): 140 self.load('mirror') 141 142 # Minimum possible limit 143 shm_limit = 10 * 1024 * 1024 144 145 assert ( 146 'success' in self.conf('{"shm": ' + str(shm_limit) + '}', 147 'applications/mirror/limits') 148 ) 149 150 # Should exceed shm_limit 151 max_body_size = 12 * 1024 * 1024 152 153 assert ( 154 'success' in self.conf('{"http":{"max_body_size": ' 155 + str(max_body_size) + ' }}', 156 'settings') 157 ) 158 159 assert self.get()['status'] == 200, 'init' 160 161 body = '0123456789AB' * 1024 * 1024 # 12 Mb 162 resp = self.post( 163 headers={ 164 'Host': 'localhost', 165 'Connection': 'close', 166 'Content-Type': 'text/html', 167 }, 168 body=body, 169 read_buffer_size=1024 * 1024, 170 ) 171 172 assert resp['body'] == body, 'keep-alive 1' 173 174 def test_asgi_keepalive_body(self): 175 self.load('mirror') 176 177 assert self.get()['status'] == 200, 'init' 178 179 body = '0123456789' * 500 180 (resp, sock) = self.post( 181 headers={ 182 'Host': 'localhost', 183 'Connection': 'keep-alive', 184 'Content-Type': 'text/html', 185 }, 186 start=True, 187 body=body, 188 read_timeout=1, 189 ) 190 191 assert resp['body'] == body, 'keep-alive 1' 192 193 body = '0123456789' 194 resp = self.post( 195 headers={ 196 'Host': 'localhost', 197 'Connection': 'close', 198 'Content-Type': 'text/html', 199 }, 200 sock=sock, 201 body=body, 202 ) 203 204 assert resp['body'] == body, 'keep-alive 2' 205 206 def test_asgi_keepalive_reconfigure(self): 207 skip_alert( 208 r'pthread_mutex.+failed', 209 r'failed to apply', 210 r'process \d+ exited on signal', 211 ) 212 self.load('mirror') 213 214 assert self.get()['status'] == 200, 'init' 215 216 body = '0123456789' 217 conns = 3 218 socks = [] 219 220 for i in range(conns): 221 (resp, sock) = self.post( 222 headers={ 223 'Host': 'localhost', 224 'Connection': 'keep-alive', 225 'Content-Type': 'text/html', 226 }, 227 start=True, 228 body=body, 229 read_timeout=1, 230 ) 231 232 assert resp['body'] == body, 'keep-alive open' 233 assert 'success' in self.conf( 234 str(i + 1), 'applications/mirror/processes' 235 ), 'reconfigure' 236 237 socks.append(sock) 238 239 for i in range(conns): 240 (resp, sock) = self.post( 241 headers={ 242 'Host': 'localhost', 243 'Connection': 'keep-alive', 244 'Content-Type': 'text/html', 245 }, 246 start=True, 247 sock=socks[i], 248 body=body, 249 read_timeout=1, 250 ) 251 252 assert resp['body'] == body, 'keep-alive request' 253 assert 'success' in self.conf( 254 str(i + 1), 'applications/mirror/processes' 255 ), 'reconfigure 2' 256 257 for i in range(conns): 258 resp = self.post( 259 headers={ 260 'Host': 'localhost', 261 'Connection': 'close', 262 'Content-Type': 'text/html', 263 }, 264 sock=socks[i], 265 body=body, 266 ) 267 268 assert resp['body'] == body, 'keep-alive close' 269 assert 'success' in self.conf( 270 str(i + 1), 'applications/mirror/processes' 271 ), 'reconfigure 3' 272 273 def test_asgi_keepalive_reconfigure_2(self): 274 self.load('mirror') 275 276 assert self.get()['status'] == 200, 'init' 277 278 body = '0123456789' 279 280 (resp, sock) = self.post( 281 headers={ 282 'Host': 'localhost', 283 'Connection': 'keep-alive', 284 'Content-Type': 'text/html', 285 }, 286 start=True, 287 body=body, 288 read_timeout=1, 289 ) 290 291 assert resp['body'] == body, 'reconfigure 2 keep-alive 1' 292 293 self.load('empty') 294 295 assert self.get()['status'] == 200, 'init' 296 297 (resp, sock) = self.post( 298 headers={ 299 'Host': 'localhost', 300 'Connection': 'close', 301 'Content-Type': 'text/html', 302 }, 303 start=True, 304 sock=sock, 305 body=body, 306 ) 307 308 assert resp['status'] == 200, 'reconfigure 2 keep-alive 2' 309 assert resp['body'] == '', 'reconfigure 2 keep-alive 2 body' 310 311 assert 'success' in self.conf( 312 {"listeners": {}, "applications": {}} 313 ), 'reconfigure 2 clear configuration' 314 315 resp = self.get(sock=sock) 316 317 assert resp == {}, 'reconfigure 2 keep-alive 3' 318 319 def test_asgi_keepalive_reconfigure_3(self): 320 self.load('empty') 321 322 assert self.get()['status'] == 200, 'init' 323 324 (_, sock) = self.http( 325 b"""GET / HTTP/1.1 326""", 327 start=True, 328 raw=True, 329 no_recv=True, 330 ) 331 332 assert self.get()['status'] == 200 333 334 assert 'success' in self.conf( 335 {"listeners": {}, "applications": {}} 336 ), 'reconfigure 3 clear configuration' 337 338 resp = self.http( 339 b"""Host: localhost 340Connection: close 341 342""", 343 sock=sock, 344 raw=True, 345 ) 346 347 assert resp['status'] == 200, 'reconfigure 3' 348 349 def test_asgi_process_switch(self): 350 self.load('delayed') 351 352 assert 'success' in self.conf( 353 '2', 'applications/delayed/processes' 354 ), 'configure 2 processes' 355 356 self.get( 357 headers={ 358 'Host': 'localhost', 359 'Content-Length': '0', 360 'X-Delay': '5', 361 'Connection': 'close', 362 }, 363 no_recv=True, 364 ) 365 366 headers_delay_1 = { 367 'Connection': 'close', 368 'Host': 'localhost', 369 'Content-Length': '0', 370 'X-Delay': '1', 371 } 372 373 self.get(headers=headers_delay_1, no_recv=True) 374 375 time.sleep(0.5) 376 377 for _ in range(10): 378 self.get(headers=headers_delay_1, no_recv=True) 379 380 self.get(headers=headers_delay_1) 381 382 def test_asgi_application__loading_error(self): 383 skip_alert(r'Python failed to import module "blah"') 384 385 self.load('empty') 386 387 assert 'success' in self.conf('"blah"', 'applications/empty/module') 388 389 assert self.get()['status'] == 503, 'loading error' 390 391 def test_asgi_application__threading(self): 392 """wait_for_record() timeouts after 5s while every thread works at 393 least 3s. So without releasing GIL test should fail. 394 """ 395 396 self.load('threading') 397 398 for _ in range(10): 399 self.get(no_recv=True) 400 401 assert ( 402 self.wait_for_record(r'\(5\) Thread: 100') is not None 403 ), 'last thread finished' 404