xref: /unit/test/test_asgi_application.py (revision 2066:242192963d93)
1import re
2import time
3
4import pytest
5from packaging import version
6from unit.applications.lang.python import TestApplicationPython
7
8
9class TestASGIApplication(TestApplicationPython):
10    prerequisites = {
11        'modules': {
12            'python': lambda v: version.parse(v) >= version.parse('3.5')
13        }
14    }
15    load_module = 'asgi'
16
17    def test_asgi_application_variables(self):
18        self.load('variables')
19
20        body = 'Test body string.'
21
22        resp = self.http(
23            b"""POST / HTTP/1.1
24Host: localhost
25Content-Length: %d
26Custom-Header: blah
27Custom-hEader: Blah
28Content-Type: text/html
29Connection: close
30custom-header: BLAH
31
32%s"""
33            % (len(body), body.encode()),
34            raw=True,
35        )
36
37        assert resp['status'] == 200, 'status'
38        headers = resp['headers']
39        header_server = headers.pop('Server')
40        assert re.search(r'Unit/[\d\.]+', header_server), 'server header'
41
42        date = headers.pop('Date')
43        assert date[-4:] == ' GMT', 'date header timezone'
44        assert (
45            abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
46        ), 'date header'
47
48        assert headers == {
49            'Connection': 'close',
50            'content-length': str(len(body)),
51            'content-type': 'text/html',
52            'request-method': 'POST',
53            'request-uri': '/',
54            'http-host': 'localhost',
55            'http-version': '1.1',
56            'custom-header': 'blah, Blah, BLAH',
57            'asgi-version': '3.0',
58            'asgi-spec-version': '2.1',
59            'scheme': 'http',
60        }, 'headers'
61        assert resp['body'] == body, 'body'
62
63    def test_asgi_application_query_string(self):
64        self.load('query_string')
65
66        resp = self.get(url='/?var1=val1&var2=val2')
67
68        assert (
69            resp['headers']['query-string'] == 'var1=val1&var2=val2'
70        ), 'query-string header'
71
72    def test_asgi_application_query_string_space(self):
73        self.load('query_string')
74
75        resp = self.get(url='/ ?var1=val1&var2=val2')
76        assert (
77            resp['headers']['query-string'] == 'var1=val1&var2=val2'
78        ), 'query-string space'
79
80        resp = self.get(url='/ %20?var1=val1&var2=val2')
81        assert (
82            resp['headers']['query-string'] == 'var1=val1&var2=val2'
83        ), 'query-string space 2'
84
85        resp = self.get(url='/ %20 ?var1=val1&var2=val2')
86        assert (
87            resp['headers']['query-string'] == 'var1=val1&var2=val2'
88        ), 'query-string space 3'
89
90        resp = self.get(url='/blah %20 blah? var1= val1 & var2=val2')
91        assert (
92            resp['headers']['query-string'] == ' var1= val1 & var2=val2'
93        ), 'query-string space 4'
94
95    def test_asgi_application_query_string_empty(self):
96        self.load('query_string')
97
98        resp = self.get(url='/?')
99
100        assert resp['status'] == 200, 'query string empty status'
101        assert resp['headers']['query-string'] == '', 'query string empty'
102
103    def test_asgi_application_query_string_absent(self):
104        self.load('query_string')
105
106        resp = self.get()
107
108        assert resp['status'] == 200, 'query string absent status'
109        assert resp['headers']['query-string'] == '', 'query string absent'
110
111    @pytest.mark.skip('not yet')
112    def test_asgi_application_server_port(self):
113        self.load('server_port')
114
115        assert (
116            self.get()['headers']['Server-Port'] == '7080'
117        ), 'Server-Port header'
118
119    @pytest.mark.skip('not yet')
120    def test_asgi_application_working_directory_invalid(self):
121        self.load('empty')
122
123        assert 'success' in self.conf(
124            '"/blah"', 'applications/empty/working_directory'
125        ), 'configure invalid working_directory'
126
127        assert self.get()['status'] == 500, 'status'
128
129    def test_asgi_application_204_transfer_encoding(self):
130        self.load('204_no_content')
131
132        assert (
133            'Transfer-Encoding' not in self.get()['headers']
134        ), '204 header transfer encoding'
135
136    def test_asgi_application_shm_ack_handle(self):
137        # Minimum possible limit
138        shm_limit = 10 * 1024 * 1024
139
140        self.load('mirror', limits={"shm": shm_limit})
141
142        # Should exceed shm_limit
143        max_body_size = 12 * 1024 * 1024
144
145        assert 'success' in self.conf(
146            '{"http":{"max_body_size": ' + str(max_body_size) + ' }}',
147            'settings',
148        )
149
150        assert self.get()['status'] == 200, 'init'
151
152        body = '0123456789AB' * 1024 * 1024  # 12 Mb
153        resp = self.post(
154            headers={
155                'Host': 'localhost',
156                'Connection': 'close',
157                'Content-Type': 'text/html',
158            },
159            body=body,
160            read_buffer_size=1024 * 1024,
161        )
162
163        assert resp['body'] == body, 'keep-alive 1'
164
165    def test_asgi_keepalive_body(self):
166        self.load('mirror')
167
168        assert self.get()['status'] == 200, 'init'
169
170        body = '0123456789' * 500
171        (resp, sock) = self.post(
172            headers={
173                'Host': 'localhost',
174                'Connection': 'keep-alive',
175                'Content-Type': 'text/html',
176            },
177            start=True,
178            body=body,
179            read_timeout=1,
180        )
181
182        assert resp['body'] == body, 'keep-alive 1'
183
184        body = '0123456789'
185        resp = self.post(
186            headers={
187                'Host': 'localhost',
188                'Connection': 'close',
189                'Content-Type': 'text/html',
190            },
191            sock=sock,
192            body=body,
193        )
194
195        assert resp['body'] == body, 'keep-alive 2'
196
197    def test_asgi_keepalive_reconfigure(self):
198        self.load('mirror')
199
200        assert self.get()['status'] == 200, 'init'
201
202        body = '0123456789'
203        conns = 3
204        socks = []
205
206        for i in range(conns):
207            (resp, sock) = self.post(
208                headers={
209                    'Host': 'localhost',
210                    'Connection': 'keep-alive',
211                    'Content-Type': 'text/html',
212                },
213                start=True,
214                body=body,
215                read_timeout=1,
216            )
217
218            assert resp['body'] == body, 'keep-alive open'
219
220            self.load('mirror', processes=i + 1)
221
222            socks.append(sock)
223
224        for i in range(conns):
225            (resp, sock) = self.post(
226                headers={
227                    'Host': 'localhost',
228                    'Connection': 'keep-alive',
229                    'Content-Type': 'text/html',
230                },
231                start=True,
232                sock=socks[i],
233                body=body,
234                read_timeout=1,
235            )
236
237            assert resp['body'] == body, 'keep-alive request'
238
239            self.load('mirror', processes=i + 1)
240
241        for i in range(conns):
242            resp = self.post(
243                headers={
244                    'Host': 'localhost',
245                    'Connection': 'close',
246                    'Content-Type': 'text/html',
247                },
248                sock=socks[i],
249                body=body,
250            )
251
252            assert resp['body'] == body, 'keep-alive close'
253
254            self.load('mirror', processes=i + 1)
255
256    def test_asgi_keepalive_reconfigure_2(self):
257        self.load('mirror')
258
259        assert self.get()['status'] == 200, 'init'
260
261        body = '0123456789'
262
263        (resp, sock) = self.post(
264            headers={
265                'Host': 'localhost',
266                'Connection': 'keep-alive',
267                'Content-Type': 'text/html',
268            },
269            start=True,
270            body=body,
271            read_timeout=1,
272        )
273
274        assert resp['body'] == body, 'reconfigure 2 keep-alive 1'
275
276        self.load('empty')
277
278        assert self.get()['status'] == 200, 'init'
279
280        (resp, sock) = self.post(
281            headers={
282                'Host': 'localhost',
283                'Connection': 'close',
284                'Content-Type': 'text/html',
285            },
286            start=True,
287            sock=sock,
288            body=body,
289        )
290
291        assert resp['status'] == 200, 'reconfigure 2 keep-alive 2'
292        assert resp['body'] == '', 'reconfigure 2 keep-alive 2 body'
293
294        assert 'success' in self.conf(
295            {"listeners": {}, "applications": {}}
296        ), 'reconfigure 2 clear configuration'
297
298        resp = self.get(sock=sock)
299
300        assert resp == {}, 'reconfigure 2 keep-alive 3'
301
302    def test_asgi_keepalive_reconfigure_3(self):
303        self.load('empty')
304
305        assert self.get()['status'] == 200, 'init'
306
307        (_, sock) = self.http(
308            b"""GET / HTTP/1.1
309""",
310            start=True,
311            raw=True,
312            no_recv=True,
313        )
314
315        assert self.get()['status'] == 200
316
317        assert 'success' in self.conf(
318            {"listeners": {}, "applications": {}}
319        ), 'reconfigure 3 clear configuration'
320
321        resp = self.http(
322            b"""Host: localhost
323Connection: close
324
325""",
326            sock=sock,
327            raw=True,
328        )
329
330        assert resp['status'] == 200, 'reconfigure 3'
331
332    def test_asgi_process_switch(self):
333        self.load('delayed', processes=2)
334
335        self.get(
336            headers={
337                'Host': 'localhost',
338                'Content-Length': '0',
339                'X-Delay': '5',
340                'Connection': 'close',
341            },
342            no_recv=True,
343        )
344
345        headers_delay_1 = {
346            'Connection': 'close',
347            'Host': 'localhost',
348            'Content-Length': '0',
349            'X-Delay': '1',
350        }
351
352        self.get(headers=headers_delay_1, no_recv=True)
353
354        time.sleep(0.5)
355
356        for _ in range(10):
357            self.get(headers=headers_delay_1, no_recv=True)
358
359        self.get(headers=headers_delay_1)
360
361    def test_asgi_application_loading_error(self, skip_alert):
362        skip_alert(r'Python failed to import module "blah"')
363
364        self.load('empty', module="blah")
365
366        assert self.get()['status'] == 503, 'loading error'
367
368    def test_asgi_application_threading(self):
369        """wait_for_record() timeouts after 5s while every thread works at
370        least 3s.  So without releasing GIL test should fail.
371        """
372
373        self.load('threading')
374
375        for _ in range(10):
376            self.get(no_recv=True)
377
378        assert (
379            self.wait_for_record(r'\(5\) Thread: 100', wait=50) is not None
380        ), 'last thread finished'
381
382    def test_asgi_application_threads(self):
383        self.load('threads', threads=2)
384
385        socks = []
386
387        for i in range(2):
388            (_, sock) = self.get(
389                headers={
390                    'Host': 'localhost',
391                    'X-Delay': '3',
392                    'Connection': 'close',
393                },
394                no_recv=True,
395                start=True,
396            )
397
398            socks.append(sock)
399
400            time.sleep(1.0)  # required to avoid greedy request reading
401
402        threads = set()
403
404        for sock in socks:
405            resp = self.recvall(sock).decode('utf-8')
406
407            self.log_in(resp)
408
409            resp = self._resp_to_dict(resp)
410
411            assert resp['status'] == 200, 'status'
412
413            threads.add(resp['headers']['x-thread'])
414
415            sock.close()
416
417        assert len(socks) == len(threads), 'threads differs'
418
419    def test_asgi_application_legacy(self):
420        self.load('legacy')
421
422        resp = self.get(
423            headers={
424                'Host': 'localhost',
425                'Content-Length': '0',
426                'Connection': 'close',
427            },
428        )
429
430        assert resp['status'] == 200, 'status'
431
432    def test_asgi_application_legacy_force(self):
433        self.load('legacy_force', protocol='asgi')
434
435        resp = self.get(
436            headers={
437                'Host': 'localhost',
438                'Content-Length': '0',
439                'Connection': 'close',
440            },
441        )
442
443        assert resp['status'] == 200, 'status'
444