1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7 #include <python/nxt_python.h> 8 9 #if (NXT_HAVE_ASGI) 10 11 #include <nxt_main.h> 12 #include <nxt_unit.h> 13 #include <nxt_unit_request.h> 14 #include <nxt_unit_response.h> 15 #include <python/nxt_python_asgi.h> 16 #include <python/nxt_python_asgi_str.h> 17 18 19 static PyObject *nxt_python_asgi_get_func(PyObject *obj); 20 static int nxt_python_asgi_ctx_data_alloc(void **pdata); 21 static void nxt_python_asgi_ctx_data_free(void *data); 22 static int nxt_python_asgi_startup(void *data); 23 static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx); 24 25 static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, 26 nxt_unit_port_t *port); 27 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); 28 29 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); 30 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, 31 uint16_t port); 32 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); 33 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); 34 35 static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx); 36 37 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 38 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); 39 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); 40 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); 41 42 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); 43 static void nxt_python_asgi_done(void); 44 45 46 int nxt_py_asgi_legacy; 47 static PyObject *nxt_py_port_read; 48 static nxt_unit_port_t *nxt_py_shared_port; 49 50 static PyMethodDef nxt_py_port_read_method = 51 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; 52 53 static nxt_python_proto_t nxt_py_asgi_proto = { 54 .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc, 55 .ctx_data_free = nxt_python_asgi_ctx_data_free, 56 .startup = nxt_python_asgi_startup, 57 .run = nxt_python_asgi_run, 58 .ready = nxt_python_asgi_ready, 59 .done = nxt_python_asgi_done, 60 }; 61 62 #define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A 63 64 65 int 66 nxt_python_asgi_check(PyObject *obj) 67 { 68 int res; 69 PyObject *func; 70 PyCodeObject *code; 71 72 func = nxt_python_asgi_get_func(obj); 73 74 if (func == NULL) { 75 return 0; 76 } 77 78 code = (PyCodeObject *) PyFunction_GET_CODE(func); 79 80 nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with " 81 "%d argument(s)", 82 (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ", 83 code->co_argcount); 84 85 res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1; 86 87 Py_DECREF(func); 88 89 return res; 90 } 91 92 93 static PyObject * 94 nxt_python_asgi_get_func(PyObject *obj) 95 { 96 PyObject *call; 97 98 if (PyFunction_Check(obj)) { 99 Py_INCREF(obj); 100 return obj; 101 } 102 103 if (PyMethod_Check(obj)) { 104 obj = PyMethod_GET_FUNCTION(obj); 105 106 Py_INCREF(obj); 107 return obj; 108 } 109 110 call = PyObject_GetAttrString(obj, "__call__"); 111 112 if (call == NULL) { 113 return NULL; 114 } 115 116 if (PyFunction_Check(call)) { 117 return call; 118 } 119 120 if (PyMethod_Check(call)) { 121 obj = PyMethod_GET_FUNCTION(call); 122 123 Py_INCREF(obj); 124 Py_DECREF(call); 125 126 return obj; 127 } 128 129 Py_DECREF(call); 130 131 return NULL; 132 } 133 134 135 int 136 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 137 { 138 PyObject *func; 139 PyCodeObject *code; 140 141 nxt_unit_debug(NULL, "asgi_init"); 142 143 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) { 144 nxt_unit_alert(NULL, "Python failed to init string objects"); 145 return NXT_UNIT_ERROR; 146 } 147 148 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); 149 if (nxt_slow_path(nxt_py_port_read == NULL)) { 150 nxt_unit_alert(NULL, 151 "Python failed to initialize the 'port_read' function"); 152 return NXT_UNIT_ERROR; 153 } 154 155 if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) { 156 return NXT_UNIT_ERROR; 157 } 158 159 if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) { 160 return NXT_UNIT_ERROR; 161 } 162 163 func = nxt_python_asgi_get_func(nxt_py_application); 164 if (nxt_slow_path(func == NULL)) { 165 nxt_unit_alert(NULL, "Python cannot find function for callable"); 166 return NXT_UNIT_ERROR; 167 } 168 169 code = (PyCodeObject *) PyFunction_GET_CODE(func); 170 171 if ((code->co_flags & CO_COROUTINE) == 0) { 172 nxt_unit_debug(NULL, "asgi: callable is not a coroutine function " 173 "switching to legacy mode"); 174 nxt_py_asgi_legacy = 1; 175 } 176 177 Py_DECREF(func); 178 179 init->callbacks.request_handler = nxt_py_asgi_request_handler; 180 init->callbacks.data_handler = nxt_py_asgi_http_data_handler; 181 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; 182 init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler; 183 init->callbacks.quit = nxt_py_asgi_quit; 184 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; 185 init->callbacks.add_port = nxt_py_asgi_add_port; 186 init->callbacks.remove_port = nxt_py_asgi_remove_port; 187 188 *proto = nxt_py_asgi_proto; 189 190 return NXT_UNIT_OK; 191 } 192 193 194 static int 195 nxt_python_asgi_ctx_data_alloc(void **pdata) 196 { 197 uint32_t i; 198 PyObject *asyncio, *loop, *new_event_loop, *obj; 199 nxt_py_asgi_ctx_data_t *ctx_data; 200 201 ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t)); 202 if (nxt_slow_path(ctx_data == NULL)) { 203 nxt_unit_alert(NULL, "Failed to allocate context data"); 204 return NXT_UNIT_ERROR; 205 } 206 207 memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t)); 208 209 nxt_queue_init(&ctx_data->drain_queue); 210 211 struct { 212 const char *key; 213 PyObject **handler; 214 215 } handlers[] = { 216 { "create_task", &ctx_data->loop_create_task }, 217 { "add_reader", &ctx_data->loop_add_reader }, 218 { "remove_reader", &ctx_data->loop_remove_reader }, 219 { "call_soon", &ctx_data->loop_call_soon }, 220 { "run_until_complete", &ctx_data->loop_run_until_complete }, 221 { "create_future", &ctx_data->loop_create_future }, 222 }; 223 224 loop = NULL; 225 226 asyncio = PyImport_ImportModule("asyncio"); 227 if (nxt_slow_path(asyncio == NULL)) { 228 nxt_unit_alert(NULL, "Python failed to import module 'asyncio'"); 229 nxt_python_print_exception(); 230 goto fail; 231 } 232 233 new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), 234 "new_event_loop"); 235 if (nxt_slow_path(new_event_loop == NULL)) { 236 nxt_unit_alert(NULL, 237 "Python failed to get 'new_event_loop' from module 'asyncio'"); 238 goto fail; 239 } 240 241 if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) { 242 nxt_unit_alert(NULL, 243 "'asyncio.new_event_loop' is not a callable object"); 244 goto fail; 245 } 246 247 loop = PyObject_CallObject(new_event_loop, NULL); 248 if (nxt_slow_path(loop == NULL)) { 249 nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'"); 250 goto fail; 251 } 252 253 for (i = 0; i < nxt_nitems(handlers); i++) { 254 obj = PyObject_GetAttrString(loop, handlers[i].key); 255 if (nxt_slow_path(obj == NULL)) { 256 nxt_unit_alert(NULL, "Python failed to get 'loop.%s'", 257 handlers[i].key); 258 goto fail; 259 } 260 261 *handlers[i].handler = obj; 262 263 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 264 nxt_unit_alert(NULL, "'loop.%s' is not a callable object", 265 handlers[i].key); 266 goto fail; 267 } 268 } 269 270 obj = PyObject_CallObject(ctx_data->loop_create_future, NULL); 271 if (nxt_slow_path(obj == NULL)) { 272 nxt_unit_alert(NULL, "Python failed to create Future "); 273 nxt_python_print_exception(); 274 goto fail; 275 } 276 277 ctx_data->quit_future = obj; 278 279 obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result"); 280 if (nxt_slow_path(obj == NULL)) { 281 nxt_unit_alert(NULL, "Python failed to get 'future.set_result'"); 282 goto fail; 283 } 284 285 ctx_data->quit_future_set_result = obj; 286 287 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 288 nxt_unit_alert(NULL, "'future.set_result' is not a callable object"); 289 goto fail; 290 } 291 292 Py_DECREF(loop); 293 Py_DECREF(asyncio); 294 295 *pdata = ctx_data; 296 297 return NXT_UNIT_OK; 298 299 fail: 300 301 nxt_python_asgi_ctx_data_free(ctx_data); 302 303 Py_XDECREF(loop); 304 Py_XDECREF(asyncio); 305 306 return NXT_UNIT_ERROR; 307 } 308 309 310 static void 311 nxt_python_asgi_ctx_data_free(void *data) 312 { 313 nxt_py_asgi_ctx_data_t *ctx_data; 314 315 ctx_data = data; 316 317 Py_XDECREF(ctx_data->loop_run_until_complete); 318 Py_XDECREF(ctx_data->loop_create_future); 319 Py_XDECREF(ctx_data->loop_create_task); 320 Py_XDECREF(ctx_data->loop_call_soon); 321 Py_XDECREF(ctx_data->loop_add_reader); 322 Py_XDECREF(ctx_data->loop_remove_reader); 323 Py_XDECREF(ctx_data->quit_future); 324 Py_XDECREF(ctx_data->quit_future_set_result); 325 326 nxt_unit_free(NULL, ctx_data); 327 } 328 329 330 static int 331 nxt_python_asgi_startup(void *data) 332 { 333 return nxt_py_asgi_lifespan_startup(data); 334 } 335 336 337 static int 338 nxt_python_asgi_run(nxt_unit_ctx_t *ctx) 339 { 340 PyObject *res; 341 nxt_py_asgi_ctx_data_t *ctx_data; 342 343 ctx_data = ctx->data; 344 345 res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, 346 ctx_data->quit_future, NULL); 347 if (nxt_slow_path(res == NULL)) { 348 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); 349 nxt_python_print_exception(); 350 351 return NXT_UNIT_ERROR; 352 } 353 354 Py_DECREF(res); 355 356 nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port); 357 nxt_py_asgi_remove_reader(ctx, ctx_data->port); 358 359 if (ctx_data->port != NULL) { 360 ctx_data->port->data = NULL; 361 ctx_data->port = NULL; 362 } 363 364 nxt_py_asgi_lifespan_shutdown(ctx); 365 366 return NXT_UNIT_OK; 367 } 368 369 370 static void 371 nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 372 { 373 PyObject *res, *fd; 374 nxt_py_asgi_ctx_data_t *ctx_data; 375 376 if (port == NULL || port->in_fd == -1) { 377 return; 378 } 379 380 ctx_data = ctx->data; 381 382 nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port); 383 384 fd = PyLong_FromLong(port->in_fd); 385 if (nxt_slow_path(fd == NULL)) { 386 nxt_unit_alert(ctx, "Python failed to create Long object"); 387 nxt_python_print_exception(); 388 389 return; 390 } 391 392 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL); 393 if (nxt_slow_path(res == NULL)) { 394 nxt_unit_alert(ctx, "Python failed to remove_reader"); 395 nxt_python_print_exception(); 396 397 } else { 398 Py_DECREF(res); 399 } 400 401 Py_DECREF(fd); 402 } 403 404 405 static void 406 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) 407 { 408 PyObject *scope, *res, *task, *receive, *send, *done, *asgi; 409 PyObject *stage2; 410 nxt_py_asgi_ctx_data_t *ctx_data; 411 412 if (req->request->websocket_handshake) { 413 asgi = nxt_py_asgi_websocket_create(req); 414 415 } else { 416 asgi = nxt_py_asgi_http_create(req); 417 } 418 419 if (nxt_slow_path(asgi == NULL)) { 420 nxt_unit_req_alert(req, "Python failed to create asgi object"); 421 nxt_unit_request_done(req, NXT_UNIT_ERROR); 422 423 return; 424 } 425 426 receive = PyObject_GetAttrString(asgi, "receive"); 427 if (nxt_slow_path(receive == NULL)) { 428 nxt_unit_req_alert(req, "Python failed to get 'receive' method"); 429 nxt_unit_request_done(req, NXT_UNIT_ERROR); 430 431 goto release_asgi; 432 } 433 434 send = PyObject_GetAttrString(asgi, "send"); 435 if (nxt_slow_path(receive == NULL)) { 436 nxt_unit_req_alert(req, "Python failed to get 'send' method"); 437 nxt_unit_request_done(req, NXT_UNIT_ERROR); 438 439 goto release_receive; 440 } 441 442 done = PyObject_GetAttrString(asgi, "_done"); 443 if (nxt_slow_path(receive == NULL)) { 444 nxt_unit_req_alert(req, "Python failed to get '_done' method"); 445 nxt_unit_request_done(req, NXT_UNIT_ERROR); 446 447 goto release_send; 448 } 449 450 scope = nxt_py_asgi_create_http_scope(req); 451 if (nxt_slow_path(scope == NULL)) { 452 nxt_unit_request_done(req, NXT_UNIT_ERROR); 453 454 goto release_done; 455 } 456 457 req->data = asgi; 458 459 if (!nxt_py_asgi_legacy) { 460 nxt_unit_req_debug(req, "Python call ASGI 3.0 application"); 461 462 res = PyObject_CallFunctionObjArgs(nxt_py_application, 463 scope, receive, send, NULL); 464 465 } else { 466 nxt_unit_req_debug(req, "Python call legacy application"); 467 468 res = PyObject_CallFunctionObjArgs(nxt_py_application, scope, NULL); 469 470 if (nxt_slow_path(res == NULL)) { 471 nxt_unit_req_error(req, "Python failed to call legacy app stage1"); 472 nxt_python_print_exception(); 473 nxt_unit_request_done(req, NXT_UNIT_ERROR); 474 475 goto release_scope; 476 } 477 478 if (nxt_slow_path(PyCallable_Check(res) == 0)) { 479 nxt_unit_req_error(req, 480 "Legacy ASGI application returns not a callable"); 481 nxt_unit_request_done(req, NXT_UNIT_ERROR); 482 483 Py_DECREF(res); 484 485 goto release_scope; 486 } 487 488 stage2 = res; 489 490 res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL); 491 492 Py_DECREF(stage2); 493 } 494 495 if (nxt_slow_path(res == NULL)) { 496 nxt_unit_req_error(req, "Python failed to call the application"); 497 nxt_python_print_exception(); 498 nxt_unit_request_done(req, NXT_UNIT_ERROR); 499 500 goto release_scope; 501 } 502 503 if (nxt_slow_path(!PyCoro_CheckExact(res))) { 504 nxt_unit_req_error(req, "Application result type is not a coroutine"); 505 nxt_unit_request_done(req, NXT_UNIT_ERROR); 506 507 Py_DECREF(res); 508 509 goto release_scope; 510 } 511 512 ctx_data = req->ctx->data; 513 514 task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); 515 if (nxt_slow_path(task == NULL)) { 516 nxt_unit_req_error(req, "Python failed to call the create_task"); 517 nxt_python_print_exception(); 518 nxt_unit_request_done(req, NXT_UNIT_ERROR); 519 520 Py_DECREF(res); 521 522 goto release_scope; 523 } 524 525 Py_DECREF(res); 526 527 res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, 528 NULL); 529 if (nxt_slow_path(res == NULL)) { 530 nxt_unit_req_error(req, 531 "Python failed to call 'task.add_done_callback'"); 532 nxt_python_print_exception(); 533 nxt_unit_request_done(req, NXT_UNIT_ERROR); 534 535 goto release_task; 536 } 537 538 Py_DECREF(res); 539 release_task: 540 Py_DECREF(task); 541 release_scope: 542 Py_DECREF(scope); 543 release_done: 544 Py_DECREF(done); 545 release_send: 546 Py_DECREF(send); 547 release_receive: 548 Py_DECREF(receive); 549 release_asgi: 550 Py_DECREF(asgi); 551 } 552 553 554 static PyObject * 555 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) 556 { 557 char *p, *target, *query; 558 uint32_t target_length, i; 559 PyObject *scope, *v, *type, *scheme; 560 PyObject *headers, *header; 561 nxt_unit_field_t *f; 562 nxt_unit_request_t *r; 563 564 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); 565 566 #define SET_ITEM(dict, key, value) \ 567 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 568 == -1)) \ 569 { \ 570 nxt_unit_req_alert(req, "Python failed to set '" \ 571 #dict "." #key "' item"); \ 572 goto fail; \ 573 } 574 575 v = NULL; 576 headers = NULL; 577 578 r = req->request; 579 580 if (r->websocket_handshake) { 581 type = nxt_py_websocket_str; 582 scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; 583 584 } else { 585 type = nxt_py_http_str; 586 scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; 587 } 588 589 scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); 590 if (nxt_slow_path(scope == NULL)) { 591 return NULL; 592 } 593 594 p = nxt_unit_sptr_get(&r->version); 595 SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str 596 : nxt_py_1_0_str) 597 SET_ITEM(scope, scheme, scheme) 598 599 v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), 600 r->method_length); 601 if (nxt_slow_path(v == NULL)) { 602 nxt_unit_req_alert(req, "Python failed to create 'method' string"); 603 goto fail; 604 } 605 606 SET_ITEM(scope, method, v) 607 Py_DECREF(v); 608 609 v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, 610 "replace"); 611 if (nxt_slow_path(v == NULL)) { 612 nxt_unit_req_alert(req, "Python failed to create 'path' string"); 613 goto fail; 614 } 615 616 SET_ITEM(scope, path, v) 617 Py_DECREF(v); 618 619 target = nxt_unit_sptr_get(&r->target); 620 query = nxt_unit_sptr_get(&r->query); 621 622 if (r->query.offset != 0) { 623 target_length = query - target - 1; 624 625 } else { 626 target_length = r->target_length; 627 } 628 629 v = PyBytes_FromStringAndSize(target, target_length); 630 if (nxt_slow_path(v == NULL)) { 631 nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); 632 goto fail; 633 } 634 635 SET_ITEM(scope, raw_path, v) 636 Py_DECREF(v); 637 638 v = PyBytes_FromStringAndSize(query, r->query_length); 639 if (nxt_slow_path(v == NULL)) { 640 nxt_unit_req_alert(req, "Python failed to create 'query' string"); 641 goto fail; 642 } 643 644 SET_ITEM(scope, query_string, v) 645 Py_DECREF(v); 646 647 v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); 648 if (nxt_slow_path(v == NULL)) { 649 nxt_unit_req_alert(req, "Python failed to create 'client' pair"); 650 goto fail; 651 } 652 653 SET_ITEM(scope, client, v) 654 Py_DECREF(v); 655 656 v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); 657 if (nxt_slow_path(v == NULL)) { 658 nxt_unit_req_alert(req, "Python failed to create 'server' pair"); 659 goto fail; 660 } 661 662 SET_ITEM(scope, server, v) 663 Py_DECREF(v); 664 665 v = NULL; 666 667 headers = PyTuple_New(r->fields_count); 668 if (nxt_slow_path(headers == NULL)) { 669 nxt_unit_req_alert(req, "Python failed to create 'headers' object"); 670 goto fail; 671 } 672 673 for (i = 0; i < r->fields_count; i++) { 674 f = r->fields + i; 675 676 header = nxt_py_asgi_create_header(f); 677 if (nxt_slow_path(header == NULL)) { 678 nxt_unit_req_alert(req, "Python failed to create 'header' pair"); 679 goto fail; 680 } 681 682 PyTuple_SET_ITEM(headers, i, header); 683 684 if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL 685 && f->name_length == ws_protocol.length 686 && f->value_length > 0 687 && r->websocket_handshake) 688 { 689 v = nxt_py_asgi_create_subprotocols(f); 690 if (nxt_slow_path(v == NULL)) { 691 nxt_unit_req_alert(req, "Failed to create subprotocols"); 692 goto fail; 693 } 694 695 SET_ITEM(scope, subprotocols, v); 696 Py_DECREF(v); 697 } 698 } 699 700 SET_ITEM(scope, headers, headers) 701 Py_DECREF(headers); 702 703 return scope; 704 705 fail: 706 707 Py_XDECREF(v); 708 Py_XDECREF(headers); 709 Py_DECREF(scope); 710 711 return NULL; 712 713 #undef SET_ITEM 714 } 715 716 717 static PyObject * 718 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) 719 { 720 char *p, *s; 721 PyObject *pair, *v; 722 723 pair = PyTuple_New(2); 724 if (nxt_slow_path(pair == NULL)) { 725 return NULL; 726 } 727 728 p = nxt_unit_sptr_get(sptr); 729 s = memchr(p, ':', len); 730 731 v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); 732 if (nxt_slow_path(v == NULL)) { 733 Py_DECREF(pair); 734 735 return NULL; 736 } 737 738 PyTuple_SET_ITEM(pair, 0, v); 739 740 if (s != NULL) { 741 p += len; 742 v = PyLong_FromString(s + 1, &p, 10); 743 744 } else { 745 v = PyLong_FromLong(port); 746 } 747 748 if (nxt_slow_path(v == NULL)) { 749 Py_DECREF(pair); 750 751 return NULL; 752 } 753 754 PyTuple_SET_ITEM(pair, 1, v); 755 756 return pair; 757 } 758 759 760 static PyObject * 761 nxt_py_asgi_create_header(nxt_unit_field_t *f) 762 { 763 char c, *name; 764 uint8_t pos; 765 PyObject *header, *v; 766 767 header = PyTuple_New(2); 768 if (nxt_slow_path(header == NULL)) { 769 return NULL; 770 } 771 772 name = nxt_unit_sptr_get(&f->name); 773 774 for (pos = 0; pos < f->name_length; pos++) { 775 c = name[pos]; 776 if (c >= 'A' && c <= 'Z') { 777 name[pos] = (c | 0x20); 778 } 779 } 780 781 v = PyBytes_FromStringAndSize(name, f->name_length); 782 if (nxt_slow_path(v == NULL)) { 783 Py_DECREF(header); 784 785 return NULL; 786 } 787 788 PyTuple_SET_ITEM(header, 0, v); 789 790 v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), 791 f->value_length); 792 if (nxt_slow_path(v == NULL)) { 793 Py_DECREF(header); 794 795 return NULL; 796 } 797 798 PyTuple_SET_ITEM(header, 1, v); 799 800 return header; 801 } 802 803 804 static PyObject * 805 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) 806 { 807 char *v; 808 uint32_t i, n, start; 809 PyObject *res, *proto; 810 811 v = nxt_unit_sptr_get(&f->value); 812 n = 1; 813 814 for (i = 0; i < f->value_length; i++) { 815 if (v[i] == ',') { 816 n++; 817 } 818 } 819 820 res = PyTuple_New(n); 821 if (nxt_slow_path(res == NULL)) { 822 return NULL; 823 } 824 825 n = 0; 826 start = 0; 827 828 for (i = 0; i < f->value_length; ) { 829 if (v[i] != ',') { 830 i++; 831 832 continue; 833 } 834 835 if (i - start > 0) { 836 proto = PyString_FromStringAndSize(v + start, i - start); 837 if (nxt_slow_path(proto == NULL)) { 838 goto fail; 839 } 840 841 PyTuple_SET_ITEM(res, n, proto); 842 843 n++; 844 } 845 846 do { 847 i++; 848 } while (i < f->value_length && v[i] == ' '); 849 850 start = i; 851 } 852 853 if (i - start > 0) { 854 proto = PyString_FromStringAndSize(v + start, i - start); 855 if (nxt_slow_path(proto == NULL)) { 856 goto fail; 857 } 858 859 PyTuple_SET_ITEM(res, n, proto); 860 } 861 862 return res; 863 864 fail: 865 866 Py_DECREF(res); 867 868 return NULL; 869 } 870 871 872 static int 873 nxt_python_asgi_ready(nxt_unit_ctx_t *ctx) 874 { 875 int rc; 876 PyObject *res, *fd, *py_ctx, *py_port; 877 nxt_unit_port_t *port; 878 nxt_py_asgi_ctx_data_t *ctx_data; 879 880 if (nxt_slow_path(nxt_py_shared_port == NULL)) { 881 return NXT_UNIT_ERROR; 882 } 883 884 port = nxt_py_shared_port; 885 886 nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port); 887 888 ctx_data = ctx->data; 889 890 rc = NXT_UNIT_ERROR; 891 892 fd = PyLong_FromLong(port->in_fd); 893 if (nxt_slow_path(fd == NULL)) { 894 nxt_unit_alert(ctx, "Python failed to create fd"); 895 nxt_python_print_exception(); 896 897 return rc; 898 } 899 900 py_ctx = PyLong_FromVoidPtr(ctx); 901 if (nxt_slow_path(py_ctx == NULL)) { 902 nxt_unit_alert(ctx, "Python failed to create py_ctx"); 903 nxt_python_print_exception(); 904 905 goto clean_fd; 906 } 907 908 py_port = PyLong_FromVoidPtr(port); 909 if (nxt_slow_path(py_port == NULL)) { 910 nxt_unit_alert(ctx, "Python failed to create py_port"); 911 nxt_python_print_exception(); 912 913 goto clean_py_ctx; 914 } 915 916 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 917 fd, nxt_py_port_read, 918 py_ctx, py_port, NULL); 919 if (nxt_slow_path(res == NULL)) { 920 nxt_unit_alert(ctx, "Python failed to add_reader"); 921 nxt_python_print_exception(); 922 923 } else { 924 Py_DECREF(res); 925 926 rc = NXT_UNIT_OK; 927 } 928 929 Py_DECREF(py_port); 930 931 clean_py_ctx: 932 933 Py_DECREF(py_ctx); 934 935 clean_fd: 936 937 Py_DECREF(fd); 938 939 return rc; 940 } 941 942 943 static int 944 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 945 { 946 int nb, rc; 947 PyObject *res, *fd, *py_ctx, *py_port; 948 nxt_py_asgi_ctx_data_t *ctx_data; 949 950 if (port->in_fd == -1) { 951 return NXT_UNIT_OK; 952 } 953 954 nb = 1; 955 956 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { 957 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 958 port->in_fd, strerror(errno), errno); 959 960 return NXT_UNIT_ERROR; 961 } 962 963 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); 964 965 if (port->id.id == NXT_UNIT_SHARED_PORT_ID) { 966 nxt_py_shared_port = port; 967 968 return NXT_UNIT_OK; 969 } 970 971 ctx_data = ctx->data; 972 973 ctx_data->port = port; 974 port->data = ctx_data; 975 976 rc = NXT_UNIT_ERROR; 977 978 fd = PyLong_FromLong(port->in_fd); 979 if (nxt_slow_path(fd == NULL)) { 980 nxt_unit_alert(ctx, "Python failed to create fd"); 981 nxt_python_print_exception(); 982 983 return rc; 984 } 985 986 py_ctx = PyLong_FromVoidPtr(ctx); 987 if (nxt_slow_path(py_ctx == NULL)) { 988 nxt_unit_alert(ctx, "Python failed to create py_ctx"); 989 nxt_python_print_exception(); 990 991 goto clean_fd; 992 } 993 994 py_port = PyLong_FromVoidPtr(port); 995 if (nxt_slow_path(py_port == NULL)) { 996 nxt_unit_alert(ctx, "Python failed to create py_port"); 997 nxt_python_print_exception(); 998 999 goto clean_py_ctx; 1000 } 1001 1002 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 1003 fd, nxt_py_port_read, 1004 py_ctx, py_port, NULL); 1005 if (nxt_slow_path(res == NULL)) { 1006 nxt_unit_alert(ctx, "Python failed to add_reader"); 1007 nxt_python_print_exception(); 1008 1009 } else { 1010 Py_DECREF(res); 1011 1012 rc = NXT_UNIT_OK; 1013 } 1014 1015 Py_DECREF(py_port); 1016 1017 clean_py_ctx: 1018 1019 Py_DECREF(py_ctx); 1020 1021 clean_fd: 1022 1023 Py_DECREF(fd); 1024 1025 return rc; 1026 } 1027 1028 1029 static void 1030 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) 1031 { 1032 if (port->in_fd == -1) { 1033 return; 1034 } 1035 1036 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); 1037 1038 if (nxt_py_shared_port == port) { 1039 nxt_py_shared_port = NULL; 1040 } 1041 } 1042 1043 1044 static void 1045 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) 1046 { 1047 PyObject *res, *p; 1048 nxt_py_asgi_ctx_data_t *ctx_data; 1049 1050 nxt_unit_debug(ctx, "asgi_quit %p", ctx); 1051 1052 ctx_data = ctx->data; 1053 1054 if (nxt_py_shared_port != NULL) { 1055 p = PyLong_FromLong(nxt_py_shared_port->in_fd); 1056 if (nxt_slow_path(p == NULL)) { 1057 nxt_unit_alert(NULL, "Python failed to create Long"); 1058 nxt_python_print_exception(); 1059 1060 } else { 1061 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, 1062 p, NULL); 1063 if (nxt_slow_path(res == NULL)) { 1064 nxt_unit_alert(NULL, "Python failed to remove_reader"); 1065 nxt_python_print_exception(); 1066 1067 } else { 1068 Py_DECREF(res); 1069 } 1070 1071 Py_DECREF(p); 1072 } 1073 } 1074 1075 p = PyLong_FromLong(0); 1076 if (nxt_slow_path(p == NULL)) { 1077 nxt_unit_alert(NULL, "Python failed to create Long"); 1078 nxt_python_print_exception(); 1079 1080 } else { 1081 res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result, 1082 p, NULL); 1083 if (nxt_slow_path(res == NULL)) { 1084 nxt_unit_alert(ctx, "Python failed to set_result"); 1085 nxt_python_print_exception(); 1086 1087 } else { 1088 Py_DECREF(res); 1089 } 1090 1091 Py_DECREF(p); 1092 } 1093 } 1094 1095 1096 static void 1097 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) 1098 { 1099 int rc; 1100 nxt_queue_link_t *lnk; 1101 nxt_py_asgi_ctx_data_t *ctx_data; 1102 1103 ctx_data = ctx->data; 1104 1105 while (!nxt_queue_is_empty(&ctx_data->drain_queue)) { 1106 lnk = nxt_queue_first(&ctx_data->drain_queue); 1107 1108 rc = nxt_py_asgi_http_drain(lnk); 1109 if (rc == NXT_UNIT_AGAIN) { 1110 return; 1111 } 1112 1113 nxt_queue_remove(lnk); 1114 } 1115 } 1116 1117 1118 static PyObject * 1119 nxt_py_asgi_port_read(PyObject *self, PyObject *args) 1120 { 1121 int rc; 1122 PyObject *arg; 1123 Py_ssize_t n; 1124 nxt_unit_ctx_t *ctx; 1125 nxt_unit_port_t *port; 1126 1127 n = PyTuple_GET_SIZE(args); 1128 1129 if (n != 2) { 1130 nxt_unit_alert(NULL, 1131 "nxt_py_asgi_port_read: invalid number of arguments %d", 1132 (int) n); 1133 1134 return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); 1135 } 1136 1137 arg = PyTuple_GET_ITEM(args, 0); 1138 if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { 1139 return PyErr_Format(PyExc_TypeError, 1140 "the first argument is not a long"); 1141 } 1142 1143 ctx = PyLong_AsVoidPtr(arg); 1144 1145 arg = PyTuple_GET_ITEM(args, 1); 1146 if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { 1147 return PyErr_Format(PyExc_TypeError, 1148 "the second argument is not a long"); 1149 } 1150 1151 port = PyLong_AsVoidPtr(arg); 1152 1153 nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port); 1154 1155 rc = nxt_unit_process_port_msg(ctx, port); 1156 1157 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 1158 return PyErr_Format(PyExc_RuntimeError, 1159 "error processing port %d message", port->id.id); 1160 } 1161 1162 Py_RETURN_NONE; 1163 } 1164 1165 1166 PyObject * 1167 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, 1168 void *data) 1169 { 1170 int i; 1171 PyObject *iter, *header, *h_iter, *name, *val, *res; 1172 1173 iter = PyObject_GetIter(headers); 1174 if (nxt_slow_path(iter == NULL)) { 1175 return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); 1176 } 1177 1178 for (i = 0; /* void */; i++) { 1179 header = PyIter_Next(iter); 1180 if (header == NULL) { 1181 break; 1182 } 1183 1184 h_iter = PyObject_GetIter(header); 1185 if (nxt_slow_path(h_iter == NULL)) { 1186 Py_DECREF(header); 1187 Py_DECREF(iter); 1188 1189 return PyErr_Format(PyExc_TypeError, 1190 "'headers' item #%d is not an iterable", i); 1191 } 1192 1193 name = PyIter_Next(h_iter); 1194 if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { 1195 Py_XDECREF(name); 1196 Py_DECREF(h_iter); 1197 Py_DECREF(header); 1198 Py_DECREF(iter); 1199 1200 return PyErr_Format(PyExc_TypeError, 1201 "'headers' item #%d 'name' is not a byte string", i); 1202 } 1203 1204 val = PyIter_Next(h_iter); 1205 if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { 1206 Py_XDECREF(val); 1207 Py_DECREF(h_iter); 1208 Py_DECREF(header); 1209 Py_DECREF(iter); 1210 1211 return PyErr_Format(PyExc_TypeError, 1212 "'headers' item #%d 'value' is not a byte string", i); 1213 } 1214 1215 res = cb(data, i, name, val); 1216 1217 Py_DECREF(name); 1218 Py_DECREF(val); 1219 Py_DECREF(h_iter); 1220 Py_DECREF(header); 1221 1222 if (nxt_slow_path(res == NULL)) { 1223 Py_DECREF(iter); 1224 1225 return NULL; 1226 } 1227 1228 Py_DECREF(res); 1229 } 1230 1231 Py_DECREF(iter); 1232 1233 Py_RETURN_NONE; 1234 } 1235 1236 1237 PyObject * 1238 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) 1239 { 1240 nxt_py_asgi_calc_size_ctx_t *ctx; 1241 1242 ctx = data; 1243 1244 ctx->fields_count++; 1245 ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); 1246 1247 Py_RETURN_NONE; 1248 } 1249 1250 1251 PyObject * 1252 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) 1253 { 1254 int rc; 1255 char *name_str, *val_str; 1256 uint32_t name_len, val_len; 1257 nxt_off_t content_length; 1258 nxt_unit_request_info_t *req; 1259 nxt_py_asgi_add_field_ctx_t *ctx; 1260 1261 name_str = PyBytes_AS_STRING(name); 1262 name_len = PyBytes_GET_SIZE(name); 1263 1264 val_str = PyBytes_AS_STRING(val); 1265 val_len = PyBytes_GET_SIZE(val); 1266 1267 ctx = data; 1268 req = ctx->req; 1269 1270 rc = nxt_unit_response_add_field(req, name_str, name_len, 1271 val_str, val_len); 1272 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1273 return PyErr_Format(PyExc_RuntimeError, 1274 "failed to add header #%d", i); 1275 } 1276 1277 if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { 1278 content_length = nxt_off_t_parse((u_char *) val_str, val_len); 1279 if (nxt_slow_path(content_length < 0)) { 1280 nxt_unit_req_error(req, "failed to parse Content-Length " 1281 "value %.*s", (int) val_len, val_str); 1282 1283 return PyErr_Format(PyExc_ValueError, 1284 "Failed to parse Content-Length: '%.*s'", 1285 (int) val_len, val_str); 1286 } 1287 1288 ctx->content_length = content_length; 1289 } 1290 1291 Py_RETURN_NONE; 1292 } 1293 1294 1295 PyObject * 1296 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, 1297 nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result) 1298 { 1299 PyObject *set_result, *res; 1300 1301 if (nxt_slow_path(result == NULL)) { 1302 Py_DECREF(future); 1303 1304 return NULL; 1305 } 1306 1307 set_result = PyObject_GetAttrString(future, "set_result"); 1308 if (nxt_slow_path(set_result == NULL)) { 1309 nxt_unit_req_alert(req, "failed to get 'set_result' for future"); 1310 1311 Py_CLEAR(future); 1312 1313 goto cleanup_result; 1314 } 1315 1316 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { 1317 nxt_unit_req_alert(req, "'future.set_result' is not a callable"); 1318 1319 Py_CLEAR(future); 1320 1321 goto cleanup; 1322 } 1323 1324 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result, 1325 result, NULL); 1326 if (nxt_slow_path(res == NULL)) { 1327 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); 1328 nxt_python_print_exception(); 1329 1330 Py_CLEAR(future); 1331 } 1332 1333 Py_XDECREF(res); 1334 1335 cleanup: 1336 1337 Py_DECREF(set_result); 1338 1339 cleanup_result: 1340 1341 Py_DECREF(result); 1342 1343 return future; 1344 } 1345 1346 1347 PyObject * 1348 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) 1349 { 1350 PyObject *msg; 1351 1352 msg = PyDict_New(); 1353 if (nxt_slow_path(msg == NULL)) { 1354 nxt_unit_req_alert(req, "Python failed to create message dict"); 1355 nxt_python_print_exception(); 1356 1357 return PyErr_Format(PyExc_RuntimeError, 1358 "failed to create message dict"); 1359 } 1360 1361 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { 1362 nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); 1363 1364 Py_DECREF(msg); 1365 1366 return PyErr_Format(PyExc_RuntimeError, 1367 "failed to set 'msg.type' item"); 1368 } 1369 1370 return msg; 1371 } 1372 1373 1374 PyObject * 1375 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, 1376 PyObject *spec_version) 1377 { 1378 PyObject *scope, *asgi; 1379 1380 scope = PyDict_New(); 1381 if (nxt_slow_path(scope == NULL)) { 1382 nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); 1383 nxt_python_print_exception(); 1384 1385 return PyErr_Format(PyExc_RuntimeError, 1386 "failed to create 'scope' dict"); 1387 } 1388 1389 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { 1390 nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); 1391 1392 Py_DECREF(scope); 1393 1394 return PyErr_Format(PyExc_RuntimeError, 1395 "failed to set 'scope.type' item"); 1396 } 1397 1398 asgi = PyDict_New(); 1399 if (nxt_slow_path(asgi == NULL)) { 1400 nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); 1401 nxt_python_print_exception(); 1402 1403 Py_DECREF(scope); 1404 1405 return PyErr_Format(PyExc_RuntimeError, 1406 "failed to create 'asgi' dict"); 1407 } 1408 1409 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { 1410 nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); 1411 1412 Py_DECREF(asgi); 1413 Py_DECREF(scope); 1414 1415 return PyErr_Format(PyExc_RuntimeError, 1416 "failed to set 'scope.asgi' item"); 1417 } 1418 1419 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, 1420 nxt_py_3_0_str) == -1)) 1421 { 1422 nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); 1423 1424 Py_DECREF(asgi); 1425 Py_DECREF(scope); 1426 1427 return PyErr_Format(PyExc_RuntimeError, 1428 "failed to set 'asgi.version' item"); 1429 } 1430 1431 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, 1432 spec_version) == -1)) 1433 { 1434 nxt_unit_req_alert(req, 1435 "Python failed to set 'asgi.spec_version' item"); 1436 1437 Py_DECREF(asgi); 1438 Py_DECREF(scope); 1439 1440 return PyErr_Format(PyExc_RuntimeError, 1441 "failed to set 'asgi.spec_version' item"); 1442 } 1443 1444 Py_DECREF(asgi); 1445 1446 return scope; 1447 } 1448 1449 1450 void 1451 nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link) 1452 { 1453 nxt_py_asgi_ctx_data_t *ctx_data; 1454 1455 ctx_data = req->ctx->data; 1456 1457 nxt_queue_insert_tail(&ctx_data->drain_queue, link); 1458 } 1459 1460 1461 void 1462 nxt_py_asgi_dealloc(PyObject *self) 1463 { 1464 PyObject_Del(self); 1465 } 1466 1467 1468 PyObject * 1469 nxt_py_asgi_await(PyObject *self) 1470 { 1471 Py_INCREF(self); 1472 return self; 1473 } 1474 1475 1476 PyObject * 1477 nxt_py_asgi_iter(PyObject *self) 1478 { 1479 Py_INCREF(self); 1480 return self; 1481 } 1482 1483 1484 PyObject * 1485 nxt_py_asgi_next(PyObject *self) 1486 { 1487 return NULL; 1488 } 1489 1490 1491 static void 1492 nxt_python_asgi_done(void) 1493 { 1494 nxt_py_asgi_str_done(); 1495 1496 Py_XDECREF(nxt_py_port_read); 1497 } 1498 1499 #else /* !(NXT_HAVE_ASGI) */ 1500 1501 1502 int 1503 nxt_python_asgi_check(PyObject *obj) 1504 { 1505 return 0; 1506 } 1507 1508 1509 int 1510 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 1511 { 1512 nxt_unit_alert(NULL, "ASGI not implemented"); 1513 return NXT_UNIT_ERROR; 1514 } 1515 1516 1517 #endif /* NXT_HAVE_ASGI */ 1518