xref: /unit/test/test_asgi_application.py (revision 1971:3410f9d2a662)
1import re
2import time
3from distutils.version import LooseVersion
4
5import pytest
6from unit.applications.lang.python import TestApplicationPython
7from unit.option import option
8
9
10class TestASGIApplication(TestApplicationPython):
11    prerequisites = {
12        'modules': {'python': lambda v: LooseVersion(v) >= LooseVersion('3.5')}
13    }
14    load_module = 'asgi'
15
16    def test_asgi_application_variables(self):
17        self.load('variables')
18
19        body = 'Test body string.'
20
21        resp = self.http(
22            b"""POST / HTTP/1.1
23Host: localhost
24Content-Length: %d
25Custom-Header: blah
26Custom-hEader: Blah
27Content-Type: text/html
28Connection: close
29custom-header: BLAH
30
31%s"""
32            % (len(body), body.encode()),
33            raw=True,
34        )
35
36        assert resp['status'] == 200, 'status'
37        headers = resp['headers']
38        header_server = headers.pop('Server')
39        assert re.search(r'Unit/[\d\.]+', header_server), 'server header'
40
41        date = headers.pop('Date')
42        assert date[-4:] == ' GMT', 'date header timezone'
43        assert (
44            abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
45        ), 'date header'
46
47        assert headers == {
48            'Connection': 'close',
49            'content-length': str(len(body)),
50            'content-type': 'text/html',
51            'request-method': 'POST',
52            'request-uri': '/',
53            'http-host': 'localhost',
54            'http-version': '1.1',
55            'custom-header': 'blah, Blah, BLAH',
56            'asgi-version': '3.0',
57            'asgi-spec-version': '2.1',
58            'scheme': 'http',
59        }, 'headers'
60        assert resp['body'] == body, 'body'
61
62    def test_asgi_application_query_string(self):
63        self.load('query_string')
64
65        resp = self.get(url='/?var1=val1&var2=val2')
66
67        assert (
68            resp['headers']['query-string'] == 'var1=val1&var2=val2'
69        ), 'query-string header'
70
71    def test_asgi_application_query_string_space(self):
72        self.load('query_string')
73
74        resp = self.get(url='/ ?var1=val1&var2=val2')
75        assert (
76            resp['headers']['query-string'] == 'var1=val1&var2=val2'
77        ), 'query-string space'
78
79        resp = self.get(url='/ %20?var1=val1&var2=val2')
80        assert (
81            resp['headers']['query-string'] == 'var1=val1&var2=val2'
82        ), 'query-string space 2'
83
84        resp = self.get(url='/ %20 ?var1=val1&var2=val2')
85        assert (
86            resp['headers']['query-string'] == 'var1=val1&var2=val2'
87        ), 'query-string space 3'
88
89        resp = self.get(url='/blah %20 blah? var1= val1 & var2=val2')
90        assert (
91            resp['headers']['query-string'] == ' var1= val1 & var2=val2'
92        ), 'query-string space 4'
93
94    def test_asgi_application_query_string_empty(self):
95        self.load('query_string')
96
97        resp = self.get(url='/?')
98
99        assert resp['status'] == 200, 'query string empty status'
100        assert resp['headers']['query-string'] == '', 'query string empty'
101
102    def test_asgi_application_query_string_absent(self):
103        self.load('query_string')
104
105        resp = self.get()
106
107        assert resp['status'] == 200, 'query string absent status'
108        assert resp['headers']['query-string'] == '', 'query string absent'
109
110    @pytest.mark.skip('not yet')
111    def test_asgi_application_server_port(self):
112        self.load('server_port')
113
114        assert (
115            self.get()['headers']['Server-Port'] == '7080'
116        ), 'Server-Port header'
117
118    @pytest.mark.skip('not yet')
119    def test_asgi_application_working_directory_invalid(self):
120        self.load('empty')
121
122        assert 'success' in self.conf(
123            '"/blah"', 'applications/empty/working_directory'
124        ), 'configure invalid working_directory'
125
126        assert self.get()['status'] == 500, 'status'
127
128    def test_asgi_application_204_transfer_encoding(self):
129        self.load('204_no_content')
130
131        assert (
132            'Transfer-Encoding' not in self.get()['headers']
133        ), '204 header transfer encoding'
134
135    def test_asgi_application_shm_ack_handle(self):
136        # Minimum possible limit
137        shm_limit = 10 * 1024 * 1024
138
139        self.load('mirror', limits={"shm": shm_limit})
140
141        # Should exceed shm_limit
142        max_body_size = 12 * 1024 * 1024
143
144        assert 'success' in self.conf(
145            '{"http":{"max_body_size": ' + str(max_body_size) + ' }}',
146            'settings',
147        )
148
149        assert self.get()['status'] == 200, 'init'
150
151        body = '0123456789AB' * 1024 * 1024  # 12 Mb
152        resp = self.post(
153            headers={
154                'Host': 'localhost',
155                'Connection': 'close',
156                'Content-Type': 'text/html',
157            },
158            body=body,
159            read_buffer_size=1024 * 1024,
160        )
161
162        assert resp['body'] == body, 'keep-alive 1'
163
164    def test_asgi_keepalive_body(self):
165        self.load('mirror')
166
167        assert self.get()['status'] == 200, 'init'
168
169        body = '0123456789' * 500
170        (resp, sock) = self.post(
171            headers={
172                'Host': 'localhost',
173                'Connection': 'keep-alive',
174                'Content-Type': 'text/html',
175            },
176            start=True,
177            body=body,
178            read_timeout=1,
179        )
180
181        assert resp['body'] == body, 'keep-alive 1'
182
183        body = '0123456789'
184        resp = self.post(
185            headers={
186                'Host': 'localhost',
187                'Connection': 'close',
188                'Content-Type': 'text/html',
189            },
190            sock=sock,
191            body=body,
192        )
193
194        assert resp['body'] == body, 'keep-alive 2'
195
196    def test_asgi_keepalive_reconfigure(self):
197        self.load('mirror')
198
199        assert self.get()['status'] == 200, 'init'
200
201        body = '0123456789'
202        conns = 3
203        socks = []
204
205        for i in range(conns):
206            (resp, sock) = self.post(
207                headers={
208                    'Host': 'localhost',
209                    'Connection': 'keep-alive',
210                    'Content-Type': 'text/html',
211                },
212                start=True,
213                body=body,
214                read_timeout=1,
215            )
216
217            assert resp['body'] == body, 'keep-alive open'
218
219            self.load('mirror', processes=i + 1)
220
221            socks.append(sock)
222
223        for i in range(conns):
224            (resp, sock) = self.post(
225                headers={
226                    'Host': 'localhost',
227                    'Connection': 'keep-alive',
228                    'Content-Type': 'text/html',
229                },
230                start=True,
231                sock=socks[i],
232                body=body,
233                read_timeout=1,
234            )
235
236            assert resp['body'] == body, 'keep-alive request'
237
238            self.load('mirror', processes=i + 1)
239
240        for i in range(conns):
241            resp = self.post(
242                headers={
243                    'Host': 'localhost',
244                    'Connection': 'close',
245                    'Content-Type': 'text/html',
246                },
247                sock=socks[i],
248                body=body,
249            )
250
251            assert resp['body'] == body, 'keep-alive close'
252
253            self.load('mirror', processes=i + 1)
254
255    def test_asgi_keepalive_reconfigure_2(self):
256        self.load('mirror')
257
258        assert self.get()['status'] == 200, 'init'
259
260        body = '0123456789'
261
262        (resp, sock) = self.post(
263            headers={
264                'Host': 'localhost',
265                'Connection': 'keep-alive',
266                'Content-Type': 'text/html',
267            },
268            start=True,
269            body=body,
270            read_timeout=1,
271        )
272
273        assert resp['body'] == body, 'reconfigure 2 keep-alive 1'
274
275        self.load('empty')
276
277        assert self.get()['status'] == 200, 'init'
278
279        (resp, sock) = self.post(
280            headers={
281                'Host': 'localhost',
282                'Connection': 'close',
283                'Content-Type': 'text/html',
284            },
285            start=True,
286            sock=sock,
287            body=body,
288        )
289
290        assert resp['status'] == 200, 'reconfigure 2 keep-alive 2'
291        assert resp['body'] == '', 'reconfigure 2 keep-alive 2 body'
292
293        assert 'success' in self.conf(
294            {"listeners": {}, "applications": {}}
295        ), 'reconfigure 2 clear configuration'
296
297        resp = self.get(sock=sock)
298
299        assert resp == {}, 'reconfigure 2 keep-alive 3'
300
301    def test_asgi_keepalive_reconfigure_3(self):
302        self.load('empty')
303
304        assert self.get()['status'] == 200, 'init'
305
306        (_, sock) = self.http(
307            b"""GET / HTTP/1.1
308""",
309            start=True,
310            raw=True,
311            no_recv=True,
312        )
313
314        assert self.get()['status'] == 200
315
316        assert 'success' in self.conf(
317            {"listeners": {}, "applications": {}}
318        ), 'reconfigure 3 clear configuration'
319
320        resp = self.http(
321            b"""Host: localhost
322Connection: close
323
324""",
325            sock=sock,
326            raw=True,
327        )
328
329        assert resp['status'] == 200, 'reconfigure 3'
330
331    def test_asgi_process_switch(self):
332        self.load('delayed', processes=2)
333
334        self.get(
335            headers={
336                'Host': 'localhost',
337                'Content-Length': '0',
338                'X-Delay': '5',
339                'Connection': 'close',
340            },
341            no_recv=True,
342        )
343
344        headers_delay_1 = {
345            'Connection': 'close',
346            'Host': 'localhost',
347            'Content-Length': '0',
348            'X-Delay': '1',
349        }
350
351        self.get(headers=headers_delay_1, no_recv=True)
352
353        time.sleep(0.5)
354
355        for _ in range(10):
356            self.get(headers=headers_delay_1, no_recv=True)
357
358        self.get(headers=headers_delay_1)
359
360    def test_asgi_application_loading_error(self, skip_alert):
361        skip_alert(r'Python failed to import module "blah"')
362
363        self.load('empty', module="blah")
364
365        assert self.get()['status'] == 503, 'loading error'
366
367    def test_asgi_application_threading(self):
368        """wait_for_record() timeouts after 5s while every thread works at
369        least 3s.  So without releasing GIL test should fail.
370        """
371
372        self.load('threading')
373
374        for _ in range(10):
375            self.get(no_recv=True)
376
377        assert (
378            self.wait_for_record(r'\(5\) Thread: 100', wait=50) is not None
379        ), 'last thread finished'
380
381    def test_asgi_application_threads(self):
382        self.load('threads', threads=2)
383
384        socks = []
385
386        for i in range(2):
387            (_, sock) = self.get(
388                headers={
389                    'Host': 'localhost',
390                    'X-Delay': '3',
391                    'Connection': 'close',
392                },
393                no_recv=True,
394                start=True,
395            )
396
397            socks.append(sock)
398
399            time.sleep(1.0)  # required to avoid greedy request reading
400
401        threads = set()
402
403        for sock in socks:
404            resp = self.recvall(sock).decode('utf-8')
405
406            self.log_in(resp)
407
408            resp = self._resp_to_dict(resp)
409
410            assert resp['status'] == 200, 'status'
411
412            threads.add(resp['headers']['x-thread'])
413
414            sock.close()
415
416        assert len(socks) == len(threads), 'threads differs'
417
418    def test_asgi_application_legacy(self):
419        self.load('legacy')
420
421        resp = self.get(
422            headers={
423                'Host': 'localhost',
424                'Content-Length': '0',
425                'Connection': 'close',
426            },
427        )
428
429        assert resp['status'] == 200, 'status'
430
431    def test_asgi_application_legacy_force(self):
432        self.load('legacy_force', protocol='asgi')
433
434        resp = self.get(
435            headers={
436                'Host': 'localhost',
437                'Content-Length': '0',
438                'Connection': 'close',
439            },
440        )
441
442        assert resp['status'] == 200, 'status'
443