xref: /unit/src/python/nxt_python_asgi.c (revision 1980:43553aa72111)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <nxt_unit_response.h>
15 #include <python/nxt_python_asgi.h>
16 #include <python/nxt_python_asgi_str.h>
17 
18 
19 static PyObject *nxt_python_asgi_get_func(PyObject *obj);
20 static int nxt_python_asgi_ctx_data_alloc(void **pdata, int main);
21 static void nxt_python_asgi_ctx_data_free(void *data);
22 static int nxt_python_asgi_startup(void *data);
23 static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
24 
25 static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
26     nxt_unit_port_t *port);
27 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
28 static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req);
29 
30 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
31 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
32     uint16_t port);
33 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
34 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
35 
36 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
37 static int nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
38 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx,
39     nxt_unit_port_t *port);
40 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
41 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
42 
43 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
44 static void nxt_python_asgi_done(void);
45 
46 static PyObject           *nxt_py_port_read;
47 
48 static PyMethodDef        nxt_py_port_read_method =
49     {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
50 
51 static nxt_python_proto_t  nxt_py_asgi_proto = {
52     .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc,
53     .ctx_data_free  = nxt_python_asgi_ctx_data_free,
54     .startup        = nxt_python_asgi_startup,
55     .run            = nxt_python_asgi_run,
56     .done           = nxt_python_asgi_done,
57 };
58 
59 #define NXT_UNIT_HASH_WS_PROTOCOL  0xED0A
60 
61 
62 int
63 nxt_python_asgi_check(PyObject *obj)
64 {
65     int           res;
66     PyObject      *func;
67     PyCodeObject  *code;
68 
69     func = nxt_python_asgi_get_func(obj);
70 
71     if (func == NULL) {
72         return 0;
73     }
74 
75     code = (PyCodeObject *) PyFunction_GET_CODE(func);
76 
77     nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with "
78                          "%d argument(s)",
79                    (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ",
80                    code->co_argcount);
81 
82     res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1;
83 
84     Py_DECREF(func);
85 
86     return res;
87 }
88 
89 
90 static PyObject *
91 nxt_python_asgi_get_func(PyObject *obj)
92 {
93     PyObject  *call;
94 
95     if (PyFunction_Check(obj)) {
96         Py_INCREF(obj);
97         return obj;
98     }
99 
100     if (PyMethod_Check(obj)) {
101         obj = PyMethod_GET_FUNCTION(obj);
102 
103         Py_INCREF(obj);
104         return obj;
105     }
106 
107     call = PyObject_GetAttrString(obj, "__call__");
108 
109     if (call == NULL) {
110         return NULL;
111     }
112 
113     if (PyFunction_Check(call)) {
114         return call;
115     }
116 
117     if (PyMethod_Check(call)) {
118         obj = PyMethod_GET_FUNCTION(call);
119 
120         Py_INCREF(obj);
121         Py_DECREF(call);
122 
123         return obj;
124     }
125 
126     Py_DECREF(call);
127 
128     return NULL;
129 }
130 
131 
132 int
133 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
134 {
135     PyObject      *func;
136     nxt_int_t     i;
137     PyCodeObject  *code;
138 
139     nxt_unit_debug(NULL, "asgi_init");
140 
141     if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) {
142         nxt_unit_alert(NULL, "Python failed to init string objects");
143         return NXT_UNIT_ERROR;
144     }
145 
146     nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
147     if (nxt_slow_path(nxt_py_port_read == NULL)) {
148         nxt_unit_alert(NULL,
149                        "Python failed to initialize the 'port_read' function");
150         return NXT_UNIT_ERROR;
151     }
152 
153     if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) {
154         return NXT_UNIT_ERROR;
155     }
156 
157     if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) {
158         return NXT_UNIT_ERROR;
159     }
160 
161     for (i = 0; i < nxt_py_targets->count; i++) {
162         func = nxt_python_asgi_get_func(nxt_py_targets->target[i].application);
163         if (nxt_slow_path(func == NULL)) {
164             nxt_unit_alert(NULL, "Python cannot find function for callable");
165             return NXT_UNIT_ERROR;
166         }
167 
168         code = (PyCodeObject *) PyFunction_GET_CODE(func);
169 
170         if ((code->co_flags & CO_COROUTINE) == 0) {
171             nxt_unit_debug(NULL, "asgi: callable is not a coroutine function "
172                                  "switching to legacy mode");
173             nxt_py_targets->target[i].asgi_legacy = 1;
174         }
175 
176         Py_DECREF(func);
177     }
178 
179     init->callbacks.request_handler = nxt_py_asgi_request_handler;
180     init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
181     init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
182     init->callbacks.close_handler = nxt_py_asgi_close_handler;
183     init->callbacks.quit = nxt_py_asgi_quit;
184     init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
185     init->callbacks.add_port = nxt_py_asgi_add_port;
186     init->callbacks.remove_port = nxt_py_asgi_remove_port;
187 
188     *proto = nxt_py_asgi_proto;
189 
190     return NXT_UNIT_OK;
191 }
192 
193 
194 static int
195 nxt_python_asgi_ctx_data_alloc(void **pdata, int main)
196 {
197     uint32_t                i;
198     PyObject                *asyncio, *loop, *event_loop, *obj;
199     const char              *event_loop_func;
200     nxt_py_asgi_ctx_data_t  *ctx_data;
201 
202     ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t));
203     if (nxt_slow_path(ctx_data == NULL)) {
204         nxt_unit_alert(NULL, "Failed to allocate context data");
205         return NXT_UNIT_ERROR;
206     }
207 
208     memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t));
209 
210     nxt_queue_init(&ctx_data->drain_queue);
211 
212     struct {
213         const char  *key;
214         PyObject    **handler;
215 
216     } handlers[] = {
217         { "create_task",        &ctx_data->loop_create_task },
218         { "add_reader",         &ctx_data->loop_add_reader },
219         { "remove_reader",      &ctx_data->loop_remove_reader },
220         { "call_soon",          &ctx_data->loop_call_soon },
221         { "run_until_complete", &ctx_data->loop_run_until_complete },
222         { "create_future",      &ctx_data->loop_create_future },
223     };
224 
225     loop = NULL;
226 
227     asyncio = PyImport_ImportModule("asyncio");
228     if (nxt_slow_path(asyncio == NULL)) {
229         nxt_unit_alert(NULL, "Python failed to import module 'asyncio'");
230         nxt_python_print_exception();
231         goto fail;
232     }
233 
234     event_loop_func = main ? "get_event_loop" : "new_event_loop";
235 
236     event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
237                                       event_loop_func);
238     if (nxt_slow_path(event_loop == NULL)) {
239         nxt_unit_alert(NULL,
240                        "Python failed to get '%s' from module 'asyncio'",
241                        event_loop_func);
242         goto fail;
243     }
244 
245     if (nxt_slow_path(PyCallable_Check(event_loop) == 0)) {
246         nxt_unit_alert(NULL,
247                        "'asyncio.%s' is not a callable object",
248                        event_loop_func);
249         goto fail;
250     }
251 
252     loop = PyObject_CallObject(event_loop, NULL);
253     if (nxt_slow_path(loop == NULL)) {
254         nxt_unit_alert(NULL, "Python failed to call 'asyncio.%s'",
255                        event_loop_func);
256         goto fail;
257     }
258 
259     for (i = 0; i < nxt_nitems(handlers); i++) {
260         obj = PyObject_GetAttrString(loop, handlers[i].key);
261         if (nxt_slow_path(obj == NULL)) {
262             nxt_unit_alert(NULL, "Python failed to get 'loop.%s'",
263                                  handlers[i].key);
264             goto fail;
265         }
266 
267         *handlers[i].handler = obj;
268 
269         if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
270             nxt_unit_alert(NULL, "'loop.%s' is not a callable object",
271                                  handlers[i].key);
272             goto fail;
273         }
274     }
275 
276     obj = PyObject_CallObject(ctx_data->loop_create_future, NULL);
277     if (nxt_slow_path(obj == NULL)) {
278         nxt_unit_alert(NULL, "Python failed to create Future ");
279         nxt_python_print_exception();
280         goto fail;
281     }
282 
283     ctx_data->quit_future = obj;
284 
285     obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result");
286     if (nxt_slow_path(obj == NULL)) {
287         nxt_unit_alert(NULL, "Python failed to get 'future.set_result'");
288         goto fail;
289     }
290 
291     ctx_data->quit_future_set_result = obj;
292 
293     if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
294         nxt_unit_alert(NULL, "'future.set_result' is not a callable object");
295         goto fail;
296     }
297 
298     Py_DECREF(loop);
299     Py_DECREF(asyncio);
300 
301     *pdata = ctx_data;
302 
303     return NXT_UNIT_OK;
304 
305 fail:
306 
307     nxt_python_asgi_ctx_data_free(ctx_data);
308 
309     Py_XDECREF(loop);
310     Py_XDECREF(asyncio);
311 
312     return NXT_UNIT_ERROR;
313 }
314 
315 
316 static void
317 nxt_python_asgi_ctx_data_free(void *data)
318 {
319     nxt_py_asgi_ctx_data_t  *ctx_data;
320 
321     ctx_data = data;
322 
323     Py_XDECREF(ctx_data->loop_run_until_complete);
324     Py_XDECREF(ctx_data->loop_create_future);
325     Py_XDECREF(ctx_data->loop_create_task);
326     Py_XDECREF(ctx_data->loop_call_soon);
327     Py_XDECREF(ctx_data->loop_add_reader);
328     Py_XDECREF(ctx_data->loop_remove_reader);
329     Py_XDECREF(ctx_data->quit_future);
330     Py_XDECREF(ctx_data->quit_future_set_result);
331 
332     nxt_unit_free(NULL, ctx_data);
333 }
334 
335 
336 static int
337 nxt_python_asgi_startup(void *data)
338 {
339     return nxt_py_asgi_lifespan_startup(data);
340 }
341 
342 
343 static int
344 nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
345 {
346     PyObject                *res;
347     nxt_py_asgi_ctx_data_t  *ctx_data;
348 
349     ctx_data = ctx->data;
350 
351     res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete,
352                                        ctx_data->quit_future, NULL);
353     if (nxt_slow_path(res == NULL)) {
354         nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
355         nxt_python_print_exception();
356 
357         return NXT_UNIT_ERROR;
358     }
359 
360     Py_DECREF(res);
361 
362     nxt_py_asgi_lifespan_shutdown(ctx);
363 
364     return NXT_UNIT_OK;
365 }
366 
367 
368 static void
369 nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
370 {
371     PyObject                *res, *fd;
372     nxt_py_asgi_ctx_data_t  *ctx_data;
373 
374     if (port == NULL || port->in_fd == -1) {
375         return;
376     }
377 
378     ctx_data = ctx->data;
379 
380     nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port);
381 
382     fd = PyLong_FromLong(port->in_fd);
383     if (nxt_slow_path(fd == NULL)) {
384         nxt_unit_alert(ctx, "Python failed to create Long object");
385         nxt_python_print_exception();
386 
387         return;
388     }
389 
390     res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL);
391     if (nxt_slow_path(res == NULL)) {
392         nxt_unit_alert(ctx, "Python failed to remove_reader");
393         nxt_python_print_exception();
394 
395     } else {
396         Py_DECREF(res);
397     }
398 
399     Py_DECREF(fd);
400 }
401 
402 
403 static void
404 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
405 {
406     PyObject                *scope, *res, *task, *receive, *send, *done, *asgi;
407     PyObject                *stage2;
408     nxt_python_target_t     *target;
409     nxt_py_asgi_ctx_data_t  *ctx_data;
410 
411     if (req->request->websocket_handshake) {
412         asgi = nxt_py_asgi_websocket_create(req);
413 
414     } else {
415         asgi = nxt_py_asgi_http_create(req);
416     }
417 
418     if (nxt_slow_path(asgi == NULL)) {
419         nxt_unit_req_alert(req, "Python failed to create asgi object");
420         nxt_unit_request_done(req, NXT_UNIT_ERROR);
421 
422         return;
423     }
424 
425     receive = PyObject_GetAttrString(asgi, "receive");
426     if (nxt_slow_path(receive == NULL)) {
427         nxt_unit_req_alert(req, "Python failed to get 'receive' method");
428         nxt_unit_request_done(req, NXT_UNIT_ERROR);
429 
430         goto release_asgi;
431     }
432 
433     send = PyObject_GetAttrString(asgi, "send");
434     if (nxt_slow_path(receive == NULL)) {
435         nxt_unit_req_alert(req, "Python failed to get 'send' method");
436         nxt_unit_request_done(req, NXT_UNIT_ERROR);
437 
438         goto release_receive;
439     }
440 
441     done = PyObject_GetAttrString(asgi, "_done");
442     if (nxt_slow_path(receive == NULL)) {
443         nxt_unit_req_alert(req, "Python failed to get '_done' method");
444         nxt_unit_request_done(req, NXT_UNIT_ERROR);
445 
446         goto release_send;
447     }
448 
449     scope = nxt_py_asgi_create_http_scope(req);
450     if (nxt_slow_path(scope == NULL)) {
451         nxt_unit_request_done(req, NXT_UNIT_ERROR);
452 
453         goto release_done;
454     }
455 
456     req->data = asgi;
457     target = &nxt_py_targets->target[req->request->app_target];
458 
459     if (!target->asgi_legacy) {
460         nxt_unit_req_debug(req, "Python call ASGI 3.0 application");
461 
462         res = PyObject_CallFunctionObjArgs(target->application,
463                                            scope, receive, send, NULL);
464 
465     } else {
466         nxt_unit_req_debug(req, "Python call legacy application");
467 
468         res = PyObject_CallFunctionObjArgs(target->application, scope, NULL);
469 
470         if (nxt_slow_path(res == NULL)) {
471             nxt_unit_req_error(req, "Python failed to call legacy app stage1");
472             nxt_python_print_exception();
473             nxt_unit_request_done(req, NXT_UNIT_ERROR);
474 
475             goto release_scope;
476         }
477 
478         if (nxt_slow_path(PyCallable_Check(res) == 0)) {
479             nxt_unit_req_error(req,
480                               "Legacy ASGI application returns not a callable");
481             nxt_unit_request_done(req, NXT_UNIT_ERROR);
482 
483             Py_DECREF(res);
484 
485             goto release_scope;
486         }
487 
488         stage2 = res;
489 
490         res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL);
491 
492         Py_DECREF(stage2);
493     }
494 
495     if (nxt_slow_path(res == NULL)) {
496         nxt_unit_req_error(req, "Python failed to call the application");
497         nxt_python_print_exception();
498         nxt_unit_request_done(req, NXT_UNIT_ERROR);
499 
500         goto release_scope;
501     }
502 
503     if (nxt_slow_path(!PyCoro_CheckExact(res))) {
504         nxt_unit_req_error(req, "Application result type is not a coroutine");
505         nxt_unit_request_done(req, NXT_UNIT_ERROR);
506 
507         Py_DECREF(res);
508 
509         goto release_scope;
510     }
511 
512     ctx_data = req->ctx->data;
513 
514     task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL);
515     if (nxt_slow_path(task == NULL)) {
516         nxt_unit_req_error(req, "Python failed to call the create_task");
517         nxt_python_print_exception();
518         nxt_unit_request_done(req, NXT_UNIT_ERROR);
519 
520         Py_DECREF(res);
521 
522         goto release_scope;
523     }
524 
525     Py_DECREF(res);
526 
527     res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done,
528                                      NULL);
529     if (nxt_slow_path(res == NULL)) {
530         nxt_unit_req_error(req,
531                            "Python failed to call 'task.add_done_callback'");
532         nxt_python_print_exception();
533         nxt_unit_request_done(req, NXT_UNIT_ERROR);
534 
535         goto release_task;
536     }
537 
538     Py_DECREF(res);
539 release_task:
540     Py_DECREF(task);
541 release_scope:
542     Py_DECREF(scope);
543 release_done:
544     Py_DECREF(done);
545 release_send:
546     Py_DECREF(send);
547 release_receive:
548     Py_DECREF(receive);
549 release_asgi:
550     Py_DECREF(asgi);
551 }
552 
553 
554 static void
555 nxt_py_asgi_close_handler(nxt_unit_request_info_t *req)
556 {
557     if (req->request->websocket_handshake) {
558         nxt_py_asgi_websocket_close_handler(req);
559 
560     } else {
561         nxt_py_asgi_http_close_handler(req);
562     }
563 }
564 
565 
566 static PyObject *
567 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
568 {
569     char                *p, *target, *query;
570     uint32_t            target_length, i;
571     PyObject            *scope, *v, *type, *scheme;
572     PyObject            *headers, *header;
573     nxt_unit_field_t    *f;
574     nxt_unit_request_t  *r;
575 
576     static const nxt_str_t  ws_protocol = nxt_string("sec-websocket-protocol");
577 
578 #define SET_ITEM(dict, key, value) \
579     if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value)      \
580                       == -1))                                                  \
581     {                                                                          \
582         nxt_unit_req_alert(req, "Python failed to set '"                       \
583                                 #dict "." #key "' item");                      \
584         goto fail;                                                             \
585     }
586 
587     v = NULL;
588     headers = NULL;
589 
590     r = req->request;
591 
592     if (r->websocket_handshake) {
593         type = nxt_py_websocket_str;
594         scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str;
595 
596     } else {
597         type = nxt_py_http_str;
598         scheme = r->tls ? nxt_py_https_str : nxt_py_http_str;
599     }
600 
601     scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str);
602     if (nxt_slow_path(scope == NULL)) {
603         return NULL;
604     }
605 
606     p = nxt_unit_sptr_get(&r->version);
607     SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str
608                                               : nxt_py_1_0_str)
609     SET_ITEM(scope, scheme, scheme)
610 
611     v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method),
612                                    r->method_length);
613     if (nxt_slow_path(v == NULL)) {
614         nxt_unit_req_alert(req, "Python failed to create 'method' string");
615         goto fail;
616     }
617 
618     SET_ITEM(scope, method, v)
619     Py_DECREF(v);
620 
621     v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length,
622                              "replace");
623     if (nxt_slow_path(v == NULL)) {
624         nxt_unit_req_alert(req, "Python failed to create 'path' string");
625         goto fail;
626     }
627 
628     SET_ITEM(scope, path, v)
629     Py_DECREF(v);
630 
631     target = nxt_unit_sptr_get(&r->target);
632     query = nxt_unit_sptr_get(&r->query);
633 
634     if (r->query.offset != 0) {
635         target_length = query - target - 1;
636 
637     } else {
638         target_length = r->target_length;
639     }
640 
641     v = PyBytes_FromStringAndSize(target, target_length);
642     if (nxt_slow_path(v == NULL)) {
643         nxt_unit_req_alert(req, "Python failed to create 'raw_path' string");
644         goto fail;
645     }
646 
647     SET_ITEM(scope, raw_path, v)
648     Py_DECREF(v);
649 
650     v = PyBytes_FromStringAndSize(query, r->query_length);
651     if (nxt_slow_path(v == NULL)) {
652         nxt_unit_req_alert(req, "Python failed to create 'query' string");
653         goto fail;
654     }
655 
656     SET_ITEM(scope, query_string, v)
657     Py_DECREF(v);
658 
659     v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0);
660     if (nxt_slow_path(v == NULL)) {
661         nxt_unit_req_alert(req, "Python failed to create 'client' pair");
662         goto fail;
663     }
664 
665     SET_ITEM(scope, client, v)
666     Py_DECREF(v);
667 
668     v = nxt_py_asgi_create_address(&r->local, r->local_length, 80);
669     if (nxt_slow_path(v == NULL)) {
670         nxt_unit_req_alert(req, "Python failed to create 'server' pair");
671         goto fail;
672     }
673 
674     SET_ITEM(scope, server, v)
675     Py_DECREF(v);
676 
677     v = NULL;
678 
679     headers = PyTuple_New(r->fields_count);
680     if (nxt_slow_path(headers == NULL)) {
681         nxt_unit_req_alert(req, "Python failed to create 'headers' object");
682         goto fail;
683     }
684 
685     for (i = 0; i < r->fields_count; i++) {
686         f = r->fields + i;
687 
688         header = nxt_py_asgi_create_header(f);
689         if (nxt_slow_path(header == NULL)) {
690             nxt_unit_req_alert(req, "Python failed to create 'header' pair");
691             goto fail;
692         }
693 
694         PyTuple_SET_ITEM(headers, i, header);
695 
696         if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL
697             && f->name_length == ws_protocol.length
698             && f->value_length > 0
699             && r->websocket_handshake)
700         {
701             v = nxt_py_asgi_create_subprotocols(f);
702             if (nxt_slow_path(v == NULL)) {
703                 nxt_unit_req_alert(req, "Failed to create subprotocols");
704                 goto fail;
705             }
706 
707             SET_ITEM(scope, subprotocols, v);
708             Py_DECREF(v);
709         }
710     }
711 
712     SET_ITEM(scope, headers, headers)
713     Py_DECREF(headers);
714 
715     return scope;
716 
717 fail:
718 
719     Py_XDECREF(v);
720     Py_XDECREF(headers);
721     Py_DECREF(scope);
722 
723     return NULL;
724 
725 #undef SET_ITEM
726 }
727 
728 
729 static PyObject *
730 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port)
731 {
732     char      *p, *s;
733     PyObject  *pair, *v;
734 
735     pair = PyTuple_New(2);
736     if (nxt_slow_path(pair == NULL)) {
737         return NULL;
738     }
739 
740     p = nxt_unit_sptr_get(sptr);
741     s = memchr(p, ':', len);
742 
743     v = PyString_FromStringAndSize(p, s == NULL ? len : s - p);
744     if (nxt_slow_path(v == NULL)) {
745         Py_DECREF(pair);
746 
747         return NULL;
748     }
749 
750     PyTuple_SET_ITEM(pair, 0, v);
751 
752     if (s != NULL) {
753         p += len;
754         v = PyLong_FromString(s + 1, &p, 10);
755 
756     } else {
757         v = PyLong_FromLong(port);
758     }
759 
760     if (nxt_slow_path(v == NULL)) {
761         Py_DECREF(pair);
762 
763         return NULL;
764     }
765 
766     PyTuple_SET_ITEM(pair, 1, v);
767 
768     return pair;
769 }
770 
771 
772 static PyObject *
773 nxt_py_asgi_create_header(nxt_unit_field_t *f)
774 {
775     char      c, *name;
776     uint8_t   pos;
777     PyObject  *header, *v;
778 
779     header = PyTuple_New(2);
780     if (nxt_slow_path(header == NULL)) {
781         return NULL;
782     }
783 
784     name = nxt_unit_sptr_get(&f->name);
785 
786     for (pos = 0; pos < f->name_length; pos++) {
787         c = name[pos];
788         if (c >= 'A' && c <= 'Z') {
789             name[pos] = (c | 0x20);
790         }
791     }
792 
793     v = PyBytes_FromStringAndSize(name, f->name_length);
794     if (nxt_slow_path(v == NULL)) {
795         Py_DECREF(header);
796 
797         return NULL;
798     }
799 
800     PyTuple_SET_ITEM(header, 0, v);
801 
802     v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value),
803                                   f->value_length);
804     if (nxt_slow_path(v == NULL)) {
805         Py_DECREF(header);
806 
807         return NULL;
808     }
809 
810     PyTuple_SET_ITEM(header, 1, v);
811 
812     return header;
813 }
814 
815 
816 static PyObject *
817 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f)
818 {
819     char      *v;
820     uint32_t  i, n, start;
821     PyObject  *res, *proto;
822 
823     v = nxt_unit_sptr_get(&f->value);
824     n = 1;
825 
826     for (i = 0; i < f->value_length; i++) {
827         if (v[i] == ',') {
828             n++;
829         }
830     }
831 
832     res = PyTuple_New(n);
833     if (nxt_slow_path(res == NULL)) {
834         return NULL;
835     }
836 
837     n = 0;
838     start = 0;
839 
840     for (i = 0; i < f->value_length; ) {
841         if (v[i] != ',') {
842             i++;
843 
844             continue;
845         }
846 
847         if (i - start > 0) {
848             proto = PyString_FromStringAndSize(v + start, i - start);
849             if (nxt_slow_path(proto == NULL)) {
850                 goto fail;
851             }
852 
853             PyTuple_SET_ITEM(res, n, proto);
854 
855             n++;
856         }
857 
858         do {
859             i++;
860         } while (i < f->value_length && v[i] == ' ');
861 
862         start = i;
863     }
864 
865     if (i - start > 0) {
866         proto = PyString_FromStringAndSize(v + start, i - start);
867         if (nxt_slow_path(proto == NULL)) {
868             goto fail;
869         }
870 
871         PyTuple_SET_ITEM(res, n, proto);
872     }
873 
874     return res;
875 
876 fail:
877 
878     Py_DECREF(res);
879 
880     return NULL;
881 }
882 
883 
884 static int
885 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
886 {
887     int  nb;
888 
889     if (port->in_fd == -1) {
890         return NXT_UNIT_OK;
891     }
892 
893     nb = 1;
894 
895     if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
896         nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
897                        port->in_fd, strerror(errno), errno);
898 
899         return NXT_UNIT_ERROR;
900     }
901 
902     nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
903 
904     return nxt_py_asgi_add_reader(ctx, port);
905 }
906 
907 
908 static int
909 nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
910 {
911     int                     rc;
912     PyObject                *res, *fd, *py_ctx, *py_port;
913     nxt_py_asgi_ctx_data_t  *ctx_data;
914 
915     nxt_unit_debug(ctx, "asgi_add_reader %d %p %p", port->in_fd, ctx, port);
916 
917     ctx_data = ctx->data;
918 
919     fd = PyLong_FromLong(port->in_fd);
920     if (nxt_slow_path(fd == NULL)) {
921         nxt_unit_alert(ctx, "Python failed to create fd");
922         nxt_python_print_exception();
923 
924         return NXT_UNIT_ERROR;
925     }
926 
927     rc = NXT_UNIT_ERROR;
928 
929     py_ctx = PyLong_FromVoidPtr(ctx);
930     if (nxt_slow_path(py_ctx == NULL)) {
931         nxt_unit_alert(ctx, "Python failed to create py_ctx");
932         nxt_python_print_exception();
933 
934         goto clean_fd;
935     }
936 
937     py_port = PyLong_FromVoidPtr(port);
938     if (nxt_slow_path(py_port == NULL)) {
939         nxt_unit_alert(ctx, "Python failed to create py_port");
940         nxt_python_print_exception();
941 
942         goto clean_py_ctx;
943     }
944 
945     res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
946                                        fd, nxt_py_port_read,
947                                        py_ctx, py_port, NULL);
948     if (nxt_slow_path(res == NULL)) {
949         nxt_unit_alert(ctx, "Python failed to add_reader");
950         nxt_python_print_exception();
951 
952     } else {
953         Py_DECREF(res);
954 
955         rc = NXT_UNIT_OK;
956     }
957 
958     Py_DECREF(py_port);
959 
960 clean_py_ctx:
961 
962     Py_DECREF(py_ctx);
963 
964 clean_fd:
965 
966     Py_DECREF(fd);
967 
968     return rc;
969 }
970 
971 
972 static void
973 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx,
974     nxt_unit_port_t *port)
975 {
976     if (port->in_fd == -1 || ctx == NULL) {
977         return;
978     }
979 
980     nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
981 
982     nxt_py_asgi_remove_reader(ctx, port);
983 }
984 
985 
986 static void
987 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
988 {
989     PyObject                *res, *p;
990     nxt_py_asgi_ctx_data_t  *ctx_data;
991 
992     nxt_unit_debug(ctx, "asgi_quit %p", ctx);
993 
994     ctx_data = ctx->data;
995 
996     p = PyLong_FromLong(0);
997     if (nxt_slow_path(p == NULL)) {
998         nxt_unit_alert(NULL, "Python failed to create Long");
999         nxt_python_print_exception();
1000 
1001     } else {
1002         res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result,
1003                                            p, NULL);
1004         if (nxt_slow_path(res == NULL)) {
1005             nxt_unit_alert(ctx, "Python failed to set_result");
1006             nxt_python_print_exception();
1007 
1008         } else {
1009             Py_DECREF(res);
1010         }
1011 
1012         Py_DECREF(p);
1013     }
1014 }
1015 
1016 
1017 static void
1018 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
1019 {
1020     int                     rc;
1021     nxt_queue_link_t        *lnk;
1022     nxt_py_asgi_ctx_data_t  *ctx_data;
1023 
1024     ctx_data = ctx->data;
1025 
1026     while (!nxt_queue_is_empty(&ctx_data->drain_queue)) {
1027         lnk = nxt_queue_first(&ctx_data->drain_queue);
1028 
1029         rc = nxt_py_asgi_http_drain(lnk);
1030         if (rc == NXT_UNIT_AGAIN) {
1031             return;
1032         }
1033 
1034         nxt_queue_remove(lnk);
1035     }
1036 }
1037 
1038 
1039 static PyObject *
1040 nxt_py_asgi_port_read(PyObject *self, PyObject *args)
1041 {
1042     int                     rc;
1043     PyObject                *arg0, *arg1, *res;
1044     Py_ssize_t              n;
1045     nxt_unit_ctx_t          *ctx;
1046     nxt_unit_port_t         *port;
1047     nxt_py_asgi_ctx_data_t  *ctx_data;
1048 
1049     n = PyTuple_GET_SIZE(args);
1050 
1051     if (n != 2) {
1052         nxt_unit_alert(NULL,
1053                        "nxt_py_asgi_port_read: invalid number of arguments %d",
1054                        (int) n);
1055 
1056         return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
1057     }
1058 
1059     arg0 = PyTuple_GET_ITEM(args, 0);
1060     if (nxt_slow_path(arg0 == NULL || PyLong_Check(arg0) == 0)) {
1061         return PyErr_Format(PyExc_TypeError,
1062                             "the first argument is not a long");
1063     }
1064 
1065     ctx = PyLong_AsVoidPtr(arg0);
1066 
1067     arg1 = PyTuple_GET_ITEM(args, 1);
1068     if (nxt_slow_path(arg1 == NULL || PyLong_Check(arg1) == 0)) {
1069         return PyErr_Format(PyExc_TypeError,
1070                             "the second argument is not a long");
1071     }
1072 
1073     port = PyLong_AsVoidPtr(arg1);
1074 
1075     rc = nxt_unit_process_port_msg(ctx, port);
1076 
1077     nxt_unit_debug(ctx, "asgi_port_read(%p,%p): %d", ctx, port, rc);
1078 
1079     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
1080         return PyErr_Format(PyExc_RuntimeError,
1081                             "error processing port %d message", port->id.id);
1082     }
1083 
1084     if (rc == NXT_UNIT_OK) {
1085         ctx_data = ctx->data;
1086 
1087         res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon,
1088                                            nxt_py_port_read,
1089                                            arg0, arg1, NULL);
1090         if (nxt_slow_path(res == NULL)) {
1091             nxt_unit_alert(ctx, "Python failed to call 'loop.call_soon'");
1092             nxt_python_print_exception();
1093         }
1094 
1095         Py_XDECREF(res);
1096     }
1097 
1098     Py_RETURN_NONE;
1099 }
1100 
1101 
1102 PyObject *
1103 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,
1104     void *data)
1105 {
1106     int       i;
1107     PyObject  *iter, *header, *h_iter, *name, *val, *res;
1108 
1109     iter = PyObject_GetIter(headers);
1110     if (nxt_slow_path(iter == NULL)) {
1111         return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable");
1112     }
1113 
1114     for (i = 0; /* void */; i++) {
1115         header = PyIter_Next(iter);
1116         if (header == NULL) {
1117             break;
1118         }
1119 
1120         h_iter = PyObject_GetIter(header);
1121         if (nxt_slow_path(h_iter == NULL)) {
1122             Py_DECREF(header);
1123             Py_DECREF(iter);
1124 
1125             return PyErr_Format(PyExc_TypeError,
1126                                 "'headers' item #%d is not an iterable", i);
1127         }
1128 
1129         name = PyIter_Next(h_iter);
1130         if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) {
1131             Py_XDECREF(name);
1132             Py_DECREF(h_iter);
1133             Py_DECREF(header);
1134             Py_DECREF(iter);
1135 
1136             return PyErr_Format(PyExc_TypeError,
1137                           "'headers' item #%d 'name' is not a byte string", i);
1138         }
1139 
1140         val = PyIter_Next(h_iter);
1141         if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) {
1142             Py_XDECREF(val);
1143             Py_DECREF(h_iter);
1144             Py_DECREF(header);
1145             Py_DECREF(iter);
1146 
1147             return PyErr_Format(PyExc_TypeError,
1148                          "'headers' item #%d 'value' is not a byte string", i);
1149         }
1150 
1151         res = cb(data, i, name, val);
1152 
1153         Py_DECREF(name);
1154         Py_DECREF(val);
1155         Py_DECREF(h_iter);
1156         Py_DECREF(header);
1157 
1158         if (nxt_slow_path(res == NULL)) {
1159             Py_DECREF(iter);
1160 
1161             return NULL;
1162         }
1163 
1164         Py_DECREF(res);
1165     }
1166 
1167     Py_DECREF(iter);
1168 
1169     Py_RETURN_NONE;
1170 }
1171 
1172 
1173 PyObject *
1174 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val)
1175 {
1176     nxt_py_asgi_calc_size_ctx_t  *ctx;
1177 
1178     ctx = data;
1179 
1180     ctx->fields_count++;
1181     ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val);
1182 
1183     Py_RETURN_NONE;
1184 }
1185 
1186 
1187 PyObject *
1188 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
1189 {
1190     int                          rc;
1191     char                         *name_str, *val_str;
1192     uint32_t                     name_len, val_len;
1193     nxt_off_t                    content_length;
1194     nxt_unit_request_info_t      *req;
1195     nxt_py_asgi_add_field_ctx_t  *ctx;
1196 
1197     name_str = PyBytes_AS_STRING(name);
1198     name_len = PyBytes_GET_SIZE(name);
1199 
1200     val_str = PyBytes_AS_STRING(val);
1201     val_len = PyBytes_GET_SIZE(val);
1202 
1203     ctx = data;
1204     req = ctx->req;
1205 
1206     rc = nxt_unit_response_add_field(req, name_str, name_len,
1207                                      val_str, val_len);
1208     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1209         return PyErr_Format(PyExc_RuntimeError,
1210                             "failed to add header #%d", i);
1211     }
1212 
1213     if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) {
1214         content_length = nxt_off_t_parse((u_char *) val_str, val_len);
1215         if (nxt_slow_path(content_length < 0)) {
1216             nxt_unit_req_error(req, "failed to parse Content-Length "
1217                                "value %.*s", (int) val_len, val_str);
1218 
1219             return PyErr_Format(PyExc_ValueError,
1220                                 "Failed to parse Content-Length: '%.*s'",
1221                                 (int) val_len, val_str);
1222         }
1223 
1224         ctx->content_length = content_length;
1225     }
1226 
1227     Py_RETURN_NONE;
1228 }
1229 
1230 
1231 PyObject *
1232 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
1233     nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result)
1234 {
1235     PyObject  *set_result, *res;
1236 
1237     if (nxt_slow_path(result == NULL)) {
1238         Py_DECREF(future);
1239 
1240         return NULL;
1241     }
1242 
1243     set_result = PyObject_GetAttrString(future, "set_result");
1244     if (nxt_slow_path(set_result == NULL)) {
1245         nxt_unit_req_alert(req, "failed to get 'set_result' for future");
1246 
1247         Py_CLEAR(future);
1248 
1249         goto cleanup_result;
1250     }
1251 
1252     if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
1253         nxt_unit_req_alert(req, "'future.set_result' is not a callable");
1254 
1255         Py_CLEAR(future);
1256 
1257         goto cleanup;
1258     }
1259 
1260     res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result,
1261                                        result, NULL);
1262     if (nxt_slow_path(res == NULL)) {
1263         nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
1264         nxt_python_print_exception();
1265 
1266         Py_CLEAR(future);
1267     }
1268 
1269     Py_XDECREF(res);
1270 
1271 cleanup:
1272 
1273     Py_DECREF(set_result);
1274 
1275 cleanup_result:
1276 
1277     Py_DECREF(result);
1278 
1279     return future;
1280 }
1281 
1282 
1283 PyObject *
1284 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)
1285 {
1286     PyObject  *msg;
1287 
1288     msg = PyDict_New();
1289     if (nxt_slow_path(msg == NULL)) {
1290         nxt_unit_req_alert(req, "Python failed to create message dict");
1291         nxt_python_print_exception();
1292 
1293         return PyErr_Format(PyExc_RuntimeError,
1294                             "failed to create message dict");
1295     }
1296 
1297     if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) {
1298         nxt_unit_req_alert(req, "Python failed to set 'msg.type' item");
1299 
1300         Py_DECREF(msg);
1301 
1302         return PyErr_Format(PyExc_RuntimeError,
1303                             "failed to set 'msg.type' item");
1304     }
1305 
1306     return msg;
1307 }
1308 
1309 
1310 PyObject *
1311 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
1312     PyObject *spec_version)
1313 {
1314     PyObject  *scope, *asgi;
1315 
1316     scope = PyDict_New();
1317     if (nxt_slow_path(scope == NULL)) {
1318         nxt_unit_req_alert(req, "Python failed to create 'scope' dict");
1319         nxt_python_print_exception();
1320 
1321         return PyErr_Format(PyExc_RuntimeError,
1322                             "failed to create 'scope' dict");
1323     }
1324 
1325     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) {
1326         nxt_unit_req_alert(req, "Python failed to set 'scope.type' item");
1327 
1328         Py_DECREF(scope);
1329 
1330         return PyErr_Format(PyExc_RuntimeError,
1331                             "failed to set 'scope.type' item");
1332     }
1333 
1334     asgi = PyDict_New();
1335     if (nxt_slow_path(asgi == NULL)) {
1336         nxt_unit_req_alert(req, "Python failed to create 'asgi' dict");
1337         nxt_python_print_exception();
1338 
1339         Py_DECREF(scope);
1340 
1341         return PyErr_Format(PyExc_RuntimeError,
1342                             "failed to create 'asgi' dict");
1343     }
1344 
1345     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) {
1346         nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item");
1347 
1348         Py_DECREF(asgi);
1349         Py_DECREF(scope);
1350 
1351         return PyErr_Format(PyExc_RuntimeError,
1352                             "failed to set 'scope.asgi' item");
1353     }
1354 
1355     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str,
1356                                      nxt_py_3_0_str) == -1))
1357     {
1358         nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item");
1359 
1360         Py_DECREF(asgi);
1361         Py_DECREF(scope);
1362 
1363         return PyErr_Format(PyExc_RuntimeError,
1364                             "failed to set 'asgi.version' item");
1365     }
1366 
1367     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str,
1368                                      spec_version) == -1))
1369     {
1370         nxt_unit_req_alert(req,
1371                            "Python failed to set 'asgi.spec_version' item");
1372 
1373         Py_DECREF(asgi);
1374         Py_DECREF(scope);
1375 
1376         return PyErr_Format(PyExc_RuntimeError,
1377                             "failed to set 'asgi.spec_version' item");
1378     }
1379 
1380     Py_DECREF(asgi);
1381 
1382     return scope;
1383 }
1384 
1385 
1386 void
1387 nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link)
1388 {
1389     nxt_py_asgi_ctx_data_t  *ctx_data;
1390 
1391     ctx_data = req->ctx->data;
1392 
1393     nxt_queue_insert_tail(&ctx_data->drain_queue, link);
1394 }
1395 
1396 
1397 void
1398 nxt_py_asgi_dealloc(PyObject *self)
1399 {
1400     PyObject_Del(self);
1401 }
1402 
1403 
1404 PyObject *
1405 nxt_py_asgi_await(PyObject *self)
1406 {
1407     Py_INCREF(self);
1408     return self;
1409 }
1410 
1411 
1412 PyObject *
1413 nxt_py_asgi_iter(PyObject *self)
1414 {
1415     Py_INCREF(self);
1416     return self;
1417 }
1418 
1419 
1420 PyObject *
1421 nxt_py_asgi_next(PyObject *self)
1422 {
1423     return NULL;
1424 }
1425 
1426 
1427 static void
1428 nxt_python_asgi_done(void)
1429 {
1430     nxt_py_asgi_str_done();
1431 
1432     Py_XDECREF(nxt_py_port_read);
1433 }
1434 
1435 #else /* !(NXT_HAVE_ASGI) */
1436 
1437 
1438 int
1439 nxt_python_asgi_check(PyObject *obj)
1440 {
1441     return 0;
1442 }
1443 
1444 
1445 int
1446 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
1447 {
1448     nxt_unit_alert(NULL, "ASGI not implemented");
1449     return NXT_UNIT_ERROR;
1450 }
1451 
1452 
1453 #endif /* NXT_HAVE_ASGI */
1454