xref: /unit/src/python/nxt_python_asgi.c (revision 1681:542b5b8c0647)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <nxt_unit_response.h>
15 #include <python/nxt_python_asgi.h>
16 #include <python/nxt_python_asgi_str.h>
17 
18 
19 static int nxt_python_asgi_ctx_data_alloc(void **pdata);
20 static void nxt_python_asgi_ctx_data_free(void *data);
21 static int nxt_python_asgi_startup(void *data);
22 static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
23 
24 static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
25     nxt_unit_port_t *port);
26 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
27 
28 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
29 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
30     uint16_t port);
31 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
32 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
33 
34 static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx);
35 
36 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
37 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
38 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
39 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
40 
41 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
42 static void nxt_python_asgi_done(void);
43 
44 
45 static PyObject           *nxt_py_port_read;
46 static nxt_unit_port_t    *nxt_py_shared_port;
47 
48 static PyMethodDef        nxt_py_port_read_method =
49     {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
50 
51 static nxt_python_proto_t  nxt_py_asgi_proto = {
52     .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc,
53     .ctx_data_free  = nxt_python_asgi_ctx_data_free,
54     .startup        = nxt_python_asgi_startup,
55     .run            = nxt_python_asgi_run,
56     .ready          = nxt_python_asgi_ready,
57     .done           = nxt_python_asgi_done,
58 };
59 
60 #define NXT_UNIT_HASH_WS_PROTOCOL  0xED0A
61 
62 
63 int
64 nxt_python_asgi_check(PyObject *obj)
65 {
66     int           res;
67     PyObject      *call;
68     PyCodeObject  *code;
69 
70     if (PyFunction_Check(obj)) {
71         code = (PyCodeObject *) PyFunction_GET_CODE(obj);
72 
73         return (code->co_flags & CO_COROUTINE) != 0;
74     }
75 
76     if (PyMethod_Check(obj)) {
77         obj = PyMethod_GET_FUNCTION(obj);
78 
79         code = (PyCodeObject *) PyFunction_GET_CODE(obj);
80 
81         return (code->co_flags & CO_COROUTINE) != 0;
82     }
83 
84     call = PyObject_GetAttrString(obj, "__call__");
85 
86     if (call == NULL) {
87         return 0;
88     }
89 
90     if (PyFunction_Check(call)) {
91         code = (PyCodeObject *) PyFunction_GET_CODE(call);
92 
93         res = (code->co_flags & CO_COROUTINE) != 0;
94 
95     } else {
96         if (PyMethod_Check(call)) {
97             obj = PyMethod_GET_FUNCTION(call);
98 
99             code = (PyCodeObject *) PyFunction_GET_CODE(obj);
100 
101             res = (code->co_flags & CO_COROUTINE) != 0;
102 
103         } else {
104             res = 0;
105         }
106     }
107 
108     Py_DECREF(call);
109 
110     return res;
111 }
112 
113 
114 int
115 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
116 {
117     nxt_unit_debug(NULL, "asgi_init");
118 
119     if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) {
120         nxt_unit_alert(NULL, "Python failed to init string objects");
121         return NXT_UNIT_ERROR;
122     }
123 
124     nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
125     if (nxt_slow_path(nxt_py_port_read == NULL)) {
126         nxt_unit_alert(NULL,
127                        "Python failed to initialize the 'port_read' function");
128         return NXT_UNIT_ERROR;
129     }
130 
131     if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) {
132         return NXT_UNIT_ERROR;
133     }
134 
135     if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) {
136         return NXT_UNIT_ERROR;
137     }
138 
139     init->callbacks.request_handler = nxt_py_asgi_request_handler;
140     init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
141     init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
142     init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler;
143     init->callbacks.quit = nxt_py_asgi_quit;
144     init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
145     init->callbacks.add_port = nxt_py_asgi_add_port;
146     init->callbacks.remove_port = nxt_py_asgi_remove_port;
147 
148     *proto = nxt_py_asgi_proto;
149 
150     return NXT_UNIT_OK;
151 }
152 
153 
154 static int
155 nxt_python_asgi_ctx_data_alloc(void **pdata)
156 {
157     uint32_t                i;
158     PyObject                *asyncio, *loop, *new_event_loop, *obj;
159     nxt_py_asgi_ctx_data_t  *ctx_data;
160 
161     ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t));
162     if (nxt_slow_path(ctx_data == NULL)) {
163         nxt_unit_alert(NULL, "Failed to allocate context data");
164         return NXT_UNIT_ERROR;
165     }
166 
167     memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t));
168 
169     nxt_queue_init(&ctx_data->drain_queue);
170 
171     struct {
172         const char  *key;
173         PyObject    **handler;
174 
175     } handlers[] = {
176         { "create_task",        &ctx_data->loop_create_task },
177         { "add_reader",         &ctx_data->loop_add_reader },
178         { "remove_reader",      &ctx_data->loop_remove_reader },
179         { "call_soon",          &ctx_data->loop_call_soon },
180         { "run_until_complete", &ctx_data->loop_run_until_complete },
181         { "create_future",      &ctx_data->loop_create_future },
182     };
183 
184     loop = NULL;
185 
186     asyncio = PyImport_ImportModule("asyncio");
187     if (nxt_slow_path(asyncio == NULL)) {
188         nxt_unit_alert(NULL, "Python failed to import module 'asyncio'");
189         nxt_python_print_exception();
190         goto fail;
191     }
192 
193     new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
194                                           "new_event_loop");
195     if (nxt_slow_path(new_event_loop == NULL)) {
196         nxt_unit_alert(NULL,
197                  "Python failed to get 'new_event_loop' from module 'asyncio'");
198         goto fail;
199     }
200 
201     if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) {
202         nxt_unit_alert(NULL,
203                        "'asyncio.new_event_loop' is not a callable object");
204         goto fail;
205     }
206 
207     loop = PyObject_CallObject(new_event_loop, NULL);
208     if (nxt_slow_path(loop == NULL)) {
209         nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'");
210         goto fail;
211     }
212 
213     for (i = 0; i < nxt_nitems(handlers); i++) {
214         obj = PyObject_GetAttrString(loop, handlers[i].key);
215         if (nxt_slow_path(obj == NULL)) {
216             nxt_unit_alert(NULL, "Python failed to get 'loop.%s'",
217                                  handlers[i].key);
218             goto fail;
219         }
220 
221         *handlers[i].handler = obj;
222 
223         if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
224             nxt_unit_alert(NULL, "'loop.%s' is not a callable object",
225                                  handlers[i].key);
226             goto fail;
227         }
228     }
229 
230     obj = PyObject_CallObject(ctx_data->loop_create_future, NULL);
231     if (nxt_slow_path(obj == NULL)) {
232         nxt_unit_alert(NULL, "Python failed to create Future ");
233         nxt_python_print_exception();
234         goto fail;
235     }
236 
237     ctx_data->quit_future = obj;
238 
239     obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result");
240     if (nxt_slow_path(obj == NULL)) {
241         nxt_unit_alert(NULL, "Python failed to get 'future.set_result'");
242         goto fail;
243     }
244 
245     ctx_data->quit_future_set_result = obj;
246 
247     if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
248         nxt_unit_alert(NULL, "'future.set_result' is not a callable object");
249         goto fail;
250     }
251 
252     Py_DECREF(loop);
253     Py_DECREF(asyncio);
254 
255     *pdata = ctx_data;
256 
257     return NXT_UNIT_OK;
258 
259 fail:
260 
261     nxt_python_asgi_ctx_data_free(ctx_data);
262 
263     Py_XDECREF(loop);
264     Py_XDECREF(asyncio);
265 
266     return NXT_UNIT_ERROR;
267 }
268 
269 
270 static void
271 nxt_python_asgi_ctx_data_free(void *data)
272 {
273     nxt_py_asgi_ctx_data_t  *ctx_data;
274 
275     ctx_data = data;
276 
277     Py_XDECREF(ctx_data->loop_run_until_complete);
278     Py_XDECREF(ctx_data->loop_create_future);
279     Py_XDECREF(ctx_data->loop_create_task);
280     Py_XDECREF(ctx_data->loop_call_soon);
281     Py_XDECREF(ctx_data->loop_add_reader);
282     Py_XDECREF(ctx_data->loop_remove_reader);
283     Py_XDECREF(ctx_data->quit_future);
284     Py_XDECREF(ctx_data->quit_future_set_result);
285 
286     nxt_unit_free(NULL, ctx_data);
287 }
288 
289 
290 static int
291 nxt_python_asgi_startup(void *data)
292 {
293     return nxt_py_asgi_lifespan_startup(data);
294 }
295 
296 
297 static int
298 nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
299 {
300     PyObject                *res;
301     nxt_py_asgi_ctx_data_t  *ctx_data;
302 
303     ctx_data = ctx->data;
304 
305     res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete,
306                                        ctx_data->quit_future, NULL);
307     if (nxt_slow_path(res == NULL)) {
308         nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
309         nxt_python_print_exception();
310 
311         return NXT_UNIT_ERROR;
312     }
313 
314     Py_DECREF(res);
315 
316     nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port);
317     nxt_py_asgi_remove_reader(ctx, ctx_data->port);
318 
319     if (ctx_data->port != NULL) {
320         ctx_data->port->data = NULL;
321         ctx_data->port = NULL;
322     }
323 
324     nxt_py_asgi_lifespan_shutdown(ctx);
325 
326     return NXT_UNIT_OK;
327 }
328 
329 
330 static void
331 nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
332 {
333     PyObject                *res;
334     nxt_py_asgi_ctx_data_t  *ctx_data;
335 
336     if (port == NULL || port->in_fd == -1) {
337         return;
338     }
339 
340     ctx_data = ctx->data;
341 
342     nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port);
343 
344     res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader,
345                                        PyLong_FromLong(port->in_fd), NULL);
346     if (nxt_slow_path(res == NULL)) {
347         nxt_unit_alert(ctx, "Python failed to remove_reader");
348         nxt_python_print_exception();
349 
350         return;
351     }
352 
353     Py_DECREF(res);
354 }
355 
356 
357 static void
358 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
359 {
360     PyObject                *scope, *res, *task, *receive, *send, *done, *asgi;
361     nxt_py_asgi_ctx_data_t  *ctx_data;
362 
363     if (req->request->websocket_handshake) {
364         asgi = nxt_py_asgi_websocket_create(req);
365 
366     } else {
367         asgi = nxt_py_asgi_http_create(req);
368     }
369 
370     if (nxt_slow_path(asgi == NULL)) {
371         nxt_unit_req_alert(req, "Python failed to create asgi object");
372         nxt_unit_request_done(req, NXT_UNIT_ERROR);
373 
374         return;
375     }
376 
377     receive = PyObject_GetAttrString(asgi, "receive");
378     if (nxt_slow_path(receive == NULL)) {
379         nxt_unit_req_alert(req, "Python failed to get 'receive' method");
380         nxt_unit_request_done(req, NXT_UNIT_ERROR);
381 
382         goto release_asgi;
383     }
384 
385     send = PyObject_GetAttrString(asgi, "send");
386     if (nxt_slow_path(receive == NULL)) {
387         nxt_unit_req_alert(req, "Python failed to get 'send' method");
388         nxt_unit_request_done(req, NXT_UNIT_ERROR);
389 
390         goto release_receive;
391     }
392 
393     done = PyObject_GetAttrString(asgi, "_done");
394     if (nxt_slow_path(receive == NULL)) {
395         nxt_unit_req_alert(req, "Python failed to get '_done' method");
396         nxt_unit_request_done(req, NXT_UNIT_ERROR);
397 
398         goto release_send;
399     }
400 
401     scope = nxt_py_asgi_create_http_scope(req);
402     if (nxt_slow_path(scope == NULL)) {
403         nxt_unit_request_done(req, NXT_UNIT_ERROR);
404 
405         goto release_done;
406     }
407 
408     req->data = asgi;
409 
410     res = PyObject_CallFunctionObjArgs(nxt_py_application,
411                                        scope, receive, send, NULL);
412     if (nxt_slow_path(res == NULL)) {
413         nxt_unit_req_error(req, "Python failed to call the application");
414         nxt_python_print_exception();
415         nxt_unit_request_done(req, NXT_UNIT_ERROR);
416 
417         goto release_scope;
418     }
419 
420     if (nxt_slow_path(!PyCoro_CheckExact(res))) {
421         nxt_unit_req_error(req, "Application result type is not a coroutine");
422         nxt_unit_request_done(req, NXT_UNIT_ERROR);
423 
424         Py_DECREF(res);
425 
426         goto release_scope;
427     }
428 
429     ctx_data = req->ctx->data;
430 
431     task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL);
432     if (nxt_slow_path(task == NULL)) {
433         nxt_unit_req_error(req, "Python failed to call the create_task");
434         nxt_python_print_exception();
435         nxt_unit_request_done(req, NXT_UNIT_ERROR);
436 
437         Py_DECREF(res);
438 
439         goto release_scope;
440     }
441 
442     Py_DECREF(res);
443 
444     res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done,
445                                      NULL);
446     if (nxt_slow_path(res == NULL)) {
447         nxt_unit_req_error(req,
448                            "Python failed to call 'task.add_done_callback'");
449         nxt_python_print_exception();
450         nxt_unit_request_done(req, NXT_UNIT_ERROR);
451 
452         goto release_task;
453     }
454 
455     Py_DECREF(res);
456 release_task:
457     Py_DECREF(task);
458 release_scope:
459     Py_DECREF(scope);
460 release_done:
461     Py_DECREF(done);
462 release_send:
463     Py_DECREF(send);
464 release_receive:
465     Py_DECREF(receive);
466 release_asgi:
467     Py_DECREF(asgi);
468 }
469 
470 
471 static PyObject *
472 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
473 {
474     char                *p, *target, *query;
475     uint32_t            target_length, i;
476     PyObject            *scope, *v, *type, *scheme;
477     PyObject            *headers, *header;
478     nxt_unit_field_t    *f;
479     nxt_unit_request_t  *r;
480 
481     static const nxt_str_t  ws_protocol = nxt_string("sec-websocket-protocol");
482 
483 #define SET_ITEM(dict, key, value) \
484     if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value)      \
485                       == -1))                                                  \
486     {                                                                          \
487         nxt_unit_req_alert(req, "Python failed to set '"                       \
488                                 #dict "." #key "' item");                      \
489         goto fail;                                                             \
490     }
491 
492     v = NULL;
493     headers = NULL;
494 
495     r = req->request;
496 
497     if (r->websocket_handshake) {
498         type = nxt_py_websocket_str;
499         scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str;
500 
501     } else {
502         type = nxt_py_http_str;
503         scheme = r->tls ? nxt_py_https_str : nxt_py_http_str;
504     }
505 
506     scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str);
507     if (nxt_slow_path(scope == NULL)) {
508         return NULL;
509     }
510 
511     p = nxt_unit_sptr_get(&r->version);
512     SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str
513                                               : nxt_py_1_0_str)
514     SET_ITEM(scope, scheme, scheme)
515 
516     v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method),
517                                    r->method_length);
518     if (nxt_slow_path(v == NULL)) {
519         nxt_unit_req_alert(req, "Python failed to create 'method' string");
520         goto fail;
521     }
522 
523     SET_ITEM(scope, method, v)
524     Py_DECREF(v);
525 
526     v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length,
527                              "replace");
528     if (nxt_slow_path(v == NULL)) {
529         nxt_unit_req_alert(req, "Python failed to create 'path' string");
530         goto fail;
531     }
532 
533     SET_ITEM(scope, path, v)
534     Py_DECREF(v);
535 
536     target = nxt_unit_sptr_get(&r->target);
537     query = nxt_unit_sptr_get(&r->query);
538 
539     if (r->query.offset != 0) {
540         target_length = query - target - 1;
541 
542     } else {
543         target_length = r->target_length;
544     }
545 
546     v = PyBytes_FromStringAndSize(target, target_length);
547     if (nxt_slow_path(v == NULL)) {
548         nxt_unit_req_alert(req, "Python failed to create 'raw_path' string");
549         goto fail;
550     }
551 
552     SET_ITEM(scope, raw_path, v)
553     Py_DECREF(v);
554 
555     v = PyBytes_FromStringAndSize(query, r->query_length);
556     if (nxt_slow_path(v == NULL)) {
557         nxt_unit_req_alert(req, "Python failed to create 'query' string");
558         goto fail;
559     }
560 
561     SET_ITEM(scope, query_string, v)
562     Py_DECREF(v);
563 
564     v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0);
565     if (nxt_slow_path(v == NULL)) {
566         nxt_unit_req_alert(req, "Python failed to create 'client' pair");
567         goto fail;
568     }
569 
570     SET_ITEM(scope, client, v)
571     Py_DECREF(v);
572 
573     v = nxt_py_asgi_create_address(&r->local, r->local_length, 80);
574     if (nxt_slow_path(v == NULL)) {
575         nxt_unit_req_alert(req, "Python failed to create 'server' pair");
576         goto fail;
577     }
578 
579     SET_ITEM(scope, server, v)
580     Py_DECREF(v);
581 
582     v = NULL;
583 
584     headers = PyTuple_New(r->fields_count);
585     if (nxt_slow_path(headers == NULL)) {
586         nxt_unit_req_alert(req, "Python failed to create 'headers' object");
587         goto fail;
588     }
589 
590     for (i = 0; i < r->fields_count; i++) {
591         f = r->fields + i;
592 
593         header = nxt_py_asgi_create_header(f);
594         if (nxt_slow_path(header == NULL)) {
595             nxt_unit_req_alert(req, "Python failed to create 'header' pair");
596             goto fail;
597         }
598 
599         PyTuple_SET_ITEM(headers, i, header);
600 
601         if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL
602             && f->name_length == ws_protocol.length
603             && f->value_length > 0
604             && r->websocket_handshake)
605         {
606             v = nxt_py_asgi_create_subprotocols(f);
607             if (nxt_slow_path(v == NULL)) {
608                 nxt_unit_req_alert(req, "Failed to create subprotocols");
609                 goto fail;
610             }
611 
612             SET_ITEM(scope, subprotocols, v);
613             Py_DECREF(v);
614         }
615     }
616 
617     SET_ITEM(scope, headers, headers)
618     Py_DECREF(headers);
619 
620     return scope;
621 
622 fail:
623 
624     Py_XDECREF(v);
625     Py_XDECREF(headers);
626     Py_DECREF(scope);
627 
628     return NULL;
629 
630 #undef SET_ITEM
631 }
632 
633 
634 static PyObject *
635 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port)
636 {
637     char      *p, *s;
638     PyObject  *pair, *v;
639 
640     pair = PyTuple_New(2);
641     if (nxt_slow_path(pair == NULL)) {
642         return NULL;
643     }
644 
645     p = nxt_unit_sptr_get(sptr);
646     s = memchr(p, ':', len);
647 
648     v = PyString_FromStringAndSize(p, s == NULL ? len : s - p);
649     if (nxt_slow_path(v == NULL)) {
650         Py_DECREF(pair);
651 
652         return NULL;
653     }
654 
655     PyTuple_SET_ITEM(pair, 0, v);
656 
657     if (s != NULL) {
658         p += len;
659         v = PyLong_FromString(s + 1, &p, 10);
660 
661     } else {
662         v = PyLong_FromLong(port);
663     }
664 
665     if (nxt_slow_path(v == NULL)) {
666         Py_DECREF(pair);
667 
668         return NULL;
669     }
670 
671     PyTuple_SET_ITEM(pair, 1, v);
672 
673     return pair;
674 }
675 
676 
677 static PyObject *
678 nxt_py_asgi_create_header(nxt_unit_field_t *f)
679 {
680     char      c, *name;
681     uint8_t   pos;
682     PyObject  *header, *v;
683 
684     header = PyTuple_New(2);
685     if (nxt_slow_path(header == NULL)) {
686         return NULL;
687     }
688 
689     name = nxt_unit_sptr_get(&f->name);
690 
691     for (pos = 0; pos < f->name_length; pos++) {
692         c = name[pos];
693         if (c >= 'A' && c <= 'Z') {
694             name[pos] = (c | 0x20);
695         }
696     }
697 
698     v = PyBytes_FromStringAndSize(name, f->name_length);
699     if (nxt_slow_path(v == NULL)) {
700         Py_DECREF(header);
701 
702         return NULL;
703     }
704 
705     PyTuple_SET_ITEM(header, 0, v);
706 
707     v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value),
708                                   f->value_length);
709     if (nxt_slow_path(v == NULL)) {
710         Py_DECREF(header);
711 
712         return NULL;
713     }
714 
715     PyTuple_SET_ITEM(header, 1, v);
716 
717     return header;
718 }
719 
720 
721 static PyObject *
722 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f)
723 {
724     char      *v;
725     uint32_t  i, n, start;
726     PyObject  *res, *proto;
727 
728     v = nxt_unit_sptr_get(&f->value);
729     n = 1;
730 
731     for (i = 0; i < f->value_length; i++) {
732         if (v[i] == ',') {
733             n++;
734         }
735     }
736 
737     res = PyTuple_New(n);
738     if (nxt_slow_path(res == NULL)) {
739         return NULL;
740     }
741 
742     n = 0;
743     start = 0;
744 
745     for (i = 0; i < f->value_length; ) {
746         if (v[i] != ',') {
747             i++;
748 
749             continue;
750         }
751 
752         if (i - start > 0) {
753             proto = PyString_FromStringAndSize(v + start, i - start);
754             if (nxt_slow_path(proto == NULL)) {
755                 goto fail;
756             }
757 
758             PyTuple_SET_ITEM(res, n, proto);
759 
760             n++;
761         }
762 
763         do {
764             i++;
765         } while (i < f->value_length && v[i] == ' ');
766 
767         start = i;
768     }
769 
770     if (i - start > 0) {
771         proto = PyString_FromStringAndSize(v + start, i - start);
772         if (nxt_slow_path(proto == NULL)) {
773             goto fail;
774         }
775 
776         PyTuple_SET_ITEM(res, n, proto);
777     }
778 
779     return res;
780 
781 fail:
782 
783     Py_DECREF(res);
784 
785     return NULL;
786 }
787 
788 
789 static int
790 nxt_python_asgi_ready(nxt_unit_ctx_t *ctx)
791 {
792     PyObject                *res;
793     nxt_unit_port_t         *port;
794     nxt_py_asgi_ctx_data_t  *ctx_data;
795 
796     if (nxt_slow_path(nxt_py_shared_port == NULL)) {
797         return NXT_UNIT_ERROR;
798     }
799 
800     port = nxt_py_shared_port;
801 
802     nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port);
803 
804     ctx_data = ctx->data;
805 
806     res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
807                                        PyLong_FromLong(port->in_fd),
808                                        nxt_py_port_read,
809                                        PyLong_FromVoidPtr(ctx),
810                                        PyLong_FromVoidPtr(port), NULL);
811     if (nxt_slow_path(res == NULL)) {
812         nxt_unit_alert(ctx, "Python failed to add_reader");
813         nxt_python_print_exception();
814 
815         return NXT_UNIT_ERROR;
816     }
817 
818     Py_DECREF(res);
819 
820     return NXT_UNIT_OK;
821 }
822 
823 
824 static int
825 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
826 {
827     int                     nb;
828     PyObject                *res;
829     nxt_py_asgi_ctx_data_t  *ctx_data;
830 
831     if (port->in_fd == -1) {
832         return NXT_UNIT_OK;
833     }
834 
835     nb = 1;
836 
837     if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
838         nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
839                        port->in_fd, strerror(errno), errno);
840 
841         return NXT_UNIT_ERROR;
842     }
843 
844     nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
845 
846     if (port->id.id == NXT_UNIT_SHARED_PORT_ID) {
847         nxt_py_shared_port = port;
848 
849         return NXT_UNIT_OK;
850     }
851 
852     ctx_data = ctx->data;
853 
854     ctx_data->port = port;
855     port->data = ctx_data;
856 
857     res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
858                                        PyLong_FromLong(port->in_fd),
859                                        nxt_py_port_read,
860                                        PyLong_FromVoidPtr(ctx),
861                                        PyLong_FromVoidPtr(port), NULL);
862     if (nxt_slow_path(res == NULL)) {
863         nxt_unit_alert(ctx, "Python failed to add_reader");
864         nxt_python_print_exception();
865 
866         return NXT_UNIT_ERROR;
867     }
868 
869     Py_DECREF(res);
870 
871     return NXT_UNIT_OK;
872 }
873 
874 
875 static void
876 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
877 {
878     if (port->in_fd == -1) {
879         return;
880     }
881 
882     nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
883 
884     if (nxt_py_shared_port == port) {
885         nxt_py_shared_port = NULL;
886     }
887 }
888 
889 
890 static void
891 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
892 {
893     PyObject                *res;
894     nxt_py_asgi_ctx_data_t  *ctx_data;
895 
896     nxt_unit_debug(ctx, "asgi_quit %p", ctx);
897 
898     ctx_data = ctx->data;
899 
900     if (nxt_py_shared_port != NULL) {
901         res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader,
902                               PyLong_FromLong(nxt_py_shared_port->in_fd), NULL);
903         if (nxt_slow_path(res == NULL)) {
904             nxt_unit_alert(NULL, "Python failed to remove_reader");
905             nxt_python_print_exception();
906 
907         } else {
908             Py_DECREF(res);
909         }
910     }
911 
912     res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result,
913                                        PyLong_FromLong(0), NULL);
914     if (nxt_slow_path(res == NULL)) {
915         nxt_unit_alert(ctx, "Python failed to set_result");
916         nxt_python_print_exception();
917 
918     } else {
919         Py_DECREF(res);
920     }
921 }
922 
923 
924 static void
925 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
926 {
927     int                     rc;
928     nxt_queue_link_t        *lnk;
929     nxt_py_asgi_ctx_data_t  *ctx_data;
930 
931     ctx_data = ctx->data;
932 
933     while (!nxt_queue_is_empty(&ctx_data->drain_queue)) {
934         lnk = nxt_queue_first(&ctx_data->drain_queue);
935 
936         rc = nxt_py_asgi_http_drain(lnk);
937         if (rc == NXT_UNIT_AGAIN) {
938             return;
939         }
940 
941         nxt_queue_remove(lnk);
942     }
943 }
944 
945 
946 static PyObject *
947 nxt_py_asgi_port_read(PyObject *self, PyObject *args)
948 {
949     int              rc;
950     PyObject         *arg;
951     Py_ssize_t       n;
952     nxt_unit_ctx_t   *ctx;
953     nxt_unit_port_t  *port;
954 
955     n = PyTuple_GET_SIZE(args);
956 
957     if (n != 2) {
958         nxt_unit_alert(NULL,
959                        "nxt_py_asgi_port_read: invalid number of arguments %d",
960                        (int) n);
961 
962         return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
963     }
964 
965     arg = PyTuple_GET_ITEM(args, 0);
966     if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
967         return PyErr_Format(PyExc_TypeError,
968                             "the first argument is not a long");
969     }
970 
971     ctx = PyLong_AsVoidPtr(arg);
972 
973     arg = PyTuple_GET_ITEM(args, 1);
974     if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
975         return PyErr_Format(PyExc_TypeError,
976                             "the second argument is not a long");
977     }
978 
979     port = PyLong_AsVoidPtr(arg);
980 
981     nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port);
982 
983     rc = nxt_unit_process_port_msg(ctx, port);
984 
985     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
986         return PyErr_Format(PyExc_RuntimeError,
987                             "error processing port %d message", port->id.id);
988     }
989 
990     Py_RETURN_NONE;
991 }
992 
993 
994 PyObject *
995 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,
996     void *data)
997 {
998     int       i;
999     PyObject  *iter, *header, *h_iter, *name, *val, *res;
1000 
1001     iter = PyObject_GetIter(headers);
1002     if (nxt_slow_path(iter == NULL)) {
1003         return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable");
1004     }
1005 
1006     for (i = 0; /* void */; i++) {
1007         header = PyIter_Next(iter);
1008         if (header == NULL) {
1009             break;
1010         }
1011 
1012         h_iter = PyObject_GetIter(header);
1013         if (nxt_slow_path(h_iter == NULL)) {
1014             Py_DECREF(header);
1015             Py_DECREF(iter);
1016 
1017             return PyErr_Format(PyExc_TypeError,
1018                                 "'headers' item #%d is not an iterable", i);
1019         }
1020 
1021         name = PyIter_Next(h_iter);
1022         if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) {
1023             Py_XDECREF(name);
1024             Py_DECREF(h_iter);
1025             Py_DECREF(header);
1026             Py_DECREF(iter);
1027 
1028             return PyErr_Format(PyExc_TypeError,
1029                           "'headers' item #%d 'name' is not a byte string", i);
1030         }
1031 
1032         val = PyIter_Next(h_iter);
1033         if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) {
1034             Py_XDECREF(val);
1035             Py_DECREF(h_iter);
1036             Py_DECREF(header);
1037             Py_DECREF(iter);
1038 
1039             return PyErr_Format(PyExc_TypeError,
1040                          "'headers' item #%d 'value' is not a byte string", i);
1041         }
1042 
1043         res = cb(data, i, name, val);
1044 
1045         Py_DECREF(name);
1046         Py_DECREF(val);
1047         Py_DECREF(h_iter);
1048         Py_DECREF(header);
1049 
1050         if (nxt_slow_path(res == NULL)) {
1051             Py_DECREF(iter);
1052 
1053             return NULL;
1054         }
1055 
1056         Py_DECREF(res);
1057     }
1058 
1059     Py_DECREF(iter);
1060 
1061     Py_RETURN_NONE;
1062 }
1063 
1064 
1065 PyObject *
1066 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val)
1067 {
1068     nxt_py_asgi_calc_size_ctx_t  *ctx;
1069 
1070     ctx = data;
1071 
1072     ctx->fields_count++;
1073     ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val);
1074 
1075     Py_RETURN_NONE;
1076 }
1077 
1078 
1079 PyObject *
1080 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
1081 {
1082     int                          rc;
1083     char                         *name_str, *val_str;
1084     uint32_t                     name_len, val_len;
1085     nxt_off_t                    content_length;
1086     nxt_unit_request_info_t      *req;
1087     nxt_py_asgi_add_field_ctx_t  *ctx;
1088 
1089     name_str = PyBytes_AS_STRING(name);
1090     name_len = PyBytes_GET_SIZE(name);
1091 
1092     val_str = PyBytes_AS_STRING(val);
1093     val_len = PyBytes_GET_SIZE(val);
1094 
1095     ctx = data;
1096     req = ctx->req;
1097 
1098     rc = nxt_unit_response_add_field(req, name_str, name_len,
1099                                      val_str, val_len);
1100     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1101         return PyErr_Format(PyExc_RuntimeError,
1102                             "failed to add header #%d", i);
1103     }
1104 
1105     if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) {
1106         content_length = nxt_off_t_parse((u_char *) val_str, val_len);
1107         if (nxt_slow_path(content_length < 0)) {
1108             nxt_unit_req_error(req, "failed to parse Content-Length "
1109                                "value %.*s", (int) val_len, val_str);
1110 
1111             return PyErr_Format(PyExc_ValueError,
1112                                 "Failed to parse Content-Length: '%.*s'",
1113                                 (int) val_len, val_str);
1114         }
1115 
1116         ctx->content_length = content_length;
1117     }
1118 
1119     Py_RETURN_NONE;
1120 }
1121 
1122 
1123 PyObject *
1124 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
1125     nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result)
1126 {
1127     PyObject  *set_result, *res;
1128 
1129     if (nxt_slow_path(result == NULL)) {
1130         Py_DECREF(future);
1131 
1132         return NULL;
1133     }
1134 
1135     set_result = PyObject_GetAttrString(future, "set_result");
1136     if (nxt_slow_path(set_result == NULL)) {
1137         nxt_unit_req_alert(req, "failed to get 'set_result' for future");
1138 
1139         Py_CLEAR(future);
1140 
1141         goto cleanup_result;
1142     }
1143 
1144     if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
1145         nxt_unit_req_alert(req, "'future.set_result' is not a callable");
1146 
1147         Py_CLEAR(future);
1148 
1149         goto cleanup;
1150     }
1151 
1152     res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result,
1153                                        result, NULL);
1154     if (nxt_slow_path(res == NULL)) {
1155         nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
1156         nxt_python_print_exception();
1157 
1158         Py_CLEAR(future);
1159     }
1160 
1161     Py_XDECREF(res);
1162 
1163 cleanup:
1164 
1165     Py_DECREF(set_result);
1166 
1167 cleanup_result:
1168 
1169     Py_DECREF(result);
1170 
1171     return future;
1172 }
1173 
1174 
1175 PyObject *
1176 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)
1177 {
1178     PyObject  *msg;
1179 
1180     msg = PyDict_New();
1181     if (nxt_slow_path(msg == NULL)) {
1182         nxt_unit_req_alert(req, "Python failed to create message dict");
1183         nxt_python_print_exception();
1184 
1185         return PyErr_Format(PyExc_RuntimeError,
1186                             "failed to create message dict");
1187     }
1188 
1189     if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) {
1190         nxt_unit_req_alert(req, "Python failed to set 'msg.type' item");
1191 
1192         Py_DECREF(msg);
1193 
1194         return PyErr_Format(PyExc_RuntimeError,
1195                             "failed to set 'msg.type' item");
1196     }
1197 
1198     return msg;
1199 }
1200 
1201 
1202 PyObject *
1203 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
1204     PyObject *spec_version)
1205 {
1206     PyObject  *scope, *asgi;
1207 
1208     scope = PyDict_New();
1209     if (nxt_slow_path(scope == NULL)) {
1210         nxt_unit_req_alert(req, "Python failed to create 'scope' dict");
1211         nxt_python_print_exception();
1212 
1213         return PyErr_Format(PyExc_RuntimeError,
1214                             "failed to create 'scope' dict");
1215     }
1216 
1217     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) {
1218         nxt_unit_req_alert(req, "Python failed to set 'scope.type' item");
1219 
1220         Py_DECREF(scope);
1221 
1222         return PyErr_Format(PyExc_RuntimeError,
1223                             "failed to set 'scope.type' item");
1224     }
1225 
1226     asgi = PyDict_New();
1227     if (nxt_slow_path(asgi == NULL)) {
1228         nxt_unit_req_alert(req, "Python failed to create 'asgi' dict");
1229         nxt_python_print_exception();
1230 
1231         Py_DECREF(scope);
1232 
1233         return PyErr_Format(PyExc_RuntimeError,
1234                             "failed to create 'asgi' dict");
1235     }
1236 
1237     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) {
1238         nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item");
1239 
1240         Py_DECREF(asgi);
1241         Py_DECREF(scope);
1242 
1243         return PyErr_Format(PyExc_RuntimeError,
1244                             "failed to set 'scope.asgi' item");
1245     }
1246 
1247     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str,
1248                                      nxt_py_3_0_str) == -1))
1249     {
1250         nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item");
1251 
1252         Py_DECREF(asgi);
1253         Py_DECREF(scope);
1254 
1255         return PyErr_Format(PyExc_RuntimeError,
1256                             "failed to set 'asgi.version' item");
1257     }
1258 
1259     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str,
1260                                      spec_version) == -1))
1261     {
1262         nxt_unit_req_alert(req,
1263                            "Python failed to set 'asgi.spec_version' item");
1264 
1265         Py_DECREF(asgi);
1266         Py_DECREF(scope);
1267 
1268         return PyErr_Format(PyExc_RuntimeError,
1269                             "failed to set 'asgi.spec_version' item");
1270     }
1271 
1272     Py_DECREF(asgi);
1273 
1274     return scope;
1275 }
1276 
1277 
1278 void
1279 nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link)
1280 {
1281     nxt_py_asgi_ctx_data_t  *ctx_data;
1282 
1283     ctx_data = req->ctx->data;
1284 
1285     nxt_queue_insert_tail(&ctx_data->drain_queue, link);
1286 }
1287 
1288 
1289 void
1290 nxt_py_asgi_dealloc(PyObject *self)
1291 {
1292     PyObject_Del(self);
1293 }
1294 
1295 
1296 PyObject *
1297 nxt_py_asgi_await(PyObject *self)
1298 {
1299     Py_INCREF(self);
1300     return self;
1301 }
1302 
1303 
1304 PyObject *
1305 nxt_py_asgi_iter(PyObject *self)
1306 {
1307     Py_INCREF(self);
1308     return self;
1309 }
1310 
1311 
1312 PyObject *
1313 nxt_py_asgi_next(PyObject *self)
1314 {
1315     return NULL;
1316 }
1317 
1318 
1319 static void
1320 nxt_python_asgi_done(void)
1321 {
1322     nxt_py_asgi_str_done();
1323 
1324     Py_XDECREF(nxt_py_port_read);
1325 }
1326 
1327 #else /* !(NXT_HAVE_ASGI) */
1328 
1329 
1330 int
1331 nxt_python_asgi_check(PyObject *obj)
1332 {
1333     return 0;
1334 }
1335 
1336 
1337 int
1338 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
1339 {
1340     nxt_unit_alert(NULL, "ASGI not implemented");
1341     return NXT_UNIT_ERROR;
1342 }
1343 
1344 
1345 #endif /* NXT_HAVE_ASGI */
1346