1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7 #include <python/nxt_python.h> 8 9 #if (NXT_HAVE_ASGI) 10 11 #include <nxt_main.h> 12 #include <nxt_unit.h> 13 #include <nxt_unit_request.h> 14 #include <nxt_unit_response.h> 15 #include <python/nxt_python_asgi.h> 16 #include <python/nxt_python_asgi_str.h> 17 18 19 static PyObject *nxt_python_asgi_get_func(PyObject *obj); 20 static int nxt_python_asgi_ctx_data_alloc(void **pdata, int main); 21 static void nxt_python_asgi_ctx_data_free(void *data); 22 static int nxt_python_asgi_startup(void *data); 23 static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx); 24 25 static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, 26 nxt_unit_port_t *port); 27 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); 28 static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req); 29 30 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); 31 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, 32 uint16_t port); 33 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); 34 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); 35 36 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 37 static int nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 38 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx, 39 nxt_unit_port_t *port); 40 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); 41 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); 42 43 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); 44 static void nxt_python_asgi_done(void); 45 46 static PyObject *nxt_py_port_read; 47 48 static PyMethodDef nxt_py_port_read_method = 49 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; 50 51 static nxt_python_proto_t nxt_py_asgi_proto = { 52 .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc, 53 .ctx_data_free = nxt_python_asgi_ctx_data_free, 54 .startup = nxt_python_asgi_startup, 55 .run = nxt_python_asgi_run, 56 .done = nxt_python_asgi_done, 57 }; 58 59 #define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A 60 61 62 int 63 nxt_python_asgi_check(PyObject *obj) 64 { 65 int res; 66 PyObject *func; 67 PyCodeObject *code; 68 69 func = nxt_python_asgi_get_func(obj); 70 71 if (func == NULL) { 72 return 0; 73 } 74 75 code = (PyCodeObject *) PyFunction_GET_CODE(func); 76 77 nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with " 78 "%d argument(s)", 79 (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ", 80 code->co_argcount); 81 82 res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1; 83 84 Py_DECREF(func); 85 86 return res; 87 } 88 89 90 static PyObject * 91 nxt_python_asgi_get_func(PyObject *obj) 92 { 93 PyObject *call; 94 95 if (PyFunction_Check(obj)) { 96 Py_INCREF(obj); 97 return obj; 98 } 99 100 if (PyMethod_Check(obj)) { 101 obj = PyMethod_GET_FUNCTION(obj); 102 103 Py_INCREF(obj); 104 return obj; 105 } 106 107 call = PyObject_GetAttrString(obj, "__call__"); 108 109 if (call == NULL) { 110 return NULL; 111 } 112 113 if (PyFunction_Check(call)) { 114 return call; 115 } 116 117 if (PyMethod_Check(call)) { 118 obj = PyMethod_GET_FUNCTION(call); 119 120 if (PyFunction_Check(obj)) { 121 Py_INCREF(obj); 122 123 } else { 124 obj = NULL; 125 } 126 127 } else { 128 obj = NULL; 129 } 130 131 Py_DECREF(call); 132 133 return obj; 134 } 135 136 137 int 138 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 139 { 140 PyObject *func; 141 nxt_int_t i; 142 PyCodeObject *code; 143 144 nxt_unit_debug(NULL, "asgi_init"); 145 146 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) { 147 nxt_unit_alert(NULL, "Python failed to init string objects"); 148 return NXT_UNIT_ERROR; 149 } 150 151 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); 152 if (nxt_slow_path(nxt_py_port_read == NULL)) { 153 nxt_unit_alert(NULL, 154 "Python failed to initialize the 'port_read' function"); 155 return NXT_UNIT_ERROR; 156 } 157 158 if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) { 159 return NXT_UNIT_ERROR; 160 } 161 162 if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) { 163 return NXT_UNIT_ERROR; 164 } 165 166 for (i = 0; i < nxt_py_targets->count; i++) { 167 func = nxt_python_asgi_get_func(nxt_py_targets->target[i].application); 168 if (nxt_slow_path(func == NULL)) { 169 nxt_unit_debug(NULL, "asgi: cannot find function for callable, " 170 "unable to check for legacy mode (#%d)", 171 (int) i); 172 continue; 173 } 174 175 code = (PyCodeObject *) PyFunction_GET_CODE(func); 176 177 if ((code->co_flags & CO_COROUTINE) == 0) { 178 nxt_unit_debug(NULL, "asgi: callable is not a coroutine function " 179 "switching to legacy mode"); 180 nxt_py_targets->target[i].asgi_legacy = 1; 181 } 182 183 Py_DECREF(func); 184 } 185 186 init->callbacks.request_handler = nxt_py_asgi_request_handler; 187 init->callbacks.data_handler = nxt_py_asgi_http_data_handler; 188 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; 189 init->callbacks.close_handler = nxt_py_asgi_close_handler; 190 init->callbacks.quit = nxt_py_asgi_quit; 191 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; 192 init->callbacks.add_port = nxt_py_asgi_add_port; 193 init->callbacks.remove_port = nxt_py_asgi_remove_port; 194 195 *proto = nxt_py_asgi_proto; 196 197 return NXT_UNIT_OK; 198 } 199 200 201 static int 202 nxt_python_asgi_ctx_data_alloc(void **pdata, int main) 203 { 204 uint32_t i; 205 PyObject *asyncio, *loop, *event_loop, *obj; 206 const char *event_loop_func; 207 nxt_py_asgi_ctx_data_t *ctx_data; 208 209 ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t)); 210 if (nxt_slow_path(ctx_data == NULL)) { 211 nxt_unit_alert(NULL, "Failed to allocate context data"); 212 return NXT_UNIT_ERROR; 213 } 214 215 memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t)); 216 217 nxt_queue_init(&ctx_data->drain_queue); 218 219 struct { 220 const char *key; 221 PyObject **handler; 222 223 } handlers[] = { 224 { "create_task", &ctx_data->loop_create_task }, 225 { "add_reader", &ctx_data->loop_add_reader }, 226 { "remove_reader", &ctx_data->loop_remove_reader }, 227 { "call_soon", &ctx_data->loop_call_soon }, 228 { "run_until_complete", &ctx_data->loop_run_until_complete }, 229 { "create_future", &ctx_data->loop_create_future }, 230 }; 231 232 loop = NULL; 233 234 asyncio = PyImport_ImportModule("asyncio"); 235 if (nxt_slow_path(asyncio == NULL)) { 236 nxt_unit_alert(NULL, "Python failed to import module 'asyncio'"); 237 nxt_python_print_exception(); 238 goto fail; 239 } 240 241 event_loop_func = main ? "get_event_loop" : "new_event_loop"; 242 243 event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), 244 event_loop_func); 245 if (nxt_slow_path(event_loop == NULL)) { 246 nxt_unit_alert(NULL, 247 "Python failed to get '%s' from module 'asyncio'", 248 event_loop_func); 249 goto fail; 250 } 251 252 if (nxt_slow_path(PyCallable_Check(event_loop) == 0)) { 253 nxt_unit_alert(NULL, 254 "'asyncio.%s' is not a callable object", 255 event_loop_func); 256 goto fail; 257 } 258 259 loop = PyObject_CallObject(event_loop, NULL); 260 if (nxt_slow_path(loop == NULL)) { 261 nxt_unit_alert(NULL, "Python failed to call 'asyncio.%s'", 262 event_loop_func); 263 goto fail; 264 } 265 266 for (i = 0; i < nxt_nitems(handlers); i++) { 267 obj = PyObject_GetAttrString(loop, handlers[i].key); 268 if (nxt_slow_path(obj == NULL)) { 269 nxt_unit_alert(NULL, "Python failed to get 'loop.%s'", 270 handlers[i].key); 271 goto fail; 272 } 273 274 *handlers[i].handler = obj; 275 276 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 277 nxt_unit_alert(NULL, "'loop.%s' is not a callable object", 278 handlers[i].key); 279 goto fail; 280 } 281 } 282 283 obj = PyObject_CallObject(ctx_data->loop_create_future, NULL); 284 if (nxt_slow_path(obj == NULL)) { 285 nxt_unit_alert(NULL, "Python failed to create Future "); 286 nxt_python_print_exception(); 287 goto fail; 288 } 289 290 ctx_data->quit_future = obj; 291 292 obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result"); 293 if (nxt_slow_path(obj == NULL)) { 294 nxt_unit_alert(NULL, "Python failed to get 'future.set_result'"); 295 goto fail; 296 } 297 298 ctx_data->quit_future_set_result = obj; 299 300 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 301 nxt_unit_alert(NULL, "'future.set_result' is not a callable object"); 302 goto fail; 303 } 304 305 Py_DECREF(loop); 306 Py_DECREF(asyncio); 307 308 *pdata = ctx_data; 309 310 return NXT_UNIT_OK; 311 312 fail: 313 314 nxt_python_asgi_ctx_data_free(ctx_data); 315 316 Py_XDECREF(loop); 317 Py_XDECREF(asyncio); 318 319 return NXT_UNIT_ERROR; 320 } 321 322 323 static void 324 nxt_python_asgi_ctx_data_free(void *data) 325 { 326 nxt_py_asgi_ctx_data_t *ctx_data; 327 328 ctx_data = data; 329 330 Py_XDECREF(ctx_data->loop_run_until_complete); 331 Py_XDECREF(ctx_data->loop_create_future); 332 Py_XDECREF(ctx_data->loop_create_task); 333 Py_XDECREF(ctx_data->loop_call_soon); 334 Py_XDECREF(ctx_data->loop_add_reader); 335 Py_XDECREF(ctx_data->loop_remove_reader); 336 Py_XDECREF(ctx_data->quit_future); 337 Py_XDECREF(ctx_data->quit_future_set_result); 338 339 nxt_unit_free(NULL, ctx_data); 340 } 341 342 343 static int 344 nxt_python_asgi_startup(void *data) 345 { 346 return nxt_py_asgi_lifespan_startup(data); 347 } 348 349 350 static int 351 nxt_python_asgi_run(nxt_unit_ctx_t *ctx) 352 { 353 PyObject *res; 354 nxt_py_asgi_ctx_data_t *ctx_data; 355 356 ctx_data = ctx->data; 357 358 res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, 359 ctx_data->quit_future, NULL); 360 if (nxt_slow_path(res == NULL)) { 361 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); 362 nxt_python_print_exception(); 363 364 return NXT_UNIT_ERROR; 365 } 366 367 Py_DECREF(res); 368 369 nxt_py_asgi_lifespan_shutdown(ctx); 370 371 return NXT_UNIT_OK; 372 } 373 374 375 static void 376 nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 377 { 378 PyObject *res, *fd; 379 nxt_py_asgi_ctx_data_t *ctx_data; 380 381 if (port == NULL || port->in_fd == -1) { 382 return; 383 } 384 385 ctx_data = ctx->data; 386 387 nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port); 388 389 fd = PyLong_FromLong(port->in_fd); 390 if (nxt_slow_path(fd == NULL)) { 391 nxt_unit_alert(ctx, "Python failed to create Long object"); 392 nxt_python_print_exception(); 393 394 return; 395 } 396 397 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL); 398 if (nxt_slow_path(res == NULL)) { 399 nxt_unit_alert(ctx, "Python failed to remove_reader"); 400 nxt_python_print_exception(); 401 402 } else { 403 Py_DECREF(res); 404 } 405 406 Py_DECREF(fd); 407 } 408 409 410 static void 411 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) 412 { 413 PyObject *scope, *res, *task, *receive, *send, *done, *asgi; 414 PyObject *stage2; 415 nxt_python_target_t *target; 416 nxt_py_asgi_ctx_data_t *ctx_data; 417 418 if (req->request->websocket_handshake) { 419 asgi = nxt_py_asgi_websocket_create(req); 420 421 } else { 422 asgi = nxt_py_asgi_http_create(req); 423 } 424 425 if (nxt_slow_path(asgi == NULL)) { 426 nxt_unit_req_alert(req, "Python failed to create asgi object"); 427 nxt_unit_request_done(req, NXT_UNIT_ERROR); 428 429 return; 430 } 431 432 receive = PyObject_GetAttrString(asgi, "receive"); 433 if (nxt_slow_path(receive == NULL)) { 434 nxt_unit_req_alert(req, "Python failed to get 'receive' method"); 435 nxt_unit_request_done(req, NXT_UNIT_ERROR); 436 437 goto release_asgi; 438 } 439 440 send = PyObject_GetAttrString(asgi, "send"); 441 if (nxt_slow_path(receive == NULL)) { 442 nxt_unit_req_alert(req, "Python failed to get 'send' method"); 443 nxt_unit_request_done(req, NXT_UNIT_ERROR); 444 445 goto release_receive; 446 } 447 448 done = PyObject_GetAttrString(asgi, "_done"); 449 if (nxt_slow_path(receive == NULL)) { 450 nxt_unit_req_alert(req, "Python failed to get '_done' method"); 451 nxt_unit_request_done(req, NXT_UNIT_ERROR); 452 453 goto release_send; 454 } 455 456 scope = nxt_py_asgi_create_http_scope(req); 457 if (nxt_slow_path(scope == NULL)) { 458 nxt_unit_request_done(req, NXT_UNIT_ERROR); 459 460 goto release_done; 461 } 462 463 req->data = asgi; 464 target = &nxt_py_targets->target[req->request->app_target]; 465 466 if (!target->asgi_legacy) { 467 nxt_unit_req_debug(req, "Python call ASGI 3.0 application"); 468 469 res = PyObject_CallFunctionObjArgs(target->application, 470 scope, receive, send, NULL); 471 472 } else { 473 nxt_unit_req_debug(req, "Python call legacy application"); 474 475 res = PyObject_CallFunctionObjArgs(target->application, scope, NULL); 476 477 if (nxt_slow_path(res == NULL)) { 478 nxt_unit_req_error(req, "Python failed to call legacy app stage1"); 479 nxt_python_print_exception(); 480 nxt_unit_request_done(req, NXT_UNIT_ERROR); 481 482 goto release_scope; 483 } 484 485 if (nxt_slow_path(PyCallable_Check(res) == 0)) { 486 nxt_unit_req_error(req, 487 "Legacy ASGI application returns not a callable"); 488 nxt_unit_request_done(req, NXT_UNIT_ERROR); 489 490 Py_DECREF(res); 491 492 goto release_scope; 493 } 494 495 stage2 = res; 496 497 res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL); 498 499 Py_DECREF(stage2); 500 } 501 502 if (nxt_slow_path(res == NULL)) { 503 nxt_unit_req_error(req, "Python failed to call the application"); 504 nxt_python_print_exception(); 505 nxt_unit_request_done(req, NXT_UNIT_ERROR); 506 507 goto release_scope; 508 } 509 510 if (nxt_slow_path(!PyCoro_CheckExact(res))) { 511 nxt_unit_req_error(req, "Application result type is not a coroutine"); 512 nxt_unit_request_done(req, NXT_UNIT_ERROR); 513 514 Py_DECREF(res); 515 516 goto release_scope; 517 } 518 519 ctx_data = req->ctx->data; 520 521 task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); 522 if (nxt_slow_path(task == NULL)) { 523 nxt_unit_req_error(req, "Python failed to call the create_task"); 524 nxt_python_print_exception(); 525 nxt_unit_request_done(req, NXT_UNIT_ERROR); 526 527 Py_DECREF(res); 528 529 goto release_scope; 530 } 531 532 Py_DECREF(res); 533 534 res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, 535 NULL); 536 if (nxt_slow_path(res == NULL)) { 537 nxt_unit_req_error(req, 538 "Python failed to call 'task.add_done_callback'"); 539 nxt_python_print_exception(); 540 nxt_unit_request_done(req, NXT_UNIT_ERROR); 541 542 goto release_task; 543 } 544 545 Py_DECREF(res); 546 release_task: 547 Py_DECREF(task); 548 release_scope: 549 Py_DECREF(scope); 550 release_done: 551 Py_DECREF(done); 552 release_send: 553 Py_DECREF(send); 554 release_receive: 555 Py_DECREF(receive); 556 release_asgi: 557 Py_DECREF(asgi); 558 } 559 560 561 static void 562 nxt_py_asgi_close_handler(nxt_unit_request_info_t *req) 563 { 564 if (req->request->websocket_handshake) { 565 nxt_py_asgi_websocket_close_handler(req); 566 567 } else { 568 nxt_py_asgi_http_close_handler(req); 569 } 570 } 571 572 573 static PyObject * 574 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) 575 { 576 char *p, *target, *query; 577 uint32_t target_length, i; 578 PyObject *scope, *v, *type, *scheme; 579 PyObject *headers, *header; 580 nxt_unit_field_t *f; 581 nxt_unit_request_t *r; 582 583 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); 584 585 #define SET_ITEM(dict, key, value) \ 586 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 587 == -1)) \ 588 { \ 589 nxt_unit_req_alert(req, "Python failed to set '" \ 590 #dict "." #key "' item"); \ 591 goto fail; \ 592 } 593 594 v = NULL; 595 headers = NULL; 596 597 r = req->request; 598 599 if (r->websocket_handshake) { 600 type = nxt_py_websocket_str; 601 scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; 602 603 } else { 604 type = nxt_py_http_str; 605 scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; 606 } 607 608 scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); 609 if (nxt_slow_path(scope == NULL)) { 610 return NULL; 611 } 612 613 p = nxt_unit_sptr_get(&r->version); 614 SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str 615 : nxt_py_1_0_str) 616 SET_ITEM(scope, scheme, scheme) 617 618 v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), 619 r->method_length); 620 if (nxt_slow_path(v == NULL)) { 621 nxt_unit_req_alert(req, "Python failed to create 'method' string"); 622 goto fail; 623 } 624 625 SET_ITEM(scope, method, v) 626 Py_DECREF(v); 627 628 v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, 629 "replace"); 630 if (nxt_slow_path(v == NULL)) { 631 nxt_unit_req_alert(req, "Python failed to create 'path' string"); 632 goto fail; 633 } 634 635 SET_ITEM(scope, path, v) 636 Py_DECREF(v); 637 638 target = nxt_unit_sptr_get(&r->target); 639 query = nxt_unit_sptr_get(&r->query); 640 641 if (r->query.offset != 0) { 642 target_length = query - target - 1; 643 644 } else { 645 target_length = r->target_length; 646 } 647 648 v = PyBytes_FromStringAndSize(target, target_length); 649 if (nxt_slow_path(v == NULL)) { 650 nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); 651 goto fail; 652 } 653 654 SET_ITEM(scope, raw_path, v) 655 Py_DECREF(v); 656 657 v = PyBytes_FromStringAndSize(query, r->query_length); 658 if (nxt_slow_path(v == NULL)) { 659 nxt_unit_req_alert(req, "Python failed to create 'query' string"); 660 goto fail; 661 } 662 663 SET_ITEM(scope, query_string, v) 664 Py_DECREF(v); 665 666 v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); 667 if (nxt_slow_path(v == NULL)) { 668 nxt_unit_req_alert(req, "Python failed to create 'client' pair"); 669 goto fail; 670 } 671 672 SET_ITEM(scope, client, v) 673 Py_DECREF(v); 674 675 v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); 676 if (nxt_slow_path(v == NULL)) { 677 nxt_unit_req_alert(req, "Python failed to create 'server' pair"); 678 goto fail; 679 } 680 681 SET_ITEM(scope, server, v) 682 Py_DECREF(v); 683 684 v = NULL; 685 686 headers = PyTuple_New(r->fields_count); 687 if (nxt_slow_path(headers == NULL)) { 688 nxt_unit_req_alert(req, "Python failed to create 'headers' object"); 689 goto fail; 690 } 691 692 for (i = 0; i < r->fields_count; i++) { 693 f = r->fields + i; 694 695 header = nxt_py_asgi_create_header(f); 696 if (nxt_slow_path(header == NULL)) { 697 nxt_unit_req_alert(req, "Python failed to create 'header' pair"); 698 goto fail; 699 } 700 701 PyTuple_SET_ITEM(headers, i, header); 702 703 if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL 704 && f->name_length == ws_protocol.length 705 && f->value_length > 0 706 && r->websocket_handshake) 707 { 708 v = nxt_py_asgi_create_subprotocols(f); 709 if (nxt_slow_path(v == NULL)) { 710 nxt_unit_req_alert(req, "Failed to create subprotocols"); 711 goto fail; 712 } 713 714 SET_ITEM(scope, subprotocols, v); 715 Py_DECREF(v); 716 } 717 } 718 719 SET_ITEM(scope, headers, headers) 720 Py_DECREF(headers); 721 722 return scope; 723 724 fail: 725 726 Py_XDECREF(v); 727 Py_XDECREF(headers); 728 Py_DECREF(scope); 729 730 return NULL; 731 732 #undef SET_ITEM 733 } 734 735 736 static PyObject * 737 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) 738 { 739 char *p, *s; 740 PyObject *pair, *v; 741 742 pair = PyTuple_New(2); 743 if (nxt_slow_path(pair == NULL)) { 744 return NULL; 745 } 746 747 p = nxt_unit_sptr_get(sptr); 748 s = memchr(p, ':', len); 749 750 v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); 751 if (nxt_slow_path(v == NULL)) { 752 Py_DECREF(pair); 753 754 return NULL; 755 } 756 757 PyTuple_SET_ITEM(pair, 0, v); 758 759 if (s != NULL) { 760 p += len; 761 v = PyLong_FromString(s + 1, &p, 10); 762 763 } else { 764 v = PyLong_FromLong(port); 765 } 766 767 if (nxt_slow_path(v == NULL)) { 768 Py_DECREF(pair); 769 770 return NULL; 771 } 772 773 PyTuple_SET_ITEM(pair, 1, v); 774 775 return pair; 776 } 777 778 779 static PyObject * 780 nxt_py_asgi_create_header(nxt_unit_field_t *f) 781 { 782 char c, *name; 783 uint8_t pos; 784 PyObject *header, *v; 785 786 header = PyTuple_New(2); 787 if (nxt_slow_path(header == NULL)) { 788 return NULL; 789 } 790 791 name = nxt_unit_sptr_get(&f->name); 792 793 for (pos = 0; pos < f->name_length; pos++) { 794 c = name[pos]; 795 if (c >= 'A' && c <= 'Z') { 796 name[pos] = (c | 0x20); 797 } 798 } 799 800 v = PyBytes_FromStringAndSize(name, f->name_length); 801 if (nxt_slow_path(v == NULL)) { 802 Py_DECREF(header); 803 804 return NULL; 805 } 806 807 PyTuple_SET_ITEM(header, 0, v); 808 809 v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), 810 f->value_length); 811 if (nxt_slow_path(v == NULL)) { 812 Py_DECREF(header); 813 814 return NULL; 815 } 816 817 PyTuple_SET_ITEM(header, 1, v); 818 819 return header; 820 } 821 822 823 static PyObject * 824 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) 825 { 826 char *v; 827 uint32_t i, n, start; 828 PyObject *res, *proto; 829 830 v = nxt_unit_sptr_get(&f->value); 831 n = 1; 832 833 for (i = 0; i < f->value_length; i++) { 834 if (v[i] == ',') { 835 n++; 836 } 837 } 838 839 res = PyTuple_New(n); 840 if (nxt_slow_path(res == NULL)) { 841 return NULL; 842 } 843 844 n = 0; 845 start = 0; 846 847 for (i = 0; i < f->value_length; ) { 848 if (v[i] != ',') { 849 i++; 850 851 continue; 852 } 853 854 if (i - start > 0) { 855 proto = PyString_FromStringAndSize(v + start, i - start); 856 if (nxt_slow_path(proto == NULL)) { 857 goto fail; 858 } 859 860 PyTuple_SET_ITEM(res, n, proto); 861 862 n++; 863 } 864 865 do { 866 i++; 867 } while (i < f->value_length && v[i] == ' '); 868 869 start = i; 870 } 871 872 if (i - start > 0) { 873 proto = PyString_FromStringAndSize(v + start, i - start); 874 if (nxt_slow_path(proto == NULL)) { 875 goto fail; 876 } 877 878 PyTuple_SET_ITEM(res, n, proto); 879 } 880 881 return res; 882 883 fail: 884 885 Py_DECREF(res); 886 887 return NULL; 888 } 889 890 891 static int 892 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 893 { 894 int nb; 895 896 if (port->in_fd == -1) { 897 return NXT_UNIT_OK; 898 } 899 900 nb = 1; 901 902 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { 903 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 904 port->in_fd, strerror(errno), errno); 905 906 return NXT_UNIT_ERROR; 907 } 908 909 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); 910 911 return nxt_py_asgi_add_reader(ctx, port); 912 } 913 914 915 static int 916 nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 917 { 918 int rc; 919 PyObject *res, *fd, *py_ctx, *py_port; 920 nxt_py_asgi_ctx_data_t *ctx_data; 921 922 nxt_unit_debug(ctx, "asgi_add_reader %d %p %p", port->in_fd, ctx, port); 923 924 ctx_data = ctx->data; 925 926 fd = PyLong_FromLong(port->in_fd); 927 if (nxt_slow_path(fd == NULL)) { 928 nxt_unit_alert(ctx, "Python failed to create fd"); 929 nxt_python_print_exception(); 930 931 return NXT_UNIT_ERROR; 932 } 933 934 rc = NXT_UNIT_ERROR; 935 936 py_ctx = PyLong_FromVoidPtr(ctx); 937 if (nxt_slow_path(py_ctx == NULL)) { 938 nxt_unit_alert(ctx, "Python failed to create py_ctx"); 939 nxt_python_print_exception(); 940 941 goto clean_fd; 942 } 943 944 py_port = PyLong_FromVoidPtr(port); 945 if (nxt_slow_path(py_port == NULL)) { 946 nxt_unit_alert(ctx, "Python failed to create py_port"); 947 nxt_python_print_exception(); 948 949 goto clean_py_ctx; 950 } 951 952 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 953 fd, nxt_py_port_read, 954 py_ctx, py_port, NULL); 955 if (nxt_slow_path(res == NULL)) { 956 nxt_unit_alert(ctx, "Python failed to add_reader"); 957 nxt_python_print_exception(); 958 959 } else { 960 Py_DECREF(res); 961 962 rc = NXT_UNIT_OK; 963 } 964 965 Py_DECREF(py_port); 966 967 clean_py_ctx: 968 969 Py_DECREF(py_ctx); 970 971 clean_fd: 972 973 Py_DECREF(fd); 974 975 return rc; 976 } 977 978 979 static void 980 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx, 981 nxt_unit_port_t *port) 982 { 983 if (port->in_fd == -1 || ctx == NULL) { 984 return; 985 } 986 987 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); 988 989 nxt_py_asgi_remove_reader(ctx, port); 990 } 991 992 993 static void 994 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) 995 { 996 PyObject *res, *p; 997 nxt_py_asgi_ctx_data_t *ctx_data; 998 999 nxt_unit_debug(ctx, "asgi_quit %p", ctx); 1000 1001 ctx_data = ctx->data; 1002 1003 p = PyLong_FromLong(0); 1004 if (nxt_slow_path(p == NULL)) { 1005 nxt_unit_alert(NULL, "Python failed to create Long"); 1006 nxt_python_print_exception(); 1007 1008 } else { 1009 res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result, 1010 p, NULL); 1011 if (nxt_slow_path(res == NULL)) { 1012 nxt_unit_alert(ctx, "Python failed to set_result"); 1013 nxt_python_print_exception(); 1014 1015 } else { 1016 Py_DECREF(res); 1017 } 1018 1019 Py_DECREF(p); 1020 } 1021 } 1022 1023 1024 static void 1025 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) 1026 { 1027 int rc; 1028 nxt_queue_link_t *lnk; 1029 nxt_py_asgi_ctx_data_t *ctx_data; 1030 1031 ctx_data = ctx->data; 1032 1033 while (!nxt_queue_is_empty(&ctx_data->drain_queue)) { 1034 lnk = nxt_queue_first(&ctx_data->drain_queue); 1035 1036 rc = nxt_py_asgi_http_drain(lnk); 1037 if (rc == NXT_UNIT_AGAIN) { 1038 return; 1039 } 1040 1041 nxt_queue_remove(lnk); 1042 } 1043 } 1044 1045 1046 static PyObject * 1047 nxt_py_asgi_port_read(PyObject *self, PyObject *args) 1048 { 1049 int rc; 1050 PyObject *arg0, *arg1, *res; 1051 Py_ssize_t n; 1052 nxt_unit_ctx_t *ctx; 1053 nxt_unit_port_t *port; 1054 nxt_py_asgi_ctx_data_t *ctx_data; 1055 1056 n = PyTuple_GET_SIZE(args); 1057 1058 if (n != 2) { 1059 nxt_unit_alert(NULL, 1060 "nxt_py_asgi_port_read: invalid number of arguments %d", 1061 (int) n); 1062 1063 return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); 1064 } 1065 1066 arg0 = PyTuple_GET_ITEM(args, 0); 1067 if (nxt_slow_path(arg0 == NULL || PyLong_Check(arg0) == 0)) { 1068 return PyErr_Format(PyExc_TypeError, 1069 "the first argument is not a long"); 1070 } 1071 1072 ctx = PyLong_AsVoidPtr(arg0); 1073 1074 arg1 = PyTuple_GET_ITEM(args, 1); 1075 if (nxt_slow_path(arg1 == NULL || PyLong_Check(arg1) == 0)) { 1076 return PyErr_Format(PyExc_TypeError, 1077 "the second argument is not a long"); 1078 } 1079 1080 port = PyLong_AsVoidPtr(arg1); 1081 1082 rc = nxt_unit_process_port_msg(ctx, port); 1083 1084 nxt_unit_debug(ctx, "asgi_port_read(%p,%p): %d", ctx, port, rc); 1085 1086 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 1087 return PyErr_Format(PyExc_RuntimeError, 1088 "error processing port %d message", port->id.id); 1089 } 1090 1091 if (rc == NXT_UNIT_OK) { 1092 ctx_data = ctx->data; 1093 1094 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, 1095 nxt_py_port_read, 1096 arg0, arg1, NULL); 1097 if (nxt_slow_path(res == NULL)) { 1098 nxt_unit_alert(ctx, "Python failed to call 'loop.call_soon'"); 1099 nxt_python_print_exception(); 1100 } 1101 1102 Py_XDECREF(res); 1103 } 1104 1105 Py_RETURN_NONE; 1106 } 1107 1108 1109 PyObject * 1110 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, 1111 void *data) 1112 { 1113 int i; 1114 PyObject *iter, *header, *h_iter, *name, *val, *res; 1115 1116 iter = PyObject_GetIter(headers); 1117 if (nxt_slow_path(iter == NULL)) { 1118 return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); 1119 } 1120 1121 for (i = 0; /* void */; i++) { 1122 header = PyIter_Next(iter); 1123 if (header == NULL) { 1124 break; 1125 } 1126 1127 h_iter = PyObject_GetIter(header); 1128 if (nxt_slow_path(h_iter == NULL)) { 1129 Py_DECREF(header); 1130 Py_DECREF(iter); 1131 1132 return PyErr_Format(PyExc_TypeError, 1133 "'headers' item #%d is not an iterable", i); 1134 } 1135 1136 name = PyIter_Next(h_iter); 1137 if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { 1138 Py_XDECREF(name); 1139 Py_DECREF(h_iter); 1140 Py_DECREF(header); 1141 Py_DECREF(iter); 1142 1143 return PyErr_Format(PyExc_TypeError, 1144 "'headers' item #%d 'name' is not a byte string", i); 1145 } 1146 1147 val = PyIter_Next(h_iter); 1148 if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { 1149 Py_XDECREF(val); 1150 Py_DECREF(h_iter); 1151 Py_DECREF(header); 1152 Py_DECREF(iter); 1153 1154 return PyErr_Format(PyExc_TypeError, 1155 "'headers' item #%d 'value' is not a byte string", i); 1156 } 1157 1158 res = cb(data, i, name, val); 1159 1160 Py_DECREF(name); 1161 Py_DECREF(val); 1162 Py_DECREF(h_iter); 1163 Py_DECREF(header); 1164 1165 if (nxt_slow_path(res == NULL)) { 1166 Py_DECREF(iter); 1167 1168 return NULL; 1169 } 1170 1171 Py_DECREF(res); 1172 } 1173 1174 Py_DECREF(iter); 1175 1176 Py_RETURN_NONE; 1177 } 1178 1179 1180 PyObject * 1181 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) 1182 { 1183 nxt_py_asgi_calc_size_ctx_t *ctx; 1184 1185 ctx = data; 1186 1187 ctx->fields_count++; 1188 ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); 1189 1190 Py_RETURN_NONE; 1191 } 1192 1193 1194 PyObject * 1195 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) 1196 { 1197 int rc; 1198 char *name_str, *val_str; 1199 uint32_t name_len, val_len; 1200 nxt_off_t content_length; 1201 nxt_unit_request_info_t *req; 1202 nxt_py_asgi_add_field_ctx_t *ctx; 1203 1204 name_str = PyBytes_AS_STRING(name); 1205 name_len = PyBytes_GET_SIZE(name); 1206 1207 val_str = PyBytes_AS_STRING(val); 1208 val_len = PyBytes_GET_SIZE(val); 1209 1210 ctx = data; 1211 req = ctx->req; 1212 1213 rc = nxt_unit_response_add_field(req, name_str, name_len, 1214 val_str, val_len); 1215 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1216 return PyErr_Format(PyExc_RuntimeError, 1217 "failed to add header #%d", i); 1218 } 1219 1220 if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { 1221 content_length = nxt_off_t_parse((u_char *) val_str, val_len); 1222 if (nxt_slow_path(content_length < 0)) { 1223 nxt_unit_req_error(req, "failed to parse Content-Length " 1224 "value %.*s", (int) val_len, val_str); 1225 1226 return PyErr_Format(PyExc_ValueError, 1227 "Failed to parse Content-Length: '%.*s'", 1228 (int) val_len, val_str); 1229 } 1230 1231 ctx->content_length = content_length; 1232 } 1233 1234 Py_RETURN_NONE; 1235 } 1236 1237 1238 PyObject * 1239 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, 1240 nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result) 1241 { 1242 PyObject *set_result, *res; 1243 1244 if (nxt_slow_path(result == NULL)) { 1245 Py_DECREF(future); 1246 1247 return NULL; 1248 } 1249 1250 set_result = PyObject_GetAttrString(future, "set_result"); 1251 if (nxt_slow_path(set_result == NULL)) { 1252 nxt_unit_req_alert(req, "failed to get 'set_result' for future"); 1253 1254 Py_CLEAR(future); 1255 1256 goto cleanup_result; 1257 } 1258 1259 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { 1260 nxt_unit_req_alert(req, "'future.set_result' is not a callable"); 1261 1262 Py_CLEAR(future); 1263 1264 goto cleanup; 1265 } 1266 1267 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result, 1268 result, NULL); 1269 if (nxt_slow_path(res == NULL)) { 1270 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); 1271 nxt_python_print_exception(); 1272 1273 Py_CLEAR(future); 1274 } 1275 1276 Py_XDECREF(res); 1277 1278 cleanup: 1279 1280 Py_DECREF(set_result); 1281 1282 cleanup_result: 1283 1284 Py_DECREF(result); 1285 1286 return future; 1287 } 1288 1289 1290 PyObject * 1291 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) 1292 { 1293 PyObject *msg; 1294 1295 msg = PyDict_New(); 1296 if (nxt_slow_path(msg == NULL)) { 1297 nxt_unit_req_alert(req, "Python failed to create message dict"); 1298 nxt_python_print_exception(); 1299 1300 return PyErr_Format(PyExc_RuntimeError, 1301 "failed to create message dict"); 1302 } 1303 1304 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { 1305 nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); 1306 1307 Py_DECREF(msg); 1308 1309 return PyErr_Format(PyExc_RuntimeError, 1310 "failed to set 'msg.type' item"); 1311 } 1312 1313 return msg; 1314 } 1315 1316 1317 PyObject * 1318 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, 1319 PyObject *spec_version) 1320 { 1321 PyObject *scope, *asgi; 1322 1323 scope = PyDict_New(); 1324 if (nxt_slow_path(scope == NULL)) { 1325 nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); 1326 nxt_python_print_exception(); 1327 1328 return PyErr_Format(PyExc_RuntimeError, 1329 "failed to create 'scope' dict"); 1330 } 1331 1332 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { 1333 nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); 1334 1335 Py_DECREF(scope); 1336 1337 return PyErr_Format(PyExc_RuntimeError, 1338 "failed to set 'scope.type' item"); 1339 } 1340 1341 asgi = PyDict_New(); 1342 if (nxt_slow_path(asgi == NULL)) { 1343 nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); 1344 nxt_python_print_exception(); 1345 1346 Py_DECREF(scope); 1347 1348 return PyErr_Format(PyExc_RuntimeError, 1349 "failed to create 'asgi' dict"); 1350 } 1351 1352 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { 1353 nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); 1354 1355 Py_DECREF(asgi); 1356 Py_DECREF(scope); 1357 1358 return PyErr_Format(PyExc_RuntimeError, 1359 "failed to set 'scope.asgi' item"); 1360 } 1361 1362 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, 1363 nxt_py_3_0_str) == -1)) 1364 { 1365 nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); 1366 1367 Py_DECREF(asgi); 1368 Py_DECREF(scope); 1369 1370 return PyErr_Format(PyExc_RuntimeError, 1371 "failed to set 'asgi.version' item"); 1372 } 1373 1374 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, 1375 spec_version) == -1)) 1376 { 1377 nxt_unit_req_alert(req, 1378 "Python failed to set 'asgi.spec_version' item"); 1379 1380 Py_DECREF(asgi); 1381 Py_DECREF(scope); 1382 1383 return PyErr_Format(PyExc_RuntimeError, 1384 "failed to set 'asgi.spec_version' item"); 1385 } 1386 1387 Py_DECREF(asgi); 1388 1389 return scope; 1390 } 1391 1392 1393 void 1394 nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link) 1395 { 1396 nxt_py_asgi_ctx_data_t *ctx_data; 1397 1398 ctx_data = req->ctx->data; 1399 1400 nxt_queue_insert_tail(&ctx_data->drain_queue, link); 1401 } 1402 1403 1404 void 1405 nxt_py_asgi_dealloc(PyObject *self) 1406 { 1407 PyObject_Del(self); 1408 } 1409 1410 1411 PyObject * 1412 nxt_py_asgi_await(PyObject *self) 1413 { 1414 Py_INCREF(self); 1415 return self; 1416 } 1417 1418 1419 PyObject * 1420 nxt_py_asgi_iter(PyObject *self) 1421 { 1422 Py_INCREF(self); 1423 return self; 1424 } 1425 1426 1427 PyObject * 1428 nxt_py_asgi_next(PyObject *self) 1429 { 1430 return NULL; 1431 } 1432 1433 1434 static void 1435 nxt_python_asgi_done(void) 1436 { 1437 nxt_py_asgi_str_done(); 1438 1439 Py_XDECREF(nxt_py_port_read); 1440 } 1441 1442 #else /* !(NXT_HAVE_ASGI) */ 1443 1444 1445 int 1446 nxt_python_asgi_check(PyObject *obj) 1447 { 1448 return 0; 1449 } 1450 1451 1452 int 1453 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 1454 { 1455 nxt_unit_alert(NULL, "ASGI not implemented"); 1456 return NXT_UNIT_ERROR; 1457 } 1458 1459 1460 #endif /* NXT_HAVE_ASGI */ 1461