xref: /unit/test/python/threads/asgi.py (revision 1848:4bd548074e2c)
1import asyncio
2import time
3import threading
4
5
6async def application(scope, receive, send):
7    assert scope['type'] == 'http'
8
9    headers = scope.get('headers', [])
10
11    def get_header(n, v=None):
12        for h in headers:
13            if h[0] == n:
14                return h[1]
15        return v
16
17    delay = float(get_header(b'x-delay', 0))
18
19    time.sleep(delay)
20
21    await send(
22        {
23            'type': 'http.response.start',
24            'status': 200,
25            'headers': [
26                (b'content-length', b'0'),
27                (b'x-thread', str(threading.currentThread().ident).encode()),
28            ],
29        }
30    )
31