1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7#include <python/nxt_python.h> 8 9#if (NXT_HAVE_ASGI) 10 11#include <nxt_main.h> 12#include <nxt_unit.h> 13#include <nxt_unit_request.h> 14#include <nxt_unit_response.h> 15#include <python/nxt_python_asgi.h> 16#include <python/nxt_python_asgi_str.h> 17 18 19static PyObject *nxt_python_asgi_get_func(PyObject *obj); 20static int nxt_python_asgi_ctx_data_alloc(void **pdata); 21static void nxt_python_asgi_ctx_data_free(void *data); 22static int nxt_python_asgi_startup(void *data); 23static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx); 24 25static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, 26 nxt_unit_port_t *port); 27static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); 28static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req); 29 30static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); 31static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, 32 uint16_t port); 33static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); 34static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); 35 36static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx); 37 38static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 39static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); 40static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); 41static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); 42 43static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); 44static void nxt_python_asgi_done(void); 45 46 47int nxt_py_asgi_legacy; 48static PyObject *nxt_py_port_read; 49static nxt_unit_port_t *nxt_py_shared_port; 50 51static PyMethodDef nxt_py_port_read_method = 52 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; 53 54static nxt_python_proto_t nxt_py_asgi_proto = { 55 .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc, 56 .ctx_data_free = nxt_python_asgi_ctx_data_free, 57 .startup = nxt_python_asgi_startup, 58 .run = nxt_python_asgi_run, 59 .ready = nxt_python_asgi_ready, 60 .done = nxt_python_asgi_done, 61}; 62 63#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A 64 65 66int 67nxt_python_asgi_check(PyObject *obj) 68{ 69 int res; 70 PyObject *func; 71 PyCodeObject *code; 72 73 func = nxt_python_asgi_get_func(obj); 74 75 if (func == NULL) { 76 return 0; 77 } 78 79 code = (PyCodeObject *) PyFunction_GET_CODE(func); 80 81 nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with " 82 "%d argument(s)", 83 (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ", 84 code->co_argcount); 85 86 res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1; 87 88 Py_DECREF(func); 89 90 return res; 91} 92 93 94static PyObject * 95nxt_python_asgi_get_func(PyObject *obj) 96{ 97 PyObject *call; 98 99 if (PyFunction_Check(obj)) { 100 Py_INCREF(obj); 101 return obj; 102 } 103 104 if (PyMethod_Check(obj)) { 105 obj = PyMethod_GET_FUNCTION(obj); 106 107 Py_INCREF(obj); 108 return obj; 109 } 110 111 call = PyObject_GetAttrString(obj, "__call__"); 112 113 if (call == NULL) { 114 return NULL; 115 } 116 117 if (PyFunction_Check(call)) { 118 return call; 119 } 120 121 if (PyMethod_Check(call)) { 122 obj = PyMethod_GET_FUNCTION(call); 123 124 Py_INCREF(obj); 125 Py_DECREF(call); 126 127 return obj; 128 } 129 130 Py_DECREF(call); 131 132 return NULL; 133} 134 135 136int 137nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 138{ 139 PyObject *func; 140 PyCodeObject *code; 141 142 nxt_unit_debug(NULL, "asgi_init"); 143 144 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) { 145 nxt_unit_alert(NULL, "Python failed to init string objects"); 146 return NXT_UNIT_ERROR; 147 } 148 149 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); 150 if (nxt_slow_path(nxt_py_port_read == NULL)) { 151 nxt_unit_alert(NULL, 152 "Python failed to initialize the 'port_read' function"); 153 return NXT_UNIT_ERROR; 154 } 155 156 if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) { 157 return NXT_UNIT_ERROR; 158 } 159 160 if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) { 161 return NXT_UNIT_ERROR; 162 } 163 164 func = nxt_python_asgi_get_func(nxt_py_application); 165 if (nxt_slow_path(func == NULL)) { 166 nxt_unit_alert(NULL, "Python cannot find function for callable"); 167 return NXT_UNIT_ERROR; 168 } 169 170 code = (PyCodeObject *) PyFunction_GET_CODE(func); 171 172 if ((code->co_flags & CO_COROUTINE) == 0) { 173 nxt_unit_debug(NULL, "asgi: callable is not a coroutine function " 174 "switching to legacy mode"); 175 nxt_py_asgi_legacy = 1; 176 } 177 178 Py_DECREF(func); 179 180 init->callbacks.request_handler = nxt_py_asgi_request_handler; 181 init->callbacks.data_handler = nxt_py_asgi_http_data_handler; 182 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; 183 init->callbacks.close_handler = nxt_py_asgi_close_handler; 184 init->callbacks.quit = nxt_py_asgi_quit; 185 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; 186 init->callbacks.add_port = nxt_py_asgi_add_port; 187 init->callbacks.remove_port = nxt_py_asgi_remove_port; 188 189 *proto = nxt_py_asgi_proto; 190 191 return NXT_UNIT_OK; 192} 193 194 195static int 196nxt_python_asgi_ctx_data_alloc(void **pdata) 197{ 198 uint32_t i; 199 PyObject *asyncio, *loop, *new_event_loop, *obj; 200 nxt_py_asgi_ctx_data_t *ctx_data; 201 202 ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t)); 203 if (nxt_slow_path(ctx_data == NULL)) { 204 nxt_unit_alert(NULL, "Failed to allocate context data"); 205 return NXT_UNIT_ERROR; 206 } 207 208 memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t)); 209 210 nxt_queue_init(&ctx_data->drain_queue); 211 212 struct { 213 const char *key; 214 PyObject **handler; 215 216 } handlers[] = { 217 { "create_task", &ctx_data->loop_create_task }, 218 { "add_reader", &ctx_data->loop_add_reader }, 219 { "remove_reader", &ctx_data->loop_remove_reader }, 220 { "call_soon", &ctx_data->loop_call_soon }, 221 { "run_until_complete", &ctx_data->loop_run_until_complete }, 222 { "create_future", &ctx_data->loop_create_future }, 223 }; 224 225 loop = NULL; 226 227 asyncio = PyImport_ImportModule("asyncio"); 228 if (nxt_slow_path(asyncio == NULL)) { 229 nxt_unit_alert(NULL, "Python failed to import module 'asyncio'"); 230 nxt_python_print_exception(); 231 goto fail; 232 } 233 234 new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), 235 "new_event_loop"); 236 if (nxt_slow_path(new_event_loop == NULL)) { 237 nxt_unit_alert(NULL, 238 "Python failed to get 'new_event_loop' from module 'asyncio'"); 239 goto fail; 240 } 241 242 if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) { 243 nxt_unit_alert(NULL, 244 "'asyncio.new_event_loop' is not a callable object"); 245 goto fail; 246 } 247 248 loop = PyObject_CallObject(new_event_loop, NULL); 249 if (nxt_slow_path(loop == NULL)) { 250 nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'"); 251 goto fail; 252 } 253 254 for (i = 0; i < nxt_nitems(handlers); i++) { 255 obj = PyObject_GetAttrString(loop, handlers[i].key); 256 if (nxt_slow_path(obj == NULL)) { 257 nxt_unit_alert(NULL, "Python failed to get 'loop.%s'", 258 handlers[i].key); 259 goto fail; 260 } 261 262 *handlers[i].handler = obj; 263 264 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 265 nxt_unit_alert(NULL, "'loop.%s' is not a callable object", 266 handlers[i].key); 267 goto fail; 268 } 269 } 270 271 obj = PyObject_CallObject(ctx_data->loop_create_future, NULL); 272 if (nxt_slow_path(obj == NULL)) { 273 nxt_unit_alert(NULL, "Python failed to create Future "); 274 nxt_python_print_exception(); 275 goto fail; 276 } 277 278 ctx_data->quit_future = obj; 279 280 obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result"); 281 if (nxt_slow_path(obj == NULL)) { 282 nxt_unit_alert(NULL, "Python failed to get 'future.set_result'"); 283 goto fail; 284 } 285 286 ctx_data->quit_future_set_result = obj; 287 288 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 289 nxt_unit_alert(NULL, "'future.set_result' is not a callable object"); 290 goto fail; 291 } 292 293 Py_DECREF(loop); 294 Py_DECREF(asyncio); 295 296 *pdata = ctx_data; 297 298 return NXT_UNIT_OK; 299 300fail: 301 302 nxt_python_asgi_ctx_data_free(ctx_data); 303 304 Py_XDECREF(loop); 305 Py_XDECREF(asyncio); 306 307 return NXT_UNIT_ERROR; 308} 309 310 311static void 312nxt_python_asgi_ctx_data_free(void *data) 313{ 314 nxt_py_asgi_ctx_data_t *ctx_data; 315 316 ctx_data = data; 317 318 Py_XDECREF(ctx_data->loop_run_until_complete); 319 Py_XDECREF(ctx_data->loop_create_future); 320 Py_XDECREF(ctx_data->loop_create_task); 321 Py_XDECREF(ctx_data->loop_call_soon); 322 Py_XDECREF(ctx_data->loop_add_reader); 323 Py_XDECREF(ctx_data->loop_remove_reader); 324 Py_XDECREF(ctx_data->quit_future); 325 Py_XDECREF(ctx_data->quit_future_set_result); 326 327 nxt_unit_free(NULL, ctx_data); 328} 329 330 331static int 332nxt_python_asgi_startup(void *data) 333{ 334 return nxt_py_asgi_lifespan_startup(data); 335} 336 337 338static int 339nxt_python_asgi_run(nxt_unit_ctx_t *ctx) 340{ 341 PyObject *res; 342 nxt_py_asgi_ctx_data_t *ctx_data; 343 344 ctx_data = ctx->data; 345 346 res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, 347 ctx_data->quit_future, NULL); 348 if (nxt_slow_path(res == NULL)) { 349 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); 350 nxt_python_print_exception(); 351 352 return NXT_UNIT_ERROR; 353 } 354 355 Py_DECREF(res); 356 357 nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port); 358 nxt_py_asgi_remove_reader(ctx, ctx_data->port); 359 360 if (ctx_data->port != NULL) { 361 ctx_data->port->data = NULL; 362 ctx_data->port = NULL; 363 } 364 365 nxt_py_asgi_lifespan_shutdown(ctx); 366 367 return NXT_UNIT_OK; 368} 369 370 371static void 372nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 373{ 374 PyObject *res, *fd; 375 nxt_py_asgi_ctx_data_t *ctx_data; 376 377 if (port == NULL || port->in_fd == -1) { 378 return; 379 } 380 381 ctx_data = ctx->data; 382 383 nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port); 384 385 fd = PyLong_FromLong(port->in_fd); 386 if (nxt_slow_path(fd == NULL)) { 387 nxt_unit_alert(ctx, "Python failed to create Long object"); 388 nxt_python_print_exception(); 389 390 return; 391 } 392 393 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL); 394 if (nxt_slow_path(res == NULL)) { 395 nxt_unit_alert(ctx, "Python failed to remove_reader"); 396 nxt_python_print_exception(); 397 398 } else { 399 Py_DECREF(res); 400 } 401 402 Py_DECREF(fd); 403} 404 405 406static void 407nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) 408{ 409 PyObject *scope, *res, *task, *receive, *send, *done, *asgi; 410 PyObject *stage2; 411 nxt_py_asgi_ctx_data_t *ctx_data; 412 413 if (req->request->websocket_handshake) { 414 asgi = nxt_py_asgi_websocket_create(req); 415 416 } else { 417 asgi = nxt_py_asgi_http_create(req); 418 } 419 420 if (nxt_slow_path(asgi == NULL)) { 421 nxt_unit_req_alert(req, "Python failed to create asgi object"); 422 nxt_unit_request_done(req, NXT_UNIT_ERROR); 423 424 return; 425 } 426 427 receive = PyObject_GetAttrString(asgi, "receive"); 428 if (nxt_slow_path(receive == NULL)) { 429 nxt_unit_req_alert(req, "Python failed to get 'receive' method"); 430 nxt_unit_request_done(req, NXT_UNIT_ERROR); 431 432 goto release_asgi; 433 } 434 435 send = PyObject_GetAttrString(asgi, "send"); 436 if (nxt_slow_path(receive == NULL)) { 437 nxt_unit_req_alert(req, "Python failed to get 'send' method"); 438 nxt_unit_request_done(req, NXT_UNIT_ERROR); 439 440 goto release_receive; 441 } 442 443 done = PyObject_GetAttrString(asgi, "_done"); 444 if (nxt_slow_path(receive == NULL)) { 445 nxt_unit_req_alert(req, "Python failed to get '_done' method"); 446 nxt_unit_request_done(req, NXT_UNIT_ERROR); 447 448 goto release_send; 449 } 450 451 scope = nxt_py_asgi_create_http_scope(req); 452 if (nxt_slow_path(scope == NULL)) { 453 nxt_unit_request_done(req, NXT_UNIT_ERROR); 454 455 goto release_done; 456 } 457 458 req->data = asgi; 459 460 if (!nxt_py_asgi_legacy) { 461 nxt_unit_req_debug(req, "Python call ASGI 3.0 application"); 462 463 res = PyObject_CallFunctionObjArgs(nxt_py_application, 464 scope, receive, send, NULL); 465 466 } else { 467 nxt_unit_req_debug(req, "Python call legacy application"); 468 469 res = PyObject_CallFunctionObjArgs(nxt_py_application, scope, NULL); 470 471 if (nxt_slow_path(res == NULL)) { 472 nxt_unit_req_error(req, "Python failed to call legacy app stage1"); 473 nxt_python_print_exception(); 474 nxt_unit_request_done(req, NXT_UNIT_ERROR); 475 476 goto release_scope; 477 } 478 479 if (nxt_slow_path(PyCallable_Check(res) == 0)) { 480 nxt_unit_req_error(req, 481 "Legacy ASGI application returns not a callable"); 482 nxt_unit_request_done(req, NXT_UNIT_ERROR); 483 484 Py_DECREF(res); 485 486 goto release_scope; 487 } 488 489 stage2 = res; 490 491 res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL); 492 493 Py_DECREF(stage2); 494 } 495 496 if (nxt_slow_path(res == NULL)) { 497 nxt_unit_req_error(req, "Python failed to call the application"); 498 nxt_python_print_exception(); 499 nxt_unit_request_done(req, NXT_UNIT_ERROR); 500 501 goto release_scope; 502 } 503 504 if (nxt_slow_path(!PyCoro_CheckExact(res))) { 505 nxt_unit_req_error(req, "Application result type is not a coroutine"); 506 nxt_unit_request_done(req, NXT_UNIT_ERROR); 507 508 Py_DECREF(res); 509 510 goto release_scope; 511 } 512 513 ctx_data = req->ctx->data; 514 515 task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); 516 if (nxt_slow_path(task == NULL)) { 517 nxt_unit_req_error(req, "Python failed to call the create_task"); 518 nxt_python_print_exception(); 519 nxt_unit_request_done(req, NXT_UNIT_ERROR); 520 521 Py_DECREF(res); 522 523 goto release_scope; 524 } 525 526 Py_DECREF(res); 527 528 res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, 529 NULL); 530 if (nxt_slow_path(res == NULL)) { 531 nxt_unit_req_error(req, 532 "Python failed to call 'task.add_done_callback'"); 533 nxt_python_print_exception(); 534 nxt_unit_request_done(req, NXT_UNIT_ERROR); 535 536 goto release_task; 537 } 538 539 Py_DECREF(res); 540release_task: 541 Py_DECREF(task); 542release_scope: 543 Py_DECREF(scope); 544release_done: 545 Py_DECREF(done); 546release_send: 547 Py_DECREF(send); 548release_receive: 549 Py_DECREF(receive); 550release_asgi: 551 Py_DECREF(asgi); 552} 553 554 555static void 556nxt_py_asgi_close_handler(nxt_unit_request_info_t *req) 557{ 558 if (req->request->websocket_handshake) { 559 nxt_py_asgi_websocket_close_handler(req); 560 561 } else { 562 nxt_py_asgi_http_close_handler(req); 563 } 564} 565 566 567static PyObject * 568nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) 569{ 570 char *p, *target, *query; 571 uint32_t target_length, i; 572 PyObject *scope, *v, *type, *scheme; 573 PyObject *headers, *header; 574 nxt_unit_field_t *f; 575 nxt_unit_request_t *r; 576 577 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); 578 579#define SET_ITEM(dict, key, value) \ 580 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 581 == -1)) \ 582 { \ 583 nxt_unit_req_alert(req, "Python failed to set '" \ 584 #dict "." #key "' item"); \ 585 goto fail; \ 586 } 587 588 v = NULL; 589 headers = NULL; 590 591 r = req->request; 592 593 if (r->websocket_handshake) { 594 type = nxt_py_websocket_str; 595 scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; 596 597 } else { 598 type = nxt_py_http_str; 599 scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; 600 } 601 602 scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); 603 if (nxt_slow_path(scope == NULL)) { 604 return NULL; 605 } 606 607 p = nxt_unit_sptr_get(&r->version); 608 SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str 609 : nxt_py_1_0_str) 610 SET_ITEM(scope, scheme, scheme) 611 612 v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), 613 r->method_length); 614 if (nxt_slow_path(v == NULL)) { 615 nxt_unit_req_alert(req, "Python failed to create 'method' string"); 616 goto fail; 617 } 618 619 SET_ITEM(scope, method, v) 620 Py_DECREF(v); 621 622 v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, 623 "replace"); 624 if (nxt_slow_path(v == NULL)) { 625 nxt_unit_req_alert(req, "Python failed to create 'path' string"); 626 goto fail; 627 } 628 629 SET_ITEM(scope, path, v) 630 Py_DECREF(v); 631 632 target = nxt_unit_sptr_get(&r->target); 633 query = nxt_unit_sptr_get(&r->query); 634 635 if (r->query.offset != 0) { 636 target_length = query - target - 1; 637 638 } else { 639 target_length = r->target_length; 640 } 641 642 v = PyBytes_FromStringAndSize(target, target_length); 643 if (nxt_slow_path(v == NULL)) { 644 nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); 645 goto fail; 646 } 647 648 SET_ITEM(scope, raw_path, v) 649 Py_DECREF(v); 650 651 v = PyBytes_FromStringAndSize(query, r->query_length); 652 if (nxt_slow_path(v == NULL)) { 653 nxt_unit_req_alert(req, "Python failed to create 'query' string"); 654 goto fail; 655 } 656 657 SET_ITEM(scope, query_string, v) 658 Py_DECREF(v); 659 660 v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); 661 if (nxt_slow_path(v == NULL)) { 662 nxt_unit_req_alert(req, "Python failed to create 'client' pair"); 663 goto fail; 664 } 665 666 SET_ITEM(scope, client, v) 667 Py_DECREF(v); 668 669 v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); 670 if (nxt_slow_path(v == NULL)) { 671 nxt_unit_req_alert(req, "Python failed to create 'server' pair"); 672 goto fail; 673 } 674 675 SET_ITEM(scope, server, v) 676 Py_DECREF(v); 677 678 v = NULL; 679 680 headers = PyTuple_New(r->fields_count); 681 if (nxt_slow_path(headers == NULL)) { 682 nxt_unit_req_alert(req, "Python failed to create 'headers' object"); 683 goto fail; 684 } 685 686 for (i = 0; i < r->fields_count; i++) { 687 f = r->fields + i; 688 689 header = nxt_py_asgi_create_header(f); 690 if (nxt_slow_path(header == NULL)) { 691 nxt_unit_req_alert(req, "Python failed to create 'header' pair"); 692 goto fail; 693 } 694 695 PyTuple_SET_ITEM(headers, i, header); 696 697 if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL 698 && f->name_length == ws_protocol.length 699 && f->value_length > 0 700 && r->websocket_handshake) 701 { 702 v = nxt_py_asgi_create_subprotocols(f); 703 if (nxt_slow_path(v == NULL)) { 704 nxt_unit_req_alert(req, "Failed to create subprotocols"); 705 goto fail; 706 } 707 708 SET_ITEM(scope, subprotocols, v); 709 Py_DECREF(v); 710 } 711 } 712 713 SET_ITEM(scope, headers, headers) 714 Py_DECREF(headers); 715 716 return scope; 717 718fail: 719 720 Py_XDECREF(v); 721 Py_XDECREF(headers); 722 Py_DECREF(scope); 723 724 return NULL; 725 726#undef SET_ITEM 727} 728 729 730static PyObject * 731nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) 732{ 733 char *p, *s; 734 PyObject *pair, *v; 735 736 pair = PyTuple_New(2); 737 if (nxt_slow_path(pair == NULL)) { 738 return NULL; 739 } 740 741 p = nxt_unit_sptr_get(sptr); 742 s = memchr(p, ':', len); 743 744 v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); 745 if (nxt_slow_path(v == NULL)) { 746 Py_DECREF(pair); 747 748 return NULL; 749 } 750 751 PyTuple_SET_ITEM(pair, 0, v); 752 753 if (s != NULL) { 754 p += len; 755 v = PyLong_FromString(s + 1, &p, 10); 756 757 } else { 758 v = PyLong_FromLong(port); 759 } 760 761 if (nxt_slow_path(v == NULL)) { 762 Py_DECREF(pair); 763 764 return NULL; 765 } 766 767 PyTuple_SET_ITEM(pair, 1, v); 768 769 return pair; 770} 771 772 773static PyObject * 774nxt_py_asgi_create_header(nxt_unit_field_t *f) 775{ 776 char c, *name; 777 uint8_t pos; 778 PyObject *header, *v; 779 780 header = PyTuple_New(2); 781 if (nxt_slow_path(header == NULL)) { 782 return NULL; 783 } 784 785 name = nxt_unit_sptr_get(&f->name); 786 787 for (pos = 0; pos < f->name_length; pos++) { 788 c = name[pos]; 789 if (c >= 'A' && c <= 'Z') { 790 name[pos] = (c | 0x20); 791 } 792 } 793 794 v = PyBytes_FromStringAndSize(name, f->name_length); 795 if (nxt_slow_path(v == NULL)) { 796 Py_DECREF(header); 797 798 return NULL; 799 } 800 801 PyTuple_SET_ITEM(header, 0, v); 802 803 v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), 804 f->value_length); 805 if (nxt_slow_path(v == NULL)) { 806 Py_DECREF(header); 807 808 return NULL; 809 } 810 811 PyTuple_SET_ITEM(header, 1, v); 812 813 return header; 814} 815 816 817static PyObject * 818nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) 819{ 820 char *v; 821 uint32_t i, n, start; 822 PyObject *res, *proto; 823 824 v = nxt_unit_sptr_get(&f->value); 825 n = 1; 826 827 for (i = 0; i < f->value_length; i++) { 828 if (v[i] == ',') { 829 n++; 830 } 831 } 832 833 res = PyTuple_New(n); 834 if (nxt_slow_path(res == NULL)) { 835 return NULL; 836 } 837 838 n = 0; 839 start = 0; 840 841 for (i = 0; i < f->value_length; ) { 842 if (v[i] != ',') { 843 i++; 844 845 continue; 846 } 847 848 if (i - start > 0) { 849 proto = PyString_FromStringAndSize(v + start, i - start); 850 if (nxt_slow_path(proto == NULL)) { 851 goto fail; 852 } 853 854 PyTuple_SET_ITEM(res, n, proto); 855 856 n++; 857 } 858 859 do { 860 i++; 861 } while (i < f->value_length && v[i] == ' '); 862 863 start = i; 864 } 865 866 if (i - start > 0) { 867 proto = PyString_FromStringAndSize(v + start, i - start); 868 if (nxt_slow_path(proto == NULL)) { 869 goto fail; 870 } 871 872 PyTuple_SET_ITEM(res, n, proto); 873 } 874 875 return res; 876 877fail: 878 879 Py_DECREF(res); 880 881 return NULL; 882} 883 884 885static int 886nxt_python_asgi_ready(nxt_unit_ctx_t *ctx) 887{ 888 int rc; 889 PyObject *res, *fd, *py_ctx, *py_port; 890 nxt_unit_port_t *port; 891 nxt_py_asgi_ctx_data_t *ctx_data; 892 893 if (nxt_slow_path(nxt_py_shared_port == NULL)) { 894 return NXT_UNIT_ERROR; 895 } 896 897 port = nxt_py_shared_port; 898 899 nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port); 900 901 ctx_data = ctx->data; 902 903 rc = NXT_UNIT_ERROR; 904 905 fd = PyLong_FromLong(port->in_fd); 906 if (nxt_slow_path(fd == NULL)) { 907 nxt_unit_alert(ctx, "Python failed to create fd"); 908 nxt_python_print_exception(); 909 910 return rc; 911 } 912 913 py_ctx = PyLong_FromVoidPtr(ctx); 914 if (nxt_slow_path(py_ctx == NULL)) { 915 nxt_unit_alert(ctx, "Python failed to create py_ctx"); 916 nxt_python_print_exception(); 917 918 goto clean_fd; 919 } 920 921 py_port = PyLong_FromVoidPtr(port); 922 if (nxt_slow_path(py_port == NULL)) { 923 nxt_unit_alert(ctx, "Python failed to create py_port"); 924 nxt_python_print_exception(); 925 926 goto clean_py_ctx; 927 } 928 929 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 930 fd, nxt_py_port_read, 931 py_ctx, py_port, NULL); 932 if (nxt_slow_path(res == NULL)) { 933 nxt_unit_alert(ctx, "Python failed to add_reader"); 934 nxt_python_print_exception(); 935 936 } else { 937 Py_DECREF(res); 938 939 rc = NXT_UNIT_OK; 940 } 941 942 Py_DECREF(py_port); 943 944clean_py_ctx: 945 946 Py_DECREF(py_ctx); 947 948clean_fd: 949 950 Py_DECREF(fd); 951 952 return rc; 953} 954 955 956static int 957nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 958{ 959 int nb, rc; 960 PyObject *res, *fd, *py_ctx, *py_port; 961 nxt_py_asgi_ctx_data_t *ctx_data; 962 963 if (port->in_fd == -1) { 964 return NXT_UNIT_OK; 965 } 966 967 nb = 1; 968 969 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { 970 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 971 port->in_fd, strerror(errno), errno); 972 973 return NXT_UNIT_ERROR; 974 } 975 976 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); 977 978 if (port->id.id == NXT_UNIT_SHARED_PORT_ID) { 979 nxt_py_shared_port = port; 980 981 return NXT_UNIT_OK; 982 } 983 984 ctx_data = ctx->data; 985 986 ctx_data->port = port; 987 port->data = ctx_data; 988 989 rc = NXT_UNIT_ERROR; 990 991 fd = PyLong_FromLong(port->in_fd); 992 if (nxt_slow_path(fd == NULL)) { 993 nxt_unit_alert(ctx, "Python failed to create fd"); 994 nxt_python_print_exception(); 995 996 return rc; 997 } 998 999 py_ctx = PyLong_FromVoidPtr(ctx); 1000 if (nxt_slow_path(py_ctx == NULL)) { 1001 nxt_unit_alert(ctx, "Python failed to create py_ctx"); 1002 nxt_python_print_exception(); 1003 1004 goto clean_fd; 1005 } 1006 1007 py_port = PyLong_FromVoidPtr(port); 1008 if (nxt_slow_path(py_port == NULL)) { 1009 nxt_unit_alert(ctx, "Python failed to create py_port"); 1010 nxt_python_print_exception(); 1011 1012 goto clean_py_ctx; 1013 } 1014 1015 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 1016 fd, nxt_py_port_read, 1017 py_ctx, py_port, NULL); 1018 if (nxt_slow_path(res == NULL)) { 1019 nxt_unit_alert(ctx, "Python failed to add_reader"); 1020 nxt_python_print_exception(); 1021 1022 } else { 1023 Py_DECREF(res); 1024 1025 rc = NXT_UNIT_OK; 1026 } 1027 1028 Py_DECREF(py_port); 1029 1030clean_py_ctx: 1031 1032 Py_DECREF(py_ctx); 1033 1034clean_fd: 1035 1036 Py_DECREF(fd); 1037 1038 return rc; 1039} 1040 1041 1042static void 1043nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) 1044{ 1045 if (port->in_fd == -1) { 1046 return; 1047 } 1048 1049 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); 1050 1051 if (nxt_py_shared_port == port) { 1052 nxt_py_shared_port = NULL; 1053 } 1054} 1055 1056 1057static void 1058nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) 1059{ 1060 PyObject *res, *p; 1061 nxt_py_asgi_ctx_data_t *ctx_data; 1062 1063 nxt_unit_debug(ctx, "asgi_quit %p", ctx); 1064 1065 ctx_data = ctx->data; 1066 1067 if (nxt_py_shared_port != NULL) { 1068 p = PyLong_FromLong(nxt_py_shared_port->in_fd); 1069 if (nxt_slow_path(p == NULL)) { 1070 nxt_unit_alert(NULL, "Python failed to create Long"); 1071 nxt_python_print_exception(); 1072 1073 } else { 1074 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, 1075 p, NULL); 1076 if (nxt_slow_path(res == NULL)) { 1077 nxt_unit_alert(NULL, "Python failed to remove_reader"); 1078 nxt_python_print_exception(); 1079 1080 } else { 1081 Py_DECREF(res); 1082 } 1083 1084 Py_DECREF(p); 1085 } 1086 } 1087 1088 p = PyLong_FromLong(0); 1089 if (nxt_slow_path(p == NULL)) { 1090 nxt_unit_alert(NULL, "Python failed to create Long"); 1091 nxt_python_print_exception(); 1092 1093 } else { 1094 res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result, 1095 p, NULL); 1096 if (nxt_slow_path(res == NULL)) { 1097 nxt_unit_alert(ctx, "Python failed to set_result"); 1098 nxt_python_print_exception(); 1099 1100 } else { 1101 Py_DECREF(res); 1102 } 1103 1104 Py_DECREF(p); 1105 } 1106} 1107 1108 1109static void 1110nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) 1111{ 1112 int rc; 1113 nxt_queue_link_t *lnk; 1114 nxt_py_asgi_ctx_data_t *ctx_data; 1115 1116 ctx_data = ctx->data; 1117 1118 while (!nxt_queue_is_empty(&ctx_data->drain_queue)) { 1119 lnk = nxt_queue_first(&ctx_data->drain_queue); 1120 1121 rc = nxt_py_asgi_http_drain(lnk); 1122 if (rc == NXT_UNIT_AGAIN) { 1123 return; 1124 } 1125 1126 nxt_queue_remove(lnk); 1127 } 1128} 1129 1130 1131static PyObject * 1132nxt_py_asgi_port_read(PyObject *self, PyObject *args) 1133{
| 1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7#include <python/nxt_python.h> 8 9#if (NXT_HAVE_ASGI) 10 11#include <nxt_main.h> 12#include <nxt_unit.h> 13#include <nxt_unit_request.h> 14#include <nxt_unit_response.h> 15#include <python/nxt_python_asgi.h> 16#include <python/nxt_python_asgi_str.h> 17 18 19static PyObject *nxt_python_asgi_get_func(PyObject *obj); 20static int nxt_python_asgi_ctx_data_alloc(void **pdata); 21static void nxt_python_asgi_ctx_data_free(void *data); 22static int nxt_python_asgi_startup(void *data); 23static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx); 24 25static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, 26 nxt_unit_port_t *port); 27static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); 28static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req); 29 30static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); 31static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, 32 uint16_t port); 33static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); 34static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); 35 36static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx); 37 38static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 39static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); 40static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); 41static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); 42 43static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); 44static void nxt_python_asgi_done(void); 45 46 47int nxt_py_asgi_legacy; 48static PyObject *nxt_py_port_read; 49static nxt_unit_port_t *nxt_py_shared_port; 50 51static PyMethodDef nxt_py_port_read_method = 52 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; 53 54static nxt_python_proto_t nxt_py_asgi_proto = { 55 .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc, 56 .ctx_data_free = nxt_python_asgi_ctx_data_free, 57 .startup = nxt_python_asgi_startup, 58 .run = nxt_python_asgi_run, 59 .ready = nxt_python_asgi_ready, 60 .done = nxt_python_asgi_done, 61}; 62 63#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A 64 65 66int 67nxt_python_asgi_check(PyObject *obj) 68{ 69 int res; 70 PyObject *func; 71 PyCodeObject *code; 72 73 func = nxt_python_asgi_get_func(obj); 74 75 if (func == NULL) { 76 return 0; 77 } 78 79 code = (PyCodeObject *) PyFunction_GET_CODE(func); 80 81 nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with " 82 "%d argument(s)", 83 (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ", 84 code->co_argcount); 85 86 res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1; 87 88 Py_DECREF(func); 89 90 return res; 91} 92 93 94static PyObject * 95nxt_python_asgi_get_func(PyObject *obj) 96{ 97 PyObject *call; 98 99 if (PyFunction_Check(obj)) { 100 Py_INCREF(obj); 101 return obj; 102 } 103 104 if (PyMethod_Check(obj)) { 105 obj = PyMethod_GET_FUNCTION(obj); 106 107 Py_INCREF(obj); 108 return obj; 109 } 110 111 call = PyObject_GetAttrString(obj, "__call__"); 112 113 if (call == NULL) { 114 return NULL; 115 } 116 117 if (PyFunction_Check(call)) { 118 return call; 119 } 120 121 if (PyMethod_Check(call)) { 122 obj = PyMethod_GET_FUNCTION(call); 123 124 Py_INCREF(obj); 125 Py_DECREF(call); 126 127 return obj; 128 } 129 130 Py_DECREF(call); 131 132 return NULL; 133} 134 135 136int 137nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 138{ 139 PyObject *func; 140 PyCodeObject *code; 141 142 nxt_unit_debug(NULL, "asgi_init"); 143 144 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) { 145 nxt_unit_alert(NULL, "Python failed to init string objects"); 146 return NXT_UNIT_ERROR; 147 } 148 149 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); 150 if (nxt_slow_path(nxt_py_port_read == NULL)) { 151 nxt_unit_alert(NULL, 152 "Python failed to initialize the 'port_read' function"); 153 return NXT_UNIT_ERROR; 154 } 155 156 if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) { 157 return NXT_UNIT_ERROR; 158 } 159 160 if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) { 161 return NXT_UNIT_ERROR; 162 } 163 164 func = nxt_python_asgi_get_func(nxt_py_application); 165 if (nxt_slow_path(func == NULL)) { 166 nxt_unit_alert(NULL, "Python cannot find function for callable"); 167 return NXT_UNIT_ERROR; 168 } 169 170 code = (PyCodeObject *) PyFunction_GET_CODE(func); 171 172 if ((code->co_flags & CO_COROUTINE) == 0) { 173 nxt_unit_debug(NULL, "asgi: callable is not a coroutine function " 174 "switching to legacy mode"); 175 nxt_py_asgi_legacy = 1; 176 } 177 178 Py_DECREF(func); 179 180 init->callbacks.request_handler = nxt_py_asgi_request_handler; 181 init->callbacks.data_handler = nxt_py_asgi_http_data_handler; 182 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; 183 init->callbacks.close_handler = nxt_py_asgi_close_handler; 184 init->callbacks.quit = nxt_py_asgi_quit; 185 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; 186 init->callbacks.add_port = nxt_py_asgi_add_port; 187 init->callbacks.remove_port = nxt_py_asgi_remove_port; 188 189 *proto = nxt_py_asgi_proto; 190 191 return NXT_UNIT_OK; 192} 193 194 195static int 196nxt_python_asgi_ctx_data_alloc(void **pdata) 197{ 198 uint32_t i; 199 PyObject *asyncio, *loop, *new_event_loop, *obj; 200 nxt_py_asgi_ctx_data_t *ctx_data; 201 202 ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t)); 203 if (nxt_slow_path(ctx_data == NULL)) { 204 nxt_unit_alert(NULL, "Failed to allocate context data"); 205 return NXT_UNIT_ERROR; 206 } 207 208 memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t)); 209 210 nxt_queue_init(&ctx_data->drain_queue); 211 212 struct { 213 const char *key; 214 PyObject **handler; 215 216 } handlers[] = { 217 { "create_task", &ctx_data->loop_create_task }, 218 { "add_reader", &ctx_data->loop_add_reader }, 219 { "remove_reader", &ctx_data->loop_remove_reader }, 220 { "call_soon", &ctx_data->loop_call_soon }, 221 { "run_until_complete", &ctx_data->loop_run_until_complete }, 222 { "create_future", &ctx_data->loop_create_future }, 223 }; 224 225 loop = NULL; 226 227 asyncio = PyImport_ImportModule("asyncio"); 228 if (nxt_slow_path(asyncio == NULL)) { 229 nxt_unit_alert(NULL, "Python failed to import module 'asyncio'"); 230 nxt_python_print_exception(); 231 goto fail; 232 } 233 234 new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), 235 "new_event_loop"); 236 if (nxt_slow_path(new_event_loop == NULL)) { 237 nxt_unit_alert(NULL, 238 "Python failed to get 'new_event_loop' from module 'asyncio'"); 239 goto fail; 240 } 241 242 if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) { 243 nxt_unit_alert(NULL, 244 "'asyncio.new_event_loop' is not a callable object"); 245 goto fail; 246 } 247 248 loop = PyObject_CallObject(new_event_loop, NULL); 249 if (nxt_slow_path(loop == NULL)) { 250 nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'"); 251 goto fail; 252 } 253 254 for (i = 0; i < nxt_nitems(handlers); i++) { 255 obj = PyObject_GetAttrString(loop, handlers[i].key); 256 if (nxt_slow_path(obj == NULL)) { 257 nxt_unit_alert(NULL, "Python failed to get 'loop.%s'", 258 handlers[i].key); 259 goto fail; 260 } 261 262 *handlers[i].handler = obj; 263 264 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 265 nxt_unit_alert(NULL, "'loop.%s' is not a callable object", 266 handlers[i].key); 267 goto fail; 268 } 269 } 270 271 obj = PyObject_CallObject(ctx_data->loop_create_future, NULL); 272 if (nxt_slow_path(obj == NULL)) { 273 nxt_unit_alert(NULL, "Python failed to create Future "); 274 nxt_python_print_exception(); 275 goto fail; 276 } 277 278 ctx_data->quit_future = obj; 279 280 obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result"); 281 if (nxt_slow_path(obj == NULL)) { 282 nxt_unit_alert(NULL, "Python failed to get 'future.set_result'"); 283 goto fail; 284 } 285 286 ctx_data->quit_future_set_result = obj; 287 288 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 289 nxt_unit_alert(NULL, "'future.set_result' is not a callable object"); 290 goto fail; 291 } 292 293 Py_DECREF(loop); 294 Py_DECREF(asyncio); 295 296 *pdata = ctx_data; 297 298 return NXT_UNIT_OK; 299 300fail: 301 302 nxt_python_asgi_ctx_data_free(ctx_data); 303 304 Py_XDECREF(loop); 305 Py_XDECREF(asyncio); 306 307 return NXT_UNIT_ERROR; 308} 309 310 311static void 312nxt_python_asgi_ctx_data_free(void *data) 313{ 314 nxt_py_asgi_ctx_data_t *ctx_data; 315 316 ctx_data = data; 317 318 Py_XDECREF(ctx_data->loop_run_until_complete); 319 Py_XDECREF(ctx_data->loop_create_future); 320 Py_XDECREF(ctx_data->loop_create_task); 321 Py_XDECREF(ctx_data->loop_call_soon); 322 Py_XDECREF(ctx_data->loop_add_reader); 323 Py_XDECREF(ctx_data->loop_remove_reader); 324 Py_XDECREF(ctx_data->quit_future); 325 Py_XDECREF(ctx_data->quit_future_set_result); 326 327 nxt_unit_free(NULL, ctx_data); 328} 329 330 331static int 332nxt_python_asgi_startup(void *data) 333{ 334 return nxt_py_asgi_lifespan_startup(data); 335} 336 337 338static int 339nxt_python_asgi_run(nxt_unit_ctx_t *ctx) 340{ 341 PyObject *res; 342 nxt_py_asgi_ctx_data_t *ctx_data; 343 344 ctx_data = ctx->data; 345 346 res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, 347 ctx_data->quit_future, NULL); 348 if (nxt_slow_path(res == NULL)) { 349 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); 350 nxt_python_print_exception(); 351 352 return NXT_UNIT_ERROR; 353 } 354 355 Py_DECREF(res); 356 357 nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port); 358 nxt_py_asgi_remove_reader(ctx, ctx_data->port); 359 360 if (ctx_data->port != NULL) { 361 ctx_data->port->data = NULL; 362 ctx_data->port = NULL; 363 } 364 365 nxt_py_asgi_lifespan_shutdown(ctx); 366 367 return NXT_UNIT_OK; 368} 369 370 371static void 372nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 373{ 374 PyObject *res, *fd; 375 nxt_py_asgi_ctx_data_t *ctx_data; 376 377 if (port == NULL || port->in_fd == -1) { 378 return; 379 } 380 381 ctx_data = ctx->data; 382 383 nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port); 384 385 fd = PyLong_FromLong(port->in_fd); 386 if (nxt_slow_path(fd == NULL)) { 387 nxt_unit_alert(ctx, "Python failed to create Long object"); 388 nxt_python_print_exception(); 389 390 return; 391 } 392 393 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL); 394 if (nxt_slow_path(res == NULL)) { 395 nxt_unit_alert(ctx, "Python failed to remove_reader"); 396 nxt_python_print_exception(); 397 398 } else { 399 Py_DECREF(res); 400 } 401 402 Py_DECREF(fd); 403} 404 405 406static void 407nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) 408{ 409 PyObject *scope, *res, *task, *receive, *send, *done, *asgi; 410 PyObject *stage2; 411 nxt_py_asgi_ctx_data_t *ctx_data; 412 413 if (req->request->websocket_handshake) { 414 asgi = nxt_py_asgi_websocket_create(req); 415 416 } else { 417 asgi = nxt_py_asgi_http_create(req); 418 } 419 420 if (nxt_slow_path(asgi == NULL)) { 421 nxt_unit_req_alert(req, "Python failed to create asgi object"); 422 nxt_unit_request_done(req, NXT_UNIT_ERROR); 423 424 return; 425 } 426 427 receive = PyObject_GetAttrString(asgi, "receive"); 428 if (nxt_slow_path(receive == NULL)) { 429 nxt_unit_req_alert(req, "Python failed to get 'receive' method"); 430 nxt_unit_request_done(req, NXT_UNIT_ERROR); 431 432 goto release_asgi; 433 } 434 435 send = PyObject_GetAttrString(asgi, "send"); 436 if (nxt_slow_path(receive == NULL)) { 437 nxt_unit_req_alert(req, "Python failed to get 'send' method"); 438 nxt_unit_request_done(req, NXT_UNIT_ERROR); 439 440 goto release_receive; 441 } 442 443 done = PyObject_GetAttrString(asgi, "_done"); 444 if (nxt_slow_path(receive == NULL)) { 445 nxt_unit_req_alert(req, "Python failed to get '_done' method"); 446 nxt_unit_request_done(req, NXT_UNIT_ERROR); 447 448 goto release_send; 449 } 450 451 scope = nxt_py_asgi_create_http_scope(req); 452 if (nxt_slow_path(scope == NULL)) { 453 nxt_unit_request_done(req, NXT_UNIT_ERROR); 454 455 goto release_done; 456 } 457 458 req->data = asgi; 459 460 if (!nxt_py_asgi_legacy) { 461 nxt_unit_req_debug(req, "Python call ASGI 3.0 application"); 462 463 res = PyObject_CallFunctionObjArgs(nxt_py_application, 464 scope, receive, send, NULL); 465 466 } else { 467 nxt_unit_req_debug(req, "Python call legacy application"); 468 469 res = PyObject_CallFunctionObjArgs(nxt_py_application, scope, NULL); 470 471 if (nxt_slow_path(res == NULL)) { 472 nxt_unit_req_error(req, "Python failed to call legacy app stage1"); 473 nxt_python_print_exception(); 474 nxt_unit_request_done(req, NXT_UNIT_ERROR); 475 476 goto release_scope; 477 } 478 479 if (nxt_slow_path(PyCallable_Check(res) == 0)) { 480 nxt_unit_req_error(req, 481 "Legacy ASGI application returns not a callable"); 482 nxt_unit_request_done(req, NXT_UNIT_ERROR); 483 484 Py_DECREF(res); 485 486 goto release_scope; 487 } 488 489 stage2 = res; 490 491 res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL); 492 493 Py_DECREF(stage2); 494 } 495 496 if (nxt_slow_path(res == NULL)) { 497 nxt_unit_req_error(req, "Python failed to call the application"); 498 nxt_python_print_exception(); 499 nxt_unit_request_done(req, NXT_UNIT_ERROR); 500 501 goto release_scope; 502 } 503 504 if (nxt_slow_path(!PyCoro_CheckExact(res))) { 505 nxt_unit_req_error(req, "Application result type is not a coroutine"); 506 nxt_unit_request_done(req, NXT_UNIT_ERROR); 507 508 Py_DECREF(res); 509 510 goto release_scope; 511 } 512 513 ctx_data = req->ctx->data; 514 515 task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); 516 if (nxt_slow_path(task == NULL)) { 517 nxt_unit_req_error(req, "Python failed to call the create_task"); 518 nxt_python_print_exception(); 519 nxt_unit_request_done(req, NXT_UNIT_ERROR); 520 521 Py_DECREF(res); 522 523 goto release_scope; 524 } 525 526 Py_DECREF(res); 527 528 res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, 529 NULL); 530 if (nxt_slow_path(res == NULL)) { 531 nxt_unit_req_error(req, 532 "Python failed to call 'task.add_done_callback'"); 533 nxt_python_print_exception(); 534 nxt_unit_request_done(req, NXT_UNIT_ERROR); 535 536 goto release_task; 537 } 538 539 Py_DECREF(res); 540release_task: 541 Py_DECREF(task); 542release_scope: 543 Py_DECREF(scope); 544release_done: 545 Py_DECREF(done); 546release_send: 547 Py_DECREF(send); 548release_receive: 549 Py_DECREF(receive); 550release_asgi: 551 Py_DECREF(asgi); 552} 553 554 555static void 556nxt_py_asgi_close_handler(nxt_unit_request_info_t *req) 557{ 558 if (req->request->websocket_handshake) { 559 nxt_py_asgi_websocket_close_handler(req); 560 561 } else { 562 nxt_py_asgi_http_close_handler(req); 563 } 564} 565 566 567static PyObject * 568nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) 569{ 570 char *p, *target, *query; 571 uint32_t target_length, i; 572 PyObject *scope, *v, *type, *scheme; 573 PyObject *headers, *header; 574 nxt_unit_field_t *f; 575 nxt_unit_request_t *r; 576 577 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); 578 579#define SET_ITEM(dict, key, value) \ 580 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 581 == -1)) \ 582 { \ 583 nxt_unit_req_alert(req, "Python failed to set '" \ 584 #dict "." #key "' item"); \ 585 goto fail; \ 586 } 587 588 v = NULL; 589 headers = NULL; 590 591 r = req->request; 592 593 if (r->websocket_handshake) { 594 type = nxt_py_websocket_str; 595 scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; 596 597 } else { 598 type = nxt_py_http_str; 599 scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; 600 } 601 602 scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); 603 if (nxt_slow_path(scope == NULL)) { 604 return NULL; 605 } 606 607 p = nxt_unit_sptr_get(&r->version); 608 SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str 609 : nxt_py_1_0_str) 610 SET_ITEM(scope, scheme, scheme) 611 612 v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), 613 r->method_length); 614 if (nxt_slow_path(v == NULL)) { 615 nxt_unit_req_alert(req, "Python failed to create 'method' string"); 616 goto fail; 617 } 618 619 SET_ITEM(scope, method, v) 620 Py_DECREF(v); 621 622 v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, 623 "replace"); 624 if (nxt_slow_path(v == NULL)) { 625 nxt_unit_req_alert(req, "Python failed to create 'path' string"); 626 goto fail; 627 } 628 629 SET_ITEM(scope, path, v) 630 Py_DECREF(v); 631 632 target = nxt_unit_sptr_get(&r->target); 633 query = nxt_unit_sptr_get(&r->query); 634 635 if (r->query.offset != 0) { 636 target_length = query - target - 1; 637 638 } else { 639 target_length = r->target_length; 640 } 641 642 v = PyBytes_FromStringAndSize(target, target_length); 643 if (nxt_slow_path(v == NULL)) { 644 nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); 645 goto fail; 646 } 647 648 SET_ITEM(scope, raw_path, v) 649 Py_DECREF(v); 650 651 v = PyBytes_FromStringAndSize(query, r->query_length); 652 if (nxt_slow_path(v == NULL)) { 653 nxt_unit_req_alert(req, "Python failed to create 'query' string"); 654 goto fail; 655 } 656 657 SET_ITEM(scope, query_string, v) 658 Py_DECREF(v); 659 660 v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); 661 if (nxt_slow_path(v == NULL)) { 662 nxt_unit_req_alert(req, "Python failed to create 'client' pair"); 663 goto fail; 664 } 665 666 SET_ITEM(scope, client, v) 667 Py_DECREF(v); 668 669 v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); 670 if (nxt_slow_path(v == NULL)) { 671 nxt_unit_req_alert(req, "Python failed to create 'server' pair"); 672 goto fail; 673 } 674 675 SET_ITEM(scope, server, v) 676 Py_DECREF(v); 677 678 v = NULL; 679 680 headers = PyTuple_New(r->fields_count); 681 if (nxt_slow_path(headers == NULL)) { 682 nxt_unit_req_alert(req, "Python failed to create 'headers' object"); 683 goto fail; 684 } 685 686 for (i = 0; i < r->fields_count; i++) { 687 f = r->fields + i; 688 689 header = nxt_py_asgi_create_header(f); 690 if (nxt_slow_path(header == NULL)) { 691 nxt_unit_req_alert(req, "Python failed to create 'header' pair"); 692 goto fail; 693 } 694 695 PyTuple_SET_ITEM(headers, i, header); 696 697 if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL 698 && f->name_length == ws_protocol.length 699 && f->value_length > 0 700 && r->websocket_handshake) 701 { 702 v = nxt_py_asgi_create_subprotocols(f); 703 if (nxt_slow_path(v == NULL)) { 704 nxt_unit_req_alert(req, "Failed to create subprotocols"); 705 goto fail; 706 } 707 708 SET_ITEM(scope, subprotocols, v); 709 Py_DECREF(v); 710 } 711 } 712 713 SET_ITEM(scope, headers, headers) 714 Py_DECREF(headers); 715 716 return scope; 717 718fail: 719 720 Py_XDECREF(v); 721 Py_XDECREF(headers); 722 Py_DECREF(scope); 723 724 return NULL; 725 726#undef SET_ITEM 727} 728 729 730static PyObject * 731nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) 732{ 733 char *p, *s; 734 PyObject *pair, *v; 735 736 pair = PyTuple_New(2); 737 if (nxt_slow_path(pair == NULL)) { 738 return NULL; 739 } 740 741 p = nxt_unit_sptr_get(sptr); 742 s = memchr(p, ':', len); 743 744 v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); 745 if (nxt_slow_path(v == NULL)) { 746 Py_DECREF(pair); 747 748 return NULL; 749 } 750 751 PyTuple_SET_ITEM(pair, 0, v); 752 753 if (s != NULL) { 754 p += len; 755 v = PyLong_FromString(s + 1, &p, 10); 756 757 } else { 758 v = PyLong_FromLong(port); 759 } 760 761 if (nxt_slow_path(v == NULL)) { 762 Py_DECREF(pair); 763 764 return NULL; 765 } 766 767 PyTuple_SET_ITEM(pair, 1, v); 768 769 return pair; 770} 771 772 773static PyObject * 774nxt_py_asgi_create_header(nxt_unit_field_t *f) 775{ 776 char c, *name; 777 uint8_t pos; 778 PyObject *header, *v; 779 780 header = PyTuple_New(2); 781 if (nxt_slow_path(header == NULL)) { 782 return NULL; 783 } 784 785 name = nxt_unit_sptr_get(&f->name); 786 787 for (pos = 0; pos < f->name_length; pos++) { 788 c = name[pos]; 789 if (c >= 'A' && c <= 'Z') { 790 name[pos] = (c | 0x20); 791 } 792 } 793 794 v = PyBytes_FromStringAndSize(name, f->name_length); 795 if (nxt_slow_path(v == NULL)) { 796 Py_DECREF(header); 797 798 return NULL; 799 } 800 801 PyTuple_SET_ITEM(header, 0, v); 802 803 v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), 804 f->value_length); 805 if (nxt_slow_path(v == NULL)) { 806 Py_DECREF(header); 807 808 return NULL; 809 } 810 811 PyTuple_SET_ITEM(header, 1, v); 812 813 return header; 814} 815 816 817static PyObject * 818nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) 819{ 820 char *v; 821 uint32_t i, n, start; 822 PyObject *res, *proto; 823 824 v = nxt_unit_sptr_get(&f->value); 825 n = 1; 826 827 for (i = 0; i < f->value_length; i++) { 828 if (v[i] == ',') { 829 n++; 830 } 831 } 832 833 res = PyTuple_New(n); 834 if (nxt_slow_path(res == NULL)) { 835 return NULL; 836 } 837 838 n = 0; 839 start = 0; 840 841 for (i = 0; i < f->value_length; ) { 842 if (v[i] != ',') { 843 i++; 844 845 continue; 846 } 847 848 if (i - start > 0) { 849 proto = PyString_FromStringAndSize(v + start, i - start); 850 if (nxt_slow_path(proto == NULL)) { 851 goto fail; 852 } 853 854 PyTuple_SET_ITEM(res, n, proto); 855 856 n++; 857 } 858 859 do { 860 i++; 861 } while (i < f->value_length && v[i] == ' '); 862 863 start = i; 864 } 865 866 if (i - start > 0) { 867 proto = PyString_FromStringAndSize(v + start, i - start); 868 if (nxt_slow_path(proto == NULL)) { 869 goto fail; 870 } 871 872 PyTuple_SET_ITEM(res, n, proto); 873 } 874 875 return res; 876 877fail: 878 879 Py_DECREF(res); 880 881 return NULL; 882} 883 884 885static int 886nxt_python_asgi_ready(nxt_unit_ctx_t *ctx) 887{ 888 int rc; 889 PyObject *res, *fd, *py_ctx, *py_port; 890 nxt_unit_port_t *port; 891 nxt_py_asgi_ctx_data_t *ctx_data; 892 893 if (nxt_slow_path(nxt_py_shared_port == NULL)) { 894 return NXT_UNIT_ERROR; 895 } 896 897 port = nxt_py_shared_port; 898 899 nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port); 900 901 ctx_data = ctx->data; 902 903 rc = NXT_UNIT_ERROR; 904 905 fd = PyLong_FromLong(port->in_fd); 906 if (nxt_slow_path(fd == NULL)) { 907 nxt_unit_alert(ctx, "Python failed to create fd"); 908 nxt_python_print_exception(); 909 910 return rc; 911 } 912 913 py_ctx = PyLong_FromVoidPtr(ctx); 914 if (nxt_slow_path(py_ctx == NULL)) { 915 nxt_unit_alert(ctx, "Python failed to create py_ctx"); 916 nxt_python_print_exception(); 917 918 goto clean_fd; 919 } 920 921 py_port = PyLong_FromVoidPtr(port); 922 if (nxt_slow_path(py_port == NULL)) { 923 nxt_unit_alert(ctx, "Python failed to create py_port"); 924 nxt_python_print_exception(); 925 926 goto clean_py_ctx; 927 } 928 929 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 930 fd, nxt_py_port_read, 931 py_ctx, py_port, NULL); 932 if (nxt_slow_path(res == NULL)) { 933 nxt_unit_alert(ctx, "Python failed to add_reader"); 934 nxt_python_print_exception(); 935 936 } else { 937 Py_DECREF(res); 938 939 rc = NXT_UNIT_OK; 940 } 941 942 Py_DECREF(py_port); 943 944clean_py_ctx: 945 946 Py_DECREF(py_ctx); 947 948clean_fd: 949 950 Py_DECREF(fd); 951 952 return rc; 953} 954 955 956static int 957nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 958{ 959 int nb, rc; 960 PyObject *res, *fd, *py_ctx, *py_port; 961 nxt_py_asgi_ctx_data_t *ctx_data; 962 963 if (port->in_fd == -1) { 964 return NXT_UNIT_OK; 965 } 966 967 nb = 1; 968 969 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { 970 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 971 port->in_fd, strerror(errno), errno); 972 973 return NXT_UNIT_ERROR; 974 } 975 976 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); 977 978 if (port->id.id == NXT_UNIT_SHARED_PORT_ID) { 979 nxt_py_shared_port = port; 980 981 return NXT_UNIT_OK; 982 } 983 984 ctx_data = ctx->data; 985 986 ctx_data->port = port; 987 port->data = ctx_data; 988 989 rc = NXT_UNIT_ERROR; 990 991 fd = PyLong_FromLong(port->in_fd); 992 if (nxt_slow_path(fd == NULL)) { 993 nxt_unit_alert(ctx, "Python failed to create fd"); 994 nxt_python_print_exception(); 995 996 return rc; 997 } 998 999 py_ctx = PyLong_FromVoidPtr(ctx); 1000 if (nxt_slow_path(py_ctx == NULL)) { 1001 nxt_unit_alert(ctx, "Python failed to create py_ctx"); 1002 nxt_python_print_exception(); 1003 1004 goto clean_fd; 1005 } 1006 1007 py_port = PyLong_FromVoidPtr(port); 1008 if (nxt_slow_path(py_port == NULL)) { 1009 nxt_unit_alert(ctx, "Python failed to create py_port"); 1010 nxt_python_print_exception(); 1011 1012 goto clean_py_ctx; 1013 } 1014 1015 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 1016 fd, nxt_py_port_read, 1017 py_ctx, py_port, NULL); 1018 if (nxt_slow_path(res == NULL)) { 1019 nxt_unit_alert(ctx, "Python failed to add_reader"); 1020 nxt_python_print_exception(); 1021 1022 } else { 1023 Py_DECREF(res); 1024 1025 rc = NXT_UNIT_OK; 1026 } 1027 1028 Py_DECREF(py_port); 1029 1030clean_py_ctx: 1031 1032 Py_DECREF(py_ctx); 1033 1034clean_fd: 1035 1036 Py_DECREF(fd); 1037 1038 return rc; 1039} 1040 1041 1042static void 1043nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) 1044{ 1045 if (port->in_fd == -1) { 1046 return; 1047 } 1048 1049 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); 1050 1051 if (nxt_py_shared_port == port) { 1052 nxt_py_shared_port = NULL; 1053 } 1054} 1055 1056 1057static void 1058nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) 1059{ 1060 PyObject *res, *p; 1061 nxt_py_asgi_ctx_data_t *ctx_data; 1062 1063 nxt_unit_debug(ctx, "asgi_quit %p", ctx); 1064 1065 ctx_data = ctx->data; 1066 1067 if (nxt_py_shared_port != NULL) { 1068 p = PyLong_FromLong(nxt_py_shared_port->in_fd); 1069 if (nxt_slow_path(p == NULL)) { 1070 nxt_unit_alert(NULL, "Python failed to create Long"); 1071 nxt_python_print_exception(); 1072 1073 } else { 1074 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, 1075 p, NULL); 1076 if (nxt_slow_path(res == NULL)) { 1077 nxt_unit_alert(NULL, "Python failed to remove_reader"); 1078 nxt_python_print_exception(); 1079 1080 } else { 1081 Py_DECREF(res); 1082 } 1083 1084 Py_DECREF(p); 1085 } 1086 } 1087 1088 p = PyLong_FromLong(0); 1089 if (nxt_slow_path(p == NULL)) { 1090 nxt_unit_alert(NULL, "Python failed to create Long"); 1091 nxt_python_print_exception(); 1092 1093 } else { 1094 res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result, 1095 p, NULL); 1096 if (nxt_slow_path(res == NULL)) { 1097 nxt_unit_alert(ctx, "Python failed to set_result"); 1098 nxt_python_print_exception(); 1099 1100 } else { 1101 Py_DECREF(res); 1102 } 1103 1104 Py_DECREF(p); 1105 } 1106} 1107 1108 1109static void 1110nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) 1111{ 1112 int rc; 1113 nxt_queue_link_t *lnk; 1114 nxt_py_asgi_ctx_data_t *ctx_data; 1115 1116 ctx_data = ctx->data; 1117 1118 while (!nxt_queue_is_empty(&ctx_data->drain_queue)) { 1119 lnk = nxt_queue_first(&ctx_data->drain_queue); 1120 1121 rc = nxt_py_asgi_http_drain(lnk); 1122 if (rc == NXT_UNIT_AGAIN) { 1123 return; 1124 } 1125 1126 nxt_queue_remove(lnk); 1127 } 1128} 1129 1130 1131static PyObject * 1132nxt_py_asgi_port_read(PyObject *self, PyObject *args) 1133{
|
1175 Py_RETURN_NONE; 1176} 1177 1178 1179PyObject * 1180nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, 1181 void *data) 1182{ 1183 int i; 1184 PyObject *iter, *header, *h_iter, *name, *val, *res; 1185 1186 iter = PyObject_GetIter(headers); 1187 if (nxt_slow_path(iter == NULL)) { 1188 return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); 1189 } 1190 1191 for (i = 0; /* void */; i++) { 1192 header = PyIter_Next(iter); 1193 if (header == NULL) { 1194 break; 1195 } 1196 1197 h_iter = PyObject_GetIter(header); 1198 if (nxt_slow_path(h_iter == NULL)) { 1199 Py_DECREF(header); 1200 Py_DECREF(iter); 1201 1202 return PyErr_Format(PyExc_TypeError, 1203 "'headers' item #%d is not an iterable", i); 1204 } 1205 1206 name = PyIter_Next(h_iter); 1207 if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { 1208 Py_XDECREF(name); 1209 Py_DECREF(h_iter); 1210 Py_DECREF(header); 1211 Py_DECREF(iter); 1212 1213 return PyErr_Format(PyExc_TypeError, 1214 "'headers' item #%d 'name' is not a byte string", i); 1215 } 1216 1217 val = PyIter_Next(h_iter); 1218 if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { 1219 Py_XDECREF(val); 1220 Py_DECREF(h_iter); 1221 Py_DECREF(header); 1222 Py_DECREF(iter); 1223 1224 return PyErr_Format(PyExc_TypeError, 1225 "'headers' item #%d 'value' is not a byte string", i); 1226 } 1227 1228 res = cb(data, i, name, val); 1229 1230 Py_DECREF(name); 1231 Py_DECREF(val); 1232 Py_DECREF(h_iter); 1233 Py_DECREF(header); 1234 1235 if (nxt_slow_path(res == NULL)) { 1236 Py_DECREF(iter); 1237 1238 return NULL; 1239 } 1240 1241 Py_DECREF(res); 1242 } 1243 1244 Py_DECREF(iter); 1245 1246 Py_RETURN_NONE; 1247} 1248 1249 1250PyObject * 1251nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) 1252{ 1253 nxt_py_asgi_calc_size_ctx_t *ctx; 1254 1255 ctx = data; 1256 1257 ctx->fields_count++; 1258 ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); 1259 1260 Py_RETURN_NONE; 1261} 1262 1263 1264PyObject * 1265nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) 1266{ 1267 int rc; 1268 char *name_str, *val_str; 1269 uint32_t name_len, val_len; 1270 nxt_off_t content_length; 1271 nxt_unit_request_info_t *req; 1272 nxt_py_asgi_add_field_ctx_t *ctx; 1273 1274 name_str = PyBytes_AS_STRING(name); 1275 name_len = PyBytes_GET_SIZE(name); 1276 1277 val_str = PyBytes_AS_STRING(val); 1278 val_len = PyBytes_GET_SIZE(val); 1279 1280 ctx = data; 1281 req = ctx->req; 1282 1283 rc = nxt_unit_response_add_field(req, name_str, name_len, 1284 val_str, val_len); 1285 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1286 return PyErr_Format(PyExc_RuntimeError, 1287 "failed to add header #%d", i); 1288 } 1289 1290 if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { 1291 content_length = nxt_off_t_parse((u_char *) val_str, val_len); 1292 if (nxt_slow_path(content_length < 0)) { 1293 nxt_unit_req_error(req, "failed to parse Content-Length " 1294 "value %.*s", (int) val_len, val_str); 1295 1296 return PyErr_Format(PyExc_ValueError, 1297 "Failed to parse Content-Length: '%.*s'", 1298 (int) val_len, val_str); 1299 } 1300 1301 ctx->content_length = content_length; 1302 } 1303 1304 Py_RETURN_NONE; 1305} 1306 1307 1308PyObject * 1309nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, 1310 nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result) 1311{ 1312 PyObject *set_result, *res; 1313 1314 if (nxt_slow_path(result == NULL)) { 1315 Py_DECREF(future); 1316 1317 return NULL; 1318 } 1319 1320 set_result = PyObject_GetAttrString(future, "set_result"); 1321 if (nxt_slow_path(set_result == NULL)) { 1322 nxt_unit_req_alert(req, "failed to get 'set_result' for future"); 1323 1324 Py_CLEAR(future); 1325 1326 goto cleanup_result; 1327 } 1328 1329 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { 1330 nxt_unit_req_alert(req, "'future.set_result' is not a callable"); 1331 1332 Py_CLEAR(future); 1333 1334 goto cleanup; 1335 } 1336 1337 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result, 1338 result, NULL); 1339 if (nxt_slow_path(res == NULL)) { 1340 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); 1341 nxt_python_print_exception(); 1342 1343 Py_CLEAR(future); 1344 } 1345 1346 Py_XDECREF(res); 1347 1348cleanup: 1349 1350 Py_DECREF(set_result); 1351 1352cleanup_result: 1353 1354 Py_DECREF(result); 1355 1356 return future; 1357} 1358 1359 1360PyObject * 1361nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) 1362{ 1363 PyObject *msg; 1364 1365 msg = PyDict_New(); 1366 if (nxt_slow_path(msg == NULL)) { 1367 nxt_unit_req_alert(req, "Python failed to create message dict"); 1368 nxt_python_print_exception(); 1369 1370 return PyErr_Format(PyExc_RuntimeError, 1371 "failed to create message dict"); 1372 } 1373 1374 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { 1375 nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); 1376 1377 Py_DECREF(msg); 1378 1379 return PyErr_Format(PyExc_RuntimeError, 1380 "failed to set 'msg.type' item"); 1381 } 1382 1383 return msg; 1384} 1385 1386 1387PyObject * 1388nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, 1389 PyObject *spec_version) 1390{ 1391 PyObject *scope, *asgi; 1392 1393 scope = PyDict_New(); 1394 if (nxt_slow_path(scope == NULL)) { 1395 nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); 1396 nxt_python_print_exception(); 1397 1398 return PyErr_Format(PyExc_RuntimeError, 1399 "failed to create 'scope' dict"); 1400 } 1401 1402 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { 1403 nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); 1404 1405 Py_DECREF(scope); 1406 1407 return PyErr_Format(PyExc_RuntimeError, 1408 "failed to set 'scope.type' item"); 1409 } 1410 1411 asgi = PyDict_New(); 1412 if (nxt_slow_path(asgi == NULL)) { 1413 nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); 1414 nxt_python_print_exception(); 1415 1416 Py_DECREF(scope); 1417 1418 return PyErr_Format(PyExc_RuntimeError, 1419 "failed to create 'asgi' dict"); 1420 } 1421 1422 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { 1423 nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); 1424 1425 Py_DECREF(asgi); 1426 Py_DECREF(scope); 1427 1428 return PyErr_Format(PyExc_RuntimeError, 1429 "failed to set 'scope.asgi' item"); 1430 } 1431 1432 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, 1433 nxt_py_3_0_str) == -1)) 1434 { 1435 nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); 1436 1437 Py_DECREF(asgi); 1438 Py_DECREF(scope); 1439 1440 return PyErr_Format(PyExc_RuntimeError, 1441 "failed to set 'asgi.version' item"); 1442 } 1443 1444 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, 1445 spec_version) == -1)) 1446 { 1447 nxt_unit_req_alert(req, 1448 "Python failed to set 'asgi.spec_version' item"); 1449 1450 Py_DECREF(asgi); 1451 Py_DECREF(scope); 1452 1453 return PyErr_Format(PyExc_RuntimeError, 1454 "failed to set 'asgi.spec_version' item"); 1455 } 1456 1457 Py_DECREF(asgi); 1458 1459 return scope; 1460} 1461 1462 1463void 1464nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link) 1465{ 1466 nxt_py_asgi_ctx_data_t *ctx_data; 1467 1468 ctx_data = req->ctx->data; 1469 1470 nxt_queue_insert_tail(&ctx_data->drain_queue, link); 1471} 1472 1473 1474void 1475nxt_py_asgi_dealloc(PyObject *self) 1476{ 1477 PyObject_Del(self); 1478} 1479 1480 1481PyObject * 1482nxt_py_asgi_await(PyObject *self) 1483{ 1484 Py_INCREF(self); 1485 return self; 1486} 1487 1488 1489PyObject * 1490nxt_py_asgi_iter(PyObject *self) 1491{ 1492 Py_INCREF(self); 1493 return self; 1494} 1495 1496 1497PyObject * 1498nxt_py_asgi_next(PyObject *self) 1499{ 1500 return NULL; 1501} 1502 1503 1504static void 1505nxt_python_asgi_done(void) 1506{ 1507 nxt_py_asgi_str_done(); 1508 1509 Py_XDECREF(nxt_py_port_read); 1510} 1511 1512#else /* !(NXT_HAVE_ASGI) */ 1513 1514 1515int 1516nxt_python_asgi_check(PyObject *obj) 1517{ 1518 return 0; 1519} 1520 1521 1522int 1523nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 1524{ 1525 nxt_unit_alert(NULL, "ASGI not implemented"); 1526 return NXT_UNIT_ERROR; 1527} 1528 1529 1530#endif /* NXT_HAVE_ASGI */
| 1190 Py_RETURN_NONE; 1191} 1192 1193 1194PyObject * 1195nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, 1196 void *data) 1197{ 1198 int i; 1199 PyObject *iter, *header, *h_iter, *name, *val, *res; 1200 1201 iter = PyObject_GetIter(headers); 1202 if (nxt_slow_path(iter == NULL)) { 1203 return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); 1204 } 1205 1206 for (i = 0; /* void */; i++) { 1207 header = PyIter_Next(iter); 1208 if (header == NULL) { 1209 break; 1210 } 1211 1212 h_iter = PyObject_GetIter(header); 1213 if (nxt_slow_path(h_iter == NULL)) { 1214 Py_DECREF(header); 1215 Py_DECREF(iter); 1216 1217 return PyErr_Format(PyExc_TypeError, 1218 "'headers' item #%d is not an iterable", i); 1219 } 1220 1221 name = PyIter_Next(h_iter); 1222 if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { 1223 Py_XDECREF(name); 1224 Py_DECREF(h_iter); 1225 Py_DECREF(header); 1226 Py_DECREF(iter); 1227 1228 return PyErr_Format(PyExc_TypeError, 1229 "'headers' item #%d 'name' is not a byte string", i); 1230 } 1231 1232 val = PyIter_Next(h_iter); 1233 if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { 1234 Py_XDECREF(val); 1235 Py_DECREF(h_iter); 1236 Py_DECREF(header); 1237 Py_DECREF(iter); 1238 1239 return PyErr_Format(PyExc_TypeError, 1240 "'headers' item #%d 'value' is not a byte string", i); 1241 } 1242 1243 res = cb(data, i, name, val); 1244 1245 Py_DECREF(name); 1246 Py_DECREF(val); 1247 Py_DECREF(h_iter); 1248 Py_DECREF(header); 1249 1250 if (nxt_slow_path(res == NULL)) { 1251 Py_DECREF(iter); 1252 1253 return NULL; 1254 } 1255 1256 Py_DECREF(res); 1257 } 1258 1259 Py_DECREF(iter); 1260 1261 Py_RETURN_NONE; 1262} 1263 1264 1265PyObject * 1266nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) 1267{ 1268 nxt_py_asgi_calc_size_ctx_t *ctx; 1269 1270 ctx = data; 1271 1272 ctx->fields_count++; 1273 ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); 1274 1275 Py_RETURN_NONE; 1276} 1277 1278 1279PyObject * 1280nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) 1281{ 1282 int rc; 1283 char *name_str, *val_str; 1284 uint32_t name_len, val_len; 1285 nxt_off_t content_length; 1286 nxt_unit_request_info_t *req; 1287 nxt_py_asgi_add_field_ctx_t *ctx; 1288 1289 name_str = PyBytes_AS_STRING(name); 1290 name_len = PyBytes_GET_SIZE(name); 1291 1292 val_str = PyBytes_AS_STRING(val); 1293 val_len = PyBytes_GET_SIZE(val); 1294 1295 ctx = data; 1296 req = ctx->req; 1297 1298 rc = nxt_unit_response_add_field(req, name_str, name_len, 1299 val_str, val_len); 1300 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1301 return PyErr_Format(PyExc_RuntimeError, 1302 "failed to add header #%d", i); 1303 } 1304 1305 if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { 1306 content_length = nxt_off_t_parse((u_char *) val_str, val_len); 1307 if (nxt_slow_path(content_length < 0)) { 1308 nxt_unit_req_error(req, "failed to parse Content-Length " 1309 "value %.*s", (int) val_len, val_str); 1310 1311 return PyErr_Format(PyExc_ValueError, 1312 "Failed to parse Content-Length: '%.*s'", 1313 (int) val_len, val_str); 1314 } 1315 1316 ctx->content_length = content_length; 1317 } 1318 1319 Py_RETURN_NONE; 1320} 1321 1322 1323PyObject * 1324nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, 1325 nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result) 1326{ 1327 PyObject *set_result, *res; 1328 1329 if (nxt_slow_path(result == NULL)) { 1330 Py_DECREF(future); 1331 1332 return NULL; 1333 } 1334 1335 set_result = PyObject_GetAttrString(future, "set_result"); 1336 if (nxt_slow_path(set_result == NULL)) { 1337 nxt_unit_req_alert(req, "failed to get 'set_result' for future"); 1338 1339 Py_CLEAR(future); 1340 1341 goto cleanup_result; 1342 } 1343 1344 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { 1345 nxt_unit_req_alert(req, "'future.set_result' is not a callable"); 1346 1347 Py_CLEAR(future); 1348 1349 goto cleanup; 1350 } 1351 1352 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result, 1353 result, NULL); 1354 if (nxt_slow_path(res == NULL)) { 1355 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); 1356 nxt_python_print_exception(); 1357 1358 Py_CLEAR(future); 1359 } 1360 1361 Py_XDECREF(res); 1362 1363cleanup: 1364 1365 Py_DECREF(set_result); 1366 1367cleanup_result: 1368 1369 Py_DECREF(result); 1370 1371 return future; 1372} 1373 1374 1375PyObject * 1376nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) 1377{ 1378 PyObject *msg; 1379 1380 msg = PyDict_New(); 1381 if (nxt_slow_path(msg == NULL)) { 1382 nxt_unit_req_alert(req, "Python failed to create message dict"); 1383 nxt_python_print_exception(); 1384 1385 return PyErr_Format(PyExc_RuntimeError, 1386 "failed to create message dict"); 1387 } 1388 1389 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { 1390 nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); 1391 1392 Py_DECREF(msg); 1393 1394 return PyErr_Format(PyExc_RuntimeError, 1395 "failed to set 'msg.type' item"); 1396 } 1397 1398 return msg; 1399} 1400 1401 1402PyObject * 1403nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, 1404 PyObject *spec_version) 1405{ 1406 PyObject *scope, *asgi; 1407 1408 scope = PyDict_New(); 1409 if (nxt_slow_path(scope == NULL)) { 1410 nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); 1411 nxt_python_print_exception(); 1412 1413 return PyErr_Format(PyExc_RuntimeError, 1414 "failed to create 'scope' dict"); 1415 } 1416 1417 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { 1418 nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); 1419 1420 Py_DECREF(scope); 1421 1422 return PyErr_Format(PyExc_RuntimeError, 1423 "failed to set 'scope.type' item"); 1424 } 1425 1426 asgi = PyDict_New(); 1427 if (nxt_slow_path(asgi == NULL)) { 1428 nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); 1429 nxt_python_print_exception(); 1430 1431 Py_DECREF(scope); 1432 1433 return PyErr_Format(PyExc_RuntimeError, 1434 "failed to create 'asgi' dict"); 1435 } 1436 1437 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { 1438 nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); 1439 1440 Py_DECREF(asgi); 1441 Py_DECREF(scope); 1442 1443 return PyErr_Format(PyExc_RuntimeError, 1444 "failed to set 'scope.asgi' item"); 1445 } 1446 1447 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, 1448 nxt_py_3_0_str) == -1)) 1449 { 1450 nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); 1451 1452 Py_DECREF(asgi); 1453 Py_DECREF(scope); 1454 1455 return PyErr_Format(PyExc_RuntimeError, 1456 "failed to set 'asgi.version' item"); 1457 } 1458 1459 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, 1460 spec_version) == -1)) 1461 { 1462 nxt_unit_req_alert(req, 1463 "Python failed to set 'asgi.spec_version' item"); 1464 1465 Py_DECREF(asgi); 1466 Py_DECREF(scope); 1467 1468 return PyErr_Format(PyExc_RuntimeError, 1469 "failed to set 'asgi.spec_version' item"); 1470 } 1471 1472 Py_DECREF(asgi); 1473 1474 return scope; 1475} 1476 1477 1478void 1479nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link) 1480{ 1481 nxt_py_asgi_ctx_data_t *ctx_data; 1482 1483 ctx_data = req->ctx->data; 1484 1485 nxt_queue_insert_tail(&ctx_data->drain_queue, link); 1486} 1487 1488 1489void 1490nxt_py_asgi_dealloc(PyObject *self) 1491{ 1492 PyObject_Del(self); 1493} 1494 1495 1496PyObject * 1497nxt_py_asgi_await(PyObject *self) 1498{ 1499 Py_INCREF(self); 1500 return self; 1501} 1502 1503 1504PyObject * 1505nxt_py_asgi_iter(PyObject *self) 1506{ 1507 Py_INCREF(self); 1508 return self; 1509} 1510 1511 1512PyObject * 1513nxt_py_asgi_next(PyObject *self) 1514{ 1515 return NULL; 1516} 1517 1518 1519static void 1520nxt_python_asgi_done(void) 1521{ 1522 nxt_py_asgi_str_done(); 1523 1524 Py_XDECREF(nxt_py_port_read); 1525} 1526 1527#else /* !(NXT_HAVE_ASGI) */ 1528 1529 1530int 1531nxt_python_asgi_check(PyObject *obj) 1532{ 1533 return 0; 1534} 1535 1536 1537int 1538nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 1539{ 1540 nxt_unit_alert(NULL, "ASGI not implemented"); 1541 return NXT_UNIT_ERROR; 1542} 1543 1544 1545#endif /* NXT_HAVE_ASGI */
|