1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7 #include <python/nxt_python.h> 8 9 #if (NXT_HAVE_ASGI) 10 11 #include <nxt_main.h> 12 #include <nxt_unit.h> 13 #include <nxt_unit_request.h> 14 #include <nxt_unit_response.h> 15 #include <python/nxt_python_asgi.h> 16 #include <python/nxt_python_asgi_str.h> 17 18 19 static PyObject *nxt_python_asgi_get_func(PyObject *obj); 20 static int nxt_python_asgi_ctx_data_alloc(void **pdata, int main); 21 static void nxt_python_asgi_ctx_data_free(void *data); 22 static int nxt_python_asgi_startup(void *data); 23 static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx); 24 25 static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, 26 nxt_unit_port_t *port); 27 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); 28 static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req); 29 30 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); 31 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, 32 uint16_t port); 33 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); 34 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); 35 36 static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx); 37 38 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 39 static int nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 40 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); 41 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); 42 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); 43 44 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); 45 static void nxt_python_asgi_done(void); 46 47 static PyObject *nxt_py_port_read; 48 static nxt_unit_port_t *nxt_py_shared_port; 49 50 static PyMethodDef nxt_py_port_read_method = 51 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; 52 53 static nxt_python_proto_t nxt_py_asgi_proto = { 54 .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc, 55 .ctx_data_free = nxt_python_asgi_ctx_data_free, 56 .startup = nxt_python_asgi_startup, 57 .run = nxt_python_asgi_run, 58 .ready = nxt_python_asgi_ready, 59 .done = nxt_python_asgi_done, 60 }; 61 62 #define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A 63 64 65 int 66 nxt_python_asgi_check(PyObject *obj) 67 { 68 int res; 69 PyObject *func; 70 PyCodeObject *code; 71 72 func = nxt_python_asgi_get_func(obj); 73 74 if (func == NULL) { 75 return 0; 76 } 77 78 code = (PyCodeObject *) PyFunction_GET_CODE(func); 79 80 nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with " 81 "%d argument(s)", 82 (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ", 83 code->co_argcount); 84 85 res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1; 86 87 Py_DECREF(func); 88 89 return res; 90 } 91 92 93 static PyObject * 94 nxt_python_asgi_get_func(PyObject *obj) 95 { 96 PyObject *call; 97 98 if (PyFunction_Check(obj)) { 99 Py_INCREF(obj); 100 return obj; 101 } 102 103 if (PyMethod_Check(obj)) { 104 obj = PyMethod_GET_FUNCTION(obj); 105 106 Py_INCREF(obj); 107 return obj; 108 } 109 110 call = PyObject_GetAttrString(obj, "__call__"); 111 112 if (call == NULL) { 113 return NULL; 114 } 115 116 if (PyFunction_Check(call)) { 117 return call; 118 } 119 120 if (PyMethod_Check(call)) { 121 obj = PyMethod_GET_FUNCTION(call); 122 123 Py_INCREF(obj); 124 Py_DECREF(call); 125 126 return obj; 127 } 128 129 Py_DECREF(call); 130 131 return NULL; 132 } 133 134 135 int 136 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 137 { 138 PyObject *func; 139 nxt_int_t i; 140 PyCodeObject *code; 141 142 nxt_unit_debug(NULL, "asgi_init"); 143 144 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) { 145 nxt_unit_alert(NULL, "Python failed to init string objects"); 146 return NXT_UNIT_ERROR; 147 } 148 149 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); 150 if (nxt_slow_path(nxt_py_port_read == NULL)) { 151 nxt_unit_alert(NULL, 152 "Python failed to initialize the 'port_read' function"); 153 return NXT_UNIT_ERROR; 154 } 155 156 if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) { 157 return NXT_UNIT_ERROR; 158 } 159 160 if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) { 161 return NXT_UNIT_ERROR; 162 } 163 164 for (i = 0; i < nxt_py_targets->count; i++) { 165 func = nxt_python_asgi_get_func(nxt_py_targets->target[i].application); 166 if (nxt_slow_path(func == NULL)) { 167 nxt_unit_alert(NULL, "Python cannot find function for callable"); 168 return NXT_UNIT_ERROR; 169 } 170 171 code = (PyCodeObject *) PyFunction_GET_CODE(func); 172 173 if ((code->co_flags & CO_COROUTINE) == 0) { 174 nxt_unit_debug(NULL, "asgi: callable is not a coroutine function " 175 "switching to legacy mode"); 176 nxt_py_targets->target[i].asgi_legacy = 1; 177 } 178 179 Py_DECREF(func); 180 } 181 182 init->callbacks.request_handler = nxt_py_asgi_request_handler; 183 init->callbacks.data_handler = nxt_py_asgi_http_data_handler; 184 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; 185 init->callbacks.close_handler = nxt_py_asgi_close_handler; 186 init->callbacks.quit = nxt_py_asgi_quit; 187 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; 188 init->callbacks.add_port = nxt_py_asgi_add_port; 189 init->callbacks.remove_port = nxt_py_asgi_remove_port; 190 191 *proto = nxt_py_asgi_proto; 192 193 return NXT_UNIT_OK; 194 } 195 196 197 static int 198 nxt_python_asgi_ctx_data_alloc(void **pdata, int main) 199 { 200 uint32_t i; 201 PyObject *asyncio, *loop, *event_loop, *obj; 202 const char *event_loop_func; 203 nxt_py_asgi_ctx_data_t *ctx_data; 204 205 ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t)); 206 if (nxt_slow_path(ctx_data == NULL)) { 207 nxt_unit_alert(NULL, "Failed to allocate context data"); 208 return NXT_UNIT_ERROR; 209 } 210 211 memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t)); 212 213 nxt_queue_init(&ctx_data->drain_queue); 214 215 struct { 216 const char *key; 217 PyObject **handler; 218 219 } handlers[] = { 220 { "create_task", &ctx_data->loop_create_task }, 221 { "add_reader", &ctx_data->loop_add_reader }, 222 { "remove_reader", &ctx_data->loop_remove_reader }, 223 { "call_soon", &ctx_data->loop_call_soon }, 224 { "run_until_complete", &ctx_data->loop_run_until_complete }, 225 { "create_future", &ctx_data->loop_create_future }, 226 }; 227 228 loop = NULL; 229 230 asyncio = PyImport_ImportModule("asyncio"); 231 if (nxt_slow_path(asyncio == NULL)) { 232 nxt_unit_alert(NULL, "Python failed to import module 'asyncio'"); 233 nxt_python_print_exception(); 234 goto fail; 235 } 236 237 event_loop_func = main ? "get_event_loop" : "new_event_loop"; 238 239 event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), 240 event_loop_func); 241 if (nxt_slow_path(event_loop == NULL)) { 242 nxt_unit_alert(NULL, 243 "Python failed to get '%s' from module 'asyncio'", 244 event_loop_func); 245 goto fail; 246 } 247 248 if (nxt_slow_path(PyCallable_Check(event_loop) == 0)) { 249 nxt_unit_alert(NULL, 250 "'asyncio.%s' is not a callable object", 251 event_loop_func); 252 goto fail; 253 } 254 255 loop = PyObject_CallObject(event_loop, NULL); 256 if (nxt_slow_path(loop == NULL)) { 257 nxt_unit_alert(NULL, "Python failed to call 'asyncio.%s'", 258 event_loop_func); 259 goto fail; 260 } 261 262 for (i = 0; i < nxt_nitems(handlers); i++) { 263 obj = PyObject_GetAttrString(loop, handlers[i].key); 264 if (nxt_slow_path(obj == NULL)) { 265 nxt_unit_alert(NULL, "Python failed to get 'loop.%s'", 266 handlers[i].key); 267 goto fail; 268 } 269 270 *handlers[i].handler = obj; 271 272 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 273 nxt_unit_alert(NULL, "'loop.%s' is not a callable object", 274 handlers[i].key); 275 goto fail; 276 } 277 } 278 279 obj = PyObject_CallObject(ctx_data->loop_create_future, NULL); 280 if (nxt_slow_path(obj == NULL)) { 281 nxt_unit_alert(NULL, "Python failed to create Future "); 282 nxt_python_print_exception(); 283 goto fail; 284 } 285 286 ctx_data->quit_future = obj; 287 288 obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result"); 289 if (nxt_slow_path(obj == NULL)) { 290 nxt_unit_alert(NULL, "Python failed to get 'future.set_result'"); 291 goto fail; 292 } 293 294 ctx_data->quit_future_set_result = obj; 295 296 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 297 nxt_unit_alert(NULL, "'future.set_result' is not a callable object"); 298 goto fail; 299 } 300 301 Py_DECREF(loop); 302 Py_DECREF(asyncio); 303 304 *pdata = ctx_data; 305 306 return NXT_UNIT_OK; 307 308 fail: 309 310 nxt_python_asgi_ctx_data_free(ctx_data); 311 312 Py_XDECREF(loop); 313 Py_XDECREF(asyncio); 314 315 return NXT_UNIT_ERROR; 316 } 317 318 319 static void 320 nxt_python_asgi_ctx_data_free(void *data) 321 { 322 nxt_py_asgi_ctx_data_t *ctx_data; 323 324 ctx_data = data; 325 326 Py_XDECREF(ctx_data->loop_run_until_complete); 327 Py_XDECREF(ctx_data->loop_create_future); 328 Py_XDECREF(ctx_data->loop_create_task); 329 Py_XDECREF(ctx_data->loop_call_soon); 330 Py_XDECREF(ctx_data->loop_add_reader); 331 Py_XDECREF(ctx_data->loop_remove_reader); 332 Py_XDECREF(ctx_data->quit_future); 333 Py_XDECREF(ctx_data->quit_future_set_result); 334 335 nxt_unit_free(NULL, ctx_data); 336 } 337 338 339 static int 340 nxt_python_asgi_startup(void *data) 341 { 342 return nxt_py_asgi_lifespan_startup(data); 343 } 344 345 346 static int 347 nxt_python_asgi_run(nxt_unit_ctx_t *ctx) 348 { 349 PyObject *res; 350 nxt_py_asgi_ctx_data_t *ctx_data; 351 352 ctx_data = ctx->data; 353 354 res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, 355 ctx_data->quit_future, NULL); 356 if (nxt_slow_path(res == NULL)) { 357 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); 358 nxt_python_print_exception(); 359 360 return NXT_UNIT_ERROR; 361 } 362 363 Py_DECREF(res); 364 365 nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port); 366 nxt_py_asgi_remove_reader(ctx, ctx_data->port); 367 368 if (ctx_data->port != NULL) { 369 ctx_data->port = NULL; 370 } 371 372 nxt_py_asgi_lifespan_shutdown(ctx); 373 374 return NXT_UNIT_OK; 375 } 376 377 378 static void 379 nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 380 { 381 PyObject *res, *fd; 382 nxt_py_asgi_ctx_data_t *ctx_data; 383 384 if (port == NULL || port->in_fd == -1) { 385 return; 386 } 387 388 ctx_data = ctx->data; 389 390 nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port); 391 392 fd = PyLong_FromLong(port->in_fd); 393 if (nxt_slow_path(fd == NULL)) { 394 nxt_unit_alert(ctx, "Python failed to create Long object"); 395 nxt_python_print_exception(); 396 397 return; 398 } 399 400 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL); 401 if (nxt_slow_path(res == NULL)) { 402 nxt_unit_alert(ctx, "Python failed to remove_reader"); 403 nxt_python_print_exception(); 404 405 } else { 406 Py_DECREF(res); 407 } 408 409 Py_DECREF(fd); 410 } 411 412 413 static void 414 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) 415 { 416 PyObject *scope, *res, *task, *receive, *send, *done, *asgi; 417 PyObject *stage2; 418 nxt_python_target_t *target; 419 nxt_py_asgi_ctx_data_t *ctx_data; 420 421 if (req->request->websocket_handshake) { 422 asgi = nxt_py_asgi_websocket_create(req); 423 424 } else { 425 asgi = nxt_py_asgi_http_create(req); 426 } 427 428 if (nxt_slow_path(asgi == NULL)) { 429 nxt_unit_req_alert(req, "Python failed to create asgi object"); 430 nxt_unit_request_done(req, NXT_UNIT_ERROR); 431 432 return; 433 } 434 435 receive = PyObject_GetAttrString(asgi, "receive"); 436 if (nxt_slow_path(receive == NULL)) { 437 nxt_unit_req_alert(req, "Python failed to get 'receive' method"); 438 nxt_unit_request_done(req, NXT_UNIT_ERROR); 439 440 goto release_asgi; 441 } 442 443 send = PyObject_GetAttrString(asgi, "send"); 444 if (nxt_slow_path(receive == NULL)) { 445 nxt_unit_req_alert(req, "Python failed to get 'send' method"); 446 nxt_unit_request_done(req, NXT_UNIT_ERROR); 447 448 goto release_receive; 449 } 450 451 done = PyObject_GetAttrString(asgi, "_done"); 452 if (nxt_slow_path(receive == NULL)) { 453 nxt_unit_req_alert(req, "Python failed to get '_done' method"); 454 nxt_unit_request_done(req, NXT_UNIT_ERROR); 455 456 goto release_send; 457 } 458 459 scope = nxt_py_asgi_create_http_scope(req); 460 if (nxt_slow_path(scope == NULL)) { 461 nxt_unit_request_done(req, NXT_UNIT_ERROR); 462 463 goto release_done; 464 } 465 466 req->data = asgi; 467 target = &nxt_py_targets->target[req->request->app_target]; 468 469 if (!target->asgi_legacy) { 470 nxt_unit_req_debug(req, "Python call ASGI 3.0 application"); 471 472 res = PyObject_CallFunctionObjArgs(target->application, 473 scope, receive, send, NULL); 474 475 } else { 476 nxt_unit_req_debug(req, "Python call legacy application"); 477 478 res = PyObject_CallFunctionObjArgs(target->application, scope, NULL); 479 480 if (nxt_slow_path(res == NULL)) { 481 nxt_unit_req_error(req, "Python failed to call legacy app stage1"); 482 nxt_python_print_exception(); 483 nxt_unit_request_done(req, NXT_UNIT_ERROR); 484 485 goto release_scope; 486 } 487 488 if (nxt_slow_path(PyCallable_Check(res) == 0)) { 489 nxt_unit_req_error(req, 490 "Legacy ASGI application returns not a callable"); 491 nxt_unit_request_done(req, NXT_UNIT_ERROR); 492 493 Py_DECREF(res); 494 495 goto release_scope; 496 } 497 498 stage2 = res; 499 500 res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL); 501 502 Py_DECREF(stage2); 503 } 504 505 if (nxt_slow_path(res == NULL)) { 506 nxt_unit_req_error(req, "Python failed to call the application"); 507 nxt_python_print_exception(); 508 nxt_unit_request_done(req, NXT_UNIT_ERROR); 509 510 goto release_scope; 511 } 512 513 if (nxt_slow_path(!PyCoro_CheckExact(res))) { 514 nxt_unit_req_error(req, "Application result type is not a coroutine"); 515 nxt_unit_request_done(req, NXT_UNIT_ERROR); 516 517 Py_DECREF(res); 518 519 goto release_scope; 520 } 521 522 ctx_data = req->ctx->data; 523 524 task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); 525 if (nxt_slow_path(task == NULL)) { 526 nxt_unit_req_error(req, "Python failed to call the create_task"); 527 nxt_python_print_exception(); 528 nxt_unit_request_done(req, NXT_UNIT_ERROR); 529 530 Py_DECREF(res); 531 532 goto release_scope; 533 } 534 535 Py_DECREF(res); 536 537 res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, 538 NULL); 539 if (nxt_slow_path(res == NULL)) { 540 nxt_unit_req_error(req, 541 "Python failed to call 'task.add_done_callback'"); 542 nxt_python_print_exception(); 543 nxt_unit_request_done(req, NXT_UNIT_ERROR); 544 545 goto release_task; 546 } 547 548 Py_DECREF(res); 549 release_task: 550 Py_DECREF(task); 551 release_scope: 552 Py_DECREF(scope); 553 release_done: 554 Py_DECREF(done); 555 release_send: 556 Py_DECREF(send); 557 release_receive: 558 Py_DECREF(receive); 559 release_asgi: 560 Py_DECREF(asgi); 561 } 562 563 564 static void 565 nxt_py_asgi_close_handler(nxt_unit_request_info_t *req) 566 { 567 if (req->request->websocket_handshake) { 568 nxt_py_asgi_websocket_close_handler(req); 569 570 } else { 571 nxt_py_asgi_http_close_handler(req); 572 } 573 } 574 575 576 static PyObject * 577 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) 578 { 579 char *p, *target, *query; 580 uint32_t target_length, i; 581 PyObject *scope, *v, *type, *scheme; 582 PyObject *headers, *header; 583 nxt_unit_field_t *f; 584 nxt_unit_request_t *r; 585 586 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); 587 588 #define SET_ITEM(dict, key, value) \ 589 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 590 == -1)) \ 591 { \ 592 nxt_unit_req_alert(req, "Python failed to set '" \ 593 #dict "." #key "' item"); \ 594 goto fail; \ 595 } 596 597 v = NULL; 598 headers = NULL; 599 600 r = req->request; 601 602 if (r->websocket_handshake) { 603 type = nxt_py_websocket_str; 604 scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; 605 606 } else { 607 type = nxt_py_http_str; 608 scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; 609 } 610 611 scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); 612 if (nxt_slow_path(scope == NULL)) { 613 return NULL; 614 } 615 616 p = nxt_unit_sptr_get(&r->version); 617 SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str 618 : nxt_py_1_0_str) 619 SET_ITEM(scope, scheme, scheme) 620 621 v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), 622 r->method_length); 623 if (nxt_slow_path(v == NULL)) { 624 nxt_unit_req_alert(req, "Python failed to create 'method' string"); 625 goto fail; 626 } 627 628 SET_ITEM(scope, method, v) 629 Py_DECREF(v); 630 631 v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, 632 "replace"); 633 if (nxt_slow_path(v == NULL)) { 634 nxt_unit_req_alert(req, "Python failed to create 'path' string"); 635 goto fail; 636 } 637 638 SET_ITEM(scope, path, v) 639 Py_DECREF(v); 640 641 target = nxt_unit_sptr_get(&r->target); 642 query = nxt_unit_sptr_get(&r->query); 643 644 if (r->query.offset != 0) { 645 target_length = query - target - 1; 646 647 } else { 648 target_length = r->target_length; 649 } 650 651 v = PyBytes_FromStringAndSize(target, target_length); 652 if (nxt_slow_path(v == NULL)) { 653 nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); 654 goto fail; 655 } 656 657 SET_ITEM(scope, raw_path, v) 658 Py_DECREF(v); 659 660 v = PyBytes_FromStringAndSize(query, r->query_length); 661 if (nxt_slow_path(v == NULL)) { 662 nxt_unit_req_alert(req, "Python failed to create 'query' string"); 663 goto fail; 664 } 665 666 SET_ITEM(scope, query_string, v) 667 Py_DECREF(v); 668 669 v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); 670 if (nxt_slow_path(v == NULL)) { 671 nxt_unit_req_alert(req, "Python failed to create 'client' pair"); 672 goto fail; 673 } 674 675 SET_ITEM(scope, client, v) 676 Py_DECREF(v); 677 678 v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); 679 if (nxt_slow_path(v == NULL)) { 680 nxt_unit_req_alert(req, "Python failed to create 'server' pair"); 681 goto fail; 682 } 683 684 SET_ITEM(scope, server, v) 685 Py_DECREF(v); 686 687 v = NULL; 688 689 headers = PyTuple_New(r->fields_count); 690 if (nxt_slow_path(headers == NULL)) { 691 nxt_unit_req_alert(req, "Python failed to create 'headers' object"); 692 goto fail; 693 } 694 695 for (i = 0; i < r->fields_count; i++) { 696 f = r->fields + i; 697 698 header = nxt_py_asgi_create_header(f); 699 if (nxt_slow_path(header == NULL)) { 700 nxt_unit_req_alert(req, "Python failed to create 'header' pair"); 701 goto fail; 702 } 703 704 PyTuple_SET_ITEM(headers, i, header); 705 706 if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL 707 && f->name_length == ws_protocol.length 708 && f->value_length > 0 709 && r->websocket_handshake) 710 { 711 v = nxt_py_asgi_create_subprotocols(f); 712 if (nxt_slow_path(v == NULL)) { 713 nxt_unit_req_alert(req, "Failed to create subprotocols"); 714 goto fail; 715 } 716 717 SET_ITEM(scope, subprotocols, v); 718 Py_DECREF(v); 719 } 720 } 721 722 SET_ITEM(scope, headers, headers) 723 Py_DECREF(headers); 724 725 return scope; 726 727 fail: 728 729 Py_XDECREF(v); 730 Py_XDECREF(headers); 731 Py_DECREF(scope); 732 733 return NULL; 734 735 #undef SET_ITEM 736 } 737 738 739 static PyObject * 740 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) 741 { 742 char *p, *s; 743 PyObject *pair, *v; 744 745 pair = PyTuple_New(2); 746 if (nxt_slow_path(pair == NULL)) { 747 return NULL; 748 } 749 750 p = nxt_unit_sptr_get(sptr); 751 s = memchr(p, ':', len); 752 753 v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); 754 if (nxt_slow_path(v == NULL)) { 755 Py_DECREF(pair); 756 757 return NULL; 758 } 759 760 PyTuple_SET_ITEM(pair, 0, v); 761 762 if (s != NULL) { 763 p += len; 764 v = PyLong_FromString(s + 1, &p, 10); 765 766 } else { 767 v = PyLong_FromLong(port); 768 } 769 770 if (nxt_slow_path(v == NULL)) { 771 Py_DECREF(pair); 772 773 return NULL; 774 } 775 776 PyTuple_SET_ITEM(pair, 1, v); 777 778 return pair; 779 } 780 781 782 static PyObject * 783 nxt_py_asgi_create_header(nxt_unit_field_t *f) 784 { 785 char c, *name; 786 uint8_t pos; 787 PyObject *header, *v; 788 789 header = PyTuple_New(2); 790 if (nxt_slow_path(header == NULL)) { 791 return NULL; 792 } 793 794 name = nxt_unit_sptr_get(&f->name); 795 796 for (pos = 0; pos < f->name_length; pos++) { 797 c = name[pos]; 798 if (c >= 'A' && c <= 'Z') { 799 name[pos] = (c | 0x20); 800 } 801 } 802 803 v = PyBytes_FromStringAndSize(name, f->name_length); 804 if (nxt_slow_path(v == NULL)) { 805 Py_DECREF(header); 806 807 return NULL; 808 } 809 810 PyTuple_SET_ITEM(header, 0, v); 811 812 v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), 813 f->value_length); 814 if (nxt_slow_path(v == NULL)) { 815 Py_DECREF(header); 816 817 return NULL; 818 } 819 820 PyTuple_SET_ITEM(header, 1, v); 821 822 return header; 823 } 824 825 826 static PyObject * 827 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) 828 { 829 char *v; 830 uint32_t i, n, start; 831 PyObject *res, *proto; 832 833 v = nxt_unit_sptr_get(&f->value); 834 n = 1; 835 836 for (i = 0; i < f->value_length; i++) { 837 if (v[i] == ',') { 838 n++; 839 } 840 } 841 842 res = PyTuple_New(n); 843 if (nxt_slow_path(res == NULL)) { 844 return NULL; 845 } 846 847 n = 0; 848 start = 0; 849 850 for (i = 0; i < f->value_length; ) { 851 if (v[i] != ',') { 852 i++; 853 854 continue; 855 } 856 857 if (i - start > 0) { 858 proto = PyString_FromStringAndSize(v + start, i - start); 859 if (nxt_slow_path(proto == NULL)) { 860 goto fail; 861 } 862 863 PyTuple_SET_ITEM(res, n, proto); 864 865 n++; 866 } 867 868 do { 869 i++; 870 } while (i < f->value_length && v[i] == ' '); 871 872 start = i; 873 } 874 875 if (i - start > 0) { 876 proto = PyString_FromStringAndSize(v + start, i - start); 877 if (nxt_slow_path(proto == NULL)) { 878 goto fail; 879 } 880 881 PyTuple_SET_ITEM(res, n, proto); 882 } 883 884 return res; 885 886 fail: 887 888 Py_DECREF(res); 889 890 return NULL; 891 } 892 893 894 static int 895 nxt_python_asgi_ready(nxt_unit_ctx_t *ctx) 896 { 897 nxt_unit_port_t *port; 898 899 if (nxt_slow_path(nxt_py_shared_port == NULL)) { 900 return NXT_UNIT_ERROR; 901 } 902 903 port = nxt_py_shared_port; 904 905 nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port); 906 907 return nxt_py_asgi_add_reader(ctx, port); 908 } 909 910 911 static int 912 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 913 { 914 int nb; 915 nxt_py_asgi_ctx_data_t *ctx_data; 916 917 if (port->in_fd == -1) { 918 return NXT_UNIT_OK; 919 } 920 921 nb = 1; 922 923 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { 924 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 925 port->in_fd, strerror(errno), errno); 926 927 return NXT_UNIT_ERROR; 928 } 929 930 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); 931 932 if (port->id.id == NXT_UNIT_SHARED_PORT_ID) { 933 nxt_py_shared_port = port; 934 935 return NXT_UNIT_OK; 936 } 937 938 ctx_data = ctx->data; 939 940 ctx_data->port = port; 941 942 return nxt_py_asgi_add_reader(ctx, port); 943 } 944 945 946 static int 947 nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 948 { 949 int rc; 950 PyObject *res, *fd, *py_ctx, *py_port; 951 nxt_py_asgi_ctx_data_t *ctx_data; 952 953 nxt_unit_debug(ctx, "asgi_add_reader %d %p %p", port->in_fd, ctx, port); 954 955 ctx_data = ctx->data; 956 957 fd = PyLong_FromLong(port->in_fd); 958 if (nxt_slow_path(fd == NULL)) { 959 nxt_unit_alert(ctx, "Python failed to create fd"); 960 nxt_python_print_exception(); 961 962 return NXT_UNIT_ERROR; 963 } 964 965 rc = NXT_UNIT_ERROR; 966 967 py_ctx = PyLong_FromVoidPtr(ctx); 968 if (nxt_slow_path(py_ctx == NULL)) { 969 nxt_unit_alert(ctx, "Python failed to create py_ctx"); 970 nxt_python_print_exception(); 971 972 goto clean_fd; 973 } 974 975 py_port = PyLong_FromVoidPtr(port); 976 if (nxt_slow_path(py_port == NULL)) { 977 nxt_unit_alert(ctx, "Python failed to create py_port"); 978 nxt_python_print_exception(); 979 980 goto clean_py_ctx; 981 } 982 983 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 984 fd, nxt_py_port_read, 985 py_ctx, py_port, NULL); 986 if (nxt_slow_path(res == NULL)) { 987 nxt_unit_alert(ctx, "Python failed to add_reader"); 988 nxt_python_print_exception(); 989 990 } else { 991 Py_DECREF(res); 992 993 rc = NXT_UNIT_OK; 994 } 995 996 Py_DECREF(py_port); 997 998 clean_py_ctx: 999 1000 Py_DECREF(py_ctx); 1001 1002 clean_fd: 1003 1004 Py_DECREF(fd); 1005 1006 return rc; 1007 } 1008 1009 1010 static void 1011 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) 1012 { 1013 if (port->in_fd == -1) { 1014 return; 1015 } 1016 1017 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); 1018 1019 if (nxt_py_shared_port == port) { 1020 nxt_py_shared_port = NULL; 1021 } 1022 } 1023 1024 1025 static void 1026 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) 1027 { 1028 PyObject *res, *p; 1029 nxt_py_asgi_ctx_data_t *ctx_data; 1030 1031 nxt_unit_debug(ctx, "asgi_quit %p", ctx); 1032 1033 ctx_data = ctx->data; 1034 1035 if (nxt_py_shared_port != NULL) { 1036 p = PyLong_FromLong(nxt_py_shared_port->in_fd); 1037 if (nxt_slow_path(p == NULL)) { 1038 nxt_unit_alert(NULL, "Python failed to create Long"); 1039 nxt_python_print_exception(); 1040 1041 } else { 1042 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, 1043 p, NULL); 1044 if (nxt_slow_path(res == NULL)) { 1045 nxt_unit_alert(NULL, "Python failed to remove_reader"); 1046 nxt_python_print_exception(); 1047 1048 } else { 1049 Py_DECREF(res); 1050 } 1051 1052 Py_DECREF(p); 1053 } 1054 } 1055 1056 p = PyLong_FromLong(0); 1057 if (nxt_slow_path(p == NULL)) { 1058 nxt_unit_alert(NULL, "Python failed to create Long"); 1059 nxt_python_print_exception(); 1060 1061 } else { 1062 res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result, 1063 p, NULL); 1064 if (nxt_slow_path(res == NULL)) { 1065 nxt_unit_alert(ctx, "Python failed to set_result"); 1066 nxt_python_print_exception(); 1067 1068 } else { 1069 Py_DECREF(res); 1070 } 1071 1072 Py_DECREF(p); 1073 } 1074 } 1075 1076 1077 static void 1078 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) 1079 { 1080 int rc; 1081 nxt_queue_link_t *lnk; 1082 nxt_py_asgi_ctx_data_t *ctx_data; 1083 1084 ctx_data = ctx->data; 1085 1086 while (!nxt_queue_is_empty(&ctx_data->drain_queue)) { 1087 lnk = nxt_queue_first(&ctx_data->drain_queue); 1088 1089 rc = nxt_py_asgi_http_drain(lnk); 1090 if (rc == NXT_UNIT_AGAIN) { 1091 return; 1092 } 1093 1094 nxt_queue_remove(lnk); 1095 } 1096 } 1097 1098 1099 static PyObject * 1100 nxt_py_asgi_port_read(PyObject *self, PyObject *args) 1101 { 1102 int rc; 1103 PyObject *arg0, *arg1, *res; 1104 Py_ssize_t n; 1105 nxt_unit_ctx_t *ctx; 1106 nxt_unit_port_t *port; 1107 nxt_py_asgi_ctx_data_t *ctx_data; 1108 1109 n = PyTuple_GET_SIZE(args); 1110 1111 if (n != 2) { 1112 nxt_unit_alert(NULL, 1113 "nxt_py_asgi_port_read: invalid number of arguments %d", 1114 (int) n); 1115 1116 return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); 1117 } 1118 1119 arg0 = PyTuple_GET_ITEM(args, 0); 1120 if (nxt_slow_path(arg0 == NULL || PyLong_Check(arg0) == 0)) { 1121 return PyErr_Format(PyExc_TypeError, 1122 "the first argument is not a long"); 1123 } 1124 1125 ctx = PyLong_AsVoidPtr(arg0); 1126 1127 arg1 = PyTuple_GET_ITEM(args, 1); 1128 if (nxt_slow_path(arg1 == NULL || PyLong_Check(arg1) == 0)) { 1129 return PyErr_Format(PyExc_TypeError, 1130 "the second argument is not a long"); 1131 } 1132 1133 port = PyLong_AsVoidPtr(arg1); 1134 1135 rc = nxt_unit_process_port_msg(ctx, port); 1136 1137 nxt_unit_debug(ctx, "asgi_port_read(%p,%p): %d", ctx, port, rc); 1138 1139 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 1140 return PyErr_Format(PyExc_RuntimeError, 1141 "error processing port %d message", port->id.id); 1142 } 1143 1144 if (rc == NXT_UNIT_OK) { 1145 ctx_data = ctx->data; 1146 1147 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, 1148 nxt_py_port_read, 1149 arg0, arg1, NULL); 1150 if (nxt_slow_path(res == NULL)) { 1151 nxt_unit_alert(ctx, "Python failed to call 'loop.call_soon'"); 1152 nxt_python_print_exception(); 1153 } 1154 1155 Py_XDECREF(res); 1156 } 1157 1158 Py_RETURN_NONE; 1159 } 1160 1161 1162 PyObject * 1163 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, 1164 void *data) 1165 { 1166 int i; 1167 PyObject *iter, *header, *h_iter, *name, *val, *res; 1168 1169 iter = PyObject_GetIter(headers); 1170 if (nxt_slow_path(iter == NULL)) { 1171 return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); 1172 } 1173 1174 for (i = 0; /* void */; i++) { 1175 header = PyIter_Next(iter); 1176 if (header == NULL) { 1177 break; 1178 } 1179 1180 h_iter = PyObject_GetIter(header); 1181 if (nxt_slow_path(h_iter == NULL)) { 1182 Py_DECREF(header); 1183 Py_DECREF(iter); 1184 1185 return PyErr_Format(PyExc_TypeError, 1186 "'headers' item #%d is not an iterable", i); 1187 } 1188 1189 name = PyIter_Next(h_iter); 1190 if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { 1191 Py_XDECREF(name); 1192 Py_DECREF(h_iter); 1193 Py_DECREF(header); 1194 Py_DECREF(iter); 1195 1196 return PyErr_Format(PyExc_TypeError, 1197 "'headers' item #%d 'name' is not a byte string", i); 1198 } 1199 1200 val = PyIter_Next(h_iter); 1201 if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { 1202 Py_XDECREF(val); 1203 Py_DECREF(h_iter); 1204 Py_DECREF(header); 1205 Py_DECREF(iter); 1206 1207 return PyErr_Format(PyExc_TypeError, 1208 "'headers' item #%d 'value' is not a byte string", i); 1209 } 1210 1211 res = cb(data, i, name, val); 1212 1213 Py_DECREF(name); 1214 Py_DECREF(val); 1215 Py_DECREF(h_iter); 1216 Py_DECREF(header); 1217 1218 if (nxt_slow_path(res == NULL)) { 1219 Py_DECREF(iter); 1220 1221 return NULL; 1222 } 1223 1224 Py_DECREF(res); 1225 } 1226 1227 Py_DECREF(iter); 1228 1229 Py_RETURN_NONE; 1230 } 1231 1232 1233 PyObject * 1234 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) 1235 { 1236 nxt_py_asgi_calc_size_ctx_t *ctx; 1237 1238 ctx = data; 1239 1240 ctx->fields_count++; 1241 ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); 1242 1243 Py_RETURN_NONE; 1244 } 1245 1246 1247 PyObject * 1248 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) 1249 { 1250 int rc; 1251 char *name_str, *val_str; 1252 uint32_t name_len, val_len; 1253 nxt_off_t content_length; 1254 nxt_unit_request_info_t *req; 1255 nxt_py_asgi_add_field_ctx_t *ctx; 1256 1257 name_str = PyBytes_AS_STRING(name); 1258 name_len = PyBytes_GET_SIZE(name); 1259 1260 val_str = PyBytes_AS_STRING(val); 1261 val_len = PyBytes_GET_SIZE(val); 1262 1263 ctx = data; 1264 req = ctx->req; 1265 1266 rc = nxt_unit_response_add_field(req, name_str, name_len, 1267 val_str, val_len); 1268 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1269 return PyErr_Format(PyExc_RuntimeError, 1270 "failed to add header #%d", i); 1271 } 1272 1273 if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { 1274 content_length = nxt_off_t_parse((u_char *) val_str, val_len); 1275 if (nxt_slow_path(content_length < 0)) { 1276 nxt_unit_req_error(req, "failed to parse Content-Length " 1277 "value %.*s", (int) val_len, val_str); 1278 1279 return PyErr_Format(PyExc_ValueError, 1280 "Failed to parse Content-Length: '%.*s'", 1281 (int) val_len, val_str); 1282 } 1283 1284 ctx->content_length = content_length; 1285 } 1286 1287 Py_RETURN_NONE; 1288 } 1289 1290 1291 PyObject * 1292 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, 1293 nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result) 1294 { 1295 PyObject *set_result, *res; 1296 1297 if (nxt_slow_path(result == NULL)) { 1298 Py_DECREF(future); 1299 1300 return NULL; 1301 } 1302 1303 set_result = PyObject_GetAttrString(future, "set_result"); 1304 if (nxt_slow_path(set_result == NULL)) { 1305 nxt_unit_req_alert(req, "failed to get 'set_result' for future"); 1306 1307 Py_CLEAR(future); 1308 1309 goto cleanup_result; 1310 } 1311 1312 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { 1313 nxt_unit_req_alert(req, "'future.set_result' is not a callable"); 1314 1315 Py_CLEAR(future); 1316 1317 goto cleanup; 1318 } 1319 1320 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result, 1321 result, NULL); 1322 if (nxt_slow_path(res == NULL)) { 1323 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); 1324 nxt_python_print_exception(); 1325 1326 Py_CLEAR(future); 1327 } 1328 1329 Py_XDECREF(res); 1330 1331 cleanup: 1332 1333 Py_DECREF(set_result); 1334 1335 cleanup_result: 1336 1337 Py_DECREF(result); 1338 1339 return future; 1340 } 1341 1342 1343 PyObject * 1344 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) 1345 { 1346 PyObject *msg; 1347 1348 msg = PyDict_New(); 1349 if (nxt_slow_path(msg == NULL)) { 1350 nxt_unit_req_alert(req, "Python failed to create message dict"); 1351 nxt_python_print_exception(); 1352 1353 return PyErr_Format(PyExc_RuntimeError, 1354 "failed to create message dict"); 1355 } 1356 1357 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { 1358 nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); 1359 1360 Py_DECREF(msg); 1361 1362 return PyErr_Format(PyExc_RuntimeError, 1363 "failed to set 'msg.type' item"); 1364 } 1365 1366 return msg; 1367 } 1368 1369 1370 PyObject * 1371 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, 1372 PyObject *spec_version) 1373 { 1374 PyObject *scope, *asgi; 1375 1376 scope = PyDict_New(); 1377 if (nxt_slow_path(scope == NULL)) { 1378 nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); 1379 nxt_python_print_exception(); 1380 1381 return PyErr_Format(PyExc_RuntimeError, 1382 "failed to create 'scope' dict"); 1383 } 1384 1385 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { 1386 nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); 1387 1388 Py_DECREF(scope); 1389 1390 return PyErr_Format(PyExc_RuntimeError, 1391 "failed to set 'scope.type' item"); 1392 } 1393 1394 asgi = PyDict_New(); 1395 if (nxt_slow_path(asgi == NULL)) { 1396 nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); 1397 nxt_python_print_exception(); 1398 1399 Py_DECREF(scope); 1400 1401 return PyErr_Format(PyExc_RuntimeError, 1402 "failed to create 'asgi' dict"); 1403 } 1404 1405 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { 1406 nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); 1407 1408 Py_DECREF(asgi); 1409 Py_DECREF(scope); 1410 1411 return PyErr_Format(PyExc_RuntimeError, 1412 "failed to set 'scope.asgi' item"); 1413 } 1414 1415 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, 1416 nxt_py_3_0_str) == -1)) 1417 { 1418 nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); 1419 1420 Py_DECREF(asgi); 1421 Py_DECREF(scope); 1422 1423 return PyErr_Format(PyExc_RuntimeError, 1424 "failed to set 'asgi.version' item"); 1425 } 1426 1427 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, 1428 spec_version) == -1)) 1429 { 1430 nxt_unit_req_alert(req, 1431 "Python failed to set 'asgi.spec_version' item"); 1432 1433 Py_DECREF(asgi); 1434 Py_DECREF(scope); 1435 1436 return PyErr_Format(PyExc_RuntimeError, 1437 "failed to set 'asgi.spec_version' item"); 1438 } 1439 1440 Py_DECREF(asgi); 1441 1442 return scope; 1443 } 1444 1445 1446 void 1447 nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link) 1448 { 1449 nxt_py_asgi_ctx_data_t *ctx_data; 1450 1451 ctx_data = req->ctx->data; 1452 1453 nxt_queue_insert_tail(&ctx_data->drain_queue, link); 1454 } 1455 1456 1457 void 1458 nxt_py_asgi_dealloc(PyObject *self) 1459 { 1460 PyObject_Del(self); 1461 } 1462 1463 1464 PyObject * 1465 nxt_py_asgi_await(PyObject *self) 1466 { 1467 Py_INCREF(self); 1468 return self; 1469 } 1470 1471 1472 PyObject * 1473 nxt_py_asgi_iter(PyObject *self) 1474 { 1475 Py_INCREF(self); 1476 return self; 1477 } 1478 1479 1480 PyObject * 1481 nxt_py_asgi_next(PyObject *self) 1482 { 1483 return NULL; 1484 } 1485 1486 1487 static void 1488 nxt_python_asgi_done(void) 1489 { 1490 nxt_py_asgi_str_done(); 1491 1492 Py_XDECREF(nxt_py_port_read); 1493 } 1494 1495 #else /* !(NXT_HAVE_ASGI) */ 1496 1497 1498 int 1499 nxt_python_asgi_check(PyObject *obj) 1500 { 1501 return 0; 1502 } 1503 1504 1505 int 1506 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 1507 { 1508 nxt_unit_alert(NULL, "ASGI not implemented"); 1509 return NXT_UNIT_ERROR; 1510 } 1511 1512 1513 #endif /* NXT_HAVE_ASGI */ 1514