xref: /unit/src/python/nxt_python_asgi.c (revision 1682:2de7799a8749)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <nxt_unit_response.h>
15 #include <python/nxt_python_asgi.h>
16 #include <python/nxt_python_asgi_str.h>
17 
18 
19 static int nxt_python_asgi_ctx_data_alloc(void **pdata);
20 static void nxt_python_asgi_ctx_data_free(void *data);
21 static int nxt_python_asgi_startup(void *data);
22 static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
23 
24 static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
25     nxt_unit_port_t *port);
26 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
27 
28 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
29 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
30     uint16_t port);
31 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
32 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
33 
34 static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx);
35 
36 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
37 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
38 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
39 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
40 
41 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
42 static void nxt_python_asgi_done(void);
43 
44 
45 static PyObject           *nxt_py_port_read;
46 static nxt_unit_port_t    *nxt_py_shared_port;
47 
48 static PyMethodDef        nxt_py_port_read_method =
49     {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
50 
51 static nxt_python_proto_t  nxt_py_asgi_proto = {
52     .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc,
53     .ctx_data_free  = nxt_python_asgi_ctx_data_free,
54     .startup        = nxt_python_asgi_startup,
55     .run            = nxt_python_asgi_run,
56     .ready          = nxt_python_asgi_ready,
57     .done           = nxt_python_asgi_done,
58 };
59 
60 #define NXT_UNIT_HASH_WS_PROTOCOL  0xED0A
61 
62 
63 int
64 nxt_python_asgi_check(PyObject *obj)
65 {
66     int           res;
67     PyObject      *call;
68     PyCodeObject  *code;
69 
70     if (PyFunction_Check(obj)) {
71         code = (PyCodeObject *) PyFunction_GET_CODE(obj);
72 
73         return (code->co_flags & CO_COROUTINE) != 0;
74     }
75 
76     if (PyMethod_Check(obj)) {
77         obj = PyMethod_GET_FUNCTION(obj);
78 
79         code = (PyCodeObject *) PyFunction_GET_CODE(obj);
80 
81         return (code->co_flags & CO_COROUTINE) != 0;
82     }
83 
84     call = PyObject_GetAttrString(obj, "__call__");
85 
86     if (call == NULL) {
87         return 0;
88     }
89 
90     if (PyFunction_Check(call)) {
91         code = (PyCodeObject *) PyFunction_GET_CODE(call);
92 
93         res = (code->co_flags & CO_COROUTINE) != 0;
94 
95     } else {
96         if (PyMethod_Check(call)) {
97             obj = PyMethod_GET_FUNCTION(call);
98 
99             code = (PyCodeObject *) PyFunction_GET_CODE(obj);
100 
101             res = (code->co_flags & CO_COROUTINE) != 0;
102 
103         } else {
104             res = 0;
105         }
106     }
107 
108     Py_DECREF(call);
109 
110     return res;
111 }
112 
113 
114 int
115 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
116 {
117     nxt_unit_debug(NULL, "asgi_init");
118 
119     if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) {
120         nxt_unit_alert(NULL, "Python failed to init string objects");
121         return NXT_UNIT_ERROR;
122     }
123 
124     nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
125     if (nxt_slow_path(nxt_py_port_read == NULL)) {
126         nxt_unit_alert(NULL,
127                        "Python failed to initialize the 'port_read' function");
128         return NXT_UNIT_ERROR;
129     }
130 
131     if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) {
132         return NXT_UNIT_ERROR;
133     }
134 
135     if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) {
136         return NXT_UNIT_ERROR;
137     }
138 
139     init->callbacks.request_handler = nxt_py_asgi_request_handler;
140     init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
141     init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
142     init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler;
143     init->callbacks.quit = nxt_py_asgi_quit;
144     init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
145     init->callbacks.add_port = nxt_py_asgi_add_port;
146     init->callbacks.remove_port = nxt_py_asgi_remove_port;
147 
148     *proto = nxt_py_asgi_proto;
149 
150     return NXT_UNIT_OK;
151 }
152 
153 
154 static int
155 nxt_python_asgi_ctx_data_alloc(void **pdata)
156 {
157     uint32_t                i;
158     PyObject                *asyncio, *loop, *new_event_loop, *obj;
159     nxt_py_asgi_ctx_data_t  *ctx_data;
160 
161     ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t));
162     if (nxt_slow_path(ctx_data == NULL)) {
163         nxt_unit_alert(NULL, "Failed to allocate context data");
164         return NXT_UNIT_ERROR;
165     }
166 
167     memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t));
168 
169     nxt_queue_init(&ctx_data->drain_queue);
170 
171     struct {
172         const char  *key;
173         PyObject    **handler;
174 
175     } handlers[] = {
176         { "create_task",        &ctx_data->loop_create_task },
177         { "add_reader",         &ctx_data->loop_add_reader },
178         { "remove_reader",      &ctx_data->loop_remove_reader },
179         { "call_soon",          &ctx_data->loop_call_soon },
180         { "run_until_complete", &ctx_data->loop_run_until_complete },
181         { "create_future",      &ctx_data->loop_create_future },
182     };
183 
184     loop = NULL;
185 
186     asyncio = PyImport_ImportModule("asyncio");
187     if (nxt_slow_path(asyncio == NULL)) {
188         nxt_unit_alert(NULL, "Python failed to import module 'asyncio'");
189         nxt_python_print_exception();
190         goto fail;
191     }
192 
193     new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
194                                           "new_event_loop");
195     if (nxt_slow_path(new_event_loop == NULL)) {
196         nxt_unit_alert(NULL,
197                  "Python failed to get 'new_event_loop' from module 'asyncio'");
198         goto fail;
199     }
200 
201     if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) {
202         nxt_unit_alert(NULL,
203                        "'asyncio.new_event_loop' is not a callable object");
204         goto fail;
205     }
206 
207     loop = PyObject_CallObject(new_event_loop, NULL);
208     if (nxt_slow_path(loop == NULL)) {
209         nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'");
210         goto fail;
211     }
212 
213     for (i = 0; i < nxt_nitems(handlers); i++) {
214         obj = PyObject_GetAttrString(loop, handlers[i].key);
215         if (nxt_slow_path(obj == NULL)) {
216             nxt_unit_alert(NULL, "Python failed to get 'loop.%s'",
217                                  handlers[i].key);
218             goto fail;
219         }
220 
221         *handlers[i].handler = obj;
222 
223         if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
224             nxt_unit_alert(NULL, "'loop.%s' is not a callable object",
225                                  handlers[i].key);
226             goto fail;
227         }
228     }
229 
230     obj = PyObject_CallObject(ctx_data->loop_create_future, NULL);
231     if (nxt_slow_path(obj == NULL)) {
232         nxt_unit_alert(NULL, "Python failed to create Future ");
233         nxt_python_print_exception();
234         goto fail;
235     }
236 
237     ctx_data->quit_future = obj;
238 
239     obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result");
240     if (nxt_slow_path(obj == NULL)) {
241         nxt_unit_alert(NULL, "Python failed to get 'future.set_result'");
242         goto fail;
243     }
244 
245     ctx_data->quit_future_set_result = obj;
246 
247     if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
248         nxt_unit_alert(NULL, "'future.set_result' is not a callable object");
249         goto fail;
250     }
251 
252     Py_DECREF(loop);
253     Py_DECREF(asyncio);
254 
255     *pdata = ctx_data;
256 
257     return NXT_UNIT_OK;
258 
259 fail:
260 
261     nxt_python_asgi_ctx_data_free(ctx_data);
262 
263     Py_XDECREF(loop);
264     Py_XDECREF(asyncio);
265 
266     return NXT_UNIT_ERROR;
267 }
268 
269 
270 static void
271 nxt_python_asgi_ctx_data_free(void *data)
272 {
273     nxt_py_asgi_ctx_data_t  *ctx_data;
274 
275     ctx_data = data;
276 
277     Py_XDECREF(ctx_data->loop_run_until_complete);
278     Py_XDECREF(ctx_data->loop_create_future);
279     Py_XDECREF(ctx_data->loop_create_task);
280     Py_XDECREF(ctx_data->loop_call_soon);
281     Py_XDECREF(ctx_data->loop_add_reader);
282     Py_XDECREF(ctx_data->loop_remove_reader);
283     Py_XDECREF(ctx_data->quit_future);
284     Py_XDECREF(ctx_data->quit_future_set_result);
285 
286     nxt_unit_free(NULL, ctx_data);
287 }
288 
289 
290 static int
291 nxt_python_asgi_startup(void *data)
292 {
293     return nxt_py_asgi_lifespan_startup(data);
294 }
295 
296 
297 static int
298 nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
299 {
300     PyObject                *res;
301     nxt_py_asgi_ctx_data_t  *ctx_data;
302 
303     ctx_data = ctx->data;
304 
305     res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete,
306                                        ctx_data->quit_future, NULL);
307     if (nxt_slow_path(res == NULL)) {
308         nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
309         nxt_python_print_exception();
310 
311         return NXT_UNIT_ERROR;
312     }
313 
314     Py_DECREF(res);
315 
316     nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port);
317     nxt_py_asgi_remove_reader(ctx, ctx_data->port);
318 
319     if (ctx_data->port != NULL) {
320         ctx_data->port->data = NULL;
321         ctx_data->port = NULL;
322     }
323 
324     nxt_py_asgi_lifespan_shutdown(ctx);
325 
326     return NXT_UNIT_OK;
327 }
328 
329 
330 static void
331 nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
332 {
333     PyObject                *res, *fd;
334     nxt_py_asgi_ctx_data_t  *ctx_data;
335 
336     if (port == NULL || port->in_fd == -1) {
337         return;
338     }
339 
340     ctx_data = ctx->data;
341 
342     nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port);
343 
344     fd = PyLong_FromLong(port->in_fd);
345     if (nxt_slow_path(fd == NULL)) {
346         nxt_unit_alert(ctx, "Python failed to create Long object");
347         nxt_python_print_exception();
348 
349         return;
350     }
351 
352     res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL);
353     if (nxt_slow_path(res == NULL)) {
354         nxt_unit_alert(ctx, "Python failed to remove_reader");
355         nxt_python_print_exception();
356 
357     } else {
358         Py_DECREF(res);
359     }
360 
361     Py_DECREF(fd);
362 }
363 
364 
365 static void
366 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
367 {
368     PyObject                *scope, *res, *task, *receive, *send, *done, *asgi;
369     nxt_py_asgi_ctx_data_t  *ctx_data;
370 
371     if (req->request->websocket_handshake) {
372         asgi = nxt_py_asgi_websocket_create(req);
373 
374     } else {
375         asgi = nxt_py_asgi_http_create(req);
376     }
377 
378     if (nxt_slow_path(asgi == NULL)) {
379         nxt_unit_req_alert(req, "Python failed to create asgi object");
380         nxt_unit_request_done(req, NXT_UNIT_ERROR);
381 
382         return;
383     }
384 
385     receive = PyObject_GetAttrString(asgi, "receive");
386     if (nxt_slow_path(receive == NULL)) {
387         nxt_unit_req_alert(req, "Python failed to get 'receive' method");
388         nxt_unit_request_done(req, NXT_UNIT_ERROR);
389 
390         goto release_asgi;
391     }
392 
393     send = PyObject_GetAttrString(asgi, "send");
394     if (nxt_slow_path(receive == NULL)) {
395         nxt_unit_req_alert(req, "Python failed to get 'send' method");
396         nxt_unit_request_done(req, NXT_UNIT_ERROR);
397 
398         goto release_receive;
399     }
400 
401     done = PyObject_GetAttrString(asgi, "_done");
402     if (nxt_slow_path(receive == NULL)) {
403         nxt_unit_req_alert(req, "Python failed to get '_done' method");
404         nxt_unit_request_done(req, NXT_UNIT_ERROR);
405 
406         goto release_send;
407     }
408 
409     scope = nxt_py_asgi_create_http_scope(req);
410     if (nxt_slow_path(scope == NULL)) {
411         nxt_unit_request_done(req, NXT_UNIT_ERROR);
412 
413         goto release_done;
414     }
415 
416     req->data = asgi;
417 
418     res = PyObject_CallFunctionObjArgs(nxt_py_application,
419                                        scope, receive, send, NULL);
420     if (nxt_slow_path(res == NULL)) {
421         nxt_unit_req_error(req, "Python failed to call the application");
422         nxt_python_print_exception();
423         nxt_unit_request_done(req, NXT_UNIT_ERROR);
424 
425         goto release_scope;
426     }
427 
428     if (nxt_slow_path(!PyCoro_CheckExact(res))) {
429         nxt_unit_req_error(req, "Application result type is not a coroutine");
430         nxt_unit_request_done(req, NXT_UNIT_ERROR);
431 
432         Py_DECREF(res);
433 
434         goto release_scope;
435     }
436 
437     ctx_data = req->ctx->data;
438 
439     task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL);
440     if (nxt_slow_path(task == NULL)) {
441         nxt_unit_req_error(req, "Python failed to call the create_task");
442         nxt_python_print_exception();
443         nxt_unit_request_done(req, NXT_UNIT_ERROR);
444 
445         Py_DECREF(res);
446 
447         goto release_scope;
448     }
449 
450     Py_DECREF(res);
451 
452     res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done,
453                                      NULL);
454     if (nxt_slow_path(res == NULL)) {
455         nxt_unit_req_error(req,
456                            "Python failed to call 'task.add_done_callback'");
457         nxt_python_print_exception();
458         nxt_unit_request_done(req, NXT_UNIT_ERROR);
459 
460         goto release_task;
461     }
462 
463     Py_DECREF(res);
464 release_task:
465     Py_DECREF(task);
466 release_scope:
467     Py_DECREF(scope);
468 release_done:
469     Py_DECREF(done);
470 release_send:
471     Py_DECREF(send);
472 release_receive:
473     Py_DECREF(receive);
474 release_asgi:
475     Py_DECREF(asgi);
476 }
477 
478 
479 static PyObject *
480 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
481 {
482     char                *p, *target, *query;
483     uint32_t            target_length, i;
484     PyObject            *scope, *v, *type, *scheme;
485     PyObject            *headers, *header;
486     nxt_unit_field_t    *f;
487     nxt_unit_request_t  *r;
488 
489     static const nxt_str_t  ws_protocol = nxt_string("sec-websocket-protocol");
490 
491 #define SET_ITEM(dict, key, value) \
492     if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value)      \
493                       == -1))                                                  \
494     {                                                                          \
495         nxt_unit_req_alert(req, "Python failed to set '"                       \
496                                 #dict "." #key "' item");                      \
497         goto fail;                                                             \
498     }
499 
500     v = NULL;
501     headers = NULL;
502 
503     r = req->request;
504 
505     if (r->websocket_handshake) {
506         type = nxt_py_websocket_str;
507         scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str;
508 
509     } else {
510         type = nxt_py_http_str;
511         scheme = r->tls ? nxt_py_https_str : nxt_py_http_str;
512     }
513 
514     scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str);
515     if (nxt_slow_path(scope == NULL)) {
516         return NULL;
517     }
518 
519     p = nxt_unit_sptr_get(&r->version);
520     SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str
521                                               : nxt_py_1_0_str)
522     SET_ITEM(scope, scheme, scheme)
523 
524     v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method),
525                                    r->method_length);
526     if (nxt_slow_path(v == NULL)) {
527         nxt_unit_req_alert(req, "Python failed to create 'method' string");
528         goto fail;
529     }
530 
531     SET_ITEM(scope, method, v)
532     Py_DECREF(v);
533 
534     v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length,
535                              "replace");
536     if (nxt_slow_path(v == NULL)) {
537         nxt_unit_req_alert(req, "Python failed to create 'path' string");
538         goto fail;
539     }
540 
541     SET_ITEM(scope, path, v)
542     Py_DECREF(v);
543 
544     target = nxt_unit_sptr_get(&r->target);
545     query = nxt_unit_sptr_get(&r->query);
546 
547     if (r->query.offset != 0) {
548         target_length = query - target - 1;
549 
550     } else {
551         target_length = r->target_length;
552     }
553 
554     v = PyBytes_FromStringAndSize(target, target_length);
555     if (nxt_slow_path(v == NULL)) {
556         nxt_unit_req_alert(req, "Python failed to create 'raw_path' string");
557         goto fail;
558     }
559 
560     SET_ITEM(scope, raw_path, v)
561     Py_DECREF(v);
562 
563     v = PyBytes_FromStringAndSize(query, r->query_length);
564     if (nxt_slow_path(v == NULL)) {
565         nxt_unit_req_alert(req, "Python failed to create 'query' string");
566         goto fail;
567     }
568 
569     SET_ITEM(scope, query_string, v)
570     Py_DECREF(v);
571 
572     v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0);
573     if (nxt_slow_path(v == NULL)) {
574         nxt_unit_req_alert(req, "Python failed to create 'client' pair");
575         goto fail;
576     }
577 
578     SET_ITEM(scope, client, v)
579     Py_DECREF(v);
580 
581     v = nxt_py_asgi_create_address(&r->local, r->local_length, 80);
582     if (nxt_slow_path(v == NULL)) {
583         nxt_unit_req_alert(req, "Python failed to create 'server' pair");
584         goto fail;
585     }
586 
587     SET_ITEM(scope, server, v)
588     Py_DECREF(v);
589 
590     v = NULL;
591 
592     headers = PyTuple_New(r->fields_count);
593     if (nxt_slow_path(headers == NULL)) {
594         nxt_unit_req_alert(req, "Python failed to create 'headers' object");
595         goto fail;
596     }
597 
598     for (i = 0; i < r->fields_count; i++) {
599         f = r->fields + i;
600 
601         header = nxt_py_asgi_create_header(f);
602         if (nxt_slow_path(header == NULL)) {
603             nxt_unit_req_alert(req, "Python failed to create 'header' pair");
604             goto fail;
605         }
606 
607         PyTuple_SET_ITEM(headers, i, header);
608 
609         if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL
610             && f->name_length == ws_protocol.length
611             && f->value_length > 0
612             && r->websocket_handshake)
613         {
614             v = nxt_py_asgi_create_subprotocols(f);
615             if (nxt_slow_path(v == NULL)) {
616                 nxt_unit_req_alert(req, "Failed to create subprotocols");
617                 goto fail;
618             }
619 
620             SET_ITEM(scope, subprotocols, v);
621             Py_DECREF(v);
622         }
623     }
624 
625     SET_ITEM(scope, headers, headers)
626     Py_DECREF(headers);
627 
628     return scope;
629 
630 fail:
631 
632     Py_XDECREF(v);
633     Py_XDECREF(headers);
634     Py_DECREF(scope);
635 
636     return NULL;
637 
638 #undef SET_ITEM
639 }
640 
641 
642 static PyObject *
643 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port)
644 {
645     char      *p, *s;
646     PyObject  *pair, *v;
647 
648     pair = PyTuple_New(2);
649     if (nxt_slow_path(pair == NULL)) {
650         return NULL;
651     }
652 
653     p = nxt_unit_sptr_get(sptr);
654     s = memchr(p, ':', len);
655 
656     v = PyString_FromStringAndSize(p, s == NULL ? len : s - p);
657     if (nxt_slow_path(v == NULL)) {
658         Py_DECREF(pair);
659 
660         return NULL;
661     }
662 
663     PyTuple_SET_ITEM(pair, 0, v);
664 
665     if (s != NULL) {
666         p += len;
667         v = PyLong_FromString(s + 1, &p, 10);
668 
669     } else {
670         v = PyLong_FromLong(port);
671     }
672 
673     if (nxt_slow_path(v == NULL)) {
674         Py_DECREF(pair);
675 
676         return NULL;
677     }
678 
679     PyTuple_SET_ITEM(pair, 1, v);
680 
681     return pair;
682 }
683 
684 
685 static PyObject *
686 nxt_py_asgi_create_header(nxt_unit_field_t *f)
687 {
688     char      c, *name;
689     uint8_t   pos;
690     PyObject  *header, *v;
691 
692     header = PyTuple_New(2);
693     if (nxt_slow_path(header == NULL)) {
694         return NULL;
695     }
696 
697     name = nxt_unit_sptr_get(&f->name);
698 
699     for (pos = 0; pos < f->name_length; pos++) {
700         c = name[pos];
701         if (c >= 'A' && c <= 'Z') {
702             name[pos] = (c | 0x20);
703         }
704     }
705 
706     v = PyBytes_FromStringAndSize(name, f->name_length);
707     if (nxt_slow_path(v == NULL)) {
708         Py_DECREF(header);
709 
710         return NULL;
711     }
712 
713     PyTuple_SET_ITEM(header, 0, v);
714 
715     v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value),
716                                   f->value_length);
717     if (nxt_slow_path(v == NULL)) {
718         Py_DECREF(header);
719 
720         return NULL;
721     }
722 
723     PyTuple_SET_ITEM(header, 1, v);
724 
725     return header;
726 }
727 
728 
729 static PyObject *
730 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f)
731 {
732     char      *v;
733     uint32_t  i, n, start;
734     PyObject  *res, *proto;
735 
736     v = nxt_unit_sptr_get(&f->value);
737     n = 1;
738 
739     for (i = 0; i < f->value_length; i++) {
740         if (v[i] == ',') {
741             n++;
742         }
743     }
744 
745     res = PyTuple_New(n);
746     if (nxt_slow_path(res == NULL)) {
747         return NULL;
748     }
749 
750     n = 0;
751     start = 0;
752 
753     for (i = 0; i < f->value_length; ) {
754         if (v[i] != ',') {
755             i++;
756 
757             continue;
758         }
759 
760         if (i - start > 0) {
761             proto = PyString_FromStringAndSize(v + start, i - start);
762             if (nxt_slow_path(proto == NULL)) {
763                 goto fail;
764             }
765 
766             PyTuple_SET_ITEM(res, n, proto);
767 
768             n++;
769         }
770 
771         do {
772             i++;
773         } while (i < f->value_length && v[i] == ' ');
774 
775         start = i;
776     }
777 
778     if (i - start > 0) {
779         proto = PyString_FromStringAndSize(v + start, i - start);
780         if (nxt_slow_path(proto == NULL)) {
781             goto fail;
782         }
783 
784         PyTuple_SET_ITEM(res, n, proto);
785     }
786 
787     return res;
788 
789 fail:
790 
791     Py_DECREF(res);
792 
793     return NULL;
794 }
795 
796 
797 static int
798 nxt_python_asgi_ready(nxt_unit_ctx_t *ctx)
799 {
800     int                     rc;
801     PyObject                *res, *fd, *py_ctx, *py_port;
802     nxt_unit_port_t         *port;
803     nxt_py_asgi_ctx_data_t  *ctx_data;
804 
805     if (nxt_slow_path(nxt_py_shared_port == NULL)) {
806         return NXT_UNIT_ERROR;
807     }
808 
809     port = nxt_py_shared_port;
810 
811     nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port);
812 
813     ctx_data = ctx->data;
814 
815     rc = NXT_UNIT_ERROR;
816 
817     fd = PyLong_FromLong(port->in_fd);
818     if (nxt_slow_path(fd == NULL)) {
819         nxt_unit_alert(ctx, "Python failed to create fd");
820         nxt_python_print_exception();
821 
822         return rc;
823     }
824 
825     py_ctx = PyLong_FromVoidPtr(ctx);
826     if (nxt_slow_path(py_ctx == NULL)) {
827         nxt_unit_alert(ctx, "Python failed to create py_ctx");
828         nxt_python_print_exception();
829 
830         goto clean_fd;
831     }
832 
833     py_port = PyLong_FromVoidPtr(port);
834     if (nxt_slow_path(py_port == NULL)) {
835         nxt_unit_alert(ctx, "Python failed to create py_port");
836         nxt_python_print_exception();
837 
838         goto clean_py_ctx;
839     }
840 
841     res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
842                                        fd, nxt_py_port_read,
843                                        py_ctx, py_port, NULL);
844     if (nxt_slow_path(res == NULL)) {
845         nxt_unit_alert(ctx, "Python failed to add_reader");
846         nxt_python_print_exception();
847 
848     } else {
849         Py_DECREF(res);
850 
851         rc = NXT_UNIT_OK;
852     }
853 
854     Py_DECREF(py_port);
855 
856 clean_py_ctx:
857 
858     Py_DECREF(py_ctx);
859 
860 clean_fd:
861 
862     Py_DECREF(fd);
863 
864     return rc;
865 }
866 
867 
868 static int
869 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
870 {
871     int                     nb, rc;
872     PyObject                *res, *fd, *py_ctx, *py_port;
873     nxt_py_asgi_ctx_data_t  *ctx_data;
874 
875     if (port->in_fd == -1) {
876         return NXT_UNIT_OK;
877     }
878 
879     nb = 1;
880 
881     if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
882         nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
883                        port->in_fd, strerror(errno), errno);
884 
885         return NXT_UNIT_ERROR;
886     }
887 
888     nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
889 
890     if (port->id.id == NXT_UNIT_SHARED_PORT_ID) {
891         nxt_py_shared_port = port;
892 
893         return NXT_UNIT_OK;
894     }
895 
896     ctx_data = ctx->data;
897 
898     ctx_data->port = port;
899     port->data = ctx_data;
900 
901     rc = NXT_UNIT_ERROR;
902 
903     fd = PyLong_FromLong(port->in_fd);
904     if (nxt_slow_path(fd == NULL)) {
905         nxt_unit_alert(ctx, "Python failed to create fd");
906         nxt_python_print_exception();
907 
908         return rc;
909     }
910 
911     py_ctx = PyLong_FromVoidPtr(ctx);
912     if (nxt_slow_path(py_ctx == NULL)) {
913         nxt_unit_alert(ctx, "Python failed to create py_ctx");
914         nxt_python_print_exception();
915 
916         goto clean_fd;
917     }
918 
919     py_port = PyLong_FromVoidPtr(port);
920     if (nxt_slow_path(py_port == NULL)) {
921         nxt_unit_alert(ctx, "Python failed to create py_port");
922         nxt_python_print_exception();
923 
924         goto clean_py_ctx;
925     }
926 
927     res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
928                                        fd, nxt_py_port_read,
929                                        py_ctx, py_port, NULL);
930     if (nxt_slow_path(res == NULL)) {
931         nxt_unit_alert(ctx, "Python failed to add_reader");
932         nxt_python_print_exception();
933 
934     } else {
935         Py_DECREF(res);
936 
937         rc = NXT_UNIT_OK;
938     }
939 
940     Py_DECREF(py_port);
941 
942 clean_py_ctx:
943 
944     Py_DECREF(py_ctx);
945 
946 clean_fd:
947 
948     Py_DECREF(fd);
949 
950     return rc;
951 }
952 
953 
954 static void
955 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
956 {
957     if (port->in_fd == -1) {
958         return;
959     }
960 
961     nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
962 
963     if (nxt_py_shared_port == port) {
964         nxt_py_shared_port = NULL;
965     }
966 }
967 
968 
969 static void
970 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
971 {
972     PyObject                *res, *p;
973     nxt_py_asgi_ctx_data_t  *ctx_data;
974 
975     nxt_unit_debug(ctx, "asgi_quit %p", ctx);
976 
977     ctx_data = ctx->data;
978 
979     if (nxt_py_shared_port != NULL) {
980         p = PyLong_FromLong(nxt_py_shared_port->in_fd);
981         if (nxt_slow_path(p == NULL)) {
982             nxt_unit_alert(NULL, "Python failed to create Long");
983             nxt_python_print_exception();
984 
985         } else {
986             res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader,
987                                                p, NULL);
988             if (nxt_slow_path(res == NULL)) {
989                 nxt_unit_alert(NULL, "Python failed to remove_reader");
990                 nxt_python_print_exception();
991 
992             } else {
993                 Py_DECREF(res);
994             }
995 
996             Py_DECREF(p);
997         }
998     }
999 
1000     p = PyLong_FromLong(0);
1001     if (nxt_slow_path(p == NULL)) {
1002         nxt_unit_alert(NULL, "Python failed to create Long");
1003         nxt_python_print_exception();
1004 
1005     } else {
1006         res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result,
1007                                            p, NULL);
1008         if (nxt_slow_path(res == NULL)) {
1009             nxt_unit_alert(ctx, "Python failed to set_result");
1010             nxt_python_print_exception();
1011 
1012         } else {
1013             Py_DECREF(res);
1014         }
1015 
1016         Py_DECREF(p);
1017     }
1018 }
1019 
1020 
1021 static void
1022 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
1023 {
1024     int                     rc;
1025     nxt_queue_link_t        *lnk;
1026     nxt_py_asgi_ctx_data_t  *ctx_data;
1027 
1028     ctx_data = ctx->data;
1029 
1030     while (!nxt_queue_is_empty(&ctx_data->drain_queue)) {
1031         lnk = nxt_queue_first(&ctx_data->drain_queue);
1032 
1033         rc = nxt_py_asgi_http_drain(lnk);
1034         if (rc == NXT_UNIT_AGAIN) {
1035             return;
1036         }
1037 
1038         nxt_queue_remove(lnk);
1039     }
1040 }
1041 
1042 
1043 static PyObject *
1044 nxt_py_asgi_port_read(PyObject *self, PyObject *args)
1045 {
1046     int              rc;
1047     PyObject         *arg;
1048     Py_ssize_t       n;
1049     nxt_unit_ctx_t   *ctx;
1050     nxt_unit_port_t  *port;
1051 
1052     n = PyTuple_GET_SIZE(args);
1053 
1054     if (n != 2) {
1055         nxt_unit_alert(NULL,
1056                        "nxt_py_asgi_port_read: invalid number of arguments %d",
1057                        (int) n);
1058 
1059         return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
1060     }
1061 
1062     arg = PyTuple_GET_ITEM(args, 0);
1063     if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
1064         return PyErr_Format(PyExc_TypeError,
1065                             "the first argument is not a long");
1066     }
1067 
1068     ctx = PyLong_AsVoidPtr(arg);
1069 
1070     arg = PyTuple_GET_ITEM(args, 1);
1071     if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
1072         return PyErr_Format(PyExc_TypeError,
1073                             "the second argument is not a long");
1074     }
1075 
1076     port = PyLong_AsVoidPtr(arg);
1077 
1078     nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port);
1079 
1080     rc = nxt_unit_process_port_msg(ctx, port);
1081 
1082     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
1083         return PyErr_Format(PyExc_RuntimeError,
1084                             "error processing port %d message", port->id.id);
1085     }
1086 
1087     Py_RETURN_NONE;
1088 }
1089 
1090 
1091 PyObject *
1092 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,
1093     void *data)
1094 {
1095     int       i;
1096     PyObject  *iter, *header, *h_iter, *name, *val, *res;
1097 
1098     iter = PyObject_GetIter(headers);
1099     if (nxt_slow_path(iter == NULL)) {
1100         return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable");
1101     }
1102 
1103     for (i = 0; /* void */; i++) {
1104         header = PyIter_Next(iter);
1105         if (header == NULL) {
1106             break;
1107         }
1108 
1109         h_iter = PyObject_GetIter(header);
1110         if (nxt_slow_path(h_iter == NULL)) {
1111             Py_DECREF(header);
1112             Py_DECREF(iter);
1113 
1114             return PyErr_Format(PyExc_TypeError,
1115                                 "'headers' item #%d is not an iterable", i);
1116         }
1117 
1118         name = PyIter_Next(h_iter);
1119         if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) {
1120             Py_XDECREF(name);
1121             Py_DECREF(h_iter);
1122             Py_DECREF(header);
1123             Py_DECREF(iter);
1124 
1125             return PyErr_Format(PyExc_TypeError,
1126                           "'headers' item #%d 'name' is not a byte string", i);
1127         }
1128 
1129         val = PyIter_Next(h_iter);
1130         if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) {
1131             Py_XDECREF(val);
1132             Py_DECREF(h_iter);
1133             Py_DECREF(header);
1134             Py_DECREF(iter);
1135 
1136             return PyErr_Format(PyExc_TypeError,
1137                          "'headers' item #%d 'value' is not a byte string", i);
1138         }
1139 
1140         res = cb(data, i, name, val);
1141 
1142         Py_DECREF(name);
1143         Py_DECREF(val);
1144         Py_DECREF(h_iter);
1145         Py_DECREF(header);
1146 
1147         if (nxt_slow_path(res == NULL)) {
1148             Py_DECREF(iter);
1149 
1150             return NULL;
1151         }
1152 
1153         Py_DECREF(res);
1154     }
1155 
1156     Py_DECREF(iter);
1157 
1158     Py_RETURN_NONE;
1159 }
1160 
1161 
1162 PyObject *
1163 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val)
1164 {
1165     nxt_py_asgi_calc_size_ctx_t  *ctx;
1166 
1167     ctx = data;
1168 
1169     ctx->fields_count++;
1170     ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val);
1171 
1172     Py_RETURN_NONE;
1173 }
1174 
1175 
1176 PyObject *
1177 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
1178 {
1179     int                          rc;
1180     char                         *name_str, *val_str;
1181     uint32_t                     name_len, val_len;
1182     nxt_off_t                    content_length;
1183     nxt_unit_request_info_t      *req;
1184     nxt_py_asgi_add_field_ctx_t  *ctx;
1185 
1186     name_str = PyBytes_AS_STRING(name);
1187     name_len = PyBytes_GET_SIZE(name);
1188 
1189     val_str = PyBytes_AS_STRING(val);
1190     val_len = PyBytes_GET_SIZE(val);
1191 
1192     ctx = data;
1193     req = ctx->req;
1194 
1195     rc = nxt_unit_response_add_field(req, name_str, name_len,
1196                                      val_str, val_len);
1197     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1198         return PyErr_Format(PyExc_RuntimeError,
1199                             "failed to add header #%d", i);
1200     }
1201 
1202     if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) {
1203         content_length = nxt_off_t_parse((u_char *) val_str, val_len);
1204         if (nxt_slow_path(content_length < 0)) {
1205             nxt_unit_req_error(req, "failed to parse Content-Length "
1206                                "value %.*s", (int) val_len, val_str);
1207 
1208             return PyErr_Format(PyExc_ValueError,
1209                                 "Failed to parse Content-Length: '%.*s'",
1210                                 (int) val_len, val_str);
1211         }
1212 
1213         ctx->content_length = content_length;
1214     }
1215 
1216     Py_RETURN_NONE;
1217 }
1218 
1219 
1220 PyObject *
1221 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
1222     nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result)
1223 {
1224     PyObject  *set_result, *res;
1225 
1226     if (nxt_slow_path(result == NULL)) {
1227         Py_DECREF(future);
1228 
1229         return NULL;
1230     }
1231 
1232     set_result = PyObject_GetAttrString(future, "set_result");
1233     if (nxt_slow_path(set_result == NULL)) {
1234         nxt_unit_req_alert(req, "failed to get 'set_result' for future");
1235 
1236         Py_CLEAR(future);
1237 
1238         goto cleanup_result;
1239     }
1240 
1241     if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
1242         nxt_unit_req_alert(req, "'future.set_result' is not a callable");
1243 
1244         Py_CLEAR(future);
1245 
1246         goto cleanup;
1247     }
1248 
1249     res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result,
1250                                        result, NULL);
1251     if (nxt_slow_path(res == NULL)) {
1252         nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
1253         nxt_python_print_exception();
1254 
1255         Py_CLEAR(future);
1256     }
1257 
1258     Py_XDECREF(res);
1259 
1260 cleanup:
1261 
1262     Py_DECREF(set_result);
1263 
1264 cleanup_result:
1265 
1266     Py_DECREF(result);
1267 
1268     return future;
1269 }
1270 
1271 
1272 PyObject *
1273 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)
1274 {
1275     PyObject  *msg;
1276 
1277     msg = PyDict_New();
1278     if (nxt_slow_path(msg == NULL)) {
1279         nxt_unit_req_alert(req, "Python failed to create message dict");
1280         nxt_python_print_exception();
1281 
1282         return PyErr_Format(PyExc_RuntimeError,
1283                             "failed to create message dict");
1284     }
1285 
1286     if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) {
1287         nxt_unit_req_alert(req, "Python failed to set 'msg.type' item");
1288 
1289         Py_DECREF(msg);
1290 
1291         return PyErr_Format(PyExc_RuntimeError,
1292                             "failed to set 'msg.type' item");
1293     }
1294 
1295     return msg;
1296 }
1297 
1298 
1299 PyObject *
1300 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
1301     PyObject *spec_version)
1302 {
1303     PyObject  *scope, *asgi;
1304 
1305     scope = PyDict_New();
1306     if (nxt_slow_path(scope == NULL)) {
1307         nxt_unit_req_alert(req, "Python failed to create 'scope' dict");
1308         nxt_python_print_exception();
1309 
1310         return PyErr_Format(PyExc_RuntimeError,
1311                             "failed to create 'scope' dict");
1312     }
1313 
1314     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) {
1315         nxt_unit_req_alert(req, "Python failed to set 'scope.type' item");
1316 
1317         Py_DECREF(scope);
1318 
1319         return PyErr_Format(PyExc_RuntimeError,
1320                             "failed to set 'scope.type' item");
1321     }
1322 
1323     asgi = PyDict_New();
1324     if (nxt_slow_path(asgi == NULL)) {
1325         nxt_unit_req_alert(req, "Python failed to create 'asgi' dict");
1326         nxt_python_print_exception();
1327 
1328         Py_DECREF(scope);
1329 
1330         return PyErr_Format(PyExc_RuntimeError,
1331                             "failed to create 'asgi' dict");
1332     }
1333 
1334     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) {
1335         nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item");
1336 
1337         Py_DECREF(asgi);
1338         Py_DECREF(scope);
1339 
1340         return PyErr_Format(PyExc_RuntimeError,
1341                             "failed to set 'scope.asgi' item");
1342     }
1343 
1344     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str,
1345                                      nxt_py_3_0_str) == -1))
1346     {
1347         nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item");
1348 
1349         Py_DECREF(asgi);
1350         Py_DECREF(scope);
1351 
1352         return PyErr_Format(PyExc_RuntimeError,
1353                             "failed to set 'asgi.version' item");
1354     }
1355 
1356     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str,
1357                                      spec_version) == -1))
1358     {
1359         nxt_unit_req_alert(req,
1360                            "Python failed to set 'asgi.spec_version' item");
1361 
1362         Py_DECREF(asgi);
1363         Py_DECREF(scope);
1364 
1365         return PyErr_Format(PyExc_RuntimeError,
1366                             "failed to set 'asgi.spec_version' item");
1367     }
1368 
1369     Py_DECREF(asgi);
1370 
1371     return scope;
1372 }
1373 
1374 
1375 void
1376 nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link)
1377 {
1378     nxt_py_asgi_ctx_data_t  *ctx_data;
1379 
1380     ctx_data = req->ctx->data;
1381 
1382     nxt_queue_insert_tail(&ctx_data->drain_queue, link);
1383 }
1384 
1385 
1386 void
1387 nxt_py_asgi_dealloc(PyObject *self)
1388 {
1389     PyObject_Del(self);
1390 }
1391 
1392 
1393 PyObject *
1394 nxt_py_asgi_await(PyObject *self)
1395 {
1396     Py_INCREF(self);
1397     return self;
1398 }
1399 
1400 
1401 PyObject *
1402 nxt_py_asgi_iter(PyObject *self)
1403 {
1404     Py_INCREF(self);
1405     return self;
1406 }
1407 
1408 
1409 PyObject *
1410 nxt_py_asgi_next(PyObject *self)
1411 {
1412     return NULL;
1413 }
1414 
1415 
1416 static void
1417 nxt_python_asgi_done(void)
1418 {
1419     nxt_py_asgi_str_done();
1420 
1421     Py_XDECREF(nxt_py_port_read);
1422 }
1423 
1424 #else /* !(NXT_HAVE_ASGI) */
1425 
1426 
1427 int
1428 nxt_python_asgi_check(PyObject *obj)
1429 {
1430     return 0;
1431 }
1432 
1433 
1434 int
1435 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
1436 {
1437     nxt_unit_alert(NULL, "ASGI not implemented");
1438     return NXT_UNIT_ERROR;
1439 }
1440 
1441 
1442 #endif /* NXT_HAVE_ASGI */
1443