xref: /unit/test/test_asgi_application.py (revision 2330:4b1f175f9c88)
1import re
2import time
3
4import pytest
5from packaging import version
6from unit.applications.lang.python import TestApplicationPython
7
8
9class TestASGIApplication(TestApplicationPython):
10    prerequisites = {
11        'modules': {
12            'python': lambda v: version.parse(v) >= version.parse('3.5')
13        }
14    }
15    load_module = 'asgi'
16
17    def test_asgi_application_variables(self):
18        self.load('variables')
19
20        body = 'Test body string.'
21
22        resp = self.http(
23            f"""POST / HTTP/1.1
24Host: localhost
25Content-Length: {len(body)}
26Custom-Header: blah
27Custom-hEader: Blah
28Content-Type: text/html
29Connection: close
30custom-header: BLAH
31
32{body}""".encode(),
33            raw=True,
34        )
35
36        assert resp['status'] == 200, 'status'
37        headers = resp['headers']
38        header_server = headers.pop('Server')
39        assert re.search(r'Unit/[\d\.]+', header_server), 'server header'
40
41        date = headers.pop('Date')
42        assert date[-4:] == ' GMT', 'date header timezone'
43        assert (
44            abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
45        ), 'date header'
46
47        assert headers == {
48            'Connection': 'close',
49            'content-length': str(len(body)),
50            'content-type': 'text/html',
51            'request-method': 'POST',
52            'request-uri': '/',
53            'http-host': 'localhost',
54            'http-version': '1.1',
55            'custom-header': 'blah, Blah, BLAH',
56            'asgi-version': '3.0',
57            'asgi-spec-version': '2.1',
58            'scheme': 'http',
59        }, 'headers'
60        assert resp['body'] == body, 'body'
61
62    def test_asgi_application_unix(self, temp_dir):
63        self.load('empty')
64
65        addr = f'{temp_dir}/sock'
66        assert 'success' in self.conf(
67            {f"unix:{addr}": {"pass": "applications/empty"}}, 'listeners'
68        )
69
70        assert self.get(sock_type='unix', addr=addr)['status'] == 200
71
72    def test_asgi_application_query_string(self):
73        self.load('query_string')
74
75        resp = self.get(url='/?var1=val1&var2=val2')
76
77        assert (
78            resp['headers']['query-string'] == 'var1=val1&var2=val2'
79        ), 'query-string header'
80
81    def test_asgi_application_prefix(self):
82        self.load('prefix', prefix='/api/rest')
83
84        def set_prefix(prefix):
85            self.conf(f'"{prefix}"', 'applications/prefix/prefix')
86
87        def check_prefix(url, prefix):
88            resp = self.get(url=url)
89            assert resp['status'] == 200
90            assert resp['headers']['prefix'] == prefix
91
92        check_prefix('/ap', 'NULL')
93        check_prefix('/api', 'NULL')
94        check_prefix('/api/', 'NULL')
95        check_prefix('/api/res', 'NULL')
96        check_prefix('/api/restful', 'NULL')
97        check_prefix('/api/rest', '/api/rest')
98        check_prefix('/api/rest/', '/api/rest')
99        check_prefix('/api/rest/get', '/api/rest')
100        check_prefix('/api/rest/get/blah', '/api/rest')
101
102        set_prefix('/api/rest/')
103        check_prefix('/api/rest', '/api/rest')
104        check_prefix('/api/restful', 'NULL')
105        check_prefix('/api/rest/', '/api/rest')
106        check_prefix('/api/rest/blah', '/api/rest')
107
108        set_prefix('/app')
109        check_prefix('/ap', 'NULL')
110        check_prefix('/app', '/app')
111        check_prefix('/app/', '/app')
112        check_prefix('/application/', 'NULL')
113
114        set_prefix('/')
115        check_prefix('/', 'NULL')
116        check_prefix('/app', 'NULL')
117
118    def test_asgi_application_query_string_space(self):
119        self.load('query_string')
120
121        resp = self.get(url='/ ?var1=val1&var2=val2')
122        assert (
123            resp['headers']['query-string'] == 'var1=val1&var2=val2'
124        ), 'query-string space'
125
126        resp = self.get(url='/ %20?var1=val1&var2=val2')
127        assert (
128            resp['headers']['query-string'] == 'var1=val1&var2=val2'
129        ), 'query-string space 2'
130
131        resp = self.get(url='/ %20 ?var1=val1&var2=val2')
132        assert (
133            resp['headers']['query-string'] == 'var1=val1&var2=val2'
134        ), 'query-string space 3'
135
136        resp = self.get(url='/blah %20 blah? var1= val1 & var2=val2')
137        assert (
138            resp['headers']['query-string'] == ' var1= val1 & var2=val2'
139        ), 'query-string space 4'
140
141    def test_asgi_application_query_string_empty(self):
142        self.load('query_string')
143
144        resp = self.get(url='/?')
145
146        assert resp['status'] == 200, 'query string empty status'
147        assert resp['headers']['query-string'] == '', 'query string empty'
148
149    def test_asgi_application_query_string_absent(self):
150        self.load('query_string')
151
152        resp = self.get()
153
154        assert resp['status'] == 200, 'query string absent status'
155        assert resp['headers']['query-string'] == '', 'query string absent'
156
157    @pytest.mark.skip('not yet')
158    def test_asgi_application_server_port(self):
159        self.load('server_port')
160
161        assert (
162            self.get()['headers']['Server-Port'] == '7080'
163        ), 'Server-Port header'
164
165    @pytest.mark.skip('not yet')
166    def test_asgi_application_working_directory_invalid(self):
167        self.load('empty')
168
169        assert 'success' in self.conf(
170            '"/blah"', 'applications/empty/working_directory'
171        ), 'configure invalid working_directory'
172
173        assert self.get()['status'] == 500, 'status'
174
175    def test_asgi_application_204_transfer_encoding(self):
176        self.load('204_no_content')
177
178        assert (
179            'Transfer-Encoding' not in self.get()['headers']
180        ), '204 header transfer encoding'
181
182    def test_asgi_application_shm_ack_handle(self):
183        # Minimum possible limit
184        shm_limit = 10 * 1024 * 1024
185
186        self.load('mirror', limits={"shm": shm_limit})
187
188        # Should exceed shm_limit
189        max_body_size = 12 * 1024 * 1024
190
191        assert 'success' in self.conf(
192            f'{{"http":{{"max_body_size": {max_body_size} }}}}',
193            'settings',
194        )
195
196        assert self.get()['status'] == 200, 'init'
197
198        body = '0123456789AB' * 1024 * 1024  # 12 Mb
199        resp = self.post(body=body, read_buffer_size=1024 * 1024)
200
201        assert resp['body'] == body, 'keep-alive 1'
202
203    def test_asgi_keepalive_body(self):
204        self.load('mirror')
205
206        assert self.get()['status'] == 200, 'init'
207
208        body = '0123456789' * 500
209        (resp, sock) = self.post(
210            headers={
211                'Host': 'localhost',
212                'Connection': 'keep-alive',
213            },
214            start=True,
215            body=body,
216            read_timeout=1,
217        )
218
219        assert resp['body'] == body, 'keep-alive 1'
220
221        body = '0123456789'
222        resp = self.post(sock=sock, body=body)
223
224        assert resp['body'] == body, 'keep-alive 2'
225
226    def test_asgi_keepalive_reconfigure(self):
227        self.load('mirror')
228
229        assert self.get()['status'] == 200, 'init'
230
231        body = '0123456789'
232        conns = 3
233        socks = []
234
235        for i in range(conns):
236            (resp, sock) = self.post(
237                headers={
238                    'Host': 'localhost',
239                    'Connection': 'keep-alive',
240                },
241                start=True,
242                body=body,
243                read_timeout=1,
244            )
245
246            assert resp['body'] == body, 'keep-alive open'
247
248            self.load('mirror', processes=i + 1)
249
250            socks.append(sock)
251
252        for i in range(conns):
253            (resp, sock) = self.post(
254                headers={
255                    'Host': 'localhost',
256                    'Connection': 'keep-alive',
257                },
258                start=True,
259                sock=socks[i],
260                body=body,
261                read_timeout=1,
262            )
263
264            assert resp['body'] == body, 'keep-alive request'
265
266            self.load('mirror', processes=i + 1)
267
268        for i in range(conns):
269            resp = self.post(sock=socks[i], body=body)
270
271            assert resp['body'] == body, 'keep-alive close'
272
273            self.load('mirror', processes=i + 1)
274
275    def test_asgi_keepalive_reconfigure_2(self):
276        self.load('mirror')
277
278        assert self.get()['status'] == 200, 'init'
279
280        body = '0123456789'
281
282        (resp, sock) = self.post(
283            headers={
284                'Host': 'localhost',
285                'Connection': 'keep-alive',
286            },
287            start=True,
288            body=body,
289            read_timeout=1,
290        )
291
292        assert resp['body'] == body, 'reconfigure 2 keep-alive 1'
293
294        self.load('empty')
295
296        assert self.get()['status'] == 200, 'init'
297
298        (resp, sock) = self.post(start=True, sock=sock, body=body)
299
300        assert resp['status'] == 200, 'reconfigure 2 keep-alive 2'
301        assert resp['body'] == '', 'reconfigure 2 keep-alive 2 body'
302
303        assert 'success' in self.conf(
304            {"listeners": {}, "applications": {}}
305        ), 'reconfigure 2 clear configuration'
306
307        resp = self.get(sock=sock)
308
309        assert resp == {}, 'reconfigure 2 keep-alive 3'
310
311    def test_asgi_keepalive_reconfigure_3(self):
312        self.load('empty')
313
314        assert self.get()['status'] == 200, 'init'
315
316        sock = self.http(
317            b"""GET / HTTP/1.1
318""",
319            raw=True,
320            no_recv=True,
321        )
322
323        assert self.get()['status'] == 200
324
325        assert 'success' in self.conf(
326            {"listeners": {}, "applications": {}}
327        ), 'reconfigure 3 clear configuration'
328
329        resp = self.http(
330            b"""Host: localhost
331Connection: close
332
333""",
334            sock=sock,
335            raw=True,
336        )
337
338        assert resp['status'] == 200, 'reconfigure 3'
339
340    def test_asgi_process_switch(self):
341        self.load('delayed', processes=2)
342
343        self.get(
344            headers={
345                'Host': 'localhost',
346                'Content-Length': '0',
347                'X-Delay': '5',
348                'Connection': 'close',
349            },
350            no_recv=True,
351        )
352
353        headers_delay_1 = {
354            'Connection': 'close',
355            'Host': 'localhost',
356            'Content-Length': '0',
357            'X-Delay': '1',
358        }
359
360        self.get(headers=headers_delay_1, no_recv=True)
361
362        time.sleep(0.5)
363
364        for _ in range(10):
365            self.get(headers=headers_delay_1, no_recv=True)
366
367        self.get(headers=headers_delay_1)
368
369    def test_asgi_application_loading_error(self, skip_alert):
370        skip_alert(r'Python failed to import module "blah"')
371
372        self.load('empty', module="blah")
373
374        assert self.get()['status'] == 503, 'loading error'
375
376    def test_asgi_application_threading(self):
377        """wait_for_record() timeouts after 5s while every thread works at
378        least 3s.  So without releasing GIL test should fail.
379        """
380
381        self.load('threading')
382
383        for _ in range(10):
384            self.get(no_recv=True)
385
386        assert (
387            self.wait_for_record(r'\(5\) Thread: 100', wait=50) is not None
388        ), 'last thread finished'
389
390    def test_asgi_application_threads(self):
391        self.load('threads', threads=2)
392
393        socks = []
394
395        for i in range(2):
396            sock = self.get(
397                headers={
398                    'Host': 'localhost',
399                    'X-Delay': '3',
400                    'Connection': 'close',
401                },
402                no_recv=True,
403            )
404
405            socks.append(sock)
406
407            time.sleep(1.0)  # required to avoid greedy request reading
408
409        threads = set()
410
411        for sock in socks:
412            resp = self.recvall(sock).decode('utf-8')
413
414            self.log_in(resp)
415
416            resp = self._resp_to_dict(resp)
417
418            assert resp['status'] == 200, 'status'
419
420            threads.add(resp['headers']['x-thread'])
421
422            sock.close()
423
424        assert len(socks) == len(threads), 'threads differs'
425
426    def test_asgi_application_legacy(self):
427        self.load('legacy')
428
429        resp = self.get(
430            headers={
431                'Host': 'localhost',
432                'Content-Length': '0',
433                'Connection': 'close',
434            },
435        )
436
437        assert resp['status'] == 200, 'status'
438
439    def test_asgi_application_legacy_force(self):
440        self.load('legacy_force', protocol='asgi')
441
442        resp = self.get(
443            headers={
444                'Host': 'localhost',
445                'Content-Length': '0',
446                'Connection': 'close',
447            },
448        )
449
450        assert resp['status'] == 200, 'status'
451