xref: /unit/test/test_asgi_application.py (revision 1984:06514cd08a35)
1import re
2import time
3from distutils.version import LooseVersion
4
5import pytest
6from unit.applications.lang.python import TestApplicationPython
7
8
9class TestASGIApplication(TestApplicationPython):
10    prerequisites = {
11        'modules': {'python': lambda v: LooseVersion(v) >= LooseVersion('3.5')}
12    }
13    load_module = 'asgi'
14
15    def test_asgi_application_variables(self):
16        self.load('variables')
17
18        body = 'Test body string.'
19
20        resp = self.http(
21            b"""POST / HTTP/1.1
22Host: localhost
23Content-Length: %d
24Custom-Header: blah
25Custom-hEader: Blah
26Content-Type: text/html
27Connection: close
28custom-header: BLAH
29
30%s"""
31            % (len(body), body.encode()),
32            raw=True,
33        )
34
35        assert resp['status'] == 200, 'status'
36        headers = resp['headers']
37        header_server = headers.pop('Server')
38        assert re.search(r'Unit/[\d\.]+', header_server), 'server header'
39
40        date = headers.pop('Date')
41        assert date[-4:] == ' GMT', 'date header timezone'
42        assert (
43            abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
44        ), 'date header'
45
46        assert headers == {
47            'Connection': 'close',
48            'content-length': str(len(body)),
49            'content-type': 'text/html',
50            'request-method': 'POST',
51            'request-uri': '/',
52            'http-host': 'localhost',
53            'http-version': '1.1',
54            'custom-header': 'blah, Blah, BLAH',
55            'asgi-version': '3.0',
56            'asgi-spec-version': '2.1',
57            'scheme': 'http',
58        }, 'headers'
59        assert resp['body'] == body, 'body'
60
61    def test_asgi_application_query_string(self):
62        self.load('query_string')
63
64        resp = self.get(url='/?var1=val1&var2=val2')
65
66        assert (
67            resp['headers']['query-string'] == 'var1=val1&var2=val2'
68        ), 'query-string header'
69
70    def test_asgi_application_query_string_space(self):
71        self.load('query_string')
72
73        resp = self.get(url='/ ?var1=val1&var2=val2')
74        assert (
75            resp['headers']['query-string'] == 'var1=val1&var2=val2'
76        ), 'query-string space'
77
78        resp = self.get(url='/ %20?var1=val1&var2=val2')
79        assert (
80            resp['headers']['query-string'] == 'var1=val1&var2=val2'
81        ), 'query-string space 2'
82
83        resp = self.get(url='/ %20 ?var1=val1&var2=val2')
84        assert (
85            resp['headers']['query-string'] == 'var1=val1&var2=val2'
86        ), 'query-string space 3'
87
88        resp = self.get(url='/blah %20 blah? var1= val1 & var2=val2')
89        assert (
90            resp['headers']['query-string'] == ' var1= val1 & var2=val2'
91        ), 'query-string space 4'
92
93    def test_asgi_application_query_string_empty(self):
94        self.load('query_string')
95
96        resp = self.get(url='/?')
97
98        assert resp['status'] == 200, 'query string empty status'
99        assert resp['headers']['query-string'] == '', 'query string empty'
100
101    def test_asgi_application_query_string_absent(self):
102        self.load('query_string')
103
104        resp = self.get()
105
106        assert resp['status'] == 200, 'query string absent status'
107        assert resp['headers']['query-string'] == '', 'query string absent'
108
109    @pytest.mark.skip('not yet')
110    def test_asgi_application_server_port(self):
111        self.load('server_port')
112
113        assert (
114            self.get()['headers']['Server-Port'] == '7080'
115        ), 'Server-Port header'
116
117    @pytest.mark.skip('not yet')
118    def test_asgi_application_working_directory_invalid(self):
119        self.load('empty')
120
121        assert 'success' in self.conf(
122            '"/blah"', 'applications/empty/working_directory'
123        ), 'configure invalid working_directory'
124
125        assert self.get()['status'] == 500, 'status'
126
127    def test_asgi_application_204_transfer_encoding(self):
128        self.load('204_no_content')
129
130        assert (
131            'Transfer-Encoding' not in self.get()['headers']
132        ), '204 header transfer encoding'
133
134    def test_asgi_application_shm_ack_handle(self):
135        # Minimum possible limit
136        shm_limit = 10 * 1024 * 1024
137
138        self.load('mirror', limits={"shm": shm_limit})
139
140        # Should exceed shm_limit
141        max_body_size = 12 * 1024 * 1024
142
143        assert 'success' in self.conf(
144            '{"http":{"max_body_size": ' + str(max_body_size) + ' }}',
145            'settings',
146        )
147
148        assert self.get()['status'] == 200, 'init'
149
150        body = '0123456789AB' * 1024 * 1024  # 12 Mb
151        resp = self.post(
152            headers={
153                'Host': 'localhost',
154                'Connection': 'close',
155                'Content-Type': 'text/html',
156            },
157            body=body,
158            read_buffer_size=1024 * 1024,
159        )
160
161        assert resp['body'] == body, 'keep-alive 1'
162
163    def test_asgi_keepalive_body(self):
164        self.load('mirror')
165
166        assert self.get()['status'] == 200, 'init'
167
168        body = '0123456789' * 500
169        (resp, sock) = self.post(
170            headers={
171                'Host': 'localhost',
172                'Connection': 'keep-alive',
173                'Content-Type': 'text/html',
174            },
175            start=True,
176            body=body,
177            read_timeout=1,
178        )
179
180        assert resp['body'] == body, 'keep-alive 1'
181
182        body = '0123456789'
183        resp = self.post(
184            headers={
185                'Host': 'localhost',
186                'Connection': 'close',
187                'Content-Type': 'text/html',
188            },
189            sock=sock,
190            body=body,
191        )
192
193        assert resp['body'] == body, 'keep-alive 2'
194
195    def test_asgi_keepalive_reconfigure(self):
196        self.load('mirror')
197
198        assert self.get()['status'] == 200, 'init'
199
200        body = '0123456789'
201        conns = 3
202        socks = []
203
204        for i in range(conns):
205            (resp, sock) = self.post(
206                headers={
207                    'Host': 'localhost',
208                    'Connection': 'keep-alive',
209                    'Content-Type': 'text/html',
210                },
211                start=True,
212                body=body,
213                read_timeout=1,
214            )
215
216            assert resp['body'] == body, 'keep-alive open'
217
218            self.load('mirror', processes=i + 1)
219
220            socks.append(sock)
221
222        for i in range(conns):
223            (resp, sock) = self.post(
224                headers={
225                    'Host': 'localhost',
226                    'Connection': 'keep-alive',
227                    'Content-Type': 'text/html',
228                },
229                start=True,
230                sock=socks[i],
231                body=body,
232                read_timeout=1,
233            )
234
235            assert resp['body'] == body, 'keep-alive request'
236
237            self.load('mirror', processes=i + 1)
238
239        for i in range(conns):
240            resp = self.post(
241                headers={
242                    'Host': 'localhost',
243                    'Connection': 'close',
244                    'Content-Type': 'text/html',
245                },
246                sock=socks[i],
247                body=body,
248            )
249
250            assert resp['body'] == body, 'keep-alive close'
251
252            self.load('mirror', processes=i + 1)
253
254    def test_asgi_keepalive_reconfigure_2(self):
255        self.load('mirror')
256
257        assert self.get()['status'] == 200, 'init'
258
259        body = '0123456789'
260
261        (resp, sock) = self.post(
262            headers={
263                'Host': 'localhost',
264                'Connection': 'keep-alive',
265                'Content-Type': 'text/html',
266            },
267            start=True,
268            body=body,
269            read_timeout=1,
270        )
271
272        assert resp['body'] == body, 'reconfigure 2 keep-alive 1'
273
274        self.load('empty')
275
276        assert self.get()['status'] == 200, 'init'
277
278        (resp, sock) = self.post(
279            headers={
280                'Host': 'localhost',
281                'Connection': 'close',
282                'Content-Type': 'text/html',
283            },
284            start=True,
285            sock=sock,
286            body=body,
287        )
288
289        assert resp['status'] == 200, 'reconfigure 2 keep-alive 2'
290        assert resp['body'] == '', 'reconfigure 2 keep-alive 2 body'
291
292        assert 'success' in self.conf(
293            {"listeners": {}, "applications": {}}
294        ), 'reconfigure 2 clear configuration'
295
296        resp = self.get(sock=sock)
297
298        assert resp == {}, 'reconfigure 2 keep-alive 3'
299
300    def test_asgi_keepalive_reconfigure_3(self):
301        self.load('empty')
302
303        assert self.get()['status'] == 200, 'init'
304
305        (_, sock) = self.http(
306            b"""GET / HTTP/1.1
307""",
308            start=True,
309            raw=True,
310            no_recv=True,
311        )
312
313        assert self.get()['status'] == 200
314
315        assert 'success' in self.conf(
316            {"listeners": {}, "applications": {}}
317        ), 'reconfigure 3 clear configuration'
318
319        resp = self.http(
320            b"""Host: localhost
321Connection: close
322
323""",
324            sock=sock,
325            raw=True,
326        )
327
328        assert resp['status'] == 200, 'reconfigure 3'
329
330    def test_asgi_process_switch(self):
331        self.load('delayed', processes=2)
332
333        self.get(
334            headers={
335                'Host': 'localhost',
336                'Content-Length': '0',
337                'X-Delay': '5',
338                'Connection': 'close',
339            },
340            no_recv=True,
341        )
342
343        headers_delay_1 = {
344            'Connection': 'close',
345            'Host': 'localhost',
346            'Content-Length': '0',
347            'X-Delay': '1',
348        }
349
350        self.get(headers=headers_delay_1, no_recv=True)
351
352        time.sleep(0.5)
353
354        for _ in range(10):
355            self.get(headers=headers_delay_1, no_recv=True)
356
357        self.get(headers=headers_delay_1)
358
359    def test_asgi_application_loading_error(self, skip_alert):
360        skip_alert(r'Python failed to import module "blah"')
361
362        self.load('empty', module="blah")
363
364        assert self.get()['status'] == 503, 'loading error'
365
366    def test_asgi_application_threading(self):
367        """wait_for_record() timeouts after 5s while every thread works at
368        least 3s.  So without releasing GIL test should fail.
369        """
370
371        self.load('threading')
372
373        for _ in range(10):
374            self.get(no_recv=True)
375
376        assert (
377            self.wait_for_record(r'\(5\) Thread: 100', wait=50) is not None
378        ), 'last thread finished'
379
380    def test_asgi_application_threads(self):
381        self.load('threads', threads=2)
382
383        socks = []
384
385        for i in range(2):
386            (_, sock) = self.get(
387                headers={
388                    'Host': 'localhost',
389                    'X-Delay': '3',
390                    'Connection': 'close',
391                },
392                no_recv=True,
393                start=True,
394            )
395
396            socks.append(sock)
397
398            time.sleep(1.0)  # required to avoid greedy request reading
399
400        threads = set()
401
402        for sock in socks:
403            resp = self.recvall(sock).decode('utf-8')
404
405            self.log_in(resp)
406
407            resp = self._resp_to_dict(resp)
408
409            assert resp['status'] == 200, 'status'
410
411            threads.add(resp['headers']['x-thread'])
412
413            sock.close()
414
415        assert len(socks) == len(threads), 'threads differs'
416
417    def test_asgi_application_legacy(self):
418        self.load('legacy')
419
420        resp = self.get(
421            headers={
422                'Host': 'localhost',
423                'Content-Length': '0',
424                'Connection': 'close',
425            },
426        )
427
428        assert resp['status'] == 200, 'status'
429
430    def test_asgi_application_legacy_force(self):
431        self.load('legacy_force', protocol='asgi')
432
433        resp = self.get(
434            headers={
435                'Host': 'localhost',
436                'Content-Length': '0',
437                'Connection': 'close',
438            },
439        )
440
441        assert resp['status'] == 200, 'status'
442