xref: /unit/src/python/nxt_python_asgi.c (revision 2067:78864c9d5ba8)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <nxt_unit_response.h>
15 #include <python/nxt_python_asgi.h>
16 #include <python/nxt_python_asgi_str.h>
17 
18 
19 static PyObject *nxt_python_asgi_get_func(PyObject *obj);
20 static int nxt_python_asgi_ctx_data_alloc(void **pdata, int main);
21 static void nxt_python_asgi_ctx_data_free(void *data);
22 static int nxt_python_asgi_startup(void *data);
23 static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
24 
25 static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
26     nxt_unit_port_t *port);
27 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
28 static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req);
29 
30 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
31 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
32     uint16_t port);
33 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
34 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
35 
36 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
37 static int nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
38 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx,
39     nxt_unit_port_t *port);
40 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
41 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
42 
43 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
44 static void nxt_python_asgi_done(void);
45 
46 static PyObject           *nxt_py_port_read;
47 
48 static PyMethodDef        nxt_py_port_read_method =
49     {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
50 
51 static nxt_python_proto_t  nxt_py_asgi_proto = {
52     .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc,
53     .ctx_data_free  = nxt_python_asgi_ctx_data_free,
54     .startup        = nxt_python_asgi_startup,
55     .run            = nxt_python_asgi_run,
56     .done           = nxt_python_asgi_done,
57 };
58 
59 #define NXT_UNIT_HASH_WS_PROTOCOL  0xED0A
60 
61 
62 int
63 nxt_python_asgi_check(PyObject *obj)
64 {
65     int           res;
66     PyObject      *func;
67     PyCodeObject  *code;
68 
69     func = nxt_python_asgi_get_func(obj);
70 
71     if (func == NULL) {
72         return 0;
73     }
74 
75     code = (PyCodeObject *) PyFunction_GET_CODE(func);
76 
77     nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with "
78                          "%d argument(s)",
79                    (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ",
80                    code->co_argcount);
81 
82     res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1;
83 
84     Py_DECREF(func);
85 
86     return res;
87 }
88 
89 
90 static PyObject *
91 nxt_python_asgi_get_func(PyObject *obj)
92 {
93     PyObject  *call;
94 
95     if (PyFunction_Check(obj)) {
96         Py_INCREF(obj);
97         return obj;
98     }
99 
100     if (PyMethod_Check(obj)) {
101         obj = PyMethod_GET_FUNCTION(obj);
102 
103         Py_INCREF(obj);
104         return obj;
105     }
106 
107     call = PyObject_GetAttrString(obj, "__call__");
108 
109     if (call == NULL) {
110         return NULL;
111     }
112 
113     if (PyFunction_Check(call)) {
114         return call;
115     }
116 
117     if (PyMethod_Check(call)) {
118         obj = PyMethod_GET_FUNCTION(call);
119 
120         if (PyFunction_Check(obj)) {
121             Py_INCREF(obj);
122 
123         } else {
124             obj = NULL;
125         }
126 
127     } else {
128         obj = NULL;
129     }
130 
131     Py_DECREF(call);
132 
133     return obj;
134 }
135 
136 
137 int
138 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
139 {
140     PyObject      *func;
141     nxt_int_t     i;
142     PyCodeObject  *code;
143 
144     nxt_unit_debug(NULL, "asgi_init");
145 
146     if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) {
147         nxt_unit_alert(NULL, "Python failed to init string objects");
148         return NXT_UNIT_ERROR;
149     }
150 
151     nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
152     if (nxt_slow_path(nxt_py_port_read == NULL)) {
153         nxt_unit_alert(NULL,
154                        "Python failed to initialize the 'port_read' function");
155         return NXT_UNIT_ERROR;
156     }
157 
158     if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) {
159         return NXT_UNIT_ERROR;
160     }
161 
162     if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) {
163         return NXT_UNIT_ERROR;
164     }
165 
166     for (i = 0; i < nxt_py_targets->count; i++) {
167         func = nxt_python_asgi_get_func(nxt_py_targets->target[i].application);
168         if (nxt_slow_path(func == NULL)) {
169             nxt_unit_debug(NULL, "asgi: cannot find function for callable, "
170                                  "unable to check for legacy mode (#%d)", i);
171             continue;
172         }
173 
174         code = (PyCodeObject *) PyFunction_GET_CODE(func);
175 
176         if ((code->co_flags & CO_COROUTINE) == 0) {
177             nxt_unit_debug(NULL, "asgi: callable is not a coroutine function "
178                                  "switching to legacy mode");
179             nxt_py_targets->target[i].asgi_legacy = 1;
180         }
181 
182         Py_DECREF(func);
183     }
184 
185     init->callbacks.request_handler = nxt_py_asgi_request_handler;
186     init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
187     init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
188     init->callbacks.close_handler = nxt_py_asgi_close_handler;
189     init->callbacks.quit = nxt_py_asgi_quit;
190     init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
191     init->callbacks.add_port = nxt_py_asgi_add_port;
192     init->callbacks.remove_port = nxt_py_asgi_remove_port;
193 
194     *proto = nxt_py_asgi_proto;
195 
196     return NXT_UNIT_OK;
197 }
198 
199 
200 static int
201 nxt_python_asgi_ctx_data_alloc(void **pdata, int main)
202 {
203     uint32_t                i;
204     PyObject                *asyncio, *loop, *event_loop, *obj;
205     const char              *event_loop_func;
206     nxt_py_asgi_ctx_data_t  *ctx_data;
207 
208     ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t));
209     if (nxt_slow_path(ctx_data == NULL)) {
210         nxt_unit_alert(NULL, "Failed to allocate context data");
211         return NXT_UNIT_ERROR;
212     }
213 
214     memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t));
215 
216     nxt_queue_init(&ctx_data->drain_queue);
217 
218     struct {
219         const char  *key;
220         PyObject    **handler;
221 
222     } handlers[] = {
223         { "create_task",        &ctx_data->loop_create_task },
224         { "add_reader",         &ctx_data->loop_add_reader },
225         { "remove_reader",      &ctx_data->loop_remove_reader },
226         { "call_soon",          &ctx_data->loop_call_soon },
227         { "run_until_complete", &ctx_data->loop_run_until_complete },
228         { "create_future",      &ctx_data->loop_create_future },
229     };
230 
231     loop = NULL;
232 
233     asyncio = PyImport_ImportModule("asyncio");
234     if (nxt_slow_path(asyncio == NULL)) {
235         nxt_unit_alert(NULL, "Python failed to import module 'asyncio'");
236         nxt_python_print_exception();
237         goto fail;
238     }
239 
240     event_loop_func = main ? "get_event_loop" : "new_event_loop";
241 
242     event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
243                                       event_loop_func);
244     if (nxt_slow_path(event_loop == NULL)) {
245         nxt_unit_alert(NULL,
246                        "Python failed to get '%s' from module 'asyncio'",
247                        event_loop_func);
248         goto fail;
249     }
250 
251     if (nxt_slow_path(PyCallable_Check(event_loop) == 0)) {
252         nxt_unit_alert(NULL,
253                        "'asyncio.%s' is not a callable object",
254                        event_loop_func);
255         goto fail;
256     }
257 
258     loop = PyObject_CallObject(event_loop, NULL);
259     if (nxt_slow_path(loop == NULL)) {
260         nxt_unit_alert(NULL, "Python failed to call 'asyncio.%s'",
261                        event_loop_func);
262         goto fail;
263     }
264 
265     for (i = 0; i < nxt_nitems(handlers); i++) {
266         obj = PyObject_GetAttrString(loop, handlers[i].key);
267         if (nxt_slow_path(obj == NULL)) {
268             nxt_unit_alert(NULL, "Python failed to get 'loop.%s'",
269                                  handlers[i].key);
270             goto fail;
271         }
272 
273         *handlers[i].handler = obj;
274 
275         if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
276             nxt_unit_alert(NULL, "'loop.%s' is not a callable object",
277                                  handlers[i].key);
278             goto fail;
279         }
280     }
281 
282     obj = PyObject_CallObject(ctx_data->loop_create_future, NULL);
283     if (nxt_slow_path(obj == NULL)) {
284         nxt_unit_alert(NULL, "Python failed to create Future ");
285         nxt_python_print_exception();
286         goto fail;
287     }
288 
289     ctx_data->quit_future = obj;
290 
291     obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result");
292     if (nxt_slow_path(obj == NULL)) {
293         nxt_unit_alert(NULL, "Python failed to get 'future.set_result'");
294         goto fail;
295     }
296 
297     ctx_data->quit_future_set_result = obj;
298 
299     if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
300         nxt_unit_alert(NULL, "'future.set_result' is not a callable object");
301         goto fail;
302     }
303 
304     Py_DECREF(loop);
305     Py_DECREF(asyncio);
306 
307     *pdata = ctx_data;
308 
309     return NXT_UNIT_OK;
310 
311 fail:
312 
313     nxt_python_asgi_ctx_data_free(ctx_data);
314 
315     Py_XDECREF(loop);
316     Py_XDECREF(asyncio);
317 
318     return NXT_UNIT_ERROR;
319 }
320 
321 
322 static void
323 nxt_python_asgi_ctx_data_free(void *data)
324 {
325     nxt_py_asgi_ctx_data_t  *ctx_data;
326 
327     ctx_data = data;
328 
329     Py_XDECREF(ctx_data->loop_run_until_complete);
330     Py_XDECREF(ctx_data->loop_create_future);
331     Py_XDECREF(ctx_data->loop_create_task);
332     Py_XDECREF(ctx_data->loop_call_soon);
333     Py_XDECREF(ctx_data->loop_add_reader);
334     Py_XDECREF(ctx_data->loop_remove_reader);
335     Py_XDECREF(ctx_data->quit_future);
336     Py_XDECREF(ctx_data->quit_future_set_result);
337 
338     nxt_unit_free(NULL, ctx_data);
339 }
340 
341 
342 static int
343 nxt_python_asgi_startup(void *data)
344 {
345     return nxt_py_asgi_lifespan_startup(data);
346 }
347 
348 
349 static int
350 nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
351 {
352     PyObject                *res;
353     nxt_py_asgi_ctx_data_t  *ctx_data;
354 
355     ctx_data = ctx->data;
356 
357     res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete,
358                                        ctx_data->quit_future, NULL);
359     if (nxt_slow_path(res == NULL)) {
360         nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
361         nxt_python_print_exception();
362 
363         return NXT_UNIT_ERROR;
364     }
365 
366     Py_DECREF(res);
367 
368     nxt_py_asgi_lifespan_shutdown(ctx);
369 
370     return NXT_UNIT_OK;
371 }
372 
373 
374 static void
375 nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
376 {
377     PyObject                *res, *fd;
378     nxt_py_asgi_ctx_data_t  *ctx_data;
379 
380     if (port == NULL || port->in_fd == -1) {
381         return;
382     }
383 
384     ctx_data = ctx->data;
385 
386     nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port);
387 
388     fd = PyLong_FromLong(port->in_fd);
389     if (nxt_slow_path(fd == NULL)) {
390         nxt_unit_alert(ctx, "Python failed to create Long object");
391         nxt_python_print_exception();
392 
393         return;
394     }
395 
396     res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL);
397     if (nxt_slow_path(res == NULL)) {
398         nxt_unit_alert(ctx, "Python failed to remove_reader");
399         nxt_python_print_exception();
400 
401     } else {
402         Py_DECREF(res);
403     }
404 
405     Py_DECREF(fd);
406 }
407 
408 
409 static void
410 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
411 {
412     PyObject                *scope, *res, *task, *receive, *send, *done, *asgi;
413     PyObject                *stage2;
414     nxt_python_target_t     *target;
415     nxt_py_asgi_ctx_data_t  *ctx_data;
416 
417     if (req->request->websocket_handshake) {
418         asgi = nxt_py_asgi_websocket_create(req);
419 
420     } else {
421         asgi = nxt_py_asgi_http_create(req);
422     }
423 
424     if (nxt_slow_path(asgi == NULL)) {
425         nxt_unit_req_alert(req, "Python failed to create asgi object");
426         nxt_unit_request_done(req, NXT_UNIT_ERROR);
427 
428         return;
429     }
430 
431     receive = PyObject_GetAttrString(asgi, "receive");
432     if (nxt_slow_path(receive == NULL)) {
433         nxt_unit_req_alert(req, "Python failed to get 'receive' method");
434         nxt_unit_request_done(req, NXT_UNIT_ERROR);
435 
436         goto release_asgi;
437     }
438 
439     send = PyObject_GetAttrString(asgi, "send");
440     if (nxt_slow_path(receive == NULL)) {
441         nxt_unit_req_alert(req, "Python failed to get 'send' method");
442         nxt_unit_request_done(req, NXT_UNIT_ERROR);
443 
444         goto release_receive;
445     }
446 
447     done = PyObject_GetAttrString(asgi, "_done");
448     if (nxt_slow_path(receive == NULL)) {
449         nxt_unit_req_alert(req, "Python failed to get '_done' method");
450         nxt_unit_request_done(req, NXT_UNIT_ERROR);
451 
452         goto release_send;
453     }
454 
455     scope = nxt_py_asgi_create_http_scope(req);
456     if (nxt_slow_path(scope == NULL)) {
457         nxt_unit_request_done(req, NXT_UNIT_ERROR);
458 
459         goto release_done;
460     }
461 
462     req->data = asgi;
463     target = &nxt_py_targets->target[req->request->app_target];
464 
465     if (!target->asgi_legacy) {
466         nxt_unit_req_debug(req, "Python call ASGI 3.0 application");
467 
468         res = PyObject_CallFunctionObjArgs(target->application,
469                                            scope, receive, send, NULL);
470 
471     } else {
472         nxt_unit_req_debug(req, "Python call legacy application");
473 
474         res = PyObject_CallFunctionObjArgs(target->application, scope, NULL);
475 
476         if (nxt_slow_path(res == NULL)) {
477             nxt_unit_req_error(req, "Python failed to call legacy app stage1");
478             nxt_python_print_exception();
479             nxt_unit_request_done(req, NXT_UNIT_ERROR);
480 
481             goto release_scope;
482         }
483 
484         if (nxt_slow_path(PyCallable_Check(res) == 0)) {
485             nxt_unit_req_error(req,
486                               "Legacy ASGI application returns not a callable");
487             nxt_unit_request_done(req, NXT_UNIT_ERROR);
488 
489             Py_DECREF(res);
490 
491             goto release_scope;
492         }
493 
494         stage2 = res;
495 
496         res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL);
497 
498         Py_DECREF(stage2);
499     }
500 
501     if (nxt_slow_path(res == NULL)) {
502         nxt_unit_req_error(req, "Python failed to call the application");
503         nxt_python_print_exception();
504         nxt_unit_request_done(req, NXT_UNIT_ERROR);
505 
506         goto release_scope;
507     }
508 
509     if (nxt_slow_path(!PyCoro_CheckExact(res))) {
510         nxt_unit_req_error(req, "Application result type is not a coroutine");
511         nxt_unit_request_done(req, NXT_UNIT_ERROR);
512 
513         Py_DECREF(res);
514 
515         goto release_scope;
516     }
517 
518     ctx_data = req->ctx->data;
519 
520     task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL);
521     if (nxt_slow_path(task == NULL)) {
522         nxt_unit_req_error(req, "Python failed to call the create_task");
523         nxt_python_print_exception();
524         nxt_unit_request_done(req, NXT_UNIT_ERROR);
525 
526         Py_DECREF(res);
527 
528         goto release_scope;
529     }
530 
531     Py_DECREF(res);
532 
533     res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done,
534                                      NULL);
535     if (nxt_slow_path(res == NULL)) {
536         nxt_unit_req_error(req,
537                            "Python failed to call 'task.add_done_callback'");
538         nxt_python_print_exception();
539         nxt_unit_request_done(req, NXT_UNIT_ERROR);
540 
541         goto release_task;
542     }
543 
544     Py_DECREF(res);
545 release_task:
546     Py_DECREF(task);
547 release_scope:
548     Py_DECREF(scope);
549 release_done:
550     Py_DECREF(done);
551 release_send:
552     Py_DECREF(send);
553 release_receive:
554     Py_DECREF(receive);
555 release_asgi:
556     Py_DECREF(asgi);
557 }
558 
559 
560 static void
561 nxt_py_asgi_close_handler(nxt_unit_request_info_t *req)
562 {
563     if (req->request->websocket_handshake) {
564         nxt_py_asgi_websocket_close_handler(req);
565 
566     } else {
567         nxt_py_asgi_http_close_handler(req);
568     }
569 }
570 
571 
572 static PyObject *
573 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
574 {
575     char                *p, *target, *query;
576     uint32_t            target_length, i;
577     PyObject            *scope, *v, *type, *scheme;
578     PyObject            *headers, *header;
579     nxt_unit_field_t    *f;
580     nxt_unit_request_t  *r;
581 
582     static const nxt_str_t  ws_protocol = nxt_string("sec-websocket-protocol");
583 
584 #define SET_ITEM(dict, key, value) \
585     if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value)      \
586                       == -1))                                                  \
587     {                                                                          \
588         nxt_unit_req_alert(req, "Python failed to set '"                       \
589                                 #dict "." #key "' item");                      \
590         goto fail;                                                             \
591     }
592 
593     v = NULL;
594     headers = NULL;
595 
596     r = req->request;
597 
598     if (r->websocket_handshake) {
599         type = nxt_py_websocket_str;
600         scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str;
601 
602     } else {
603         type = nxt_py_http_str;
604         scheme = r->tls ? nxt_py_https_str : nxt_py_http_str;
605     }
606 
607     scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str);
608     if (nxt_slow_path(scope == NULL)) {
609         return NULL;
610     }
611 
612     p = nxt_unit_sptr_get(&r->version);
613     SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str
614                                               : nxt_py_1_0_str)
615     SET_ITEM(scope, scheme, scheme)
616 
617     v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method),
618                                    r->method_length);
619     if (nxt_slow_path(v == NULL)) {
620         nxt_unit_req_alert(req, "Python failed to create 'method' string");
621         goto fail;
622     }
623 
624     SET_ITEM(scope, method, v)
625     Py_DECREF(v);
626 
627     v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length,
628                              "replace");
629     if (nxt_slow_path(v == NULL)) {
630         nxt_unit_req_alert(req, "Python failed to create 'path' string");
631         goto fail;
632     }
633 
634     SET_ITEM(scope, path, v)
635     Py_DECREF(v);
636 
637     target = nxt_unit_sptr_get(&r->target);
638     query = nxt_unit_sptr_get(&r->query);
639 
640     if (r->query.offset != 0) {
641         target_length = query - target - 1;
642 
643     } else {
644         target_length = r->target_length;
645     }
646 
647     v = PyBytes_FromStringAndSize(target, target_length);
648     if (nxt_slow_path(v == NULL)) {
649         nxt_unit_req_alert(req, "Python failed to create 'raw_path' string");
650         goto fail;
651     }
652 
653     SET_ITEM(scope, raw_path, v)
654     Py_DECREF(v);
655 
656     v = PyBytes_FromStringAndSize(query, r->query_length);
657     if (nxt_slow_path(v == NULL)) {
658         nxt_unit_req_alert(req, "Python failed to create 'query' string");
659         goto fail;
660     }
661 
662     SET_ITEM(scope, query_string, v)
663     Py_DECREF(v);
664 
665     v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0);
666     if (nxt_slow_path(v == NULL)) {
667         nxt_unit_req_alert(req, "Python failed to create 'client' pair");
668         goto fail;
669     }
670 
671     SET_ITEM(scope, client, v)
672     Py_DECREF(v);
673 
674     v = nxt_py_asgi_create_address(&r->local, r->local_length, 80);
675     if (nxt_slow_path(v == NULL)) {
676         nxt_unit_req_alert(req, "Python failed to create 'server' pair");
677         goto fail;
678     }
679 
680     SET_ITEM(scope, server, v)
681     Py_DECREF(v);
682 
683     v = NULL;
684 
685     headers = PyTuple_New(r->fields_count);
686     if (nxt_slow_path(headers == NULL)) {
687         nxt_unit_req_alert(req, "Python failed to create 'headers' object");
688         goto fail;
689     }
690 
691     for (i = 0; i < r->fields_count; i++) {
692         f = r->fields + i;
693 
694         header = nxt_py_asgi_create_header(f);
695         if (nxt_slow_path(header == NULL)) {
696             nxt_unit_req_alert(req, "Python failed to create 'header' pair");
697             goto fail;
698         }
699 
700         PyTuple_SET_ITEM(headers, i, header);
701 
702         if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL
703             && f->name_length == ws_protocol.length
704             && f->value_length > 0
705             && r->websocket_handshake)
706         {
707             v = nxt_py_asgi_create_subprotocols(f);
708             if (nxt_slow_path(v == NULL)) {
709                 nxt_unit_req_alert(req, "Failed to create subprotocols");
710                 goto fail;
711             }
712 
713             SET_ITEM(scope, subprotocols, v);
714             Py_DECREF(v);
715         }
716     }
717 
718     SET_ITEM(scope, headers, headers)
719     Py_DECREF(headers);
720 
721     return scope;
722 
723 fail:
724 
725     Py_XDECREF(v);
726     Py_XDECREF(headers);
727     Py_DECREF(scope);
728 
729     return NULL;
730 
731 #undef SET_ITEM
732 }
733 
734 
735 static PyObject *
736 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port)
737 {
738     char      *p, *s;
739     PyObject  *pair, *v;
740 
741     pair = PyTuple_New(2);
742     if (nxt_slow_path(pair == NULL)) {
743         return NULL;
744     }
745 
746     p = nxt_unit_sptr_get(sptr);
747     s = memchr(p, ':', len);
748 
749     v = PyString_FromStringAndSize(p, s == NULL ? len : s - p);
750     if (nxt_slow_path(v == NULL)) {
751         Py_DECREF(pair);
752 
753         return NULL;
754     }
755 
756     PyTuple_SET_ITEM(pair, 0, v);
757 
758     if (s != NULL) {
759         p += len;
760         v = PyLong_FromString(s + 1, &p, 10);
761 
762     } else {
763         v = PyLong_FromLong(port);
764     }
765 
766     if (nxt_slow_path(v == NULL)) {
767         Py_DECREF(pair);
768 
769         return NULL;
770     }
771 
772     PyTuple_SET_ITEM(pair, 1, v);
773 
774     return pair;
775 }
776 
777 
778 static PyObject *
779 nxt_py_asgi_create_header(nxt_unit_field_t *f)
780 {
781     char      c, *name;
782     uint8_t   pos;
783     PyObject  *header, *v;
784 
785     header = PyTuple_New(2);
786     if (nxt_slow_path(header == NULL)) {
787         return NULL;
788     }
789 
790     name = nxt_unit_sptr_get(&f->name);
791 
792     for (pos = 0; pos < f->name_length; pos++) {
793         c = name[pos];
794         if (c >= 'A' && c <= 'Z') {
795             name[pos] = (c | 0x20);
796         }
797     }
798 
799     v = PyBytes_FromStringAndSize(name, f->name_length);
800     if (nxt_slow_path(v == NULL)) {
801         Py_DECREF(header);
802 
803         return NULL;
804     }
805 
806     PyTuple_SET_ITEM(header, 0, v);
807 
808     v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value),
809                                   f->value_length);
810     if (nxt_slow_path(v == NULL)) {
811         Py_DECREF(header);
812 
813         return NULL;
814     }
815 
816     PyTuple_SET_ITEM(header, 1, v);
817 
818     return header;
819 }
820 
821 
822 static PyObject *
823 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f)
824 {
825     char      *v;
826     uint32_t  i, n, start;
827     PyObject  *res, *proto;
828 
829     v = nxt_unit_sptr_get(&f->value);
830     n = 1;
831 
832     for (i = 0; i < f->value_length; i++) {
833         if (v[i] == ',') {
834             n++;
835         }
836     }
837 
838     res = PyTuple_New(n);
839     if (nxt_slow_path(res == NULL)) {
840         return NULL;
841     }
842 
843     n = 0;
844     start = 0;
845 
846     for (i = 0; i < f->value_length; ) {
847         if (v[i] != ',') {
848             i++;
849 
850             continue;
851         }
852 
853         if (i - start > 0) {
854             proto = PyString_FromStringAndSize(v + start, i - start);
855             if (nxt_slow_path(proto == NULL)) {
856                 goto fail;
857             }
858 
859             PyTuple_SET_ITEM(res, n, proto);
860 
861             n++;
862         }
863 
864         do {
865             i++;
866         } while (i < f->value_length && v[i] == ' ');
867 
868         start = i;
869     }
870 
871     if (i - start > 0) {
872         proto = PyString_FromStringAndSize(v + start, i - start);
873         if (nxt_slow_path(proto == NULL)) {
874             goto fail;
875         }
876 
877         PyTuple_SET_ITEM(res, n, proto);
878     }
879 
880     return res;
881 
882 fail:
883 
884     Py_DECREF(res);
885 
886     return NULL;
887 }
888 
889 
890 static int
891 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
892 {
893     int  nb;
894 
895     if (port->in_fd == -1) {
896         return NXT_UNIT_OK;
897     }
898 
899     nb = 1;
900 
901     if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
902         nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
903                        port->in_fd, strerror(errno), errno);
904 
905         return NXT_UNIT_ERROR;
906     }
907 
908     nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
909 
910     return nxt_py_asgi_add_reader(ctx, port);
911 }
912 
913 
914 static int
915 nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
916 {
917     int                     rc;
918     PyObject                *res, *fd, *py_ctx, *py_port;
919     nxt_py_asgi_ctx_data_t  *ctx_data;
920 
921     nxt_unit_debug(ctx, "asgi_add_reader %d %p %p", port->in_fd, ctx, port);
922 
923     ctx_data = ctx->data;
924 
925     fd = PyLong_FromLong(port->in_fd);
926     if (nxt_slow_path(fd == NULL)) {
927         nxt_unit_alert(ctx, "Python failed to create fd");
928         nxt_python_print_exception();
929 
930         return NXT_UNIT_ERROR;
931     }
932 
933     rc = NXT_UNIT_ERROR;
934 
935     py_ctx = PyLong_FromVoidPtr(ctx);
936     if (nxt_slow_path(py_ctx == NULL)) {
937         nxt_unit_alert(ctx, "Python failed to create py_ctx");
938         nxt_python_print_exception();
939 
940         goto clean_fd;
941     }
942 
943     py_port = PyLong_FromVoidPtr(port);
944     if (nxt_slow_path(py_port == NULL)) {
945         nxt_unit_alert(ctx, "Python failed to create py_port");
946         nxt_python_print_exception();
947 
948         goto clean_py_ctx;
949     }
950 
951     res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
952                                        fd, nxt_py_port_read,
953                                        py_ctx, py_port, NULL);
954     if (nxt_slow_path(res == NULL)) {
955         nxt_unit_alert(ctx, "Python failed to add_reader");
956         nxt_python_print_exception();
957 
958     } else {
959         Py_DECREF(res);
960 
961         rc = NXT_UNIT_OK;
962     }
963 
964     Py_DECREF(py_port);
965 
966 clean_py_ctx:
967 
968     Py_DECREF(py_ctx);
969 
970 clean_fd:
971 
972     Py_DECREF(fd);
973 
974     return rc;
975 }
976 
977 
978 static void
979 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx,
980     nxt_unit_port_t *port)
981 {
982     if (port->in_fd == -1 || ctx == NULL) {
983         return;
984     }
985 
986     nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
987 
988     nxt_py_asgi_remove_reader(ctx, port);
989 }
990 
991 
992 static void
993 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
994 {
995     PyObject                *res, *p;
996     nxt_py_asgi_ctx_data_t  *ctx_data;
997 
998     nxt_unit_debug(ctx, "asgi_quit %p", ctx);
999 
1000     ctx_data = ctx->data;
1001 
1002     p = PyLong_FromLong(0);
1003     if (nxt_slow_path(p == NULL)) {
1004         nxt_unit_alert(NULL, "Python failed to create Long");
1005         nxt_python_print_exception();
1006 
1007     } else {
1008         res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result,
1009                                            p, NULL);
1010         if (nxt_slow_path(res == NULL)) {
1011             nxt_unit_alert(ctx, "Python failed to set_result");
1012             nxt_python_print_exception();
1013 
1014         } else {
1015             Py_DECREF(res);
1016         }
1017 
1018         Py_DECREF(p);
1019     }
1020 }
1021 
1022 
1023 static void
1024 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
1025 {
1026     int                     rc;
1027     nxt_queue_link_t        *lnk;
1028     nxt_py_asgi_ctx_data_t  *ctx_data;
1029 
1030     ctx_data = ctx->data;
1031 
1032     while (!nxt_queue_is_empty(&ctx_data->drain_queue)) {
1033         lnk = nxt_queue_first(&ctx_data->drain_queue);
1034 
1035         rc = nxt_py_asgi_http_drain(lnk);
1036         if (rc == NXT_UNIT_AGAIN) {
1037             return;
1038         }
1039 
1040         nxt_queue_remove(lnk);
1041     }
1042 }
1043 
1044 
1045 static PyObject *
1046 nxt_py_asgi_port_read(PyObject *self, PyObject *args)
1047 {
1048     int                     rc;
1049     PyObject                *arg0, *arg1, *res;
1050     Py_ssize_t              n;
1051     nxt_unit_ctx_t          *ctx;
1052     nxt_unit_port_t         *port;
1053     nxt_py_asgi_ctx_data_t  *ctx_data;
1054 
1055     n = PyTuple_GET_SIZE(args);
1056 
1057     if (n != 2) {
1058         nxt_unit_alert(NULL,
1059                        "nxt_py_asgi_port_read: invalid number of arguments %d",
1060                        (int) n);
1061 
1062         return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
1063     }
1064 
1065     arg0 = PyTuple_GET_ITEM(args, 0);
1066     if (nxt_slow_path(arg0 == NULL || PyLong_Check(arg0) == 0)) {
1067         return PyErr_Format(PyExc_TypeError,
1068                             "the first argument is not a long");
1069     }
1070 
1071     ctx = PyLong_AsVoidPtr(arg0);
1072 
1073     arg1 = PyTuple_GET_ITEM(args, 1);
1074     if (nxt_slow_path(arg1 == NULL || PyLong_Check(arg1) == 0)) {
1075         return PyErr_Format(PyExc_TypeError,
1076                             "the second argument is not a long");
1077     }
1078 
1079     port = PyLong_AsVoidPtr(arg1);
1080 
1081     rc = nxt_unit_process_port_msg(ctx, port);
1082 
1083     nxt_unit_debug(ctx, "asgi_port_read(%p,%p): %d", ctx, port, rc);
1084 
1085     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
1086         return PyErr_Format(PyExc_RuntimeError,
1087                             "error processing port %d message", port->id.id);
1088     }
1089 
1090     if (rc == NXT_UNIT_OK) {
1091         ctx_data = ctx->data;
1092 
1093         res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon,
1094                                            nxt_py_port_read,
1095                                            arg0, arg1, NULL);
1096         if (nxt_slow_path(res == NULL)) {
1097             nxt_unit_alert(ctx, "Python failed to call 'loop.call_soon'");
1098             nxt_python_print_exception();
1099         }
1100 
1101         Py_XDECREF(res);
1102     }
1103 
1104     Py_RETURN_NONE;
1105 }
1106 
1107 
1108 PyObject *
1109 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,
1110     void *data)
1111 {
1112     int       i;
1113     PyObject  *iter, *header, *h_iter, *name, *val, *res;
1114 
1115     iter = PyObject_GetIter(headers);
1116     if (nxt_slow_path(iter == NULL)) {
1117         return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable");
1118     }
1119 
1120     for (i = 0; /* void */; i++) {
1121         header = PyIter_Next(iter);
1122         if (header == NULL) {
1123             break;
1124         }
1125 
1126         h_iter = PyObject_GetIter(header);
1127         if (nxt_slow_path(h_iter == NULL)) {
1128             Py_DECREF(header);
1129             Py_DECREF(iter);
1130 
1131             return PyErr_Format(PyExc_TypeError,
1132                                 "'headers' item #%d is not an iterable", i);
1133         }
1134 
1135         name = PyIter_Next(h_iter);
1136         if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) {
1137             Py_XDECREF(name);
1138             Py_DECREF(h_iter);
1139             Py_DECREF(header);
1140             Py_DECREF(iter);
1141 
1142             return PyErr_Format(PyExc_TypeError,
1143                           "'headers' item #%d 'name' is not a byte string", i);
1144         }
1145 
1146         val = PyIter_Next(h_iter);
1147         if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) {
1148             Py_XDECREF(val);
1149             Py_DECREF(h_iter);
1150             Py_DECREF(header);
1151             Py_DECREF(iter);
1152 
1153             return PyErr_Format(PyExc_TypeError,
1154                          "'headers' item #%d 'value' is not a byte string", i);
1155         }
1156 
1157         res = cb(data, i, name, val);
1158 
1159         Py_DECREF(name);
1160         Py_DECREF(val);
1161         Py_DECREF(h_iter);
1162         Py_DECREF(header);
1163 
1164         if (nxt_slow_path(res == NULL)) {
1165             Py_DECREF(iter);
1166 
1167             return NULL;
1168         }
1169 
1170         Py_DECREF(res);
1171     }
1172 
1173     Py_DECREF(iter);
1174 
1175     Py_RETURN_NONE;
1176 }
1177 
1178 
1179 PyObject *
1180 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val)
1181 {
1182     nxt_py_asgi_calc_size_ctx_t  *ctx;
1183 
1184     ctx = data;
1185 
1186     ctx->fields_count++;
1187     ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val);
1188 
1189     Py_RETURN_NONE;
1190 }
1191 
1192 
1193 PyObject *
1194 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
1195 {
1196     int                          rc;
1197     char                         *name_str, *val_str;
1198     uint32_t                     name_len, val_len;
1199     nxt_off_t                    content_length;
1200     nxt_unit_request_info_t      *req;
1201     nxt_py_asgi_add_field_ctx_t  *ctx;
1202 
1203     name_str = PyBytes_AS_STRING(name);
1204     name_len = PyBytes_GET_SIZE(name);
1205 
1206     val_str = PyBytes_AS_STRING(val);
1207     val_len = PyBytes_GET_SIZE(val);
1208 
1209     ctx = data;
1210     req = ctx->req;
1211 
1212     rc = nxt_unit_response_add_field(req, name_str, name_len,
1213                                      val_str, val_len);
1214     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1215         return PyErr_Format(PyExc_RuntimeError,
1216                             "failed to add header #%d", i);
1217     }
1218 
1219     if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) {
1220         content_length = nxt_off_t_parse((u_char *) val_str, val_len);
1221         if (nxt_slow_path(content_length < 0)) {
1222             nxt_unit_req_error(req, "failed to parse Content-Length "
1223                                "value %.*s", (int) val_len, val_str);
1224 
1225             return PyErr_Format(PyExc_ValueError,
1226                                 "Failed to parse Content-Length: '%.*s'",
1227                                 (int) val_len, val_str);
1228         }
1229 
1230         ctx->content_length = content_length;
1231     }
1232 
1233     Py_RETURN_NONE;
1234 }
1235 
1236 
1237 PyObject *
1238 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
1239     nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result)
1240 {
1241     PyObject  *set_result, *res;
1242 
1243     if (nxt_slow_path(result == NULL)) {
1244         Py_DECREF(future);
1245 
1246         return NULL;
1247     }
1248 
1249     set_result = PyObject_GetAttrString(future, "set_result");
1250     if (nxt_slow_path(set_result == NULL)) {
1251         nxt_unit_req_alert(req, "failed to get 'set_result' for future");
1252 
1253         Py_CLEAR(future);
1254 
1255         goto cleanup_result;
1256     }
1257 
1258     if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
1259         nxt_unit_req_alert(req, "'future.set_result' is not a callable");
1260 
1261         Py_CLEAR(future);
1262 
1263         goto cleanup;
1264     }
1265 
1266     res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result,
1267                                        result, NULL);
1268     if (nxt_slow_path(res == NULL)) {
1269         nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
1270         nxt_python_print_exception();
1271 
1272         Py_CLEAR(future);
1273     }
1274 
1275     Py_XDECREF(res);
1276 
1277 cleanup:
1278 
1279     Py_DECREF(set_result);
1280 
1281 cleanup_result:
1282 
1283     Py_DECREF(result);
1284 
1285     return future;
1286 }
1287 
1288 
1289 PyObject *
1290 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)
1291 {
1292     PyObject  *msg;
1293 
1294     msg = PyDict_New();
1295     if (nxt_slow_path(msg == NULL)) {
1296         nxt_unit_req_alert(req, "Python failed to create message dict");
1297         nxt_python_print_exception();
1298 
1299         return PyErr_Format(PyExc_RuntimeError,
1300                             "failed to create message dict");
1301     }
1302 
1303     if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) {
1304         nxt_unit_req_alert(req, "Python failed to set 'msg.type' item");
1305 
1306         Py_DECREF(msg);
1307 
1308         return PyErr_Format(PyExc_RuntimeError,
1309                             "failed to set 'msg.type' item");
1310     }
1311 
1312     return msg;
1313 }
1314 
1315 
1316 PyObject *
1317 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
1318     PyObject *spec_version)
1319 {
1320     PyObject  *scope, *asgi;
1321 
1322     scope = PyDict_New();
1323     if (nxt_slow_path(scope == NULL)) {
1324         nxt_unit_req_alert(req, "Python failed to create 'scope' dict");
1325         nxt_python_print_exception();
1326 
1327         return PyErr_Format(PyExc_RuntimeError,
1328                             "failed to create 'scope' dict");
1329     }
1330 
1331     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) {
1332         nxt_unit_req_alert(req, "Python failed to set 'scope.type' item");
1333 
1334         Py_DECREF(scope);
1335 
1336         return PyErr_Format(PyExc_RuntimeError,
1337                             "failed to set 'scope.type' item");
1338     }
1339 
1340     asgi = PyDict_New();
1341     if (nxt_slow_path(asgi == NULL)) {
1342         nxt_unit_req_alert(req, "Python failed to create 'asgi' dict");
1343         nxt_python_print_exception();
1344 
1345         Py_DECREF(scope);
1346 
1347         return PyErr_Format(PyExc_RuntimeError,
1348                             "failed to create 'asgi' dict");
1349     }
1350 
1351     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) {
1352         nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item");
1353 
1354         Py_DECREF(asgi);
1355         Py_DECREF(scope);
1356 
1357         return PyErr_Format(PyExc_RuntimeError,
1358                             "failed to set 'scope.asgi' item");
1359     }
1360 
1361     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str,
1362                                      nxt_py_3_0_str) == -1))
1363     {
1364         nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item");
1365 
1366         Py_DECREF(asgi);
1367         Py_DECREF(scope);
1368 
1369         return PyErr_Format(PyExc_RuntimeError,
1370                             "failed to set 'asgi.version' item");
1371     }
1372 
1373     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str,
1374                                      spec_version) == -1))
1375     {
1376         nxt_unit_req_alert(req,
1377                            "Python failed to set 'asgi.spec_version' item");
1378 
1379         Py_DECREF(asgi);
1380         Py_DECREF(scope);
1381 
1382         return PyErr_Format(PyExc_RuntimeError,
1383                             "failed to set 'asgi.spec_version' item");
1384     }
1385 
1386     Py_DECREF(asgi);
1387 
1388     return scope;
1389 }
1390 
1391 
1392 void
1393 nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link)
1394 {
1395     nxt_py_asgi_ctx_data_t  *ctx_data;
1396 
1397     ctx_data = req->ctx->data;
1398 
1399     nxt_queue_insert_tail(&ctx_data->drain_queue, link);
1400 }
1401 
1402 
1403 void
1404 nxt_py_asgi_dealloc(PyObject *self)
1405 {
1406     PyObject_Del(self);
1407 }
1408 
1409 
1410 PyObject *
1411 nxt_py_asgi_await(PyObject *self)
1412 {
1413     Py_INCREF(self);
1414     return self;
1415 }
1416 
1417 
1418 PyObject *
1419 nxt_py_asgi_iter(PyObject *self)
1420 {
1421     Py_INCREF(self);
1422     return self;
1423 }
1424 
1425 
1426 PyObject *
1427 nxt_py_asgi_next(PyObject *self)
1428 {
1429     return NULL;
1430 }
1431 
1432 
1433 static void
1434 nxt_python_asgi_done(void)
1435 {
1436     nxt_py_asgi_str_done();
1437 
1438     Py_XDECREF(nxt_py_port_read);
1439 }
1440 
1441 #else /* !(NXT_HAVE_ASGI) */
1442 
1443 
1444 int
1445 nxt_python_asgi_check(PyObject *obj)
1446 {
1447     return 0;
1448 }
1449 
1450 
1451 int
1452 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
1453 {
1454     nxt_unit_alert(NULL, "ASGI not implemented");
1455     return NXT_UNIT_ERROR;
1456 }
1457 
1458 
1459 #endif /* NXT_HAVE_ASGI */
1460