nxt_python_asgi.c (1715:95874fd97501) nxt_python_asgi.c (1767:582a004c73f8)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6
7#include <python/nxt_python.h>
8
9#if (NXT_HAVE_ASGI)
10
11#include <nxt_main.h>
12#include <nxt_unit.h>
13#include <nxt_unit_request.h>
14#include <nxt_unit_response.h>
15#include <python/nxt_python_asgi.h>
16#include <python/nxt_python_asgi_str.h>
17
18
19static PyObject *nxt_python_asgi_get_func(PyObject *obj);
20static int nxt_python_asgi_ctx_data_alloc(void **pdata);
21static void nxt_python_asgi_ctx_data_free(void *data);
22static int nxt_python_asgi_startup(void *data);
23static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
24
25static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
26 nxt_unit_port_t *port);
27static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
28static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req);
29
30static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
31static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
32 uint16_t port);
33static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
34static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
35
36static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx);
37
38static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
39static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
40static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
41static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
42
43static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
44static void nxt_python_asgi_done(void);
45
46
47int nxt_py_asgi_legacy;
48static PyObject *nxt_py_port_read;
49static nxt_unit_port_t *nxt_py_shared_port;
50
51static PyMethodDef nxt_py_port_read_method =
52 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
53
54static nxt_python_proto_t nxt_py_asgi_proto = {
55 .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc,
56 .ctx_data_free = nxt_python_asgi_ctx_data_free,
57 .startup = nxt_python_asgi_startup,
58 .run = nxt_python_asgi_run,
59 .ready = nxt_python_asgi_ready,
60 .done = nxt_python_asgi_done,
61};
62
63#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A
64
65
66int
67nxt_python_asgi_check(PyObject *obj)
68{
69 int res;
70 PyObject *func;
71 PyCodeObject *code;
72
73 func = nxt_python_asgi_get_func(obj);
74
75 if (func == NULL) {
76 return 0;
77 }
78
79 code = (PyCodeObject *) PyFunction_GET_CODE(func);
80
81 nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with "
82 "%d argument(s)",
83 (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ",
84 code->co_argcount);
85
86 res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1;
87
88 Py_DECREF(func);
89
90 return res;
91}
92
93
94static PyObject *
95nxt_python_asgi_get_func(PyObject *obj)
96{
97 PyObject *call;
98
99 if (PyFunction_Check(obj)) {
100 Py_INCREF(obj);
101 return obj;
102 }
103
104 if (PyMethod_Check(obj)) {
105 obj = PyMethod_GET_FUNCTION(obj);
106
107 Py_INCREF(obj);
108 return obj;
109 }
110
111 call = PyObject_GetAttrString(obj, "__call__");
112
113 if (call == NULL) {
114 return NULL;
115 }
116
117 if (PyFunction_Check(call)) {
118 return call;
119 }
120
121 if (PyMethod_Check(call)) {
122 obj = PyMethod_GET_FUNCTION(call);
123
124 Py_INCREF(obj);
125 Py_DECREF(call);
126
127 return obj;
128 }
129
130 Py_DECREF(call);
131
132 return NULL;
133}
134
135
136int
137nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
138{
139 PyObject *func;
140 PyCodeObject *code;
141
142 nxt_unit_debug(NULL, "asgi_init");
143
144 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) {
145 nxt_unit_alert(NULL, "Python failed to init string objects");
146 return NXT_UNIT_ERROR;
147 }
148
149 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
150 if (nxt_slow_path(nxt_py_port_read == NULL)) {
151 nxt_unit_alert(NULL,
152 "Python failed to initialize the 'port_read' function");
153 return NXT_UNIT_ERROR;
154 }
155
156 if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) {
157 return NXT_UNIT_ERROR;
158 }
159
160 if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) {
161 return NXT_UNIT_ERROR;
162 }
163
164 func = nxt_python_asgi_get_func(nxt_py_application);
165 if (nxt_slow_path(func == NULL)) {
166 nxt_unit_alert(NULL, "Python cannot find function for callable");
167 return NXT_UNIT_ERROR;
168 }
169
170 code = (PyCodeObject *) PyFunction_GET_CODE(func);
171
172 if ((code->co_flags & CO_COROUTINE) == 0) {
173 nxt_unit_debug(NULL, "asgi: callable is not a coroutine function "
174 "switching to legacy mode");
175 nxt_py_asgi_legacy = 1;
176 }
177
178 Py_DECREF(func);
179
180 init->callbacks.request_handler = nxt_py_asgi_request_handler;
181 init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
182 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
183 init->callbacks.close_handler = nxt_py_asgi_close_handler;
184 init->callbacks.quit = nxt_py_asgi_quit;
185 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
186 init->callbacks.add_port = nxt_py_asgi_add_port;
187 init->callbacks.remove_port = nxt_py_asgi_remove_port;
188
189 *proto = nxt_py_asgi_proto;
190
191 return NXT_UNIT_OK;
192}
193
194
195static int
196nxt_python_asgi_ctx_data_alloc(void **pdata)
197{
198 uint32_t i;
199 PyObject *asyncio, *loop, *new_event_loop, *obj;
200 nxt_py_asgi_ctx_data_t *ctx_data;
201
202 ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t));
203 if (nxt_slow_path(ctx_data == NULL)) {
204 nxt_unit_alert(NULL, "Failed to allocate context data");
205 return NXT_UNIT_ERROR;
206 }
207
208 memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t));
209
210 nxt_queue_init(&ctx_data->drain_queue);
211
212 struct {
213 const char *key;
214 PyObject **handler;
215
216 } handlers[] = {
217 { "create_task", &ctx_data->loop_create_task },
218 { "add_reader", &ctx_data->loop_add_reader },
219 { "remove_reader", &ctx_data->loop_remove_reader },
220 { "call_soon", &ctx_data->loop_call_soon },
221 { "run_until_complete", &ctx_data->loop_run_until_complete },
222 { "create_future", &ctx_data->loop_create_future },
223 };
224
225 loop = NULL;
226
227 asyncio = PyImport_ImportModule("asyncio");
228 if (nxt_slow_path(asyncio == NULL)) {
229 nxt_unit_alert(NULL, "Python failed to import module 'asyncio'");
230 nxt_python_print_exception();
231 goto fail;
232 }
233
234 new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
235 "new_event_loop");
236 if (nxt_slow_path(new_event_loop == NULL)) {
237 nxt_unit_alert(NULL,
238 "Python failed to get 'new_event_loop' from module 'asyncio'");
239 goto fail;
240 }
241
242 if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) {
243 nxt_unit_alert(NULL,
244 "'asyncio.new_event_loop' is not a callable object");
245 goto fail;
246 }
247
248 loop = PyObject_CallObject(new_event_loop, NULL);
249 if (nxt_slow_path(loop == NULL)) {
250 nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'");
251 goto fail;
252 }
253
254 for (i = 0; i < nxt_nitems(handlers); i++) {
255 obj = PyObject_GetAttrString(loop, handlers[i].key);
256 if (nxt_slow_path(obj == NULL)) {
257 nxt_unit_alert(NULL, "Python failed to get 'loop.%s'",
258 handlers[i].key);
259 goto fail;
260 }
261
262 *handlers[i].handler = obj;
263
264 if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
265 nxt_unit_alert(NULL, "'loop.%s' is not a callable object",
266 handlers[i].key);
267 goto fail;
268 }
269 }
270
271 obj = PyObject_CallObject(ctx_data->loop_create_future, NULL);
272 if (nxt_slow_path(obj == NULL)) {
273 nxt_unit_alert(NULL, "Python failed to create Future ");
274 nxt_python_print_exception();
275 goto fail;
276 }
277
278 ctx_data->quit_future = obj;
279
280 obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result");
281 if (nxt_slow_path(obj == NULL)) {
282 nxt_unit_alert(NULL, "Python failed to get 'future.set_result'");
283 goto fail;
284 }
285
286 ctx_data->quit_future_set_result = obj;
287
288 if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
289 nxt_unit_alert(NULL, "'future.set_result' is not a callable object");
290 goto fail;
291 }
292
293 Py_DECREF(loop);
294 Py_DECREF(asyncio);
295
296 *pdata = ctx_data;
297
298 return NXT_UNIT_OK;
299
300fail:
301
302 nxt_python_asgi_ctx_data_free(ctx_data);
303
304 Py_XDECREF(loop);
305 Py_XDECREF(asyncio);
306
307 return NXT_UNIT_ERROR;
308}
309
310
311static void
312nxt_python_asgi_ctx_data_free(void *data)
313{
314 nxt_py_asgi_ctx_data_t *ctx_data;
315
316 ctx_data = data;
317
318 Py_XDECREF(ctx_data->loop_run_until_complete);
319 Py_XDECREF(ctx_data->loop_create_future);
320 Py_XDECREF(ctx_data->loop_create_task);
321 Py_XDECREF(ctx_data->loop_call_soon);
322 Py_XDECREF(ctx_data->loop_add_reader);
323 Py_XDECREF(ctx_data->loop_remove_reader);
324 Py_XDECREF(ctx_data->quit_future);
325 Py_XDECREF(ctx_data->quit_future_set_result);
326
327 nxt_unit_free(NULL, ctx_data);
328}
329
330
331static int
332nxt_python_asgi_startup(void *data)
333{
334 return nxt_py_asgi_lifespan_startup(data);
335}
336
337
338static int
339nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
340{
341 PyObject *res;
342 nxt_py_asgi_ctx_data_t *ctx_data;
343
344 ctx_data = ctx->data;
345
346 res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete,
347 ctx_data->quit_future, NULL);
348 if (nxt_slow_path(res == NULL)) {
349 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
350 nxt_python_print_exception();
351
352 return NXT_UNIT_ERROR;
353 }
354
355 Py_DECREF(res);
356
357 nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port);
358 nxt_py_asgi_remove_reader(ctx, ctx_data->port);
359
360 if (ctx_data->port != NULL) {
361 ctx_data->port->data = NULL;
362 ctx_data->port = NULL;
363 }
364
365 nxt_py_asgi_lifespan_shutdown(ctx);
366
367 return NXT_UNIT_OK;
368}
369
370
371static void
372nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
373{
374 PyObject *res, *fd;
375 nxt_py_asgi_ctx_data_t *ctx_data;
376
377 if (port == NULL || port->in_fd == -1) {
378 return;
379 }
380
381 ctx_data = ctx->data;
382
383 nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port);
384
385 fd = PyLong_FromLong(port->in_fd);
386 if (nxt_slow_path(fd == NULL)) {
387 nxt_unit_alert(ctx, "Python failed to create Long object");
388 nxt_python_print_exception();
389
390 return;
391 }
392
393 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL);
394 if (nxt_slow_path(res == NULL)) {
395 nxt_unit_alert(ctx, "Python failed to remove_reader");
396 nxt_python_print_exception();
397
398 } else {
399 Py_DECREF(res);
400 }
401
402 Py_DECREF(fd);
403}
404
405
406static void
407nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
408{
409 PyObject *scope, *res, *task, *receive, *send, *done, *asgi;
410 PyObject *stage2;
411 nxt_py_asgi_ctx_data_t *ctx_data;
412
413 if (req->request->websocket_handshake) {
414 asgi = nxt_py_asgi_websocket_create(req);
415
416 } else {
417 asgi = nxt_py_asgi_http_create(req);
418 }
419
420 if (nxt_slow_path(asgi == NULL)) {
421 nxt_unit_req_alert(req, "Python failed to create asgi object");
422 nxt_unit_request_done(req, NXT_UNIT_ERROR);
423
424 return;
425 }
426
427 receive = PyObject_GetAttrString(asgi, "receive");
428 if (nxt_slow_path(receive == NULL)) {
429 nxt_unit_req_alert(req, "Python failed to get 'receive' method");
430 nxt_unit_request_done(req, NXT_UNIT_ERROR);
431
432 goto release_asgi;
433 }
434
435 send = PyObject_GetAttrString(asgi, "send");
436 if (nxt_slow_path(receive == NULL)) {
437 nxt_unit_req_alert(req, "Python failed to get 'send' method");
438 nxt_unit_request_done(req, NXT_UNIT_ERROR);
439
440 goto release_receive;
441 }
442
443 done = PyObject_GetAttrString(asgi, "_done");
444 if (nxt_slow_path(receive == NULL)) {
445 nxt_unit_req_alert(req, "Python failed to get '_done' method");
446 nxt_unit_request_done(req, NXT_UNIT_ERROR);
447
448 goto release_send;
449 }
450
451 scope = nxt_py_asgi_create_http_scope(req);
452 if (nxt_slow_path(scope == NULL)) {
453 nxt_unit_request_done(req, NXT_UNIT_ERROR);
454
455 goto release_done;
456 }
457
458 req->data = asgi;
459
460 if (!nxt_py_asgi_legacy) {
461 nxt_unit_req_debug(req, "Python call ASGI 3.0 application");
462
463 res = PyObject_CallFunctionObjArgs(nxt_py_application,
464 scope, receive, send, NULL);
465
466 } else {
467 nxt_unit_req_debug(req, "Python call legacy application");
468
469 res = PyObject_CallFunctionObjArgs(nxt_py_application, scope, NULL);
470
471 if (nxt_slow_path(res == NULL)) {
472 nxt_unit_req_error(req, "Python failed to call legacy app stage1");
473 nxt_python_print_exception();
474 nxt_unit_request_done(req, NXT_UNIT_ERROR);
475
476 goto release_scope;
477 }
478
479 if (nxt_slow_path(PyCallable_Check(res) == 0)) {
480 nxt_unit_req_error(req,
481 "Legacy ASGI application returns not a callable");
482 nxt_unit_request_done(req, NXT_UNIT_ERROR);
483
484 Py_DECREF(res);
485
486 goto release_scope;
487 }
488
489 stage2 = res;
490
491 res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL);
492
493 Py_DECREF(stage2);
494 }
495
496 if (nxt_slow_path(res == NULL)) {
497 nxt_unit_req_error(req, "Python failed to call the application");
498 nxt_python_print_exception();
499 nxt_unit_request_done(req, NXT_UNIT_ERROR);
500
501 goto release_scope;
502 }
503
504 if (nxt_slow_path(!PyCoro_CheckExact(res))) {
505 nxt_unit_req_error(req, "Application result type is not a coroutine");
506 nxt_unit_request_done(req, NXT_UNIT_ERROR);
507
508 Py_DECREF(res);
509
510 goto release_scope;
511 }
512
513 ctx_data = req->ctx->data;
514
515 task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL);
516 if (nxt_slow_path(task == NULL)) {
517 nxt_unit_req_error(req, "Python failed to call the create_task");
518 nxt_python_print_exception();
519 nxt_unit_request_done(req, NXT_UNIT_ERROR);
520
521 Py_DECREF(res);
522
523 goto release_scope;
524 }
525
526 Py_DECREF(res);
527
528 res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done,
529 NULL);
530 if (nxt_slow_path(res == NULL)) {
531 nxt_unit_req_error(req,
532 "Python failed to call 'task.add_done_callback'");
533 nxt_python_print_exception();
534 nxt_unit_request_done(req, NXT_UNIT_ERROR);
535
536 goto release_task;
537 }
538
539 Py_DECREF(res);
540release_task:
541 Py_DECREF(task);
542release_scope:
543 Py_DECREF(scope);
544release_done:
545 Py_DECREF(done);
546release_send:
547 Py_DECREF(send);
548release_receive:
549 Py_DECREF(receive);
550release_asgi:
551 Py_DECREF(asgi);
552}
553
554
555static void
556nxt_py_asgi_close_handler(nxt_unit_request_info_t *req)
557{
558 if (req->request->websocket_handshake) {
559 nxt_py_asgi_websocket_close_handler(req);
560
561 } else {
562 nxt_py_asgi_http_close_handler(req);
563 }
564}
565
566
567static PyObject *
568nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
569{
570 char *p, *target, *query;
571 uint32_t target_length, i;
572 PyObject *scope, *v, *type, *scheme;
573 PyObject *headers, *header;
574 nxt_unit_field_t *f;
575 nxt_unit_request_t *r;
576
577 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol");
578
579#define SET_ITEM(dict, key, value) \
580 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \
581 == -1)) \
582 { \
583 nxt_unit_req_alert(req, "Python failed to set '" \
584 #dict "." #key "' item"); \
585 goto fail; \
586 }
587
588 v = NULL;
589 headers = NULL;
590
591 r = req->request;
592
593 if (r->websocket_handshake) {
594 type = nxt_py_websocket_str;
595 scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str;
596
597 } else {
598 type = nxt_py_http_str;
599 scheme = r->tls ? nxt_py_https_str : nxt_py_http_str;
600 }
601
602 scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str);
603 if (nxt_slow_path(scope == NULL)) {
604 return NULL;
605 }
606
607 p = nxt_unit_sptr_get(&r->version);
608 SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str
609 : nxt_py_1_0_str)
610 SET_ITEM(scope, scheme, scheme)
611
612 v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method),
613 r->method_length);
614 if (nxt_slow_path(v == NULL)) {
615 nxt_unit_req_alert(req, "Python failed to create 'method' string");
616 goto fail;
617 }
618
619 SET_ITEM(scope, method, v)
620 Py_DECREF(v);
621
622 v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length,
623 "replace");
624 if (nxt_slow_path(v == NULL)) {
625 nxt_unit_req_alert(req, "Python failed to create 'path' string");
626 goto fail;
627 }
628
629 SET_ITEM(scope, path, v)
630 Py_DECREF(v);
631
632 target = nxt_unit_sptr_get(&r->target);
633 query = nxt_unit_sptr_get(&r->query);
634
635 if (r->query.offset != 0) {
636 target_length = query - target - 1;
637
638 } else {
639 target_length = r->target_length;
640 }
641
642 v = PyBytes_FromStringAndSize(target, target_length);
643 if (nxt_slow_path(v == NULL)) {
644 nxt_unit_req_alert(req, "Python failed to create 'raw_path' string");
645 goto fail;
646 }
647
648 SET_ITEM(scope, raw_path, v)
649 Py_DECREF(v);
650
651 v = PyBytes_FromStringAndSize(query, r->query_length);
652 if (nxt_slow_path(v == NULL)) {
653 nxt_unit_req_alert(req, "Python failed to create 'query' string");
654 goto fail;
655 }
656
657 SET_ITEM(scope, query_string, v)
658 Py_DECREF(v);
659
660 v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0);
661 if (nxt_slow_path(v == NULL)) {
662 nxt_unit_req_alert(req, "Python failed to create 'client' pair");
663 goto fail;
664 }
665
666 SET_ITEM(scope, client, v)
667 Py_DECREF(v);
668
669 v = nxt_py_asgi_create_address(&r->local, r->local_length, 80);
670 if (nxt_slow_path(v == NULL)) {
671 nxt_unit_req_alert(req, "Python failed to create 'server' pair");
672 goto fail;
673 }
674
675 SET_ITEM(scope, server, v)
676 Py_DECREF(v);
677
678 v = NULL;
679
680 headers = PyTuple_New(r->fields_count);
681 if (nxt_slow_path(headers == NULL)) {
682 nxt_unit_req_alert(req, "Python failed to create 'headers' object");
683 goto fail;
684 }
685
686 for (i = 0; i < r->fields_count; i++) {
687 f = r->fields + i;
688
689 header = nxt_py_asgi_create_header(f);
690 if (nxt_slow_path(header == NULL)) {
691 nxt_unit_req_alert(req, "Python failed to create 'header' pair");
692 goto fail;
693 }
694
695 PyTuple_SET_ITEM(headers, i, header);
696
697 if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL
698 && f->name_length == ws_protocol.length
699 && f->value_length > 0
700 && r->websocket_handshake)
701 {
702 v = nxt_py_asgi_create_subprotocols(f);
703 if (nxt_slow_path(v == NULL)) {
704 nxt_unit_req_alert(req, "Failed to create subprotocols");
705 goto fail;
706 }
707
708 SET_ITEM(scope, subprotocols, v);
709 Py_DECREF(v);
710 }
711 }
712
713 SET_ITEM(scope, headers, headers)
714 Py_DECREF(headers);
715
716 return scope;
717
718fail:
719
720 Py_XDECREF(v);
721 Py_XDECREF(headers);
722 Py_DECREF(scope);
723
724 return NULL;
725
726#undef SET_ITEM
727}
728
729
730static PyObject *
731nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port)
732{
733 char *p, *s;
734 PyObject *pair, *v;
735
736 pair = PyTuple_New(2);
737 if (nxt_slow_path(pair == NULL)) {
738 return NULL;
739 }
740
741 p = nxt_unit_sptr_get(sptr);
742 s = memchr(p, ':', len);
743
744 v = PyString_FromStringAndSize(p, s == NULL ? len : s - p);
745 if (nxt_slow_path(v == NULL)) {
746 Py_DECREF(pair);
747
748 return NULL;
749 }
750
751 PyTuple_SET_ITEM(pair, 0, v);
752
753 if (s != NULL) {
754 p += len;
755 v = PyLong_FromString(s + 1, &p, 10);
756
757 } else {
758 v = PyLong_FromLong(port);
759 }
760
761 if (nxt_slow_path(v == NULL)) {
762 Py_DECREF(pair);
763
764 return NULL;
765 }
766
767 PyTuple_SET_ITEM(pair, 1, v);
768
769 return pair;
770}
771
772
773static PyObject *
774nxt_py_asgi_create_header(nxt_unit_field_t *f)
775{
776 char c, *name;
777 uint8_t pos;
778 PyObject *header, *v;
779
780 header = PyTuple_New(2);
781 if (nxt_slow_path(header == NULL)) {
782 return NULL;
783 }
784
785 name = nxt_unit_sptr_get(&f->name);
786
787 for (pos = 0; pos < f->name_length; pos++) {
788 c = name[pos];
789 if (c >= 'A' && c <= 'Z') {
790 name[pos] = (c | 0x20);
791 }
792 }
793
794 v = PyBytes_FromStringAndSize(name, f->name_length);
795 if (nxt_slow_path(v == NULL)) {
796 Py_DECREF(header);
797
798 return NULL;
799 }
800
801 PyTuple_SET_ITEM(header, 0, v);
802
803 v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value),
804 f->value_length);
805 if (nxt_slow_path(v == NULL)) {
806 Py_DECREF(header);
807
808 return NULL;
809 }
810
811 PyTuple_SET_ITEM(header, 1, v);
812
813 return header;
814}
815
816
817static PyObject *
818nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f)
819{
820 char *v;
821 uint32_t i, n, start;
822 PyObject *res, *proto;
823
824 v = nxt_unit_sptr_get(&f->value);
825 n = 1;
826
827 for (i = 0; i < f->value_length; i++) {
828 if (v[i] == ',') {
829 n++;
830 }
831 }
832
833 res = PyTuple_New(n);
834 if (nxt_slow_path(res == NULL)) {
835 return NULL;
836 }
837
838 n = 0;
839 start = 0;
840
841 for (i = 0; i < f->value_length; ) {
842 if (v[i] != ',') {
843 i++;
844
845 continue;
846 }
847
848 if (i - start > 0) {
849 proto = PyString_FromStringAndSize(v + start, i - start);
850 if (nxt_slow_path(proto == NULL)) {
851 goto fail;
852 }
853
854 PyTuple_SET_ITEM(res, n, proto);
855
856 n++;
857 }
858
859 do {
860 i++;
861 } while (i < f->value_length && v[i] == ' ');
862
863 start = i;
864 }
865
866 if (i - start > 0) {
867 proto = PyString_FromStringAndSize(v + start, i - start);
868 if (nxt_slow_path(proto == NULL)) {
869 goto fail;
870 }
871
872 PyTuple_SET_ITEM(res, n, proto);
873 }
874
875 return res;
876
877fail:
878
879 Py_DECREF(res);
880
881 return NULL;
882}
883
884
885static int
886nxt_python_asgi_ready(nxt_unit_ctx_t *ctx)
887{
888 int rc;
889 PyObject *res, *fd, *py_ctx, *py_port;
890 nxt_unit_port_t *port;
891 nxt_py_asgi_ctx_data_t *ctx_data;
892
893 if (nxt_slow_path(nxt_py_shared_port == NULL)) {
894 return NXT_UNIT_ERROR;
895 }
896
897 port = nxt_py_shared_port;
898
899 nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port);
900
901 ctx_data = ctx->data;
902
903 rc = NXT_UNIT_ERROR;
904
905 fd = PyLong_FromLong(port->in_fd);
906 if (nxt_slow_path(fd == NULL)) {
907 nxt_unit_alert(ctx, "Python failed to create fd");
908 nxt_python_print_exception();
909
910 return rc;
911 }
912
913 py_ctx = PyLong_FromVoidPtr(ctx);
914 if (nxt_slow_path(py_ctx == NULL)) {
915 nxt_unit_alert(ctx, "Python failed to create py_ctx");
916 nxt_python_print_exception();
917
918 goto clean_fd;
919 }
920
921 py_port = PyLong_FromVoidPtr(port);
922 if (nxt_slow_path(py_port == NULL)) {
923 nxt_unit_alert(ctx, "Python failed to create py_port");
924 nxt_python_print_exception();
925
926 goto clean_py_ctx;
927 }
928
929 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
930 fd, nxt_py_port_read,
931 py_ctx, py_port, NULL);
932 if (nxt_slow_path(res == NULL)) {
933 nxt_unit_alert(ctx, "Python failed to add_reader");
934 nxt_python_print_exception();
935
936 } else {
937 Py_DECREF(res);
938
939 rc = NXT_UNIT_OK;
940 }
941
942 Py_DECREF(py_port);
943
944clean_py_ctx:
945
946 Py_DECREF(py_ctx);
947
948clean_fd:
949
950 Py_DECREF(fd);
951
952 return rc;
953}
954
955
956static int
957nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
958{
959 int nb, rc;
960 PyObject *res, *fd, *py_ctx, *py_port;
961 nxt_py_asgi_ctx_data_t *ctx_data;
962
963 if (port->in_fd == -1) {
964 return NXT_UNIT_OK;
965 }
966
967 nb = 1;
968
969 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
970 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
971 port->in_fd, strerror(errno), errno);
972
973 return NXT_UNIT_ERROR;
974 }
975
976 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
977
978 if (port->id.id == NXT_UNIT_SHARED_PORT_ID) {
979 nxt_py_shared_port = port;
980
981 return NXT_UNIT_OK;
982 }
983
984 ctx_data = ctx->data;
985
986 ctx_data->port = port;
987 port->data = ctx_data;
988
989 rc = NXT_UNIT_ERROR;
990
991 fd = PyLong_FromLong(port->in_fd);
992 if (nxt_slow_path(fd == NULL)) {
993 nxt_unit_alert(ctx, "Python failed to create fd");
994 nxt_python_print_exception();
995
996 return rc;
997 }
998
999 py_ctx = PyLong_FromVoidPtr(ctx);
1000 if (nxt_slow_path(py_ctx == NULL)) {
1001 nxt_unit_alert(ctx, "Python failed to create py_ctx");
1002 nxt_python_print_exception();
1003
1004 goto clean_fd;
1005 }
1006
1007 py_port = PyLong_FromVoidPtr(port);
1008 if (nxt_slow_path(py_port == NULL)) {
1009 nxt_unit_alert(ctx, "Python failed to create py_port");
1010 nxt_python_print_exception();
1011
1012 goto clean_py_ctx;
1013 }
1014
1015 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
1016 fd, nxt_py_port_read,
1017 py_ctx, py_port, NULL);
1018 if (nxt_slow_path(res == NULL)) {
1019 nxt_unit_alert(ctx, "Python failed to add_reader");
1020 nxt_python_print_exception();
1021
1022 } else {
1023 Py_DECREF(res);
1024
1025 rc = NXT_UNIT_OK;
1026 }
1027
1028 Py_DECREF(py_port);
1029
1030clean_py_ctx:
1031
1032 Py_DECREF(py_ctx);
1033
1034clean_fd:
1035
1036 Py_DECREF(fd);
1037
1038 return rc;
1039}
1040
1041
1042static void
1043nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
1044{
1045 if (port->in_fd == -1) {
1046 return;
1047 }
1048
1049 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
1050
1051 if (nxt_py_shared_port == port) {
1052 nxt_py_shared_port = NULL;
1053 }
1054}
1055
1056
1057static void
1058nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
1059{
1060 PyObject *res, *p;
1061 nxt_py_asgi_ctx_data_t *ctx_data;
1062
1063 nxt_unit_debug(ctx, "asgi_quit %p", ctx);
1064
1065 ctx_data = ctx->data;
1066
1067 if (nxt_py_shared_port != NULL) {
1068 p = PyLong_FromLong(nxt_py_shared_port->in_fd);
1069 if (nxt_slow_path(p == NULL)) {
1070 nxt_unit_alert(NULL, "Python failed to create Long");
1071 nxt_python_print_exception();
1072
1073 } else {
1074 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader,
1075 p, NULL);
1076 if (nxt_slow_path(res == NULL)) {
1077 nxt_unit_alert(NULL, "Python failed to remove_reader");
1078 nxt_python_print_exception();
1079
1080 } else {
1081 Py_DECREF(res);
1082 }
1083
1084 Py_DECREF(p);
1085 }
1086 }
1087
1088 p = PyLong_FromLong(0);
1089 if (nxt_slow_path(p == NULL)) {
1090 nxt_unit_alert(NULL, "Python failed to create Long");
1091 nxt_python_print_exception();
1092
1093 } else {
1094 res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result,
1095 p, NULL);
1096 if (nxt_slow_path(res == NULL)) {
1097 nxt_unit_alert(ctx, "Python failed to set_result");
1098 nxt_python_print_exception();
1099
1100 } else {
1101 Py_DECREF(res);
1102 }
1103
1104 Py_DECREF(p);
1105 }
1106}
1107
1108
1109static void
1110nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
1111{
1112 int rc;
1113 nxt_queue_link_t *lnk;
1114 nxt_py_asgi_ctx_data_t *ctx_data;
1115
1116 ctx_data = ctx->data;
1117
1118 while (!nxt_queue_is_empty(&ctx_data->drain_queue)) {
1119 lnk = nxt_queue_first(&ctx_data->drain_queue);
1120
1121 rc = nxt_py_asgi_http_drain(lnk);
1122 if (rc == NXT_UNIT_AGAIN) {
1123 return;
1124 }
1125
1126 nxt_queue_remove(lnk);
1127 }
1128}
1129
1130
1131static PyObject *
1132nxt_py_asgi_port_read(PyObject *self, PyObject *args)
1133{
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6
7#include <python/nxt_python.h>
8
9#if (NXT_HAVE_ASGI)
10
11#include <nxt_main.h>
12#include <nxt_unit.h>
13#include <nxt_unit_request.h>
14#include <nxt_unit_response.h>
15#include <python/nxt_python_asgi.h>
16#include <python/nxt_python_asgi_str.h>
17
18
19static PyObject *nxt_python_asgi_get_func(PyObject *obj);
20static int nxt_python_asgi_ctx_data_alloc(void **pdata);
21static void nxt_python_asgi_ctx_data_free(void *data);
22static int nxt_python_asgi_startup(void *data);
23static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
24
25static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
26 nxt_unit_port_t *port);
27static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
28static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req);
29
30static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
31static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
32 uint16_t port);
33static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
34static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
35
36static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx);
37
38static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
39static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
40static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
41static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
42
43static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
44static void nxt_python_asgi_done(void);
45
46
47int nxt_py_asgi_legacy;
48static PyObject *nxt_py_port_read;
49static nxt_unit_port_t *nxt_py_shared_port;
50
51static PyMethodDef nxt_py_port_read_method =
52 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
53
54static nxt_python_proto_t nxt_py_asgi_proto = {
55 .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc,
56 .ctx_data_free = nxt_python_asgi_ctx_data_free,
57 .startup = nxt_python_asgi_startup,
58 .run = nxt_python_asgi_run,
59 .ready = nxt_python_asgi_ready,
60 .done = nxt_python_asgi_done,
61};
62
63#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A
64
65
66int
67nxt_python_asgi_check(PyObject *obj)
68{
69 int res;
70 PyObject *func;
71 PyCodeObject *code;
72
73 func = nxt_python_asgi_get_func(obj);
74
75 if (func == NULL) {
76 return 0;
77 }
78
79 code = (PyCodeObject *) PyFunction_GET_CODE(func);
80
81 nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with "
82 "%d argument(s)",
83 (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ",
84 code->co_argcount);
85
86 res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1;
87
88 Py_DECREF(func);
89
90 return res;
91}
92
93
94static PyObject *
95nxt_python_asgi_get_func(PyObject *obj)
96{
97 PyObject *call;
98
99 if (PyFunction_Check(obj)) {
100 Py_INCREF(obj);
101 return obj;
102 }
103
104 if (PyMethod_Check(obj)) {
105 obj = PyMethod_GET_FUNCTION(obj);
106
107 Py_INCREF(obj);
108 return obj;
109 }
110
111 call = PyObject_GetAttrString(obj, "__call__");
112
113 if (call == NULL) {
114 return NULL;
115 }
116
117 if (PyFunction_Check(call)) {
118 return call;
119 }
120
121 if (PyMethod_Check(call)) {
122 obj = PyMethod_GET_FUNCTION(call);
123
124 Py_INCREF(obj);
125 Py_DECREF(call);
126
127 return obj;
128 }
129
130 Py_DECREF(call);
131
132 return NULL;
133}
134
135
136int
137nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
138{
139 PyObject *func;
140 PyCodeObject *code;
141
142 nxt_unit_debug(NULL, "asgi_init");
143
144 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) {
145 nxt_unit_alert(NULL, "Python failed to init string objects");
146 return NXT_UNIT_ERROR;
147 }
148
149 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
150 if (nxt_slow_path(nxt_py_port_read == NULL)) {
151 nxt_unit_alert(NULL,
152 "Python failed to initialize the 'port_read' function");
153 return NXT_UNIT_ERROR;
154 }
155
156 if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) {
157 return NXT_UNIT_ERROR;
158 }
159
160 if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) {
161 return NXT_UNIT_ERROR;
162 }
163
164 func = nxt_python_asgi_get_func(nxt_py_application);
165 if (nxt_slow_path(func == NULL)) {
166 nxt_unit_alert(NULL, "Python cannot find function for callable");
167 return NXT_UNIT_ERROR;
168 }
169
170 code = (PyCodeObject *) PyFunction_GET_CODE(func);
171
172 if ((code->co_flags & CO_COROUTINE) == 0) {
173 nxt_unit_debug(NULL, "asgi: callable is not a coroutine function "
174 "switching to legacy mode");
175 nxt_py_asgi_legacy = 1;
176 }
177
178 Py_DECREF(func);
179
180 init->callbacks.request_handler = nxt_py_asgi_request_handler;
181 init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
182 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
183 init->callbacks.close_handler = nxt_py_asgi_close_handler;
184 init->callbacks.quit = nxt_py_asgi_quit;
185 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
186 init->callbacks.add_port = nxt_py_asgi_add_port;
187 init->callbacks.remove_port = nxt_py_asgi_remove_port;
188
189 *proto = nxt_py_asgi_proto;
190
191 return NXT_UNIT_OK;
192}
193
194
195static int
196nxt_python_asgi_ctx_data_alloc(void **pdata)
197{
198 uint32_t i;
199 PyObject *asyncio, *loop, *new_event_loop, *obj;
200 nxt_py_asgi_ctx_data_t *ctx_data;
201
202 ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t));
203 if (nxt_slow_path(ctx_data == NULL)) {
204 nxt_unit_alert(NULL, "Failed to allocate context data");
205 return NXT_UNIT_ERROR;
206 }
207
208 memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t));
209
210 nxt_queue_init(&ctx_data->drain_queue);
211
212 struct {
213 const char *key;
214 PyObject **handler;
215
216 } handlers[] = {
217 { "create_task", &ctx_data->loop_create_task },
218 { "add_reader", &ctx_data->loop_add_reader },
219 { "remove_reader", &ctx_data->loop_remove_reader },
220 { "call_soon", &ctx_data->loop_call_soon },
221 { "run_until_complete", &ctx_data->loop_run_until_complete },
222 { "create_future", &ctx_data->loop_create_future },
223 };
224
225 loop = NULL;
226
227 asyncio = PyImport_ImportModule("asyncio");
228 if (nxt_slow_path(asyncio == NULL)) {
229 nxt_unit_alert(NULL, "Python failed to import module 'asyncio'");
230 nxt_python_print_exception();
231 goto fail;
232 }
233
234 new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
235 "new_event_loop");
236 if (nxt_slow_path(new_event_loop == NULL)) {
237 nxt_unit_alert(NULL,
238 "Python failed to get 'new_event_loop' from module 'asyncio'");
239 goto fail;
240 }
241
242 if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) {
243 nxt_unit_alert(NULL,
244 "'asyncio.new_event_loop' is not a callable object");
245 goto fail;
246 }
247
248 loop = PyObject_CallObject(new_event_loop, NULL);
249 if (nxt_slow_path(loop == NULL)) {
250 nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'");
251 goto fail;
252 }
253
254 for (i = 0; i < nxt_nitems(handlers); i++) {
255 obj = PyObject_GetAttrString(loop, handlers[i].key);
256 if (nxt_slow_path(obj == NULL)) {
257 nxt_unit_alert(NULL, "Python failed to get 'loop.%s'",
258 handlers[i].key);
259 goto fail;
260 }
261
262 *handlers[i].handler = obj;
263
264 if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
265 nxt_unit_alert(NULL, "'loop.%s' is not a callable object",
266 handlers[i].key);
267 goto fail;
268 }
269 }
270
271 obj = PyObject_CallObject(ctx_data->loop_create_future, NULL);
272 if (nxt_slow_path(obj == NULL)) {
273 nxt_unit_alert(NULL, "Python failed to create Future ");
274 nxt_python_print_exception();
275 goto fail;
276 }
277
278 ctx_data->quit_future = obj;
279
280 obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result");
281 if (nxt_slow_path(obj == NULL)) {
282 nxt_unit_alert(NULL, "Python failed to get 'future.set_result'");
283 goto fail;
284 }
285
286 ctx_data->quit_future_set_result = obj;
287
288 if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
289 nxt_unit_alert(NULL, "'future.set_result' is not a callable object");
290 goto fail;
291 }
292
293 Py_DECREF(loop);
294 Py_DECREF(asyncio);
295
296 *pdata = ctx_data;
297
298 return NXT_UNIT_OK;
299
300fail:
301
302 nxt_python_asgi_ctx_data_free(ctx_data);
303
304 Py_XDECREF(loop);
305 Py_XDECREF(asyncio);
306
307 return NXT_UNIT_ERROR;
308}
309
310
311static void
312nxt_python_asgi_ctx_data_free(void *data)
313{
314 nxt_py_asgi_ctx_data_t *ctx_data;
315
316 ctx_data = data;
317
318 Py_XDECREF(ctx_data->loop_run_until_complete);
319 Py_XDECREF(ctx_data->loop_create_future);
320 Py_XDECREF(ctx_data->loop_create_task);
321 Py_XDECREF(ctx_data->loop_call_soon);
322 Py_XDECREF(ctx_data->loop_add_reader);
323 Py_XDECREF(ctx_data->loop_remove_reader);
324 Py_XDECREF(ctx_data->quit_future);
325 Py_XDECREF(ctx_data->quit_future_set_result);
326
327 nxt_unit_free(NULL, ctx_data);
328}
329
330
331static int
332nxt_python_asgi_startup(void *data)
333{
334 return nxt_py_asgi_lifespan_startup(data);
335}
336
337
338static int
339nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
340{
341 PyObject *res;
342 nxt_py_asgi_ctx_data_t *ctx_data;
343
344 ctx_data = ctx->data;
345
346 res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete,
347 ctx_data->quit_future, NULL);
348 if (nxt_slow_path(res == NULL)) {
349 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
350 nxt_python_print_exception();
351
352 return NXT_UNIT_ERROR;
353 }
354
355 Py_DECREF(res);
356
357 nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port);
358 nxt_py_asgi_remove_reader(ctx, ctx_data->port);
359
360 if (ctx_data->port != NULL) {
361 ctx_data->port->data = NULL;
362 ctx_data->port = NULL;
363 }
364
365 nxt_py_asgi_lifespan_shutdown(ctx);
366
367 return NXT_UNIT_OK;
368}
369
370
371static void
372nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
373{
374 PyObject *res, *fd;
375 nxt_py_asgi_ctx_data_t *ctx_data;
376
377 if (port == NULL || port->in_fd == -1) {
378 return;
379 }
380
381 ctx_data = ctx->data;
382
383 nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port);
384
385 fd = PyLong_FromLong(port->in_fd);
386 if (nxt_slow_path(fd == NULL)) {
387 nxt_unit_alert(ctx, "Python failed to create Long object");
388 nxt_python_print_exception();
389
390 return;
391 }
392
393 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL);
394 if (nxt_slow_path(res == NULL)) {
395 nxt_unit_alert(ctx, "Python failed to remove_reader");
396 nxt_python_print_exception();
397
398 } else {
399 Py_DECREF(res);
400 }
401
402 Py_DECREF(fd);
403}
404
405
406static void
407nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
408{
409 PyObject *scope, *res, *task, *receive, *send, *done, *asgi;
410 PyObject *stage2;
411 nxt_py_asgi_ctx_data_t *ctx_data;
412
413 if (req->request->websocket_handshake) {
414 asgi = nxt_py_asgi_websocket_create(req);
415
416 } else {
417 asgi = nxt_py_asgi_http_create(req);
418 }
419
420 if (nxt_slow_path(asgi == NULL)) {
421 nxt_unit_req_alert(req, "Python failed to create asgi object");
422 nxt_unit_request_done(req, NXT_UNIT_ERROR);
423
424 return;
425 }
426
427 receive = PyObject_GetAttrString(asgi, "receive");
428 if (nxt_slow_path(receive == NULL)) {
429 nxt_unit_req_alert(req, "Python failed to get 'receive' method");
430 nxt_unit_request_done(req, NXT_UNIT_ERROR);
431
432 goto release_asgi;
433 }
434
435 send = PyObject_GetAttrString(asgi, "send");
436 if (nxt_slow_path(receive == NULL)) {
437 nxt_unit_req_alert(req, "Python failed to get 'send' method");
438 nxt_unit_request_done(req, NXT_UNIT_ERROR);
439
440 goto release_receive;
441 }
442
443 done = PyObject_GetAttrString(asgi, "_done");
444 if (nxt_slow_path(receive == NULL)) {
445 nxt_unit_req_alert(req, "Python failed to get '_done' method");
446 nxt_unit_request_done(req, NXT_UNIT_ERROR);
447
448 goto release_send;
449 }
450
451 scope = nxt_py_asgi_create_http_scope(req);
452 if (nxt_slow_path(scope == NULL)) {
453 nxt_unit_request_done(req, NXT_UNIT_ERROR);
454
455 goto release_done;
456 }
457
458 req->data = asgi;
459
460 if (!nxt_py_asgi_legacy) {
461 nxt_unit_req_debug(req, "Python call ASGI 3.0 application");
462
463 res = PyObject_CallFunctionObjArgs(nxt_py_application,
464 scope, receive, send, NULL);
465
466 } else {
467 nxt_unit_req_debug(req, "Python call legacy application");
468
469 res = PyObject_CallFunctionObjArgs(nxt_py_application, scope, NULL);
470
471 if (nxt_slow_path(res == NULL)) {
472 nxt_unit_req_error(req, "Python failed to call legacy app stage1");
473 nxt_python_print_exception();
474 nxt_unit_request_done(req, NXT_UNIT_ERROR);
475
476 goto release_scope;
477 }
478
479 if (nxt_slow_path(PyCallable_Check(res) == 0)) {
480 nxt_unit_req_error(req,
481 "Legacy ASGI application returns not a callable");
482 nxt_unit_request_done(req, NXT_UNIT_ERROR);
483
484 Py_DECREF(res);
485
486 goto release_scope;
487 }
488
489 stage2 = res;
490
491 res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL);
492
493 Py_DECREF(stage2);
494 }
495
496 if (nxt_slow_path(res == NULL)) {
497 nxt_unit_req_error(req, "Python failed to call the application");
498 nxt_python_print_exception();
499 nxt_unit_request_done(req, NXT_UNIT_ERROR);
500
501 goto release_scope;
502 }
503
504 if (nxt_slow_path(!PyCoro_CheckExact(res))) {
505 nxt_unit_req_error(req, "Application result type is not a coroutine");
506 nxt_unit_request_done(req, NXT_UNIT_ERROR);
507
508 Py_DECREF(res);
509
510 goto release_scope;
511 }
512
513 ctx_data = req->ctx->data;
514
515 task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL);
516 if (nxt_slow_path(task == NULL)) {
517 nxt_unit_req_error(req, "Python failed to call the create_task");
518 nxt_python_print_exception();
519 nxt_unit_request_done(req, NXT_UNIT_ERROR);
520
521 Py_DECREF(res);
522
523 goto release_scope;
524 }
525
526 Py_DECREF(res);
527
528 res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done,
529 NULL);
530 if (nxt_slow_path(res == NULL)) {
531 nxt_unit_req_error(req,
532 "Python failed to call 'task.add_done_callback'");
533 nxt_python_print_exception();
534 nxt_unit_request_done(req, NXT_UNIT_ERROR);
535
536 goto release_task;
537 }
538
539 Py_DECREF(res);
540release_task:
541 Py_DECREF(task);
542release_scope:
543 Py_DECREF(scope);
544release_done:
545 Py_DECREF(done);
546release_send:
547 Py_DECREF(send);
548release_receive:
549 Py_DECREF(receive);
550release_asgi:
551 Py_DECREF(asgi);
552}
553
554
555static void
556nxt_py_asgi_close_handler(nxt_unit_request_info_t *req)
557{
558 if (req->request->websocket_handshake) {
559 nxt_py_asgi_websocket_close_handler(req);
560
561 } else {
562 nxt_py_asgi_http_close_handler(req);
563 }
564}
565
566
567static PyObject *
568nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
569{
570 char *p, *target, *query;
571 uint32_t target_length, i;
572 PyObject *scope, *v, *type, *scheme;
573 PyObject *headers, *header;
574 nxt_unit_field_t *f;
575 nxt_unit_request_t *r;
576
577 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol");
578
579#define SET_ITEM(dict, key, value) \
580 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \
581 == -1)) \
582 { \
583 nxt_unit_req_alert(req, "Python failed to set '" \
584 #dict "." #key "' item"); \
585 goto fail; \
586 }
587
588 v = NULL;
589 headers = NULL;
590
591 r = req->request;
592
593 if (r->websocket_handshake) {
594 type = nxt_py_websocket_str;
595 scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str;
596
597 } else {
598 type = nxt_py_http_str;
599 scheme = r->tls ? nxt_py_https_str : nxt_py_http_str;
600 }
601
602 scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str);
603 if (nxt_slow_path(scope == NULL)) {
604 return NULL;
605 }
606
607 p = nxt_unit_sptr_get(&r->version);
608 SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str
609 : nxt_py_1_0_str)
610 SET_ITEM(scope, scheme, scheme)
611
612 v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method),
613 r->method_length);
614 if (nxt_slow_path(v == NULL)) {
615 nxt_unit_req_alert(req, "Python failed to create 'method' string");
616 goto fail;
617 }
618
619 SET_ITEM(scope, method, v)
620 Py_DECREF(v);
621
622 v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length,
623 "replace");
624 if (nxt_slow_path(v == NULL)) {
625 nxt_unit_req_alert(req, "Python failed to create 'path' string");
626 goto fail;
627 }
628
629 SET_ITEM(scope, path, v)
630 Py_DECREF(v);
631
632 target = nxt_unit_sptr_get(&r->target);
633 query = nxt_unit_sptr_get(&r->query);
634
635 if (r->query.offset != 0) {
636 target_length = query - target - 1;
637
638 } else {
639 target_length = r->target_length;
640 }
641
642 v = PyBytes_FromStringAndSize(target, target_length);
643 if (nxt_slow_path(v == NULL)) {
644 nxt_unit_req_alert(req, "Python failed to create 'raw_path' string");
645 goto fail;
646 }
647
648 SET_ITEM(scope, raw_path, v)
649 Py_DECREF(v);
650
651 v = PyBytes_FromStringAndSize(query, r->query_length);
652 if (nxt_slow_path(v == NULL)) {
653 nxt_unit_req_alert(req, "Python failed to create 'query' string");
654 goto fail;
655 }
656
657 SET_ITEM(scope, query_string, v)
658 Py_DECREF(v);
659
660 v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0);
661 if (nxt_slow_path(v == NULL)) {
662 nxt_unit_req_alert(req, "Python failed to create 'client' pair");
663 goto fail;
664 }
665
666 SET_ITEM(scope, client, v)
667 Py_DECREF(v);
668
669 v = nxt_py_asgi_create_address(&r->local, r->local_length, 80);
670 if (nxt_slow_path(v == NULL)) {
671 nxt_unit_req_alert(req, "Python failed to create 'server' pair");
672 goto fail;
673 }
674
675 SET_ITEM(scope, server, v)
676 Py_DECREF(v);
677
678 v = NULL;
679
680 headers = PyTuple_New(r->fields_count);
681 if (nxt_slow_path(headers == NULL)) {
682 nxt_unit_req_alert(req, "Python failed to create 'headers' object");
683 goto fail;
684 }
685
686 for (i = 0; i < r->fields_count; i++) {
687 f = r->fields + i;
688
689 header = nxt_py_asgi_create_header(f);
690 if (nxt_slow_path(header == NULL)) {
691 nxt_unit_req_alert(req, "Python failed to create 'header' pair");
692 goto fail;
693 }
694
695 PyTuple_SET_ITEM(headers, i, header);
696
697 if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL
698 && f->name_length == ws_protocol.length
699 && f->value_length > 0
700 && r->websocket_handshake)
701 {
702 v = nxt_py_asgi_create_subprotocols(f);
703 if (nxt_slow_path(v == NULL)) {
704 nxt_unit_req_alert(req, "Failed to create subprotocols");
705 goto fail;
706 }
707
708 SET_ITEM(scope, subprotocols, v);
709 Py_DECREF(v);
710 }
711 }
712
713 SET_ITEM(scope, headers, headers)
714 Py_DECREF(headers);
715
716 return scope;
717
718fail:
719
720 Py_XDECREF(v);
721 Py_XDECREF(headers);
722 Py_DECREF(scope);
723
724 return NULL;
725
726#undef SET_ITEM
727}
728
729
730static PyObject *
731nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port)
732{
733 char *p, *s;
734 PyObject *pair, *v;
735
736 pair = PyTuple_New(2);
737 if (nxt_slow_path(pair == NULL)) {
738 return NULL;
739 }
740
741 p = nxt_unit_sptr_get(sptr);
742 s = memchr(p, ':', len);
743
744 v = PyString_FromStringAndSize(p, s == NULL ? len : s - p);
745 if (nxt_slow_path(v == NULL)) {
746 Py_DECREF(pair);
747
748 return NULL;
749 }
750
751 PyTuple_SET_ITEM(pair, 0, v);
752
753 if (s != NULL) {
754 p += len;
755 v = PyLong_FromString(s + 1, &p, 10);
756
757 } else {
758 v = PyLong_FromLong(port);
759 }
760
761 if (nxt_slow_path(v == NULL)) {
762 Py_DECREF(pair);
763
764 return NULL;
765 }
766
767 PyTuple_SET_ITEM(pair, 1, v);
768
769 return pair;
770}
771
772
773static PyObject *
774nxt_py_asgi_create_header(nxt_unit_field_t *f)
775{
776 char c, *name;
777 uint8_t pos;
778 PyObject *header, *v;
779
780 header = PyTuple_New(2);
781 if (nxt_slow_path(header == NULL)) {
782 return NULL;
783 }
784
785 name = nxt_unit_sptr_get(&f->name);
786
787 for (pos = 0; pos < f->name_length; pos++) {
788 c = name[pos];
789 if (c >= 'A' && c <= 'Z') {
790 name[pos] = (c | 0x20);
791 }
792 }
793
794 v = PyBytes_FromStringAndSize(name, f->name_length);
795 if (nxt_slow_path(v == NULL)) {
796 Py_DECREF(header);
797
798 return NULL;
799 }
800
801 PyTuple_SET_ITEM(header, 0, v);
802
803 v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value),
804 f->value_length);
805 if (nxt_slow_path(v == NULL)) {
806 Py_DECREF(header);
807
808 return NULL;
809 }
810
811 PyTuple_SET_ITEM(header, 1, v);
812
813 return header;
814}
815
816
817static PyObject *
818nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f)
819{
820 char *v;
821 uint32_t i, n, start;
822 PyObject *res, *proto;
823
824 v = nxt_unit_sptr_get(&f->value);
825 n = 1;
826
827 for (i = 0; i < f->value_length; i++) {
828 if (v[i] == ',') {
829 n++;
830 }
831 }
832
833 res = PyTuple_New(n);
834 if (nxt_slow_path(res == NULL)) {
835 return NULL;
836 }
837
838 n = 0;
839 start = 0;
840
841 for (i = 0; i < f->value_length; ) {
842 if (v[i] != ',') {
843 i++;
844
845 continue;
846 }
847
848 if (i - start > 0) {
849 proto = PyString_FromStringAndSize(v + start, i - start);
850 if (nxt_slow_path(proto == NULL)) {
851 goto fail;
852 }
853
854 PyTuple_SET_ITEM(res, n, proto);
855
856 n++;
857 }
858
859 do {
860 i++;
861 } while (i < f->value_length && v[i] == ' ');
862
863 start = i;
864 }
865
866 if (i - start > 0) {
867 proto = PyString_FromStringAndSize(v + start, i - start);
868 if (nxt_slow_path(proto == NULL)) {
869 goto fail;
870 }
871
872 PyTuple_SET_ITEM(res, n, proto);
873 }
874
875 return res;
876
877fail:
878
879 Py_DECREF(res);
880
881 return NULL;
882}
883
884
885static int
886nxt_python_asgi_ready(nxt_unit_ctx_t *ctx)
887{
888 int rc;
889 PyObject *res, *fd, *py_ctx, *py_port;
890 nxt_unit_port_t *port;
891 nxt_py_asgi_ctx_data_t *ctx_data;
892
893 if (nxt_slow_path(nxt_py_shared_port == NULL)) {
894 return NXT_UNIT_ERROR;
895 }
896
897 port = nxt_py_shared_port;
898
899 nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port);
900
901 ctx_data = ctx->data;
902
903 rc = NXT_UNIT_ERROR;
904
905 fd = PyLong_FromLong(port->in_fd);
906 if (nxt_slow_path(fd == NULL)) {
907 nxt_unit_alert(ctx, "Python failed to create fd");
908 nxt_python_print_exception();
909
910 return rc;
911 }
912
913 py_ctx = PyLong_FromVoidPtr(ctx);
914 if (nxt_slow_path(py_ctx == NULL)) {
915 nxt_unit_alert(ctx, "Python failed to create py_ctx");
916 nxt_python_print_exception();
917
918 goto clean_fd;
919 }
920
921 py_port = PyLong_FromVoidPtr(port);
922 if (nxt_slow_path(py_port == NULL)) {
923 nxt_unit_alert(ctx, "Python failed to create py_port");
924 nxt_python_print_exception();
925
926 goto clean_py_ctx;
927 }
928
929 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
930 fd, nxt_py_port_read,
931 py_ctx, py_port, NULL);
932 if (nxt_slow_path(res == NULL)) {
933 nxt_unit_alert(ctx, "Python failed to add_reader");
934 nxt_python_print_exception();
935
936 } else {
937 Py_DECREF(res);
938
939 rc = NXT_UNIT_OK;
940 }
941
942 Py_DECREF(py_port);
943
944clean_py_ctx:
945
946 Py_DECREF(py_ctx);
947
948clean_fd:
949
950 Py_DECREF(fd);
951
952 return rc;
953}
954
955
956static int
957nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
958{
959 int nb, rc;
960 PyObject *res, *fd, *py_ctx, *py_port;
961 nxt_py_asgi_ctx_data_t *ctx_data;
962
963 if (port->in_fd == -1) {
964 return NXT_UNIT_OK;
965 }
966
967 nb = 1;
968
969 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
970 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
971 port->in_fd, strerror(errno), errno);
972
973 return NXT_UNIT_ERROR;
974 }
975
976 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
977
978 if (port->id.id == NXT_UNIT_SHARED_PORT_ID) {
979 nxt_py_shared_port = port;
980
981 return NXT_UNIT_OK;
982 }
983
984 ctx_data = ctx->data;
985
986 ctx_data->port = port;
987 port->data = ctx_data;
988
989 rc = NXT_UNIT_ERROR;
990
991 fd = PyLong_FromLong(port->in_fd);
992 if (nxt_slow_path(fd == NULL)) {
993 nxt_unit_alert(ctx, "Python failed to create fd");
994 nxt_python_print_exception();
995
996 return rc;
997 }
998
999 py_ctx = PyLong_FromVoidPtr(ctx);
1000 if (nxt_slow_path(py_ctx == NULL)) {
1001 nxt_unit_alert(ctx, "Python failed to create py_ctx");
1002 nxt_python_print_exception();
1003
1004 goto clean_fd;
1005 }
1006
1007 py_port = PyLong_FromVoidPtr(port);
1008 if (nxt_slow_path(py_port == NULL)) {
1009 nxt_unit_alert(ctx, "Python failed to create py_port");
1010 nxt_python_print_exception();
1011
1012 goto clean_py_ctx;
1013 }
1014
1015 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
1016 fd, nxt_py_port_read,
1017 py_ctx, py_port, NULL);
1018 if (nxt_slow_path(res == NULL)) {
1019 nxt_unit_alert(ctx, "Python failed to add_reader");
1020 nxt_python_print_exception();
1021
1022 } else {
1023 Py_DECREF(res);
1024
1025 rc = NXT_UNIT_OK;
1026 }
1027
1028 Py_DECREF(py_port);
1029
1030clean_py_ctx:
1031
1032 Py_DECREF(py_ctx);
1033
1034clean_fd:
1035
1036 Py_DECREF(fd);
1037
1038 return rc;
1039}
1040
1041
1042static void
1043nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
1044{
1045 if (port->in_fd == -1) {
1046 return;
1047 }
1048
1049 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
1050
1051 if (nxt_py_shared_port == port) {
1052 nxt_py_shared_port = NULL;
1053 }
1054}
1055
1056
1057static void
1058nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
1059{
1060 PyObject *res, *p;
1061 nxt_py_asgi_ctx_data_t *ctx_data;
1062
1063 nxt_unit_debug(ctx, "asgi_quit %p", ctx);
1064
1065 ctx_data = ctx->data;
1066
1067 if (nxt_py_shared_port != NULL) {
1068 p = PyLong_FromLong(nxt_py_shared_port->in_fd);
1069 if (nxt_slow_path(p == NULL)) {
1070 nxt_unit_alert(NULL, "Python failed to create Long");
1071 nxt_python_print_exception();
1072
1073 } else {
1074 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader,
1075 p, NULL);
1076 if (nxt_slow_path(res == NULL)) {
1077 nxt_unit_alert(NULL, "Python failed to remove_reader");
1078 nxt_python_print_exception();
1079
1080 } else {
1081 Py_DECREF(res);
1082 }
1083
1084 Py_DECREF(p);
1085 }
1086 }
1087
1088 p = PyLong_FromLong(0);
1089 if (nxt_slow_path(p == NULL)) {
1090 nxt_unit_alert(NULL, "Python failed to create Long");
1091 nxt_python_print_exception();
1092
1093 } else {
1094 res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result,
1095 p, NULL);
1096 if (nxt_slow_path(res == NULL)) {
1097 nxt_unit_alert(ctx, "Python failed to set_result");
1098 nxt_python_print_exception();
1099
1100 } else {
1101 Py_DECREF(res);
1102 }
1103
1104 Py_DECREF(p);
1105 }
1106}
1107
1108
1109static void
1110nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
1111{
1112 int rc;
1113 nxt_queue_link_t *lnk;
1114 nxt_py_asgi_ctx_data_t *ctx_data;
1115
1116 ctx_data = ctx->data;
1117
1118 while (!nxt_queue_is_empty(&ctx_data->drain_queue)) {
1119 lnk = nxt_queue_first(&ctx_data->drain_queue);
1120
1121 rc = nxt_py_asgi_http_drain(lnk);
1122 if (rc == NXT_UNIT_AGAIN) {
1123 return;
1124 }
1125
1126 nxt_queue_remove(lnk);
1127 }
1128}
1129
1130
1131static PyObject *
1132nxt_py_asgi_port_read(PyObject *self, PyObject *args)
1133{
1134 int rc;
1135 PyObject *arg;
1136 Py_ssize_t n;
1137 nxt_unit_ctx_t *ctx;
1138 nxt_unit_port_t *port;
1134 int rc;
1135 PyObject *arg0, *arg1, *res;
1136 Py_ssize_t n;
1137 nxt_unit_ctx_t *ctx;
1138 nxt_unit_port_t *port;
1139 nxt_py_asgi_ctx_data_t *ctx_data;
1139
1140 n = PyTuple_GET_SIZE(args);
1141
1142 if (n != 2) {
1143 nxt_unit_alert(NULL,
1144 "nxt_py_asgi_port_read: invalid number of arguments %d",
1145 (int) n);
1146
1147 return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
1148 }
1149
1140
1141 n = PyTuple_GET_SIZE(args);
1142
1143 if (n != 2) {
1144 nxt_unit_alert(NULL,
1145 "nxt_py_asgi_port_read: invalid number of arguments %d",
1146 (int) n);
1147
1148 return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
1149 }
1150
1150 arg = PyTuple_GET_ITEM(args, 0);
1151 if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
1151 arg0 = PyTuple_GET_ITEM(args, 0);
1152 if (nxt_slow_path(arg0 == NULL || PyLong_Check(arg0) == 0)) {
1152 return PyErr_Format(PyExc_TypeError,
1153 "the first argument is not a long");
1154 }
1155
1153 return PyErr_Format(PyExc_TypeError,
1154 "the first argument is not a long");
1155 }
1156
1156 ctx = PyLong_AsVoidPtr(arg);
1157 ctx = PyLong_AsVoidPtr(arg0);
1157
1158
1158 arg = PyTuple_GET_ITEM(args, 1);
1159 if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
1159 arg1 = PyTuple_GET_ITEM(args, 1);
1160 if (nxt_slow_path(arg1 == NULL || PyLong_Check(arg1) == 0)) {
1160 return PyErr_Format(PyExc_TypeError,
1161 "the second argument is not a long");
1162 }
1163
1161 return PyErr_Format(PyExc_TypeError,
1162 "the second argument is not a long");
1163 }
1164
1164 port = PyLong_AsVoidPtr(arg);
1165 port = PyLong_AsVoidPtr(arg1);
1165
1166
1166 nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port);
1167
1168 rc = nxt_unit_process_port_msg(ctx, port);
1169
1167 rc = nxt_unit_process_port_msg(ctx, port);
1168
1169 nxt_unit_debug(ctx, "asgi_port_read(%p,%p): %d", ctx, port, rc);
1170
1170 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
1171 return PyErr_Format(PyExc_RuntimeError,
1172 "error processing port %d message", port->id.id);
1173 }
1174
1171 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
1172 return PyErr_Format(PyExc_RuntimeError,
1173 "error processing port %d message", port->id.id);
1174 }
1175
1176 if (rc == NXT_UNIT_OK) {
1177 ctx_data = ctx->data;
1178
1179 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon,
1180 nxt_py_port_read,
1181 arg0, arg1, NULL);
1182 if (nxt_slow_path(res == NULL)) {
1183 nxt_unit_alert(ctx, "Python failed to call 'loop.call_soon'");
1184 nxt_python_print_exception();
1185 }
1186
1187 Py_XDECREF(res);
1188 }
1189
1175 Py_RETURN_NONE;
1176}
1177
1178
1179PyObject *
1180nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,
1181 void *data)
1182{
1183 int i;
1184 PyObject *iter, *header, *h_iter, *name, *val, *res;
1185
1186 iter = PyObject_GetIter(headers);
1187 if (nxt_slow_path(iter == NULL)) {
1188 return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable");
1189 }
1190
1191 for (i = 0; /* void */; i++) {
1192 header = PyIter_Next(iter);
1193 if (header == NULL) {
1194 break;
1195 }
1196
1197 h_iter = PyObject_GetIter(header);
1198 if (nxt_slow_path(h_iter == NULL)) {
1199 Py_DECREF(header);
1200 Py_DECREF(iter);
1201
1202 return PyErr_Format(PyExc_TypeError,
1203 "'headers' item #%d is not an iterable", i);
1204 }
1205
1206 name = PyIter_Next(h_iter);
1207 if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) {
1208 Py_XDECREF(name);
1209 Py_DECREF(h_iter);
1210 Py_DECREF(header);
1211 Py_DECREF(iter);
1212
1213 return PyErr_Format(PyExc_TypeError,
1214 "'headers' item #%d 'name' is not a byte string", i);
1215 }
1216
1217 val = PyIter_Next(h_iter);
1218 if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) {
1219 Py_XDECREF(val);
1220 Py_DECREF(h_iter);
1221 Py_DECREF(header);
1222 Py_DECREF(iter);
1223
1224 return PyErr_Format(PyExc_TypeError,
1225 "'headers' item #%d 'value' is not a byte string", i);
1226 }
1227
1228 res = cb(data, i, name, val);
1229
1230 Py_DECREF(name);
1231 Py_DECREF(val);
1232 Py_DECREF(h_iter);
1233 Py_DECREF(header);
1234
1235 if (nxt_slow_path(res == NULL)) {
1236 Py_DECREF(iter);
1237
1238 return NULL;
1239 }
1240
1241 Py_DECREF(res);
1242 }
1243
1244 Py_DECREF(iter);
1245
1246 Py_RETURN_NONE;
1247}
1248
1249
1250PyObject *
1251nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val)
1252{
1253 nxt_py_asgi_calc_size_ctx_t *ctx;
1254
1255 ctx = data;
1256
1257 ctx->fields_count++;
1258 ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val);
1259
1260 Py_RETURN_NONE;
1261}
1262
1263
1264PyObject *
1265nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
1266{
1267 int rc;
1268 char *name_str, *val_str;
1269 uint32_t name_len, val_len;
1270 nxt_off_t content_length;
1271 nxt_unit_request_info_t *req;
1272 nxt_py_asgi_add_field_ctx_t *ctx;
1273
1274 name_str = PyBytes_AS_STRING(name);
1275 name_len = PyBytes_GET_SIZE(name);
1276
1277 val_str = PyBytes_AS_STRING(val);
1278 val_len = PyBytes_GET_SIZE(val);
1279
1280 ctx = data;
1281 req = ctx->req;
1282
1283 rc = nxt_unit_response_add_field(req, name_str, name_len,
1284 val_str, val_len);
1285 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1286 return PyErr_Format(PyExc_RuntimeError,
1287 "failed to add header #%d", i);
1288 }
1289
1290 if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) {
1291 content_length = nxt_off_t_parse((u_char *) val_str, val_len);
1292 if (nxt_slow_path(content_length < 0)) {
1293 nxt_unit_req_error(req, "failed to parse Content-Length "
1294 "value %.*s", (int) val_len, val_str);
1295
1296 return PyErr_Format(PyExc_ValueError,
1297 "Failed to parse Content-Length: '%.*s'",
1298 (int) val_len, val_str);
1299 }
1300
1301 ctx->content_length = content_length;
1302 }
1303
1304 Py_RETURN_NONE;
1305}
1306
1307
1308PyObject *
1309nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
1310 nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result)
1311{
1312 PyObject *set_result, *res;
1313
1314 if (nxt_slow_path(result == NULL)) {
1315 Py_DECREF(future);
1316
1317 return NULL;
1318 }
1319
1320 set_result = PyObject_GetAttrString(future, "set_result");
1321 if (nxt_slow_path(set_result == NULL)) {
1322 nxt_unit_req_alert(req, "failed to get 'set_result' for future");
1323
1324 Py_CLEAR(future);
1325
1326 goto cleanup_result;
1327 }
1328
1329 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
1330 nxt_unit_req_alert(req, "'future.set_result' is not a callable");
1331
1332 Py_CLEAR(future);
1333
1334 goto cleanup;
1335 }
1336
1337 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result,
1338 result, NULL);
1339 if (nxt_slow_path(res == NULL)) {
1340 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
1341 nxt_python_print_exception();
1342
1343 Py_CLEAR(future);
1344 }
1345
1346 Py_XDECREF(res);
1347
1348cleanup:
1349
1350 Py_DECREF(set_result);
1351
1352cleanup_result:
1353
1354 Py_DECREF(result);
1355
1356 return future;
1357}
1358
1359
1360PyObject *
1361nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)
1362{
1363 PyObject *msg;
1364
1365 msg = PyDict_New();
1366 if (nxt_slow_path(msg == NULL)) {
1367 nxt_unit_req_alert(req, "Python failed to create message dict");
1368 nxt_python_print_exception();
1369
1370 return PyErr_Format(PyExc_RuntimeError,
1371 "failed to create message dict");
1372 }
1373
1374 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) {
1375 nxt_unit_req_alert(req, "Python failed to set 'msg.type' item");
1376
1377 Py_DECREF(msg);
1378
1379 return PyErr_Format(PyExc_RuntimeError,
1380 "failed to set 'msg.type' item");
1381 }
1382
1383 return msg;
1384}
1385
1386
1387PyObject *
1388nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
1389 PyObject *spec_version)
1390{
1391 PyObject *scope, *asgi;
1392
1393 scope = PyDict_New();
1394 if (nxt_slow_path(scope == NULL)) {
1395 nxt_unit_req_alert(req, "Python failed to create 'scope' dict");
1396 nxt_python_print_exception();
1397
1398 return PyErr_Format(PyExc_RuntimeError,
1399 "failed to create 'scope' dict");
1400 }
1401
1402 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) {
1403 nxt_unit_req_alert(req, "Python failed to set 'scope.type' item");
1404
1405 Py_DECREF(scope);
1406
1407 return PyErr_Format(PyExc_RuntimeError,
1408 "failed to set 'scope.type' item");
1409 }
1410
1411 asgi = PyDict_New();
1412 if (nxt_slow_path(asgi == NULL)) {
1413 nxt_unit_req_alert(req, "Python failed to create 'asgi' dict");
1414 nxt_python_print_exception();
1415
1416 Py_DECREF(scope);
1417
1418 return PyErr_Format(PyExc_RuntimeError,
1419 "failed to create 'asgi' dict");
1420 }
1421
1422 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) {
1423 nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item");
1424
1425 Py_DECREF(asgi);
1426 Py_DECREF(scope);
1427
1428 return PyErr_Format(PyExc_RuntimeError,
1429 "failed to set 'scope.asgi' item");
1430 }
1431
1432 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str,
1433 nxt_py_3_0_str) == -1))
1434 {
1435 nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item");
1436
1437 Py_DECREF(asgi);
1438 Py_DECREF(scope);
1439
1440 return PyErr_Format(PyExc_RuntimeError,
1441 "failed to set 'asgi.version' item");
1442 }
1443
1444 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str,
1445 spec_version) == -1))
1446 {
1447 nxt_unit_req_alert(req,
1448 "Python failed to set 'asgi.spec_version' item");
1449
1450 Py_DECREF(asgi);
1451 Py_DECREF(scope);
1452
1453 return PyErr_Format(PyExc_RuntimeError,
1454 "failed to set 'asgi.spec_version' item");
1455 }
1456
1457 Py_DECREF(asgi);
1458
1459 return scope;
1460}
1461
1462
1463void
1464nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link)
1465{
1466 nxt_py_asgi_ctx_data_t *ctx_data;
1467
1468 ctx_data = req->ctx->data;
1469
1470 nxt_queue_insert_tail(&ctx_data->drain_queue, link);
1471}
1472
1473
1474void
1475nxt_py_asgi_dealloc(PyObject *self)
1476{
1477 PyObject_Del(self);
1478}
1479
1480
1481PyObject *
1482nxt_py_asgi_await(PyObject *self)
1483{
1484 Py_INCREF(self);
1485 return self;
1486}
1487
1488
1489PyObject *
1490nxt_py_asgi_iter(PyObject *self)
1491{
1492 Py_INCREF(self);
1493 return self;
1494}
1495
1496
1497PyObject *
1498nxt_py_asgi_next(PyObject *self)
1499{
1500 return NULL;
1501}
1502
1503
1504static void
1505nxt_python_asgi_done(void)
1506{
1507 nxt_py_asgi_str_done();
1508
1509 Py_XDECREF(nxt_py_port_read);
1510}
1511
1512#else /* !(NXT_HAVE_ASGI) */
1513
1514
1515int
1516nxt_python_asgi_check(PyObject *obj)
1517{
1518 return 0;
1519}
1520
1521
1522int
1523nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
1524{
1525 nxt_unit_alert(NULL, "ASGI not implemented");
1526 return NXT_UNIT_ERROR;
1527}
1528
1529
1530#endif /* NXT_HAVE_ASGI */
1190 Py_RETURN_NONE;
1191}
1192
1193
1194PyObject *
1195nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,
1196 void *data)
1197{
1198 int i;
1199 PyObject *iter, *header, *h_iter, *name, *val, *res;
1200
1201 iter = PyObject_GetIter(headers);
1202 if (nxt_slow_path(iter == NULL)) {
1203 return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable");
1204 }
1205
1206 for (i = 0; /* void */; i++) {
1207 header = PyIter_Next(iter);
1208 if (header == NULL) {
1209 break;
1210 }
1211
1212 h_iter = PyObject_GetIter(header);
1213 if (nxt_slow_path(h_iter == NULL)) {
1214 Py_DECREF(header);
1215 Py_DECREF(iter);
1216
1217 return PyErr_Format(PyExc_TypeError,
1218 "'headers' item #%d is not an iterable", i);
1219 }
1220
1221 name = PyIter_Next(h_iter);
1222 if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) {
1223 Py_XDECREF(name);
1224 Py_DECREF(h_iter);
1225 Py_DECREF(header);
1226 Py_DECREF(iter);
1227
1228 return PyErr_Format(PyExc_TypeError,
1229 "'headers' item #%d 'name' is not a byte string", i);
1230 }
1231
1232 val = PyIter_Next(h_iter);
1233 if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) {
1234 Py_XDECREF(val);
1235 Py_DECREF(h_iter);
1236 Py_DECREF(header);
1237 Py_DECREF(iter);
1238
1239 return PyErr_Format(PyExc_TypeError,
1240 "'headers' item #%d 'value' is not a byte string", i);
1241 }
1242
1243 res = cb(data, i, name, val);
1244
1245 Py_DECREF(name);
1246 Py_DECREF(val);
1247 Py_DECREF(h_iter);
1248 Py_DECREF(header);
1249
1250 if (nxt_slow_path(res == NULL)) {
1251 Py_DECREF(iter);
1252
1253 return NULL;
1254 }
1255
1256 Py_DECREF(res);
1257 }
1258
1259 Py_DECREF(iter);
1260
1261 Py_RETURN_NONE;
1262}
1263
1264
1265PyObject *
1266nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val)
1267{
1268 nxt_py_asgi_calc_size_ctx_t *ctx;
1269
1270 ctx = data;
1271
1272 ctx->fields_count++;
1273 ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val);
1274
1275 Py_RETURN_NONE;
1276}
1277
1278
1279PyObject *
1280nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
1281{
1282 int rc;
1283 char *name_str, *val_str;
1284 uint32_t name_len, val_len;
1285 nxt_off_t content_length;
1286 nxt_unit_request_info_t *req;
1287 nxt_py_asgi_add_field_ctx_t *ctx;
1288
1289 name_str = PyBytes_AS_STRING(name);
1290 name_len = PyBytes_GET_SIZE(name);
1291
1292 val_str = PyBytes_AS_STRING(val);
1293 val_len = PyBytes_GET_SIZE(val);
1294
1295 ctx = data;
1296 req = ctx->req;
1297
1298 rc = nxt_unit_response_add_field(req, name_str, name_len,
1299 val_str, val_len);
1300 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1301 return PyErr_Format(PyExc_RuntimeError,
1302 "failed to add header #%d", i);
1303 }
1304
1305 if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) {
1306 content_length = nxt_off_t_parse((u_char *) val_str, val_len);
1307 if (nxt_slow_path(content_length < 0)) {
1308 nxt_unit_req_error(req, "failed to parse Content-Length "
1309 "value %.*s", (int) val_len, val_str);
1310
1311 return PyErr_Format(PyExc_ValueError,
1312 "Failed to parse Content-Length: '%.*s'",
1313 (int) val_len, val_str);
1314 }
1315
1316 ctx->content_length = content_length;
1317 }
1318
1319 Py_RETURN_NONE;
1320}
1321
1322
1323PyObject *
1324nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
1325 nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result)
1326{
1327 PyObject *set_result, *res;
1328
1329 if (nxt_slow_path(result == NULL)) {
1330 Py_DECREF(future);
1331
1332 return NULL;
1333 }
1334
1335 set_result = PyObject_GetAttrString(future, "set_result");
1336 if (nxt_slow_path(set_result == NULL)) {
1337 nxt_unit_req_alert(req, "failed to get 'set_result' for future");
1338
1339 Py_CLEAR(future);
1340
1341 goto cleanup_result;
1342 }
1343
1344 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
1345 nxt_unit_req_alert(req, "'future.set_result' is not a callable");
1346
1347 Py_CLEAR(future);
1348
1349 goto cleanup;
1350 }
1351
1352 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result,
1353 result, NULL);
1354 if (nxt_slow_path(res == NULL)) {
1355 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
1356 nxt_python_print_exception();
1357
1358 Py_CLEAR(future);
1359 }
1360
1361 Py_XDECREF(res);
1362
1363cleanup:
1364
1365 Py_DECREF(set_result);
1366
1367cleanup_result:
1368
1369 Py_DECREF(result);
1370
1371 return future;
1372}
1373
1374
1375PyObject *
1376nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)
1377{
1378 PyObject *msg;
1379
1380 msg = PyDict_New();
1381 if (nxt_slow_path(msg == NULL)) {
1382 nxt_unit_req_alert(req, "Python failed to create message dict");
1383 nxt_python_print_exception();
1384
1385 return PyErr_Format(PyExc_RuntimeError,
1386 "failed to create message dict");
1387 }
1388
1389 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) {
1390 nxt_unit_req_alert(req, "Python failed to set 'msg.type' item");
1391
1392 Py_DECREF(msg);
1393
1394 return PyErr_Format(PyExc_RuntimeError,
1395 "failed to set 'msg.type' item");
1396 }
1397
1398 return msg;
1399}
1400
1401
1402PyObject *
1403nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
1404 PyObject *spec_version)
1405{
1406 PyObject *scope, *asgi;
1407
1408 scope = PyDict_New();
1409 if (nxt_slow_path(scope == NULL)) {
1410 nxt_unit_req_alert(req, "Python failed to create 'scope' dict");
1411 nxt_python_print_exception();
1412
1413 return PyErr_Format(PyExc_RuntimeError,
1414 "failed to create 'scope' dict");
1415 }
1416
1417 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) {
1418 nxt_unit_req_alert(req, "Python failed to set 'scope.type' item");
1419
1420 Py_DECREF(scope);
1421
1422 return PyErr_Format(PyExc_RuntimeError,
1423 "failed to set 'scope.type' item");
1424 }
1425
1426 asgi = PyDict_New();
1427 if (nxt_slow_path(asgi == NULL)) {
1428 nxt_unit_req_alert(req, "Python failed to create 'asgi' dict");
1429 nxt_python_print_exception();
1430
1431 Py_DECREF(scope);
1432
1433 return PyErr_Format(PyExc_RuntimeError,
1434 "failed to create 'asgi' dict");
1435 }
1436
1437 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) {
1438 nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item");
1439
1440 Py_DECREF(asgi);
1441 Py_DECREF(scope);
1442
1443 return PyErr_Format(PyExc_RuntimeError,
1444 "failed to set 'scope.asgi' item");
1445 }
1446
1447 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str,
1448 nxt_py_3_0_str) == -1))
1449 {
1450 nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item");
1451
1452 Py_DECREF(asgi);
1453 Py_DECREF(scope);
1454
1455 return PyErr_Format(PyExc_RuntimeError,
1456 "failed to set 'asgi.version' item");
1457 }
1458
1459 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str,
1460 spec_version) == -1))
1461 {
1462 nxt_unit_req_alert(req,
1463 "Python failed to set 'asgi.spec_version' item");
1464
1465 Py_DECREF(asgi);
1466 Py_DECREF(scope);
1467
1468 return PyErr_Format(PyExc_RuntimeError,
1469 "failed to set 'asgi.spec_version' item");
1470 }
1471
1472 Py_DECREF(asgi);
1473
1474 return scope;
1475}
1476
1477
1478void
1479nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link)
1480{
1481 nxt_py_asgi_ctx_data_t *ctx_data;
1482
1483 ctx_data = req->ctx->data;
1484
1485 nxt_queue_insert_tail(&ctx_data->drain_queue, link);
1486}
1487
1488
1489void
1490nxt_py_asgi_dealloc(PyObject *self)
1491{
1492 PyObject_Del(self);
1493}
1494
1495
1496PyObject *
1497nxt_py_asgi_await(PyObject *self)
1498{
1499 Py_INCREF(self);
1500 return self;
1501}
1502
1503
1504PyObject *
1505nxt_py_asgi_iter(PyObject *self)
1506{
1507 Py_INCREF(self);
1508 return self;
1509}
1510
1511
1512PyObject *
1513nxt_py_asgi_next(PyObject *self)
1514{
1515 return NULL;
1516}
1517
1518
1519static void
1520nxt_python_asgi_done(void)
1521{
1522 nxt_py_asgi_str_done();
1523
1524 Py_XDECREF(nxt_py_port_read);
1525}
1526
1527#else /* !(NXT_HAVE_ASGI) */
1528
1529
1530int
1531nxt_python_asgi_check(PyObject *obj)
1532{
1533 return 0;
1534}
1535
1536
1537int
1538nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
1539{
1540 nxt_unit_alert(NULL, "ASGI not implemented");
1541 return NXT_UNIT_ERROR;
1542}
1543
1544
1545#endif /* NXT_HAVE_ASGI */