xref: /unit/src/python/nxt_python_asgi.c (revision 2068:e7eded7e5de9)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <nxt_unit_response.h>
15 #include <python/nxt_python_asgi.h>
16 #include <python/nxt_python_asgi_str.h>
17 
18 
19 static PyObject *nxt_python_asgi_get_func(PyObject *obj);
20 static int nxt_python_asgi_ctx_data_alloc(void **pdata, int main);
21 static void nxt_python_asgi_ctx_data_free(void *data);
22 static int nxt_python_asgi_startup(void *data);
23 static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
24 
25 static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
26     nxt_unit_port_t *port);
27 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
28 static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req);
29 
30 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
31 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
32     uint16_t port);
33 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
34 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
35 
36 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
37 static int nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
38 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx,
39     nxt_unit_port_t *port);
40 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
41 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
42 
43 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
44 static void nxt_python_asgi_done(void);
45 
46 static PyObject           *nxt_py_port_read;
47 
48 static PyMethodDef        nxt_py_port_read_method =
49     {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
50 
51 static nxt_python_proto_t  nxt_py_asgi_proto = {
52     .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc,
53     .ctx_data_free  = nxt_python_asgi_ctx_data_free,
54     .startup        = nxt_python_asgi_startup,
55     .run            = nxt_python_asgi_run,
56     .done           = nxt_python_asgi_done,
57 };
58 
59 #define NXT_UNIT_HASH_WS_PROTOCOL  0xED0A
60 
61 
62 int
63 nxt_python_asgi_check(PyObject *obj)
64 {
65     int           res;
66     PyObject      *func;
67     PyCodeObject  *code;
68 
69     func = nxt_python_asgi_get_func(obj);
70 
71     if (func == NULL) {
72         return 0;
73     }
74 
75     code = (PyCodeObject *) PyFunction_GET_CODE(func);
76 
77     nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with "
78                          "%d argument(s)",
79                    (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ",
80                    code->co_argcount);
81 
82     res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1;
83 
84     Py_DECREF(func);
85 
86     return res;
87 }
88 
89 
90 static PyObject *
91 nxt_python_asgi_get_func(PyObject *obj)
92 {
93     PyObject  *call;
94 
95     if (PyFunction_Check(obj)) {
96         Py_INCREF(obj);
97         return obj;
98     }
99 
100     if (PyMethod_Check(obj)) {
101         obj = PyMethod_GET_FUNCTION(obj);
102 
103         Py_INCREF(obj);
104         return obj;
105     }
106 
107     call = PyObject_GetAttrString(obj, "__call__");
108 
109     if (call == NULL) {
110         return NULL;
111     }
112 
113     if (PyFunction_Check(call)) {
114         return call;
115     }
116 
117     if (PyMethod_Check(call)) {
118         obj = PyMethod_GET_FUNCTION(call);
119 
120         if (PyFunction_Check(obj)) {
121             Py_INCREF(obj);
122 
123         } else {
124             obj = NULL;
125         }
126 
127     } else {
128         obj = NULL;
129     }
130 
131     Py_DECREF(call);
132 
133     return obj;
134 }
135 
136 
137 int
138 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
139 {
140     PyObject      *func;
141     nxt_int_t     i;
142     PyCodeObject  *code;
143 
144     nxt_unit_debug(NULL, "asgi_init");
145 
146     if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) {
147         nxt_unit_alert(NULL, "Python failed to init string objects");
148         return NXT_UNIT_ERROR;
149     }
150 
151     nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
152     if (nxt_slow_path(nxt_py_port_read == NULL)) {
153         nxt_unit_alert(NULL,
154                        "Python failed to initialize the 'port_read' function");
155         return NXT_UNIT_ERROR;
156     }
157 
158     if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) {
159         return NXT_UNIT_ERROR;
160     }
161 
162     if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) {
163         return NXT_UNIT_ERROR;
164     }
165 
166     for (i = 0; i < nxt_py_targets->count; i++) {
167         func = nxt_python_asgi_get_func(nxt_py_targets->target[i].application);
168         if (nxt_slow_path(func == NULL)) {
169             nxt_unit_debug(NULL, "asgi: cannot find function for callable, "
170                                  "unable to check for legacy mode (#%d)",
171                                  (int) i);
172             continue;
173         }
174 
175         code = (PyCodeObject *) PyFunction_GET_CODE(func);
176 
177         if ((code->co_flags & CO_COROUTINE) == 0) {
178             nxt_unit_debug(NULL, "asgi: callable is not a coroutine function "
179                                  "switching to legacy mode");
180             nxt_py_targets->target[i].asgi_legacy = 1;
181         }
182 
183         Py_DECREF(func);
184     }
185 
186     init->callbacks.request_handler = nxt_py_asgi_request_handler;
187     init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
188     init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
189     init->callbacks.close_handler = nxt_py_asgi_close_handler;
190     init->callbacks.quit = nxt_py_asgi_quit;
191     init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
192     init->callbacks.add_port = nxt_py_asgi_add_port;
193     init->callbacks.remove_port = nxt_py_asgi_remove_port;
194 
195     *proto = nxt_py_asgi_proto;
196 
197     return NXT_UNIT_OK;
198 }
199 
200 
201 static int
202 nxt_python_asgi_ctx_data_alloc(void **pdata, int main)
203 {
204     uint32_t                i;
205     PyObject                *asyncio, *loop, *event_loop, *obj;
206     const char              *event_loop_func;
207     nxt_py_asgi_ctx_data_t  *ctx_data;
208 
209     ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t));
210     if (nxt_slow_path(ctx_data == NULL)) {
211         nxt_unit_alert(NULL, "Failed to allocate context data");
212         return NXT_UNIT_ERROR;
213     }
214 
215     memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t));
216 
217     nxt_queue_init(&ctx_data->drain_queue);
218 
219     struct {
220         const char  *key;
221         PyObject    **handler;
222 
223     } handlers[] = {
224         { "create_task",        &ctx_data->loop_create_task },
225         { "add_reader",         &ctx_data->loop_add_reader },
226         { "remove_reader",      &ctx_data->loop_remove_reader },
227         { "call_soon",          &ctx_data->loop_call_soon },
228         { "run_until_complete", &ctx_data->loop_run_until_complete },
229         { "create_future",      &ctx_data->loop_create_future },
230     };
231 
232     loop = NULL;
233 
234     asyncio = PyImport_ImportModule("asyncio");
235     if (nxt_slow_path(asyncio == NULL)) {
236         nxt_unit_alert(NULL, "Python failed to import module 'asyncio'");
237         nxt_python_print_exception();
238         goto fail;
239     }
240 
241     event_loop_func = main ? "get_event_loop" : "new_event_loop";
242 
243     event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
244                                       event_loop_func);
245     if (nxt_slow_path(event_loop == NULL)) {
246         nxt_unit_alert(NULL,
247                        "Python failed to get '%s' from module 'asyncio'",
248                        event_loop_func);
249         goto fail;
250     }
251 
252     if (nxt_slow_path(PyCallable_Check(event_loop) == 0)) {
253         nxt_unit_alert(NULL,
254                        "'asyncio.%s' is not a callable object",
255                        event_loop_func);
256         goto fail;
257     }
258 
259     loop = PyObject_CallObject(event_loop, NULL);
260     if (nxt_slow_path(loop == NULL)) {
261         nxt_unit_alert(NULL, "Python failed to call 'asyncio.%s'",
262                        event_loop_func);
263         goto fail;
264     }
265 
266     for (i = 0; i < nxt_nitems(handlers); i++) {
267         obj = PyObject_GetAttrString(loop, handlers[i].key);
268         if (nxt_slow_path(obj == NULL)) {
269             nxt_unit_alert(NULL, "Python failed to get 'loop.%s'",
270                                  handlers[i].key);
271             goto fail;
272         }
273 
274         *handlers[i].handler = obj;
275 
276         if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
277             nxt_unit_alert(NULL, "'loop.%s' is not a callable object",
278                                  handlers[i].key);
279             goto fail;
280         }
281     }
282 
283     obj = PyObject_CallObject(ctx_data->loop_create_future, NULL);
284     if (nxt_slow_path(obj == NULL)) {
285         nxt_unit_alert(NULL, "Python failed to create Future ");
286         nxt_python_print_exception();
287         goto fail;
288     }
289 
290     ctx_data->quit_future = obj;
291 
292     obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result");
293     if (nxt_slow_path(obj == NULL)) {
294         nxt_unit_alert(NULL, "Python failed to get 'future.set_result'");
295         goto fail;
296     }
297 
298     ctx_data->quit_future_set_result = obj;
299 
300     if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
301         nxt_unit_alert(NULL, "'future.set_result' is not a callable object");
302         goto fail;
303     }
304 
305     Py_DECREF(loop);
306     Py_DECREF(asyncio);
307 
308     *pdata = ctx_data;
309 
310     return NXT_UNIT_OK;
311 
312 fail:
313 
314     nxt_python_asgi_ctx_data_free(ctx_data);
315 
316     Py_XDECREF(loop);
317     Py_XDECREF(asyncio);
318 
319     return NXT_UNIT_ERROR;
320 }
321 
322 
323 static void
324 nxt_python_asgi_ctx_data_free(void *data)
325 {
326     nxt_py_asgi_ctx_data_t  *ctx_data;
327 
328     ctx_data = data;
329 
330     Py_XDECREF(ctx_data->loop_run_until_complete);
331     Py_XDECREF(ctx_data->loop_create_future);
332     Py_XDECREF(ctx_data->loop_create_task);
333     Py_XDECREF(ctx_data->loop_call_soon);
334     Py_XDECREF(ctx_data->loop_add_reader);
335     Py_XDECREF(ctx_data->loop_remove_reader);
336     Py_XDECREF(ctx_data->quit_future);
337     Py_XDECREF(ctx_data->quit_future_set_result);
338 
339     nxt_unit_free(NULL, ctx_data);
340 }
341 
342 
343 static int
344 nxt_python_asgi_startup(void *data)
345 {
346     return nxt_py_asgi_lifespan_startup(data);
347 }
348 
349 
350 static int
351 nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
352 {
353     PyObject                *res;
354     nxt_py_asgi_ctx_data_t  *ctx_data;
355 
356     ctx_data = ctx->data;
357 
358     res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete,
359                                        ctx_data->quit_future, NULL);
360     if (nxt_slow_path(res == NULL)) {
361         nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
362         nxt_python_print_exception();
363 
364         return NXT_UNIT_ERROR;
365     }
366 
367     Py_DECREF(res);
368 
369     nxt_py_asgi_lifespan_shutdown(ctx);
370 
371     return NXT_UNIT_OK;
372 }
373 
374 
375 static void
376 nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
377 {
378     PyObject                *res, *fd;
379     nxt_py_asgi_ctx_data_t  *ctx_data;
380 
381     if (port == NULL || port->in_fd == -1) {
382         return;
383     }
384 
385     ctx_data = ctx->data;
386 
387     nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port);
388 
389     fd = PyLong_FromLong(port->in_fd);
390     if (nxt_slow_path(fd == NULL)) {
391         nxt_unit_alert(ctx, "Python failed to create Long object");
392         nxt_python_print_exception();
393 
394         return;
395     }
396 
397     res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL);
398     if (nxt_slow_path(res == NULL)) {
399         nxt_unit_alert(ctx, "Python failed to remove_reader");
400         nxt_python_print_exception();
401 
402     } else {
403         Py_DECREF(res);
404     }
405 
406     Py_DECREF(fd);
407 }
408 
409 
410 static void
411 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
412 {
413     PyObject                *scope, *res, *task, *receive, *send, *done, *asgi;
414     PyObject                *stage2;
415     nxt_python_target_t     *target;
416     nxt_py_asgi_ctx_data_t  *ctx_data;
417 
418     if (req->request->websocket_handshake) {
419         asgi = nxt_py_asgi_websocket_create(req);
420 
421     } else {
422         asgi = nxt_py_asgi_http_create(req);
423     }
424 
425     if (nxt_slow_path(asgi == NULL)) {
426         nxt_unit_req_alert(req, "Python failed to create asgi object");
427         nxt_unit_request_done(req, NXT_UNIT_ERROR);
428 
429         return;
430     }
431 
432     receive = PyObject_GetAttrString(asgi, "receive");
433     if (nxt_slow_path(receive == NULL)) {
434         nxt_unit_req_alert(req, "Python failed to get 'receive' method");
435         nxt_unit_request_done(req, NXT_UNIT_ERROR);
436 
437         goto release_asgi;
438     }
439 
440     send = PyObject_GetAttrString(asgi, "send");
441     if (nxt_slow_path(receive == NULL)) {
442         nxt_unit_req_alert(req, "Python failed to get 'send' method");
443         nxt_unit_request_done(req, NXT_UNIT_ERROR);
444 
445         goto release_receive;
446     }
447 
448     done = PyObject_GetAttrString(asgi, "_done");
449     if (nxt_slow_path(receive == NULL)) {
450         nxt_unit_req_alert(req, "Python failed to get '_done' method");
451         nxt_unit_request_done(req, NXT_UNIT_ERROR);
452 
453         goto release_send;
454     }
455 
456     scope = nxt_py_asgi_create_http_scope(req);
457     if (nxt_slow_path(scope == NULL)) {
458         nxt_unit_request_done(req, NXT_UNIT_ERROR);
459 
460         goto release_done;
461     }
462 
463     req->data = asgi;
464     target = &nxt_py_targets->target[req->request->app_target];
465 
466     if (!target->asgi_legacy) {
467         nxt_unit_req_debug(req, "Python call ASGI 3.0 application");
468 
469         res = PyObject_CallFunctionObjArgs(target->application,
470                                            scope, receive, send, NULL);
471 
472     } else {
473         nxt_unit_req_debug(req, "Python call legacy application");
474 
475         res = PyObject_CallFunctionObjArgs(target->application, scope, NULL);
476 
477         if (nxt_slow_path(res == NULL)) {
478             nxt_unit_req_error(req, "Python failed to call legacy app stage1");
479             nxt_python_print_exception();
480             nxt_unit_request_done(req, NXT_UNIT_ERROR);
481 
482             goto release_scope;
483         }
484 
485         if (nxt_slow_path(PyCallable_Check(res) == 0)) {
486             nxt_unit_req_error(req,
487                               "Legacy ASGI application returns not a callable");
488             nxt_unit_request_done(req, NXT_UNIT_ERROR);
489 
490             Py_DECREF(res);
491 
492             goto release_scope;
493         }
494 
495         stage2 = res;
496 
497         res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL);
498 
499         Py_DECREF(stage2);
500     }
501 
502     if (nxt_slow_path(res == NULL)) {
503         nxt_unit_req_error(req, "Python failed to call the application");
504         nxt_python_print_exception();
505         nxt_unit_request_done(req, NXT_UNIT_ERROR);
506 
507         goto release_scope;
508     }
509 
510     if (nxt_slow_path(!PyCoro_CheckExact(res))) {
511         nxt_unit_req_error(req, "Application result type is not a coroutine");
512         nxt_unit_request_done(req, NXT_UNIT_ERROR);
513 
514         Py_DECREF(res);
515 
516         goto release_scope;
517     }
518 
519     ctx_data = req->ctx->data;
520 
521     task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL);
522     if (nxt_slow_path(task == NULL)) {
523         nxt_unit_req_error(req, "Python failed to call the create_task");
524         nxt_python_print_exception();
525         nxt_unit_request_done(req, NXT_UNIT_ERROR);
526 
527         Py_DECREF(res);
528 
529         goto release_scope;
530     }
531 
532     Py_DECREF(res);
533 
534     res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done,
535                                      NULL);
536     if (nxt_slow_path(res == NULL)) {
537         nxt_unit_req_error(req,
538                            "Python failed to call 'task.add_done_callback'");
539         nxt_python_print_exception();
540         nxt_unit_request_done(req, NXT_UNIT_ERROR);
541 
542         goto release_task;
543     }
544 
545     Py_DECREF(res);
546 release_task:
547     Py_DECREF(task);
548 release_scope:
549     Py_DECREF(scope);
550 release_done:
551     Py_DECREF(done);
552 release_send:
553     Py_DECREF(send);
554 release_receive:
555     Py_DECREF(receive);
556 release_asgi:
557     Py_DECREF(asgi);
558 }
559 
560 
561 static void
562 nxt_py_asgi_close_handler(nxt_unit_request_info_t *req)
563 {
564     if (req->request->websocket_handshake) {
565         nxt_py_asgi_websocket_close_handler(req);
566 
567     } else {
568         nxt_py_asgi_http_close_handler(req);
569     }
570 }
571 
572 
573 static PyObject *
574 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
575 {
576     char                *p, *target, *query;
577     uint32_t            target_length, i;
578     PyObject            *scope, *v, *type, *scheme;
579     PyObject            *headers, *header;
580     nxt_unit_field_t    *f;
581     nxt_unit_request_t  *r;
582 
583     static const nxt_str_t  ws_protocol = nxt_string("sec-websocket-protocol");
584 
585 #define SET_ITEM(dict, key, value) \
586     if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value)      \
587                       == -1))                                                  \
588     {                                                                          \
589         nxt_unit_req_alert(req, "Python failed to set '"                       \
590                                 #dict "." #key "' item");                      \
591         goto fail;                                                             \
592     }
593 
594     v = NULL;
595     headers = NULL;
596 
597     r = req->request;
598 
599     if (r->websocket_handshake) {
600         type = nxt_py_websocket_str;
601         scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str;
602 
603     } else {
604         type = nxt_py_http_str;
605         scheme = r->tls ? nxt_py_https_str : nxt_py_http_str;
606     }
607 
608     scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str);
609     if (nxt_slow_path(scope == NULL)) {
610         return NULL;
611     }
612 
613     p = nxt_unit_sptr_get(&r->version);
614     SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str
615                                               : nxt_py_1_0_str)
616     SET_ITEM(scope, scheme, scheme)
617 
618     v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method),
619                                    r->method_length);
620     if (nxt_slow_path(v == NULL)) {
621         nxt_unit_req_alert(req, "Python failed to create 'method' string");
622         goto fail;
623     }
624 
625     SET_ITEM(scope, method, v)
626     Py_DECREF(v);
627 
628     v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length,
629                              "replace");
630     if (nxt_slow_path(v == NULL)) {
631         nxt_unit_req_alert(req, "Python failed to create 'path' string");
632         goto fail;
633     }
634 
635     SET_ITEM(scope, path, v)
636     Py_DECREF(v);
637 
638     target = nxt_unit_sptr_get(&r->target);
639     query = nxt_unit_sptr_get(&r->query);
640 
641     if (r->query.offset != 0) {
642         target_length = query - target - 1;
643 
644     } else {
645         target_length = r->target_length;
646     }
647 
648     v = PyBytes_FromStringAndSize(target, target_length);
649     if (nxt_slow_path(v == NULL)) {
650         nxt_unit_req_alert(req, "Python failed to create 'raw_path' string");
651         goto fail;
652     }
653 
654     SET_ITEM(scope, raw_path, v)
655     Py_DECREF(v);
656 
657     v = PyBytes_FromStringAndSize(query, r->query_length);
658     if (nxt_slow_path(v == NULL)) {
659         nxt_unit_req_alert(req, "Python failed to create 'query' string");
660         goto fail;
661     }
662 
663     SET_ITEM(scope, query_string, v)
664     Py_DECREF(v);
665 
666     v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0);
667     if (nxt_slow_path(v == NULL)) {
668         nxt_unit_req_alert(req, "Python failed to create 'client' pair");
669         goto fail;
670     }
671 
672     SET_ITEM(scope, client, v)
673     Py_DECREF(v);
674 
675     v = nxt_py_asgi_create_address(&r->local, r->local_length, 80);
676     if (nxt_slow_path(v == NULL)) {
677         nxt_unit_req_alert(req, "Python failed to create 'server' pair");
678         goto fail;
679     }
680 
681     SET_ITEM(scope, server, v)
682     Py_DECREF(v);
683 
684     v = NULL;
685 
686     headers = PyTuple_New(r->fields_count);
687     if (nxt_slow_path(headers == NULL)) {
688         nxt_unit_req_alert(req, "Python failed to create 'headers' object");
689         goto fail;
690     }
691 
692     for (i = 0; i < r->fields_count; i++) {
693         f = r->fields + i;
694 
695         header = nxt_py_asgi_create_header(f);
696         if (nxt_slow_path(header == NULL)) {
697             nxt_unit_req_alert(req, "Python failed to create 'header' pair");
698             goto fail;
699         }
700 
701         PyTuple_SET_ITEM(headers, i, header);
702 
703         if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL
704             && f->name_length == ws_protocol.length
705             && f->value_length > 0
706             && r->websocket_handshake)
707         {
708             v = nxt_py_asgi_create_subprotocols(f);
709             if (nxt_slow_path(v == NULL)) {
710                 nxt_unit_req_alert(req, "Failed to create subprotocols");
711                 goto fail;
712             }
713 
714             SET_ITEM(scope, subprotocols, v);
715             Py_DECREF(v);
716         }
717     }
718 
719     SET_ITEM(scope, headers, headers)
720     Py_DECREF(headers);
721 
722     return scope;
723 
724 fail:
725 
726     Py_XDECREF(v);
727     Py_XDECREF(headers);
728     Py_DECREF(scope);
729 
730     return NULL;
731 
732 #undef SET_ITEM
733 }
734 
735 
736 static PyObject *
737 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port)
738 {
739     char      *p, *s;
740     PyObject  *pair, *v;
741 
742     pair = PyTuple_New(2);
743     if (nxt_slow_path(pair == NULL)) {
744         return NULL;
745     }
746 
747     p = nxt_unit_sptr_get(sptr);
748     s = memchr(p, ':', len);
749 
750     v = PyString_FromStringAndSize(p, s == NULL ? len : s - p);
751     if (nxt_slow_path(v == NULL)) {
752         Py_DECREF(pair);
753 
754         return NULL;
755     }
756 
757     PyTuple_SET_ITEM(pair, 0, v);
758 
759     if (s != NULL) {
760         p += len;
761         v = PyLong_FromString(s + 1, &p, 10);
762 
763     } else {
764         v = PyLong_FromLong(port);
765     }
766 
767     if (nxt_slow_path(v == NULL)) {
768         Py_DECREF(pair);
769 
770         return NULL;
771     }
772 
773     PyTuple_SET_ITEM(pair, 1, v);
774 
775     return pair;
776 }
777 
778 
779 static PyObject *
780 nxt_py_asgi_create_header(nxt_unit_field_t *f)
781 {
782     char      c, *name;
783     uint8_t   pos;
784     PyObject  *header, *v;
785 
786     header = PyTuple_New(2);
787     if (nxt_slow_path(header == NULL)) {
788         return NULL;
789     }
790 
791     name = nxt_unit_sptr_get(&f->name);
792 
793     for (pos = 0; pos < f->name_length; pos++) {
794         c = name[pos];
795         if (c >= 'A' && c <= 'Z') {
796             name[pos] = (c | 0x20);
797         }
798     }
799 
800     v = PyBytes_FromStringAndSize(name, f->name_length);
801     if (nxt_slow_path(v == NULL)) {
802         Py_DECREF(header);
803 
804         return NULL;
805     }
806 
807     PyTuple_SET_ITEM(header, 0, v);
808 
809     v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value),
810                                   f->value_length);
811     if (nxt_slow_path(v == NULL)) {
812         Py_DECREF(header);
813 
814         return NULL;
815     }
816 
817     PyTuple_SET_ITEM(header, 1, v);
818 
819     return header;
820 }
821 
822 
823 static PyObject *
824 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f)
825 {
826     char      *v;
827     uint32_t  i, n, start;
828     PyObject  *res, *proto;
829 
830     v = nxt_unit_sptr_get(&f->value);
831     n = 1;
832 
833     for (i = 0; i < f->value_length; i++) {
834         if (v[i] == ',') {
835             n++;
836         }
837     }
838 
839     res = PyTuple_New(n);
840     if (nxt_slow_path(res == NULL)) {
841         return NULL;
842     }
843 
844     n = 0;
845     start = 0;
846 
847     for (i = 0; i < f->value_length; ) {
848         if (v[i] != ',') {
849             i++;
850 
851             continue;
852         }
853 
854         if (i - start > 0) {
855             proto = PyString_FromStringAndSize(v + start, i - start);
856             if (nxt_slow_path(proto == NULL)) {
857                 goto fail;
858             }
859 
860             PyTuple_SET_ITEM(res, n, proto);
861 
862             n++;
863         }
864 
865         do {
866             i++;
867         } while (i < f->value_length && v[i] == ' ');
868 
869         start = i;
870     }
871 
872     if (i - start > 0) {
873         proto = PyString_FromStringAndSize(v + start, i - start);
874         if (nxt_slow_path(proto == NULL)) {
875             goto fail;
876         }
877 
878         PyTuple_SET_ITEM(res, n, proto);
879     }
880 
881     return res;
882 
883 fail:
884 
885     Py_DECREF(res);
886 
887     return NULL;
888 }
889 
890 
891 static int
892 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
893 {
894     int  nb;
895 
896     if (port->in_fd == -1) {
897         return NXT_UNIT_OK;
898     }
899 
900     nb = 1;
901 
902     if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
903         nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
904                        port->in_fd, strerror(errno), errno);
905 
906         return NXT_UNIT_ERROR;
907     }
908 
909     nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
910 
911     return nxt_py_asgi_add_reader(ctx, port);
912 }
913 
914 
915 static int
916 nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
917 {
918     int                     rc;
919     PyObject                *res, *fd, *py_ctx, *py_port;
920     nxt_py_asgi_ctx_data_t  *ctx_data;
921 
922     nxt_unit_debug(ctx, "asgi_add_reader %d %p %p", port->in_fd, ctx, port);
923 
924     ctx_data = ctx->data;
925 
926     fd = PyLong_FromLong(port->in_fd);
927     if (nxt_slow_path(fd == NULL)) {
928         nxt_unit_alert(ctx, "Python failed to create fd");
929         nxt_python_print_exception();
930 
931         return NXT_UNIT_ERROR;
932     }
933 
934     rc = NXT_UNIT_ERROR;
935 
936     py_ctx = PyLong_FromVoidPtr(ctx);
937     if (nxt_slow_path(py_ctx == NULL)) {
938         nxt_unit_alert(ctx, "Python failed to create py_ctx");
939         nxt_python_print_exception();
940 
941         goto clean_fd;
942     }
943 
944     py_port = PyLong_FromVoidPtr(port);
945     if (nxt_slow_path(py_port == NULL)) {
946         nxt_unit_alert(ctx, "Python failed to create py_port");
947         nxt_python_print_exception();
948 
949         goto clean_py_ctx;
950     }
951 
952     res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
953                                        fd, nxt_py_port_read,
954                                        py_ctx, py_port, NULL);
955     if (nxt_slow_path(res == NULL)) {
956         nxt_unit_alert(ctx, "Python failed to add_reader");
957         nxt_python_print_exception();
958 
959     } else {
960         Py_DECREF(res);
961 
962         rc = NXT_UNIT_OK;
963     }
964 
965     Py_DECREF(py_port);
966 
967 clean_py_ctx:
968 
969     Py_DECREF(py_ctx);
970 
971 clean_fd:
972 
973     Py_DECREF(fd);
974 
975     return rc;
976 }
977 
978 
979 static void
980 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx,
981     nxt_unit_port_t *port)
982 {
983     if (port->in_fd == -1 || ctx == NULL) {
984         return;
985     }
986 
987     nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
988 
989     nxt_py_asgi_remove_reader(ctx, port);
990 }
991 
992 
993 static void
994 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
995 {
996     PyObject                *res, *p;
997     nxt_py_asgi_ctx_data_t  *ctx_data;
998 
999     nxt_unit_debug(ctx, "asgi_quit %p", ctx);
1000 
1001     ctx_data = ctx->data;
1002 
1003     p = PyLong_FromLong(0);
1004     if (nxt_slow_path(p == NULL)) {
1005         nxt_unit_alert(NULL, "Python failed to create Long");
1006         nxt_python_print_exception();
1007 
1008     } else {
1009         res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result,
1010                                            p, NULL);
1011         if (nxt_slow_path(res == NULL)) {
1012             nxt_unit_alert(ctx, "Python failed to set_result");
1013             nxt_python_print_exception();
1014 
1015         } else {
1016             Py_DECREF(res);
1017         }
1018 
1019         Py_DECREF(p);
1020     }
1021 }
1022 
1023 
1024 static void
1025 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
1026 {
1027     int                     rc;
1028     nxt_queue_link_t        *lnk;
1029     nxt_py_asgi_ctx_data_t  *ctx_data;
1030 
1031     ctx_data = ctx->data;
1032 
1033     while (!nxt_queue_is_empty(&ctx_data->drain_queue)) {
1034         lnk = nxt_queue_first(&ctx_data->drain_queue);
1035 
1036         rc = nxt_py_asgi_http_drain(lnk);
1037         if (rc == NXT_UNIT_AGAIN) {
1038             return;
1039         }
1040 
1041         nxt_queue_remove(lnk);
1042     }
1043 }
1044 
1045 
1046 static PyObject *
1047 nxt_py_asgi_port_read(PyObject *self, PyObject *args)
1048 {
1049     int                     rc;
1050     PyObject                *arg0, *arg1, *res;
1051     Py_ssize_t              n;
1052     nxt_unit_ctx_t          *ctx;
1053     nxt_unit_port_t         *port;
1054     nxt_py_asgi_ctx_data_t  *ctx_data;
1055 
1056     n = PyTuple_GET_SIZE(args);
1057 
1058     if (n != 2) {
1059         nxt_unit_alert(NULL,
1060                        "nxt_py_asgi_port_read: invalid number of arguments %d",
1061                        (int) n);
1062 
1063         return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
1064     }
1065 
1066     arg0 = PyTuple_GET_ITEM(args, 0);
1067     if (nxt_slow_path(arg0 == NULL || PyLong_Check(arg0) == 0)) {
1068         return PyErr_Format(PyExc_TypeError,
1069                             "the first argument is not a long");
1070     }
1071 
1072     ctx = PyLong_AsVoidPtr(arg0);
1073 
1074     arg1 = PyTuple_GET_ITEM(args, 1);
1075     if (nxt_slow_path(arg1 == NULL || PyLong_Check(arg1) == 0)) {
1076         return PyErr_Format(PyExc_TypeError,
1077                             "the second argument is not a long");
1078     }
1079 
1080     port = PyLong_AsVoidPtr(arg1);
1081 
1082     rc = nxt_unit_process_port_msg(ctx, port);
1083 
1084     nxt_unit_debug(ctx, "asgi_port_read(%p,%p): %d", ctx, port, rc);
1085 
1086     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
1087         return PyErr_Format(PyExc_RuntimeError,
1088                             "error processing port %d message", port->id.id);
1089     }
1090 
1091     if (rc == NXT_UNIT_OK) {
1092         ctx_data = ctx->data;
1093 
1094         res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon,
1095                                            nxt_py_port_read,
1096                                            arg0, arg1, NULL);
1097         if (nxt_slow_path(res == NULL)) {
1098             nxt_unit_alert(ctx, "Python failed to call 'loop.call_soon'");
1099             nxt_python_print_exception();
1100         }
1101 
1102         Py_XDECREF(res);
1103     }
1104 
1105     Py_RETURN_NONE;
1106 }
1107 
1108 
1109 PyObject *
1110 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,
1111     void *data)
1112 {
1113     int       i;
1114     PyObject  *iter, *header, *h_iter, *name, *val, *res;
1115 
1116     iter = PyObject_GetIter(headers);
1117     if (nxt_slow_path(iter == NULL)) {
1118         return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable");
1119     }
1120 
1121     for (i = 0; /* void */; i++) {
1122         header = PyIter_Next(iter);
1123         if (header == NULL) {
1124             break;
1125         }
1126 
1127         h_iter = PyObject_GetIter(header);
1128         if (nxt_slow_path(h_iter == NULL)) {
1129             Py_DECREF(header);
1130             Py_DECREF(iter);
1131 
1132             return PyErr_Format(PyExc_TypeError,
1133                                 "'headers' item #%d is not an iterable", i);
1134         }
1135 
1136         name = PyIter_Next(h_iter);
1137         if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) {
1138             Py_XDECREF(name);
1139             Py_DECREF(h_iter);
1140             Py_DECREF(header);
1141             Py_DECREF(iter);
1142 
1143             return PyErr_Format(PyExc_TypeError,
1144                           "'headers' item #%d 'name' is not a byte string", i);
1145         }
1146 
1147         val = PyIter_Next(h_iter);
1148         if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) {
1149             Py_XDECREF(val);
1150             Py_DECREF(h_iter);
1151             Py_DECREF(header);
1152             Py_DECREF(iter);
1153 
1154             return PyErr_Format(PyExc_TypeError,
1155                          "'headers' item #%d 'value' is not a byte string", i);
1156         }
1157 
1158         res = cb(data, i, name, val);
1159 
1160         Py_DECREF(name);
1161         Py_DECREF(val);
1162         Py_DECREF(h_iter);
1163         Py_DECREF(header);
1164 
1165         if (nxt_slow_path(res == NULL)) {
1166             Py_DECREF(iter);
1167 
1168             return NULL;
1169         }
1170 
1171         Py_DECREF(res);
1172     }
1173 
1174     Py_DECREF(iter);
1175 
1176     Py_RETURN_NONE;
1177 }
1178 
1179 
1180 PyObject *
1181 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val)
1182 {
1183     nxt_py_asgi_calc_size_ctx_t  *ctx;
1184 
1185     ctx = data;
1186 
1187     ctx->fields_count++;
1188     ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val);
1189 
1190     Py_RETURN_NONE;
1191 }
1192 
1193 
1194 PyObject *
1195 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
1196 {
1197     int                          rc;
1198     char                         *name_str, *val_str;
1199     uint32_t                     name_len, val_len;
1200     nxt_off_t                    content_length;
1201     nxt_unit_request_info_t      *req;
1202     nxt_py_asgi_add_field_ctx_t  *ctx;
1203 
1204     name_str = PyBytes_AS_STRING(name);
1205     name_len = PyBytes_GET_SIZE(name);
1206 
1207     val_str = PyBytes_AS_STRING(val);
1208     val_len = PyBytes_GET_SIZE(val);
1209 
1210     ctx = data;
1211     req = ctx->req;
1212 
1213     rc = nxt_unit_response_add_field(req, name_str, name_len,
1214                                      val_str, val_len);
1215     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1216         return PyErr_Format(PyExc_RuntimeError,
1217                             "failed to add header #%d", i);
1218     }
1219 
1220     if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) {
1221         content_length = nxt_off_t_parse((u_char *) val_str, val_len);
1222         if (nxt_slow_path(content_length < 0)) {
1223             nxt_unit_req_error(req, "failed to parse Content-Length "
1224                                "value %.*s", (int) val_len, val_str);
1225 
1226             return PyErr_Format(PyExc_ValueError,
1227                                 "Failed to parse Content-Length: '%.*s'",
1228                                 (int) val_len, val_str);
1229         }
1230 
1231         ctx->content_length = content_length;
1232     }
1233 
1234     Py_RETURN_NONE;
1235 }
1236 
1237 
1238 PyObject *
1239 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
1240     nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result)
1241 {
1242     PyObject  *set_result, *res;
1243 
1244     if (nxt_slow_path(result == NULL)) {
1245         Py_DECREF(future);
1246 
1247         return NULL;
1248     }
1249 
1250     set_result = PyObject_GetAttrString(future, "set_result");
1251     if (nxt_slow_path(set_result == NULL)) {
1252         nxt_unit_req_alert(req, "failed to get 'set_result' for future");
1253 
1254         Py_CLEAR(future);
1255 
1256         goto cleanup_result;
1257     }
1258 
1259     if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
1260         nxt_unit_req_alert(req, "'future.set_result' is not a callable");
1261 
1262         Py_CLEAR(future);
1263 
1264         goto cleanup;
1265     }
1266 
1267     res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result,
1268                                        result, NULL);
1269     if (nxt_slow_path(res == NULL)) {
1270         nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
1271         nxt_python_print_exception();
1272 
1273         Py_CLEAR(future);
1274     }
1275 
1276     Py_XDECREF(res);
1277 
1278 cleanup:
1279 
1280     Py_DECREF(set_result);
1281 
1282 cleanup_result:
1283 
1284     Py_DECREF(result);
1285 
1286     return future;
1287 }
1288 
1289 
1290 PyObject *
1291 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)
1292 {
1293     PyObject  *msg;
1294 
1295     msg = PyDict_New();
1296     if (nxt_slow_path(msg == NULL)) {
1297         nxt_unit_req_alert(req, "Python failed to create message dict");
1298         nxt_python_print_exception();
1299 
1300         return PyErr_Format(PyExc_RuntimeError,
1301                             "failed to create message dict");
1302     }
1303 
1304     if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) {
1305         nxt_unit_req_alert(req, "Python failed to set 'msg.type' item");
1306 
1307         Py_DECREF(msg);
1308 
1309         return PyErr_Format(PyExc_RuntimeError,
1310                             "failed to set 'msg.type' item");
1311     }
1312 
1313     return msg;
1314 }
1315 
1316 
1317 PyObject *
1318 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
1319     PyObject *spec_version)
1320 {
1321     PyObject  *scope, *asgi;
1322 
1323     scope = PyDict_New();
1324     if (nxt_slow_path(scope == NULL)) {
1325         nxt_unit_req_alert(req, "Python failed to create 'scope' dict");
1326         nxt_python_print_exception();
1327 
1328         return PyErr_Format(PyExc_RuntimeError,
1329                             "failed to create 'scope' dict");
1330     }
1331 
1332     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) {
1333         nxt_unit_req_alert(req, "Python failed to set 'scope.type' item");
1334 
1335         Py_DECREF(scope);
1336 
1337         return PyErr_Format(PyExc_RuntimeError,
1338                             "failed to set 'scope.type' item");
1339     }
1340 
1341     asgi = PyDict_New();
1342     if (nxt_slow_path(asgi == NULL)) {
1343         nxt_unit_req_alert(req, "Python failed to create 'asgi' dict");
1344         nxt_python_print_exception();
1345 
1346         Py_DECREF(scope);
1347 
1348         return PyErr_Format(PyExc_RuntimeError,
1349                             "failed to create 'asgi' dict");
1350     }
1351 
1352     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) {
1353         nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item");
1354 
1355         Py_DECREF(asgi);
1356         Py_DECREF(scope);
1357 
1358         return PyErr_Format(PyExc_RuntimeError,
1359                             "failed to set 'scope.asgi' item");
1360     }
1361 
1362     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str,
1363                                      nxt_py_3_0_str) == -1))
1364     {
1365         nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item");
1366 
1367         Py_DECREF(asgi);
1368         Py_DECREF(scope);
1369 
1370         return PyErr_Format(PyExc_RuntimeError,
1371                             "failed to set 'asgi.version' item");
1372     }
1373 
1374     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str,
1375                                      spec_version) == -1))
1376     {
1377         nxt_unit_req_alert(req,
1378                            "Python failed to set 'asgi.spec_version' item");
1379 
1380         Py_DECREF(asgi);
1381         Py_DECREF(scope);
1382 
1383         return PyErr_Format(PyExc_RuntimeError,
1384                             "failed to set 'asgi.spec_version' item");
1385     }
1386 
1387     Py_DECREF(asgi);
1388 
1389     return scope;
1390 }
1391 
1392 
1393 void
1394 nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link)
1395 {
1396     nxt_py_asgi_ctx_data_t  *ctx_data;
1397 
1398     ctx_data = req->ctx->data;
1399 
1400     nxt_queue_insert_tail(&ctx_data->drain_queue, link);
1401 }
1402 
1403 
1404 void
1405 nxt_py_asgi_dealloc(PyObject *self)
1406 {
1407     PyObject_Del(self);
1408 }
1409 
1410 
1411 PyObject *
1412 nxt_py_asgi_await(PyObject *self)
1413 {
1414     Py_INCREF(self);
1415     return self;
1416 }
1417 
1418 
1419 PyObject *
1420 nxt_py_asgi_iter(PyObject *self)
1421 {
1422     Py_INCREF(self);
1423     return self;
1424 }
1425 
1426 
1427 PyObject *
1428 nxt_py_asgi_next(PyObject *self)
1429 {
1430     return NULL;
1431 }
1432 
1433 
1434 static void
1435 nxt_python_asgi_done(void)
1436 {
1437     nxt_py_asgi_str_done();
1438 
1439     Py_XDECREF(nxt_py_port_read);
1440 }
1441 
1442 #else /* !(NXT_HAVE_ASGI) */
1443 
1444 
1445 int
1446 nxt_python_asgi_check(PyObject *obj)
1447 {
1448     return 0;
1449 }
1450 
1451 
1452 int
1453 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
1454 {
1455     nxt_unit_alert(NULL, "ASGI not implemented");
1456     return NXT_UNIT_ERROR;
1457 }
1458 
1459 
1460 #endif /* NXT_HAVE_ASGI */
1461