xref: /unit/src/python/nxt_python_asgi.c (revision 1624:e46b1b422545)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <nxt_unit_response.h>
15 #include <python/nxt_python_asgi.h>
16 #include <python/nxt_python_asgi_str.h>
17 
18 
19 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
20 
21 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
22 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
23     uint16_t port);
24 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
25 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
26 
27 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
28 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
29 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
30 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
31 
32 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
33 
34 
35 PyObject  *nxt_py_loop_run_until_complete;
36 PyObject  *nxt_py_loop_create_future;
37 PyObject  *nxt_py_loop_create_task;
38 
39 nxt_queue_t  nxt_py_asgi_drain_queue;
40 
41 static PyObject  *nxt_py_loop_call_soon;
42 static PyObject  *nxt_py_quit_future;
43 static PyObject  *nxt_py_quit_future_set_result;
44 static PyObject  *nxt_py_loop_add_reader;
45 static PyObject  *nxt_py_loop_remove_reader;
46 static PyObject  *nxt_py_port_read;
47 
48 static PyMethodDef        nxt_py_port_read_method =
49     {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
50 
51 #define NXT_UNIT_HASH_WS_PROTOCOL  0xED0A
52 
53 
54 int
55 nxt_python_asgi_check(PyObject *obj)
56 {
57     int           res;
58     PyObject      *call;
59     PyCodeObject  *code;
60 
61     if (PyFunction_Check(obj)) {
62         code = (PyCodeObject *) PyFunction_GET_CODE(obj);
63 
64         return (code->co_flags & CO_COROUTINE) != 0;
65     }
66 
67     if (PyMethod_Check(obj)) {
68         obj = PyMethod_GET_FUNCTION(obj);
69 
70         code = (PyCodeObject *) PyFunction_GET_CODE(obj);
71 
72         return (code->co_flags & CO_COROUTINE) != 0;
73     }
74 
75     call = PyObject_GetAttrString(obj, "__call__");
76 
77     if (call == NULL) {
78         return 0;
79     }
80 
81     if (PyFunction_Check(call)) {
82         code = (PyCodeObject *) PyFunction_GET_CODE(call);
83 
84         res = (code->co_flags & CO_COROUTINE) != 0;
85 
86     } else {
87         if (PyMethod_Check(call)) {
88             obj = PyMethod_GET_FUNCTION(call);
89 
90             code = (PyCodeObject *) PyFunction_GET_CODE(obj);
91 
92             res = (code->co_flags & CO_COROUTINE) != 0;
93 
94         } else {
95             res = 0;
96         }
97     }
98 
99     Py_DECREF(call);
100 
101     return res;
102 }
103 
104 
105 nxt_int_t
106 nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
107 {
108     PyObject   *asyncio, *loop, *get_event_loop;
109     nxt_int_t  rc;
110 
111     nxt_debug(task, "asgi_init");
112 
113     if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) {
114         nxt_alert(task, "Python failed to init string objects");
115         return NXT_ERROR;
116     }
117 
118     asyncio = PyImport_ImportModule("asyncio");
119     if (nxt_slow_path(asyncio == NULL)) {
120         nxt_alert(task, "Python failed to import module 'asyncio'");
121         nxt_python_print_exception();
122         return NXT_ERROR;
123     }
124 
125     loop = NULL;
126     get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
127                                           "get_event_loop");
128     if (nxt_slow_path(get_event_loop == NULL)) {
129         nxt_alert(task,
130                  "Python failed to get 'get_event_loop' from module 'asyncio'");
131         goto fail;
132     }
133 
134     if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) {
135         nxt_alert(task, "'asyncio.get_event_loop' is not a callable object");
136         goto fail;
137     }
138 
139     loop = PyObject_CallObject(get_event_loop, NULL);
140     if (nxt_slow_path(loop == NULL)) {
141         nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'");
142         goto fail;
143     }
144 
145     nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task");
146     if (nxt_slow_path(nxt_py_loop_create_task == NULL)) {
147         nxt_alert(task, "Python failed to get 'loop.create_task'");
148         goto fail;
149     }
150 
151     if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) {
152         nxt_alert(task, "'loop.create_task' is not a callable object");
153         goto fail;
154     }
155 
156     nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader");
157     if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) {
158         nxt_alert(task, "Python failed to get 'loop.add_reader'");
159         goto fail;
160     }
161 
162     if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) {
163         nxt_alert(task, "'loop.add_reader' is not a callable object");
164         goto fail;
165     }
166 
167     nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader");
168     if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) {
169         nxt_alert(task, "Python failed to get 'loop.remove_reader'");
170         goto fail;
171     }
172 
173     if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) {
174         nxt_alert(task, "'loop.remove_reader' is not a callable object");
175         goto fail;
176     }
177 
178     nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon");
179     if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) {
180         nxt_alert(task, "Python failed to get 'loop.call_soon'");
181         goto fail;
182     }
183 
184     if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) {
185         nxt_alert(task, "'loop.call_soon' is not a callable object");
186         goto fail;
187     }
188 
189     nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop,
190                                                           "run_until_complete");
191     if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) {
192         nxt_alert(task, "Python failed to get 'loop.run_until_complete'");
193         goto fail;
194     }
195 
196     if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) {
197         nxt_alert(task, "'loop.run_until_complete' is not a callable object");
198         goto fail;
199     }
200 
201     nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future");
202     if (nxt_slow_path(nxt_py_loop_create_future == NULL)) {
203         nxt_alert(task, "Python failed to get 'loop.create_future'");
204         goto fail;
205     }
206 
207     if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) {
208         nxt_alert(task, "'loop.create_future' is not a callable object");
209         goto fail;
210     }
211 
212     nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
213     if (nxt_slow_path(nxt_py_quit_future == NULL)) {
214         nxt_alert(task, "Python failed to create Future ");
215         nxt_python_print_exception();
216         goto fail;
217     }
218 
219     nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future,
220                                                            "set_result");
221     if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) {
222         nxt_alert(task, "Python failed to get 'future.set_result'");
223         goto fail;
224     }
225 
226     if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) {
227         nxt_alert(task, "'future.set_result' is not a callable object");
228         goto fail;
229     }
230 
231     nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
232     if (nxt_slow_path(nxt_py_port_read == NULL)) {
233         nxt_alert(task, "Python failed to initialize the 'port_read' function");
234         goto fail;
235     }
236 
237     nxt_queue_init(&nxt_py_asgi_drain_queue);
238 
239     if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) {
240         goto fail;
241     }
242 
243     if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) {
244         goto fail;
245     }
246 
247     rc = nxt_py_asgi_lifespan_startup(task);
248     if (nxt_slow_path(rc == NXT_ERROR)) {
249         goto fail;
250     }
251 
252     init->callbacks.request_handler = nxt_py_asgi_request_handler;
253     init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
254     init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
255     init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler;
256     init->callbacks.quit = nxt_py_asgi_quit;
257     init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
258     init->callbacks.add_port = nxt_py_asgi_add_port;
259     init->callbacks.remove_port = nxt_py_asgi_remove_port;
260 
261     Py_DECREF(loop);
262     Py_DECREF(asyncio);
263 
264     return NXT_OK;
265 
266 fail:
267 
268     Py_XDECREF(loop);
269     Py_DECREF(asyncio);
270 
271     return NXT_ERROR;
272 }
273 
274 
275 nxt_int_t
276 nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
277 {
278     PyObject  *res;
279 
280     res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete,
281                                        nxt_py_quit_future, NULL);
282     if (nxt_slow_path(res == NULL)) {
283         nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
284         nxt_python_print_exception();
285 
286         return NXT_ERROR;
287     }
288 
289     Py_DECREF(res);
290 
291     nxt_py_asgi_lifespan_shutdown();
292 
293     return NXT_OK;
294 }
295 
296 
297 static void
298 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
299 {
300     PyObject  *scope, *res, *task, *receive, *send, *done, *asgi;
301 
302     if (req->request->websocket_handshake) {
303         asgi = nxt_py_asgi_websocket_create(req);
304 
305     } else {
306         asgi = nxt_py_asgi_http_create(req);
307     }
308 
309     if (nxt_slow_path(asgi == NULL)) {
310         nxt_unit_req_alert(req, "Python failed to create asgi object");
311         nxt_unit_request_done(req, NXT_UNIT_ERROR);
312 
313         return;
314     }
315 
316     receive = PyObject_GetAttrString(asgi, "receive");
317     if (nxt_slow_path(receive == NULL)) {
318         nxt_unit_req_alert(req, "Python failed to get 'receive' method");
319         nxt_unit_request_done(req, NXT_UNIT_ERROR);
320 
321         goto release_asgi;
322     }
323 
324     send = PyObject_GetAttrString(asgi, "send");
325     if (nxt_slow_path(receive == NULL)) {
326         nxt_unit_req_alert(req, "Python failed to get 'send' method");
327         nxt_unit_request_done(req, NXT_UNIT_ERROR);
328 
329         goto release_receive;
330     }
331 
332     done = PyObject_GetAttrString(asgi, "_done");
333     if (nxt_slow_path(receive == NULL)) {
334         nxt_unit_req_alert(req, "Python failed to get '_done' method");
335         nxt_unit_request_done(req, NXT_UNIT_ERROR);
336 
337         goto release_send;
338     }
339 
340     scope = nxt_py_asgi_create_http_scope(req);
341     if (nxt_slow_path(scope == NULL)) {
342         nxt_unit_request_done(req, NXT_UNIT_ERROR);
343 
344         goto release_done;
345     }
346 
347     req->data = asgi;
348 
349     res = PyObject_CallFunctionObjArgs(nxt_py_application,
350                                        scope, receive, send, NULL);
351     if (nxt_slow_path(res == NULL)) {
352         nxt_unit_req_error(req, "Python failed to call the application");
353         nxt_python_print_exception();
354         nxt_unit_request_done(req, NXT_UNIT_ERROR);
355 
356         goto release_scope;
357     }
358 
359     if (nxt_slow_path(!PyCoro_CheckExact(res))) {
360         nxt_unit_req_error(req, "Application result type is not a coroutine");
361         nxt_unit_request_done(req, NXT_UNIT_ERROR);
362 
363         Py_DECREF(res);
364 
365         goto release_scope;
366     }
367 
368     task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL);
369     if (nxt_slow_path(task == NULL)) {
370         nxt_unit_req_error(req, "Python failed to call the create_task");
371         nxt_python_print_exception();
372         nxt_unit_request_done(req, NXT_UNIT_ERROR);
373 
374         Py_DECREF(res);
375 
376         goto release_scope;
377     }
378 
379     Py_DECREF(res);
380 
381     res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done,
382                                      NULL);
383     if (nxt_slow_path(res == NULL)) {
384         nxt_unit_req_error(req,
385                            "Python failed to call 'task.add_done_callback'");
386         nxt_python_print_exception();
387         nxt_unit_request_done(req, NXT_UNIT_ERROR);
388 
389         goto release_task;
390     }
391 
392     Py_DECREF(res);
393 release_task:
394     Py_DECREF(task);
395 release_scope:
396     Py_DECREF(scope);
397 release_done:
398     Py_DECREF(done);
399 release_send:
400     Py_DECREF(send);
401 release_receive:
402     Py_DECREF(receive);
403 release_asgi:
404     Py_DECREF(asgi);
405 }
406 
407 
408 static PyObject *
409 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
410 {
411     char                *p, *target, *query;
412     uint32_t            target_length, i;
413     PyObject            *scope, *v, *type, *scheme;
414     PyObject            *headers, *header;
415     nxt_unit_field_t    *f;
416     nxt_unit_request_t  *r;
417 
418     static const nxt_str_t  ws_protocol = nxt_string("sec-websocket-protocol");
419 
420 #define SET_ITEM(dict, key, value) \
421     if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value)      \
422                       == -1))                                                  \
423     {                                                                          \
424         nxt_unit_req_alert(req, "Python failed to set '"                       \
425                                 #dict "." #key "' item");                      \
426         goto fail;                                                             \
427     }
428 
429     v = NULL;
430     headers = NULL;
431 
432     r = req->request;
433 
434     if (r->websocket_handshake) {
435         type = nxt_py_websocket_str;
436         scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str;
437 
438     } else {
439         type = nxt_py_http_str;
440         scheme = r->tls ? nxt_py_https_str : nxt_py_http_str;
441     }
442 
443     scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str);
444     if (nxt_slow_path(scope == NULL)) {
445         return NULL;
446     }
447 
448     p = nxt_unit_sptr_get(&r->version);
449     SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str
450                                               : nxt_py_1_0_str)
451     SET_ITEM(scope, scheme, scheme)
452 
453     v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method),
454                                    r->method_length);
455     if (nxt_slow_path(v == NULL)) {
456         nxt_unit_req_alert(req, "Python failed to create 'method' string");
457         goto fail;
458     }
459 
460     SET_ITEM(scope, method, v)
461     Py_DECREF(v);
462 
463     v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length,
464                              "replace");
465     if (nxt_slow_path(v == NULL)) {
466         nxt_unit_req_alert(req, "Python failed to create 'path' string");
467         goto fail;
468     }
469 
470     SET_ITEM(scope, path, v)
471     Py_DECREF(v);
472 
473     target = nxt_unit_sptr_get(&r->target);
474     query = nxt_unit_sptr_get(&r->query);
475 
476     if (r->query.offset != 0) {
477         target_length = query - target - 1;
478 
479     } else {
480         target_length = r->target_length;
481     }
482 
483     v = PyBytes_FromStringAndSize(target, target_length);
484     if (nxt_slow_path(v == NULL)) {
485         nxt_unit_req_alert(req, "Python failed to create 'raw_path' string");
486         goto fail;
487     }
488 
489     SET_ITEM(scope, raw_path, v)
490     Py_DECREF(v);
491 
492     v = PyBytes_FromStringAndSize(query, r->query_length);
493     if (nxt_slow_path(v == NULL)) {
494         nxt_unit_req_alert(req, "Python failed to create 'query' string");
495         goto fail;
496     }
497 
498     SET_ITEM(scope, query_string, v)
499     Py_DECREF(v);
500 
501     v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0);
502     if (nxt_slow_path(v == NULL)) {
503         nxt_unit_req_alert(req, "Python failed to create 'client' pair");
504         goto fail;
505     }
506 
507     SET_ITEM(scope, client, v)
508     Py_DECREF(v);
509 
510     v = nxt_py_asgi_create_address(&r->local, r->local_length, 80);
511     if (nxt_slow_path(v == NULL)) {
512         nxt_unit_req_alert(req, "Python failed to create 'server' pair");
513         goto fail;
514     }
515 
516     SET_ITEM(scope, server, v)
517     Py_DECREF(v);
518 
519     v = NULL;
520 
521     headers = PyTuple_New(r->fields_count);
522     if (nxt_slow_path(headers == NULL)) {
523         nxt_unit_req_alert(req, "Python failed to create 'headers' object");
524         goto fail;
525     }
526 
527     for (i = 0; i < r->fields_count; i++) {
528         f = r->fields + i;
529 
530         header = nxt_py_asgi_create_header(f);
531         if (nxt_slow_path(header == NULL)) {
532             nxt_unit_req_alert(req, "Python failed to create 'header' pair");
533             goto fail;
534         }
535 
536         PyTuple_SET_ITEM(headers, i, header);
537 
538         if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL
539             && f->name_length == ws_protocol.length
540             && f->value_length > 0
541             && r->websocket_handshake)
542         {
543             v = nxt_py_asgi_create_subprotocols(f);
544             if (nxt_slow_path(v == NULL)) {
545                 nxt_unit_req_alert(req, "Failed to create subprotocols");
546                 goto fail;
547             }
548 
549             SET_ITEM(scope, subprotocols, v);
550             Py_DECREF(v);
551         }
552     }
553 
554     SET_ITEM(scope, headers, headers)
555     Py_DECREF(headers);
556 
557     return scope;
558 
559 fail:
560 
561     Py_XDECREF(v);
562     Py_XDECREF(headers);
563     Py_DECREF(scope);
564 
565     return NULL;
566 
567 #undef SET_ITEM
568 }
569 
570 
571 static PyObject *
572 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port)
573 {
574     char      *p, *s;
575     PyObject  *pair, *v;
576 
577     pair = PyTuple_New(2);
578     if (nxt_slow_path(pair == NULL)) {
579         return NULL;
580     }
581 
582     p = nxt_unit_sptr_get(sptr);
583     s = memchr(p, ':', len);
584 
585     v = PyString_FromStringAndSize(p, s == NULL ? len : s - p);
586     if (nxt_slow_path(v == NULL)) {
587         Py_DECREF(pair);
588 
589         return NULL;
590     }
591 
592     PyTuple_SET_ITEM(pair, 0, v);
593 
594     if (s != NULL) {
595         p += len;
596         v = PyLong_FromString(s + 1, &p, 10);
597 
598     } else {
599         v = PyLong_FromLong(port);
600     }
601 
602     if (nxt_slow_path(v == NULL)) {
603         Py_DECREF(pair);
604 
605         return NULL;
606     }
607 
608     PyTuple_SET_ITEM(pair, 1, v);
609 
610     return pair;
611 }
612 
613 
614 static PyObject *
615 nxt_py_asgi_create_header(nxt_unit_field_t *f)
616 {
617     char      c, *name;
618     uint8_t   pos;
619     PyObject  *header, *v;
620 
621     header = PyTuple_New(2);
622     if (nxt_slow_path(header == NULL)) {
623         return NULL;
624     }
625 
626     name = nxt_unit_sptr_get(&f->name);
627 
628     for (pos = 0; pos < f->name_length; pos++) {
629         c = name[pos];
630         if (c >= 'A' && c <= 'Z') {
631             name[pos] = (c | 0x20);
632         }
633     }
634 
635     v = PyBytes_FromStringAndSize(name, f->name_length);
636     if (nxt_slow_path(v == NULL)) {
637         Py_DECREF(header);
638 
639         return NULL;
640     }
641 
642     PyTuple_SET_ITEM(header, 0, v);
643 
644     v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value),
645                                   f->value_length);
646     if (nxt_slow_path(v == NULL)) {
647         Py_DECREF(header);
648 
649         return NULL;
650     }
651 
652     PyTuple_SET_ITEM(header, 1, v);
653 
654     return header;
655 }
656 
657 
658 static PyObject *
659 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f)
660 {
661     char      *v;
662     uint32_t  i, n, start;
663     PyObject  *res, *proto;
664 
665     v = nxt_unit_sptr_get(&f->value);
666     n = 1;
667 
668     for (i = 0; i < f->value_length; i++) {
669         if (v[i] == ',') {
670             n++;
671         }
672     }
673 
674     res = PyTuple_New(n);
675     if (nxt_slow_path(res == NULL)) {
676         return NULL;
677     }
678 
679     n = 0;
680     start = 0;
681 
682     for (i = 0; i < f->value_length; ) {
683         if (v[i] != ',') {
684             i++;
685 
686             continue;
687         }
688 
689         if (i - start > 0) {
690             proto = PyString_FromStringAndSize(v + start, i - start);
691             if (nxt_slow_path(proto == NULL)) {
692                 goto fail;
693             }
694 
695             PyTuple_SET_ITEM(res, n, proto);
696 
697             n++;
698         }
699 
700         do {
701             i++;
702         } while (i < f->value_length && v[i] == ' ');
703 
704         start = i;
705     }
706 
707     if (i - start > 0) {
708         proto = PyString_FromStringAndSize(v + start, i - start);
709         if (nxt_slow_path(proto == NULL)) {
710             goto fail;
711         }
712 
713         PyTuple_SET_ITEM(res, n, proto);
714     }
715 
716     return res;
717 
718 fail:
719 
720     Py_DECREF(res);
721 
722     return NULL;
723 }
724 
725 
726 static int
727 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
728 {
729     int       nb;
730     PyObject  *res;
731 
732     if (port->in_fd == -1) {
733         return NXT_UNIT_OK;
734     }
735 
736     nb = 1;
737 
738     if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
739         nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
740                        port->in_fd, strerror(errno), errno);
741 
742         return NXT_UNIT_ERROR;
743     }
744 
745     nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
746 
747     res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader,
748                                        PyLong_FromLong(port->in_fd),
749                                        nxt_py_port_read,
750                                        PyLong_FromVoidPtr(ctx),
751                                        PyLong_FromVoidPtr(port), NULL);
752     if (nxt_slow_path(res == NULL)) {
753         nxt_unit_alert(ctx, "Python failed to add_reader");
754 
755         return NXT_UNIT_ERROR;
756     }
757 
758     Py_DECREF(res);
759 
760     return NXT_UNIT_OK;
761 }
762 
763 
764 static void
765 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
766 {
767     PyObject  *res;
768 
769     nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
770 
771     if (port->in_fd == -1) {
772         return;
773     }
774 
775     res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader,
776                                        PyLong_FromLong(port->in_fd), NULL);
777     if (nxt_slow_path(res == NULL)) {
778         nxt_unit_alert(NULL, "Python failed to remove_reader");
779     }
780 
781     Py_DECREF(res);
782 }
783 
784 
785 static void
786 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
787 {
788     PyObject  *res;
789 
790     nxt_unit_debug(ctx, "asgi_quit %p", ctx);
791 
792     res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result,
793                                        PyLong_FromLong(0), NULL);
794     if (nxt_slow_path(res == NULL)) {
795         nxt_unit_alert(ctx, "Python failed to set_result");
796     }
797 
798     Py_DECREF(res);
799 }
800 
801 
802 static void
803 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
804 {
805     int               rc;
806     nxt_queue_link_t  *lnk;
807 
808     while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) {
809         lnk = nxt_queue_first(&nxt_py_asgi_drain_queue);
810 
811         rc = nxt_py_asgi_http_drain(lnk);
812         if (rc == NXT_UNIT_AGAIN) {
813             break;
814         }
815 
816         nxt_queue_remove(lnk);
817     }
818 }
819 
820 
821 static PyObject *
822 nxt_py_asgi_port_read(PyObject *self, PyObject *args)
823 {
824     int              rc;
825     PyObject         *arg;
826     Py_ssize_t       n;
827     nxt_unit_ctx_t   *ctx;
828     nxt_unit_port_t  *port;
829 
830     n = PyTuple_GET_SIZE(args);
831 
832     if (n != 2) {
833         nxt_unit_alert(NULL,
834                        "nxt_py_asgi_port_read: invalid number of arguments %d",
835                        (int) n);
836 
837         return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
838     }
839 
840     arg = PyTuple_GET_ITEM(args, 0);
841     if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
842         return PyErr_Format(PyExc_TypeError,
843                             "the first argument is not a long");
844     }
845 
846     ctx = PyLong_AsVoidPtr(arg);
847 
848     arg = PyTuple_GET_ITEM(args, 1);
849     if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
850         return PyErr_Format(PyExc_TypeError,
851                             "the second argument is not a long");
852     }
853 
854     port = PyLong_AsVoidPtr(arg);
855 
856     nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port);
857 
858     rc = nxt_unit_process_port_msg(ctx, port);
859 
860     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
861         return PyErr_Format(PyExc_RuntimeError,
862                             "error processing port message");
863     }
864 
865     Py_RETURN_NONE;
866 }
867 
868 
869 PyObject *
870 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,
871     void *data)
872 {
873     int       i;
874     PyObject  *iter, *header, *h_iter, *name, *val, *res;
875 
876     iter = PyObject_GetIter(headers);
877     if (nxt_slow_path(iter == NULL)) {
878         return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable");
879     }
880 
881     for (i = 0; /* void */; i++) {
882         header = PyIter_Next(iter);
883         if (header == NULL) {
884             break;
885         }
886 
887         h_iter = PyObject_GetIter(header);
888         if (nxt_slow_path(h_iter == NULL)) {
889             Py_DECREF(header);
890             Py_DECREF(iter);
891 
892             return PyErr_Format(PyExc_TypeError,
893                                 "'headers' item #%d is not an iterable", i);
894         }
895 
896         name = PyIter_Next(h_iter);
897         if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) {
898             Py_XDECREF(name);
899             Py_DECREF(h_iter);
900             Py_DECREF(header);
901             Py_DECREF(iter);
902 
903             return PyErr_Format(PyExc_TypeError,
904                           "'headers' item #%d 'name' is not a byte string", i);
905         }
906 
907         val = PyIter_Next(h_iter);
908         if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) {
909             Py_XDECREF(val);
910             Py_DECREF(h_iter);
911             Py_DECREF(header);
912             Py_DECREF(iter);
913 
914             return PyErr_Format(PyExc_TypeError,
915                          "'headers' item #%d 'value' is not a byte string", i);
916         }
917 
918         res = cb(data, i, name, val);
919 
920         Py_DECREF(name);
921         Py_DECREF(val);
922         Py_DECREF(h_iter);
923         Py_DECREF(header);
924 
925         if (nxt_slow_path(res == NULL)) {
926             Py_DECREF(iter);
927 
928             return NULL;
929         }
930 
931         Py_DECREF(res);
932     }
933 
934     Py_DECREF(iter);
935 
936     Py_RETURN_NONE;
937 }
938 
939 
940 PyObject *
941 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val)
942 {
943     nxt_py_asgi_calc_size_ctx_t  *ctx;
944 
945     ctx = data;
946 
947     ctx->fields_count++;
948     ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val);
949 
950     Py_RETURN_NONE;
951 }
952 
953 
954 PyObject *
955 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
956 {
957     int                          rc;
958     char                         *name_str, *val_str;
959     uint32_t                     name_len, val_len;
960     nxt_off_t                    content_length;
961     nxt_unit_request_info_t      *req;
962     nxt_py_asgi_add_field_ctx_t  *ctx;
963 
964     name_str = PyBytes_AS_STRING(name);
965     name_len = PyBytes_GET_SIZE(name);
966 
967     val_str = PyBytes_AS_STRING(val);
968     val_len = PyBytes_GET_SIZE(val);
969 
970     ctx = data;
971     req = ctx->req;
972 
973     rc = nxt_unit_response_add_field(req, name_str, name_len,
974                                      val_str, val_len);
975     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
976         return PyErr_Format(PyExc_RuntimeError,
977                             "failed to add header #%d", i);
978     }
979 
980     if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) {
981         content_length = nxt_off_t_parse((u_char *) val_str, val_len);
982         if (nxt_slow_path(content_length < 0)) {
983             nxt_unit_req_error(req, "failed to parse Content-Length "
984                                "value %.*s", (int) val_len, val_str);
985 
986             return PyErr_Format(PyExc_ValueError,
987                                 "Failed to parse Content-Length: '%.*s'",
988                                 (int) val_len, val_str);
989         }
990 
991         ctx->content_length = content_length;
992     }
993 
994     Py_RETURN_NONE;
995 }
996 
997 
998 PyObject *
999 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
1000     PyObject *result)
1001 {
1002     PyObject  *set_result, *res;
1003 
1004     if (nxt_slow_path(result == NULL)) {
1005         Py_DECREF(future);
1006 
1007         return NULL;
1008     }
1009 
1010     set_result = PyObject_GetAttrString(future, "set_result");
1011     if (nxt_slow_path(set_result == NULL)) {
1012         nxt_unit_req_alert(req, "failed to get 'set_result' for future");
1013 
1014         Py_CLEAR(future);
1015 
1016         goto cleanup;
1017     }
1018 
1019     if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
1020         nxt_unit_req_alert(req, "'future.set_result' is not a callable");
1021 
1022         Py_CLEAR(future);
1023 
1024         goto cleanup;
1025     }
1026 
1027     res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result,
1028                                        result, NULL);
1029     if (nxt_slow_path(res == NULL)) {
1030         nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
1031         nxt_python_print_exception();
1032 
1033         Py_CLEAR(future);
1034     }
1035 
1036     Py_XDECREF(res);
1037 
1038 cleanup:
1039 
1040     Py_DECREF(set_result);
1041     Py_DECREF(result);
1042 
1043     return future;
1044 }
1045 
1046 
1047 PyObject *
1048 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)
1049 {
1050     PyObject  *msg;
1051 
1052     msg = PyDict_New();
1053     if (nxt_slow_path(msg == NULL)) {
1054         nxt_unit_req_alert(req, "Python failed to create message dict");
1055         nxt_python_print_exception();
1056 
1057         return PyErr_Format(PyExc_RuntimeError,
1058                             "failed to create message dict");
1059     }
1060 
1061     if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) {
1062         nxt_unit_req_alert(req, "Python failed to set 'msg.type' item");
1063 
1064         Py_DECREF(msg);
1065 
1066         return PyErr_Format(PyExc_RuntimeError,
1067                             "failed to set 'msg.type' item");
1068     }
1069 
1070     return msg;
1071 }
1072 
1073 
1074 PyObject *
1075 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
1076     PyObject *spec_version)
1077 {
1078     PyObject  *scope, *asgi;
1079 
1080     scope = PyDict_New();
1081     if (nxt_slow_path(scope == NULL)) {
1082         nxt_unit_req_alert(req, "Python failed to create 'scope' dict");
1083         nxt_python_print_exception();
1084 
1085         return PyErr_Format(PyExc_RuntimeError,
1086                             "failed to create 'scope' dict");
1087     }
1088 
1089     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) {
1090         nxt_unit_req_alert(req, "Python failed to set 'scope.type' item");
1091 
1092         Py_DECREF(scope);
1093 
1094         return PyErr_Format(PyExc_RuntimeError,
1095                             "failed to set 'scope.type' item");
1096     }
1097 
1098     asgi = PyDict_New();
1099     if (nxt_slow_path(asgi == NULL)) {
1100         nxt_unit_req_alert(req, "Python failed to create 'asgi' dict");
1101         nxt_python_print_exception();
1102 
1103         Py_DECREF(scope);
1104 
1105         return PyErr_Format(PyExc_RuntimeError,
1106                             "failed to create 'asgi' dict");
1107     }
1108 
1109     if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) {
1110         nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item");
1111 
1112         Py_DECREF(asgi);
1113         Py_DECREF(scope);
1114 
1115         return PyErr_Format(PyExc_RuntimeError,
1116                             "failed to set 'scope.asgi' item");
1117     }
1118 
1119     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str,
1120                                      nxt_py_3_0_str) == -1))
1121     {
1122         nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item");
1123 
1124         Py_DECREF(asgi);
1125         Py_DECREF(scope);
1126 
1127         return PyErr_Format(PyExc_RuntimeError,
1128                             "failed to set 'asgi.version' item");
1129     }
1130 
1131     if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str,
1132                                      spec_version) == -1))
1133     {
1134         nxt_unit_req_alert(req,
1135                            "Python failed to set 'asgi.spec_version' item");
1136 
1137         Py_DECREF(asgi);
1138         Py_DECREF(scope);
1139 
1140         return PyErr_Format(PyExc_RuntimeError,
1141                             "failed to set 'asgi.spec_version' item");
1142     }
1143 
1144     Py_DECREF(asgi);
1145 
1146     return scope;
1147 }
1148 
1149 
1150 void
1151 nxt_py_asgi_dealloc(PyObject *self)
1152 {
1153     PyObject_Del(self);
1154 }
1155 
1156 
1157 PyObject *
1158 nxt_py_asgi_await(PyObject *self)
1159 {
1160     Py_INCREF(self);
1161     return self;
1162 }
1163 
1164 
1165 PyObject *
1166 nxt_py_asgi_iter(PyObject *self)
1167 {
1168     Py_INCREF(self);
1169     return self;
1170 }
1171 
1172 
1173 PyObject *
1174 nxt_py_asgi_next(PyObject *self)
1175 {
1176     return NULL;
1177 }
1178 
1179 
1180 void
1181 nxt_python_asgi_done(void)
1182 {
1183     nxt_py_asgi_str_done();
1184 
1185     Py_XDECREF(nxt_py_quit_future);
1186     Py_XDECREF(nxt_py_quit_future_set_result);
1187     Py_XDECREF(nxt_py_loop_run_until_complete);
1188     Py_XDECREF(nxt_py_loop_create_future);
1189     Py_XDECREF(nxt_py_loop_create_task);
1190     Py_XDECREF(nxt_py_loop_call_soon);
1191     Py_XDECREF(nxt_py_loop_add_reader);
1192     Py_XDECREF(nxt_py_loop_remove_reader);
1193     Py_XDECREF(nxt_py_port_read);
1194 }
1195 
1196 #else /* !(NXT_HAVE_ASGI) */
1197 
1198 
1199 int
1200 nxt_python_asgi_check(PyObject *obj)
1201 {
1202     return 0;
1203 }
1204 
1205 
1206 nxt_int_t
1207 nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
1208 {
1209     nxt_alert(task, "ASGI not implemented");
1210     return NXT_ERROR;
1211 }
1212 
1213 
1214 nxt_int_t
1215 nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
1216 {
1217     nxt_unit_alert(ctx, "ASGI not implemented");
1218     return NXT_ERROR;
1219 }
1220 
1221 
1222 void
1223 nxt_python_asgi_done(void)
1224 {
1225 }
1226 
1227 #endif /* NXT_HAVE_ASGI */
1228