xref: /unit/test/test_asgi_application.py (revision 1683:43cbc14c7be9)
1import re
2import time
3from distutils.version import LooseVersion
4
5import pytest
6
7from conftest import option
8from conftest import skip_alert
9from unit.applications.lang.python import TestApplicationPython
10
11
12class TestASGIApplication(TestApplicationPython):
13    prerequisites = {'modules': {'python':
14                            lambda v: LooseVersion(v) >= LooseVersion('3.5')}}
15    load_module = 'asgi'
16
17    def findall(self, pattern):
18        with open(option.temp_dir + '/unit.log', 'r', errors='ignore') as f:
19            return re.findall(pattern, f.read())
20
21    def test_asgi_application_variables(self):
22        self.load('variables')
23
24        body = 'Test body string.'
25
26        resp = self.http(
27            b"""POST / HTTP/1.1
28Host: localhost
29Content-Length: %d
30Custom-Header: blah
31Custom-hEader: Blah
32Content-Type: text/html
33Connection: close
34custom-header: BLAH
35
36%s""" % (len(body), body.encode()),
37            raw=True,
38        )
39
40        assert resp['status'] == 200, 'status'
41        headers = resp['headers']
42        header_server = headers.pop('Server')
43        assert re.search(r'Unit/[\d\.]+', header_server), 'server header'
44
45        date = headers.pop('Date')
46        assert date[-4:] == ' GMT', 'date header timezone'
47        assert (
48            abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
49        ), 'date header'
50
51        assert headers == {
52            'Connection': 'close',
53            'content-length': str(len(body)),
54            'content-type': 'text/html',
55            'request-method': 'POST',
56            'request-uri': '/',
57            'http-host': 'localhost',
58            'http-version': '1.1',
59            'custom-header': 'blah, Blah, BLAH',
60            'asgi-version': '3.0',
61            'asgi-spec-version': '2.1',
62            'scheme': 'http',
63        }, 'headers'
64        assert resp['body'] == body, 'body'
65
66    def test_asgi_application_query_string(self):
67        self.load('query_string')
68
69        resp = self.get(url='/?var1=val1&var2=val2')
70
71        assert (
72            resp['headers']['query-string'] == 'var1=val1&var2=val2'
73        ), 'query-string header'
74
75    def test_asgi_application_query_string_space(self):
76        self.load('query_string')
77
78        resp = self.get(url='/ ?var1=val1&var2=val2')
79        assert (
80            resp['headers']['query-string'] == 'var1=val1&var2=val2'
81        ), 'query-string space'
82
83        resp = self.get(url='/ %20?var1=val1&var2=val2')
84        assert (
85            resp['headers']['query-string'] == 'var1=val1&var2=val2'
86        ), 'query-string space 2'
87
88        resp = self.get(url='/ %20 ?var1=val1&var2=val2')
89        assert (
90            resp['headers']['query-string'] == 'var1=val1&var2=val2'
91        ), 'query-string space 3'
92
93        resp = self.get(url='/blah %20 blah? var1= val1 & var2=val2')
94        assert (
95            resp['headers']['query-string'] == ' var1= val1 & var2=val2'
96        ), 'query-string space 4'
97
98    def test_asgi_application_query_string_empty(self):
99        self.load('query_string')
100
101        resp = self.get(url='/?')
102
103        assert resp['status'] == 200, 'query string empty status'
104        assert resp['headers']['query-string'] == '', 'query string empty'
105
106    def test_asgi_application_query_string_absent(self):
107        self.load('query_string')
108
109        resp = self.get()
110
111        assert resp['status'] == 200, 'query string absent status'
112        assert resp['headers']['query-string'] == '', 'query string absent'
113
114    @pytest.mark.skip('not yet')
115    def test_asgi_application_server_port(self):
116        self.load('server_port')
117
118        assert (
119            self.get()['headers']['Server-Port'] == '7080'
120        ), 'Server-Port header'
121
122    @pytest.mark.skip('not yet')
123    def test_asgi_application_working_directory_invalid(self):
124        self.load('empty')
125
126        assert 'success' in self.conf(
127            '"/blah"', 'applications/empty/working_directory'
128        ), 'configure invalid working_directory'
129
130        assert self.get()['status'] == 500, 'status'
131
132    def test_asgi_application_204_transfer_encoding(self):
133        self.load('204_no_content')
134
135        assert (
136            'Transfer-Encoding' not in self.get()['headers']
137        ), '204 header transfer encoding'
138
139    def test_asgi_application_shm_ack_handle(self):
140        self.load('mirror')
141
142        # Minimum possible limit
143        shm_limit = 10 * 1024 * 1024
144
145        assert (
146            'success' in self.conf('{"shm": ' + str(shm_limit) + '}',
147                                 'applications/mirror/limits')
148        )
149
150        # Should exceed shm_limit
151        max_body_size = 12 * 1024 * 1024
152
153        assert (
154            'success' in self.conf('{"http":{"max_body_size": '
155                                  + str(max_body_size) + ' }}',
156                                 'settings')
157        )
158
159        assert self.get()['status'] == 200, 'init'
160
161        body = '0123456789AB' * 1024 * 1024  # 12 Mb
162        resp = self.post(
163            headers={
164                'Host': 'localhost',
165                'Connection': 'close',
166                'Content-Type': 'text/html',
167            },
168            body=body,
169            read_buffer_size=1024 * 1024,
170        )
171
172        assert resp['body'] == body, 'keep-alive 1'
173
174    def test_asgi_keepalive_body(self):
175        self.load('mirror')
176
177        assert self.get()['status'] == 200, 'init'
178
179        body = '0123456789' * 500
180        (resp, sock) = self.post(
181            headers={
182                'Host': 'localhost',
183                'Connection': 'keep-alive',
184                'Content-Type': 'text/html',
185            },
186            start=True,
187            body=body,
188            read_timeout=1,
189        )
190
191        assert resp['body'] == body, 'keep-alive 1'
192
193        body = '0123456789'
194        resp = self.post(
195            headers={
196                'Host': 'localhost',
197                'Connection': 'close',
198                'Content-Type': 'text/html',
199            },
200            sock=sock,
201            body=body,
202        )
203
204        assert resp['body'] == body, 'keep-alive 2'
205
206    def test_asgi_keepalive_reconfigure(self):
207        skip_alert(
208            r'pthread_mutex.+failed',
209            r'failed to apply',
210            r'process \d+ exited on signal',
211        )
212        self.load('mirror')
213
214        assert self.get()['status'] == 200, 'init'
215
216        body = '0123456789'
217        conns = 3
218        socks = []
219
220        for i in range(conns):
221            (resp, sock) = self.post(
222                headers={
223                    'Host': 'localhost',
224                    'Connection': 'keep-alive',
225                    'Content-Type': 'text/html',
226                },
227                start=True,
228                body=body,
229                read_timeout=1,
230            )
231
232            assert resp['body'] == body, 'keep-alive open'
233            assert 'success' in self.conf(
234                str(i + 1), 'applications/mirror/processes'
235            ), 'reconfigure'
236
237            socks.append(sock)
238
239        for i in range(conns):
240            (resp, sock) = self.post(
241                headers={
242                    'Host': 'localhost',
243                    'Connection': 'keep-alive',
244                    'Content-Type': 'text/html',
245                },
246                start=True,
247                sock=socks[i],
248                body=body,
249                read_timeout=1,
250            )
251
252            assert resp['body'] == body, 'keep-alive request'
253            assert 'success' in self.conf(
254                str(i + 1), 'applications/mirror/processes'
255            ), 'reconfigure 2'
256
257        for i in range(conns):
258            resp = self.post(
259                headers={
260                    'Host': 'localhost',
261                    'Connection': 'close',
262                    'Content-Type': 'text/html',
263                },
264                sock=socks[i],
265                body=body,
266            )
267
268            assert resp['body'] == body, 'keep-alive close'
269            assert 'success' in self.conf(
270                str(i + 1), 'applications/mirror/processes'
271            ), 'reconfigure 3'
272
273    def test_asgi_keepalive_reconfigure_2(self):
274        self.load('mirror')
275
276        assert self.get()['status'] == 200, 'init'
277
278        body = '0123456789'
279
280        (resp, sock) = self.post(
281            headers={
282                'Host': 'localhost',
283                'Connection': 'keep-alive',
284                'Content-Type': 'text/html',
285            },
286            start=True,
287            body=body,
288            read_timeout=1,
289        )
290
291        assert resp['body'] == body, 'reconfigure 2 keep-alive 1'
292
293        self.load('empty')
294
295        assert self.get()['status'] == 200, 'init'
296
297        (resp, sock) = self.post(
298            headers={
299                'Host': 'localhost',
300                'Connection': 'close',
301                'Content-Type': 'text/html',
302            },
303            start=True,
304            sock=sock,
305            body=body,
306        )
307
308        assert resp['status'] == 200, 'reconfigure 2 keep-alive 2'
309        assert resp['body'] == '', 'reconfigure 2 keep-alive 2 body'
310
311        assert 'success' in self.conf(
312            {"listeners": {}, "applications": {}}
313        ), 'reconfigure 2 clear configuration'
314
315        resp = self.get(sock=sock)
316
317        assert resp == {}, 'reconfigure 2 keep-alive 3'
318
319    def test_asgi_keepalive_reconfigure_3(self):
320        self.load('empty')
321
322        assert self.get()['status'] == 200, 'init'
323
324        (_, sock) = self.http(
325            b"""GET / HTTP/1.1
326""",
327            start=True,
328            raw=True,
329            no_recv=True,
330        )
331
332        assert self.get()['status'] == 200
333
334        assert 'success' in self.conf(
335            {"listeners": {}, "applications": {}}
336        ), 'reconfigure 3 clear configuration'
337
338        resp = self.http(
339            b"""Host: localhost
340Connection: close
341
342""",
343            sock=sock,
344            raw=True,
345        )
346
347        assert resp['status'] == 200, 'reconfigure 3'
348
349    def test_asgi_process_switch(self):
350        self.load('delayed')
351
352        assert 'success' in self.conf(
353            '2', 'applications/delayed/processes'
354        ), 'configure 2 processes'
355
356        self.get(
357            headers={
358                'Host': 'localhost',
359                'Content-Length': '0',
360                'X-Delay': '5',
361                'Connection': 'close',
362            },
363            no_recv=True,
364        )
365
366        headers_delay_1 = {
367            'Connection': 'close',
368            'Host': 'localhost',
369            'Content-Length': '0',
370            'X-Delay': '1',
371        }
372
373        self.get(headers=headers_delay_1, no_recv=True)
374
375        time.sleep(0.5)
376
377        for _ in range(10):
378            self.get(headers=headers_delay_1, no_recv=True)
379
380        self.get(headers=headers_delay_1)
381
382    def test_asgi_application_loading_error(self):
383        skip_alert(r'Python failed to import module "blah"')
384
385        self.load('empty')
386
387        assert 'success' in self.conf('"blah"', 'applications/empty/module')
388
389        assert self.get()['status'] == 503, 'loading error'
390
391    def test_asgi_application_threading(self):
392        """wait_for_record() timeouts after 5s while every thread works at
393        least 3s.  So without releasing GIL test should fail.
394        """
395
396        self.load('threading')
397
398        for _ in range(10):
399            self.get(no_recv=True)
400
401        assert (
402            self.wait_for_record(r'\(5\) Thread: 100') is not None
403        ), 'last thread finished'
404
405    def test_asgi_application_threads(self):
406        self.load('threads')
407
408        assert 'success' in self.conf(
409            '4', 'applications/threads/threads'
410        ), 'configure 4 threads'
411
412        socks = []
413
414        for i in range(4):
415            (_, sock) = self.get(
416                headers={
417                    'Host': 'localhost',
418                    'X-Delay': '2',
419                    'Connection': 'close',
420                },
421                no_recv=True,
422                start=True,
423            )
424
425            socks.append(sock)
426
427            time.sleep(0.25) # required to avoid greedy request reading
428
429        threads = set()
430
431        for sock in socks:
432            resp = self.recvall(sock).decode('utf-8')
433
434            self.log_in(resp)
435
436            resp = self._resp_to_dict(resp)
437
438            assert resp['status'] == 200, 'status'
439
440            threads.add(resp['headers']['x-thread'])
441
442            sock.close()
443
444        assert len(socks) == len(threads), 'threads differs'
445