1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7 #include <python/nxt_python.h> 8 9 #if (NXT_HAVE_ASGI) 10 11 #include <nxt_main.h> 12 #include <nxt_unit.h> 13 #include <nxt_unit_request.h> 14 #include <nxt_unit_response.h> 15 #include <python/nxt_python_asgi.h> 16 #include <python/nxt_python_asgi_str.h> 17 18 19 static PyObject *nxt_python_asgi_get_func(PyObject *obj); 20 static int nxt_python_asgi_ctx_data_alloc(void **pdata, int main); 21 static void nxt_python_asgi_ctx_data_free(void *data); 22 static int nxt_python_asgi_startup(void *data); 23 static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx); 24 25 static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, 26 nxt_unit_port_t *port); 27 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); 28 static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req); 29 30 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); 31 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, 32 uint16_t port); 33 static PyObject *nxt_py_asgi_create_ip_address(nxt_unit_sptr_t *sptr, 34 uint8_t len, uint16_t port); 35 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); 36 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); 37 38 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 39 static int nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 40 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx, 41 nxt_unit_port_t *port); 42 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); 43 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); 44 45 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); 46 static void nxt_python_asgi_done(void); 47 48 static PyObject *nxt_py_port_read; 49 50 static PyMethodDef nxt_py_port_read_method = 51 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; 52 53 static nxt_python_proto_t nxt_py_asgi_proto = { 54 .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc, 55 .ctx_data_free = nxt_python_asgi_ctx_data_free, 56 .startup = nxt_python_asgi_startup, 57 .run = nxt_python_asgi_run, 58 .done = nxt_python_asgi_done, 59 }; 60 61 #define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A 62 63 64 int 65 nxt_python_asgi_check(PyObject *obj) 66 { 67 int res; 68 PyObject *func; 69 PyCodeObject *code; 70 71 func = nxt_python_asgi_get_func(obj); 72 73 if (func == NULL) { 74 return 0; 75 } 76 77 code = (PyCodeObject *) PyFunction_GET_CODE(func); 78 79 nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with " 80 "%d argument(s)", 81 (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ", 82 code->co_argcount); 83 84 res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1; 85 86 Py_DECREF(func); 87 88 return res; 89 } 90 91 92 static PyObject * 93 nxt_python_asgi_get_func(PyObject *obj) 94 { 95 PyObject *call; 96 97 if (PyFunction_Check(obj)) { 98 Py_INCREF(obj); 99 return obj; 100 } 101 102 if (PyMethod_Check(obj)) { 103 obj = PyMethod_GET_FUNCTION(obj); 104 105 Py_INCREF(obj); 106 return obj; 107 } 108 109 call = PyObject_GetAttrString(obj, "__call__"); 110 111 if (call == NULL) { 112 return NULL; 113 } 114 115 if (PyFunction_Check(call)) { 116 return call; 117 } 118 119 if (PyMethod_Check(call)) { 120 obj = PyMethod_GET_FUNCTION(call); 121 122 if (PyFunction_Check(obj)) { 123 Py_INCREF(obj); 124 125 } else { 126 obj = NULL; 127 } 128 129 } else { 130 obj = NULL; 131 } 132 133 Py_DECREF(call); 134 135 return obj; 136 } 137 138 139 int 140 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 141 { 142 PyObject *func; 143 nxt_int_t i; 144 PyCodeObject *code; 145 146 nxt_unit_debug(NULL, "asgi_init"); 147 148 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) { 149 nxt_unit_alert(NULL, "Python failed to init string objects"); 150 return NXT_UNIT_ERROR; 151 } 152 153 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); 154 if (nxt_slow_path(nxt_py_port_read == NULL)) { 155 nxt_unit_alert(NULL, 156 "Python failed to initialize the 'port_read' function"); 157 return NXT_UNIT_ERROR; 158 } 159 160 if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) { 161 return NXT_UNIT_ERROR; 162 } 163 164 if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) { 165 return NXT_UNIT_ERROR; 166 } 167 168 for (i = 0; i < nxt_py_targets->count; i++) { 169 func = nxt_python_asgi_get_func(nxt_py_targets->target[i].application); 170 if (nxt_slow_path(func == NULL)) { 171 nxt_unit_debug(NULL, "asgi: cannot find function for callable, " 172 "unable to check for legacy mode (#%d)", 173 (int) i); 174 continue; 175 } 176 177 code = (PyCodeObject *) PyFunction_GET_CODE(func); 178 179 if ((code->co_flags & CO_COROUTINE) == 0) { 180 nxt_unit_debug(NULL, "asgi: callable is not a coroutine function " 181 "switching to legacy mode"); 182 nxt_py_targets->target[i].asgi_legacy = 1; 183 } 184 185 Py_DECREF(func); 186 } 187 188 init->callbacks.request_handler = nxt_py_asgi_request_handler; 189 init->callbacks.data_handler = nxt_py_asgi_http_data_handler; 190 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; 191 init->callbacks.close_handler = nxt_py_asgi_close_handler; 192 init->callbacks.quit = nxt_py_asgi_quit; 193 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; 194 init->callbacks.add_port = nxt_py_asgi_add_port; 195 init->callbacks.remove_port = nxt_py_asgi_remove_port; 196 197 *proto = nxt_py_asgi_proto; 198 199 return NXT_UNIT_OK; 200 } 201 202 203 static int 204 nxt_python_asgi_ctx_data_alloc(void **pdata, int main) 205 { 206 uint32_t i; 207 PyObject *asyncio, *loop, *event_loop, *obj; 208 const char *event_loop_func; 209 nxt_py_asgi_ctx_data_t *ctx_data; 210 211 ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t)); 212 if (nxt_slow_path(ctx_data == NULL)) { 213 nxt_unit_alert(NULL, "Failed to allocate context data"); 214 return NXT_UNIT_ERROR; 215 } 216 217 memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t)); 218 219 nxt_queue_init(&ctx_data->drain_queue); 220 221 struct { 222 const char *key; 223 PyObject **handler; 224 225 } handlers[] = { 226 { "create_task", &ctx_data->loop_create_task }, 227 { "add_reader", &ctx_data->loop_add_reader }, 228 { "remove_reader", &ctx_data->loop_remove_reader }, 229 { "call_soon", &ctx_data->loop_call_soon }, 230 { "run_until_complete", &ctx_data->loop_run_until_complete }, 231 { "create_future", &ctx_data->loop_create_future }, 232 }; 233 234 loop = NULL; 235 236 asyncio = PyImport_ImportModule("asyncio"); 237 if (nxt_slow_path(asyncio == NULL)) { 238 nxt_unit_alert(NULL, "Python failed to import module 'asyncio'"); 239 nxt_python_print_exception(); 240 goto fail; 241 } 242 243 event_loop_func = main ? "get_event_loop" : "new_event_loop"; 244 245 event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), 246 event_loop_func); 247 if (nxt_slow_path(event_loop == NULL)) { 248 nxt_unit_alert(NULL, 249 "Python failed to get '%s' from module 'asyncio'", 250 event_loop_func); 251 goto fail; 252 } 253 254 if (nxt_slow_path(PyCallable_Check(event_loop) == 0)) { 255 nxt_unit_alert(NULL, 256 "'asyncio.%s' is not a callable object", 257 event_loop_func); 258 goto fail; 259 } 260 261 loop = PyObject_CallObject(event_loop, NULL); 262 if (nxt_slow_path(loop == NULL)) { 263 nxt_unit_alert(NULL, "Python failed to call 'asyncio.%s'", 264 event_loop_func); 265 goto fail; 266 } 267 268 for (i = 0; i < nxt_nitems(handlers); i++) { 269 obj = PyObject_GetAttrString(loop, handlers[i].key); 270 if (nxt_slow_path(obj == NULL)) { 271 nxt_unit_alert(NULL, "Python failed to get 'loop.%s'", 272 handlers[i].key); 273 goto fail; 274 } 275 276 *handlers[i].handler = obj; 277 278 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 279 nxt_unit_alert(NULL, "'loop.%s' is not a callable object", 280 handlers[i].key); 281 goto fail; 282 } 283 } 284 285 obj = PyObject_CallObject(ctx_data->loop_create_future, NULL); 286 if (nxt_slow_path(obj == NULL)) { 287 nxt_unit_alert(NULL, "Python failed to create Future "); 288 nxt_python_print_exception(); 289 goto fail; 290 } 291 292 ctx_data->quit_future = obj; 293 294 obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result"); 295 if (nxt_slow_path(obj == NULL)) { 296 nxt_unit_alert(NULL, "Python failed to get 'future.set_result'"); 297 goto fail; 298 } 299 300 ctx_data->quit_future_set_result = obj; 301 302 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 303 nxt_unit_alert(NULL, "'future.set_result' is not a callable object"); 304 goto fail; 305 } 306 307 Py_DECREF(loop); 308 Py_DECREF(asyncio); 309 310 *pdata = ctx_data; 311 312 return NXT_UNIT_OK; 313 314 fail: 315 316 nxt_python_asgi_ctx_data_free(ctx_data); 317 318 Py_XDECREF(loop); 319 Py_XDECREF(asyncio); 320 321 return NXT_UNIT_ERROR; 322 } 323 324 325 static void 326 nxt_python_asgi_ctx_data_free(void *data) 327 { 328 nxt_py_asgi_ctx_data_t *ctx_data; 329 330 ctx_data = data; 331 332 Py_XDECREF(ctx_data->loop_run_until_complete); 333 Py_XDECREF(ctx_data->loop_create_future); 334 Py_XDECREF(ctx_data->loop_create_task); 335 Py_XDECREF(ctx_data->loop_call_soon); 336 Py_XDECREF(ctx_data->loop_add_reader); 337 Py_XDECREF(ctx_data->loop_remove_reader); 338 Py_XDECREF(ctx_data->quit_future); 339 Py_XDECREF(ctx_data->quit_future_set_result); 340 341 nxt_unit_free(NULL, ctx_data); 342 } 343 344 345 static int 346 nxt_python_asgi_startup(void *data) 347 { 348 return nxt_py_asgi_lifespan_startup(data); 349 } 350 351 352 static int 353 nxt_python_asgi_run(nxt_unit_ctx_t *ctx) 354 { 355 PyObject *res; 356 nxt_py_asgi_ctx_data_t *ctx_data; 357 358 ctx_data = ctx->data; 359 360 res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, 361 ctx_data->quit_future, NULL); 362 if (nxt_slow_path(res == NULL)) { 363 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); 364 nxt_python_print_exception(); 365 366 return NXT_UNIT_ERROR; 367 } 368 369 Py_DECREF(res); 370 371 nxt_py_asgi_lifespan_shutdown(ctx); 372 373 return NXT_UNIT_OK; 374 } 375 376 377 static void 378 nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 379 { 380 PyObject *res, *fd; 381 nxt_py_asgi_ctx_data_t *ctx_data; 382 383 if (port == NULL || port->in_fd == -1) { 384 return; 385 } 386 387 ctx_data = ctx->data; 388 389 nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port); 390 391 fd = PyLong_FromLong(port->in_fd); 392 if (nxt_slow_path(fd == NULL)) { 393 nxt_unit_alert(ctx, "Python failed to create Long object"); 394 nxt_python_print_exception(); 395 396 return; 397 } 398 399 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL); 400 if (nxt_slow_path(res == NULL)) { 401 nxt_unit_alert(ctx, "Python failed to remove_reader"); 402 nxt_python_print_exception(); 403 404 } else { 405 Py_DECREF(res); 406 } 407 408 Py_DECREF(fd); 409 } 410 411 412 static void 413 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) 414 { 415 PyObject *scope, *res, *task, *receive, *send, *done, *asgi; 416 PyObject *stage2; 417 nxt_python_target_t *target; 418 nxt_py_asgi_ctx_data_t *ctx_data; 419 420 if (req->request->websocket_handshake) { 421 asgi = nxt_py_asgi_websocket_create(req); 422 423 } else { 424 asgi = nxt_py_asgi_http_create(req); 425 } 426 427 if (nxt_slow_path(asgi == NULL)) { 428 nxt_unit_req_alert(req, "Python failed to create asgi object"); 429 nxt_unit_request_done(req, NXT_UNIT_ERROR); 430 431 return; 432 } 433 434 receive = PyObject_GetAttrString(asgi, "receive"); 435 if (nxt_slow_path(receive == NULL)) { 436 nxt_unit_req_alert(req, "Python failed to get 'receive' method"); 437 nxt_unit_request_done(req, NXT_UNIT_ERROR); 438 439 goto release_asgi; 440 } 441 442 send = PyObject_GetAttrString(asgi, "send"); 443 if (nxt_slow_path(receive == NULL)) { 444 nxt_unit_req_alert(req, "Python failed to get 'send' method"); 445 nxt_unit_request_done(req, NXT_UNIT_ERROR); 446 447 goto release_receive; 448 } 449 450 done = PyObject_GetAttrString(asgi, "_done"); 451 if (nxt_slow_path(receive == NULL)) { 452 nxt_unit_req_alert(req, "Python failed to get '_done' method"); 453 nxt_unit_request_done(req, NXT_UNIT_ERROR); 454 455 goto release_send; 456 } 457 458 scope = nxt_py_asgi_create_http_scope(req); 459 if (nxt_slow_path(scope == NULL)) { 460 nxt_unit_request_done(req, NXT_UNIT_ERROR); 461 462 goto release_done; 463 } 464 465 req->data = asgi; 466 target = &nxt_py_targets->target[req->request->app_target]; 467 468 if (!target->asgi_legacy) { 469 nxt_unit_req_debug(req, "Python call ASGI 3.0 application"); 470 471 res = PyObject_CallFunctionObjArgs(target->application, 472 scope, receive, send, NULL); 473 474 } else { 475 nxt_unit_req_debug(req, "Python call legacy application"); 476 477 res = PyObject_CallFunctionObjArgs(target->application, scope, NULL); 478 479 if (nxt_slow_path(res == NULL)) { 480 nxt_unit_req_error(req, "Python failed to call legacy app stage1"); 481 nxt_python_print_exception(); 482 nxt_unit_request_done(req, NXT_UNIT_ERROR); 483 484 goto release_scope; 485 } 486 487 if (nxt_slow_path(PyCallable_Check(res) == 0)) { 488 nxt_unit_req_error(req, 489 "Legacy ASGI application returns not a callable"); 490 nxt_unit_request_done(req, NXT_UNIT_ERROR); 491 492 Py_DECREF(res); 493 494 goto release_scope; 495 } 496 497 stage2 = res; 498 499 res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL); 500 501 Py_DECREF(stage2); 502 } 503 504 if (nxt_slow_path(res == NULL)) { 505 nxt_unit_req_error(req, "Python failed to call the application"); 506 nxt_python_print_exception(); 507 nxt_unit_request_done(req, NXT_UNIT_ERROR); 508 509 goto release_scope; 510 } 511 512 if (nxt_slow_path(!PyCoro_CheckExact(res))) { 513 nxt_unit_req_error(req, "Application result type is not a coroutine"); 514 nxt_unit_request_done(req, NXT_UNIT_ERROR); 515 516 Py_DECREF(res); 517 518 goto release_scope; 519 } 520 521 ctx_data = req->ctx->data; 522 523 task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); 524 if (nxt_slow_path(task == NULL)) { 525 nxt_unit_req_error(req, "Python failed to call the create_task"); 526 nxt_python_print_exception(); 527 nxt_unit_request_done(req, NXT_UNIT_ERROR); 528 529 Py_DECREF(res); 530 531 goto release_scope; 532 } 533 534 Py_DECREF(res); 535 536 res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, 537 NULL); 538 if (nxt_slow_path(res == NULL)) { 539 nxt_unit_req_error(req, 540 "Python failed to call 'task.add_done_callback'"); 541 nxt_python_print_exception(); 542 nxt_unit_request_done(req, NXT_UNIT_ERROR); 543 544 goto release_task; 545 } 546 547 Py_DECREF(res); 548 release_task: 549 Py_DECREF(task); 550 release_scope: 551 Py_DECREF(scope); 552 release_done: 553 Py_DECREF(done); 554 release_send: 555 Py_DECREF(send); 556 release_receive: 557 Py_DECREF(receive); 558 release_asgi: 559 Py_DECREF(asgi); 560 } 561 562 563 static void 564 nxt_py_asgi_close_handler(nxt_unit_request_info_t *req) 565 { 566 if (req->request->websocket_handshake) { 567 nxt_py_asgi_websocket_close_handler(req); 568 569 } else { 570 nxt_py_asgi_http_close_handler(req); 571 } 572 } 573 574 575 static PyObject * 576 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) 577 { 578 char *p, *target, *query; 579 uint32_t target_length, i; 580 PyObject *scope, *v, *type, *scheme; 581 PyObject *headers, *header; 582 nxt_unit_field_t *f; 583 nxt_unit_request_t *r; 584 585 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); 586 587 #define SET_ITEM(dict, key, value) \ 588 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 589 == -1)) \ 590 { \ 591 nxt_unit_req_alert(req, "Python failed to set '" \ 592 #dict "." #key "' item"); \ 593 goto fail; \ 594 } 595 596 v = NULL; 597 headers = NULL; 598 599 r = req->request; 600 601 if (r->websocket_handshake) { 602 type = nxt_py_websocket_str; 603 scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; 604 605 } else { 606 type = nxt_py_http_str; 607 scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; 608 } 609 610 scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); 611 if (nxt_slow_path(scope == NULL)) { 612 return NULL; 613 } 614 615 p = nxt_unit_sptr_get(&r->version); 616 SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str 617 : nxt_py_1_0_str) 618 SET_ITEM(scope, scheme, scheme) 619 620 v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), 621 r->method_length); 622 if (nxt_slow_path(v == NULL)) { 623 nxt_unit_req_alert(req, "Python failed to create 'method' string"); 624 goto fail; 625 } 626 627 SET_ITEM(scope, method, v) 628 Py_DECREF(v); 629 630 v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, 631 "replace"); 632 if (nxt_slow_path(v == NULL)) { 633 nxt_unit_req_alert(req, "Python failed to create 'path' string"); 634 goto fail; 635 } 636 637 SET_ITEM(scope, path, v) 638 Py_DECREF(v); 639 640 target = nxt_unit_sptr_get(&r->target); 641 query = nxt_unit_sptr_get(&r->query); 642 643 if (r->query.offset != 0) { 644 target_length = query - target - 1; 645 646 } else { 647 target_length = r->target_length; 648 } 649 650 v = PyBytes_FromStringAndSize(target, target_length); 651 if (nxt_slow_path(v == NULL)) { 652 nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); 653 goto fail; 654 } 655 656 SET_ITEM(scope, raw_path, v) 657 Py_DECREF(v); 658 659 v = PyBytes_FromStringAndSize(query, r->query_length); 660 if (nxt_slow_path(v == NULL)) { 661 nxt_unit_req_alert(req, "Python failed to create 'query' string"); 662 goto fail; 663 } 664 665 SET_ITEM(scope, query_string, v) 666 Py_DECREF(v); 667 668 v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); 669 if (nxt_slow_path(v == NULL)) { 670 nxt_unit_req_alert(req, "Python failed to create 'client' pair"); 671 goto fail; 672 } 673 674 SET_ITEM(scope, client, v) 675 Py_DECREF(v); 676 677 v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); 678 if (nxt_slow_path(v == NULL)) { 679 nxt_unit_req_alert(req, "Python failed to create 'server' pair"); 680 goto fail; 681 } 682 683 SET_ITEM(scope, server, v) 684 Py_DECREF(v); 685 686 v = NULL; 687 688 headers = PyTuple_New(r->fields_count); 689 if (nxt_slow_path(headers == NULL)) { 690 nxt_unit_req_alert(req, "Python failed to create 'headers' object"); 691 goto fail; 692 } 693 694 for (i = 0; i < r->fields_count; i++) { 695 f = r->fields + i; 696 697 header = nxt_py_asgi_create_header(f); 698 if (nxt_slow_path(header == NULL)) { 699 nxt_unit_req_alert(req, "Python failed to create 'header' pair"); 700 goto fail; 701 } 702 703 PyTuple_SET_ITEM(headers, i, header); 704 705 if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL 706 && f->name_length == ws_protocol.length 707 && f->value_length > 0 708 && r->websocket_handshake) 709 { 710 v = nxt_py_asgi_create_subprotocols(f); 711 if (nxt_slow_path(v == NULL)) { 712 nxt_unit_req_alert(req, "Failed to create subprotocols"); 713 goto fail; 714 } 715 716 SET_ITEM(scope, subprotocols, v); 717 Py_DECREF(v); 718 } 719 } 720 721 SET_ITEM(scope, headers, headers) 722 Py_DECREF(headers); 723 724 return scope; 725 726 fail: 727 728 Py_XDECREF(v); 729 Py_XDECREF(headers); 730 Py_DECREF(scope); 731 732 return NULL; 733 734 #undef SET_ITEM 735 } 736 737 738 static PyObject * 739 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) 740 { 741 size_t prefix_len; 742 nxt_str_t addr; 743 PyObject *pair, *v; 744 745 addr.length = len; 746 addr.start = nxt_unit_sptr_get(sptr); 747 748 prefix_len = nxt_length("unix:"); 749 if (!nxt_str_start(&addr, "unix:", prefix_len)) { 750 return nxt_py_asgi_create_ip_address(sptr, len, port); 751 } 752 753 #if NXT_HAVE_UNIX_DOMAIN 754 pair = PyTuple_New(2); 755 if (nxt_slow_path(pair == NULL)) { 756 return NULL; 757 } 758 759 addr.start += prefix_len; 760 addr.length -= prefix_len; 761 762 v = PyString_FromStringAndSize((const char *) addr.start, addr.length); 763 if (nxt_slow_path(v == NULL)) { 764 Py_DECREF(pair); 765 766 return NULL; 767 } 768 769 PyTuple_SET_ITEM(pair, 0, v); 770 PyTuple_SET_ITEM(pair, 1, Py_None); 771 772 return pair; 773 774 #else 775 return NULL; 776 #endif 777 } 778 779 780 static PyObject * 781 nxt_py_asgi_create_ip_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) 782 { 783 char *p, *s; 784 PyObject *pair, *v; 785 786 pair = PyTuple_New(2); 787 if (nxt_slow_path(pair == NULL)) { 788 return NULL; 789 } 790 791 p = nxt_unit_sptr_get(sptr); 792 s = memchr(p, ':', len); 793 794 v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); 795 if (nxt_slow_path(v == NULL)) { 796 Py_DECREF(pair); 797 798 return NULL; 799 } 800 801 PyTuple_SET_ITEM(pair, 0, v); 802 803 if (s != NULL) { 804 p += len; 805 v = PyLong_FromString(s + 1, &p, 10); 806 807 } else { 808 v = PyLong_FromLong(port); 809 } 810 811 if (nxt_slow_path(v == NULL)) { 812 Py_DECREF(pair); 813 814 return NULL; 815 } 816 817 PyTuple_SET_ITEM(pair, 1, v); 818 819 return pair; 820 } 821 822 823 static PyObject * 824 nxt_py_asgi_create_header(nxt_unit_field_t *f) 825 { 826 char c, *name; 827 uint8_t pos; 828 PyObject *header, *v; 829 830 header = PyTuple_New(2); 831 if (nxt_slow_path(header == NULL)) { 832 return NULL; 833 } 834 835 name = nxt_unit_sptr_get(&f->name); 836 837 for (pos = 0; pos < f->name_length; pos++) { 838 c = name[pos]; 839 if (c >= 'A' && c <= 'Z') { 840 name[pos] = (c | 0x20); 841 } 842 } 843 844 v = PyBytes_FromStringAndSize(name, f->name_length); 845 if (nxt_slow_path(v == NULL)) { 846 Py_DECREF(header); 847 848 return NULL; 849 } 850 851 PyTuple_SET_ITEM(header, 0, v); 852 853 v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), 854 f->value_length); 855 if (nxt_slow_path(v == NULL)) { 856 Py_DECREF(header); 857 858 return NULL; 859 } 860 861 PyTuple_SET_ITEM(header, 1, v); 862 863 return header; 864 } 865 866 867 static PyObject * 868 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) 869 { 870 char *v; 871 uint32_t i, n, start; 872 PyObject *res, *proto; 873 874 v = nxt_unit_sptr_get(&f->value); 875 n = 1; 876 877 for (i = 0; i < f->value_length; i++) { 878 if (v[i] == ',') { 879 n++; 880 } 881 } 882 883 res = PyTuple_New(n); 884 if (nxt_slow_path(res == NULL)) { 885 return NULL; 886 } 887 888 n = 0; 889 start = 0; 890 891 for (i = 0; i < f->value_length; ) { 892 if (v[i] != ',') { 893 i++; 894 895 continue; 896 } 897 898 if (i - start > 0) { 899 proto = PyString_FromStringAndSize(v + start, i - start); 900 if (nxt_slow_path(proto == NULL)) { 901 goto fail; 902 } 903 904 PyTuple_SET_ITEM(res, n, proto); 905 906 n++; 907 } 908 909 do { 910 i++; 911 } while (i < f->value_length && v[i] == ' '); 912 913 start = i; 914 } 915 916 if (i - start > 0) { 917 proto = PyString_FromStringAndSize(v + start, i - start); 918 if (nxt_slow_path(proto == NULL)) { 919 goto fail; 920 } 921 922 PyTuple_SET_ITEM(res, n, proto); 923 } 924 925 return res; 926 927 fail: 928 929 Py_DECREF(res); 930 931 return NULL; 932 } 933 934 935 static int 936 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 937 { 938 int nb; 939 940 if (port->in_fd == -1) { 941 return NXT_UNIT_OK; 942 } 943 944 nb = 1; 945 946 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { 947 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 948 port->in_fd, strerror(errno), errno); 949 950 return NXT_UNIT_ERROR; 951 } 952 953 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); 954 955 return nxt_py_asgi_add_reader(ctx, port); 956 } 957 958 959 static int 960 nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 961 { 962 int rc; 963 PyObject *res, *fd, *py_ctx, *py_port; 964 nxt_py_asgi_ctx_data_t *ctx_data; 965 966 nxt_unit_debug(ctx, "asgi_add_reader %d %p %p", port->in_fd, ctx, port); 967 968 ctx_data = ctx->data; 969 970 fd = PyLong_FromLong(port->in_fd); 971 if (nxt_slow_path(fd == NULL)) { 972 nxt_unit_alert(ctx, "Python failed to create fd"); 973 nxt_python_print_exception(); 974 975 return NXT_UNIT_ERROR; 976 } 977 978 rc = NXT_UNIT_ERROR; 979 980 py_ctx = PyLong_FromVoidPtr(ctx); 981 if (nxt_slow_path(py_ctx == NULL)) { 982 nxt_unit_alert(ctx, "Python failed to create py_ctx"); 983 nxt_python_print_exception(); 984 985 goto clean_fd; 986 } 987 988 py_port = PyLong_FromVoidPtr(port); 989 if (nxt_slow_path(py_port == NULL)) { 990 nxt_unit_alert(ctx, "Python failed to create py_port"); 991 nxt_python_print_exception(); 992 993 goto clean_py_ctx; 994 } 995 996 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 997 fd, nxt_py_port_read, 998 py_ctx, py_port, NULL); 999 if (nxt_slow_path(res == NULL)) { 1000 nxt_unit_alert(ctx, "Python failed to add_reader"); 1001 nxt_python_print_exception(); 1002 1003 } else { 1004 Py_DECREF(res); 1005 1006 rc = NXT_UNIT_OK; 1007 } 1008 1009 Py_DECREF(py_port); 1010 1011 clean_py_ctx: 1012 1013 Py_DECREF(py_ctx); 1014 1015 clean_fd: 1016 1017 Py_DECREF(fd); 1018 1019 return rc; 1020 } 1021 1022 1023 static void 1024 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx, 1025 nxt_unit_port_t *port) 1026 { 1027 if (port->in_fd == -1 || ctx == NULL) { 1028 return; 1029 } 1030 1031 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); 1032 1033 nxt_py_asgi_remove_reader(ctx, port); 1034 } 1035 1036 1037 static void 1038 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) 1039 { 1040 PyObject *res, *p; 1041 nxt_py_asgi_ctx_data_t *ctx_data; 1042 1043 nxt_unit_debug(ctx, "asgi_quit %p", ctx); 1044 1045 ctx_data = ctx->data; 1046 1047 p = PyLong_FromLong(0); 1048 if (nxt_slow_path(p == NULL)) { 1049 nxt_unit_alert(NULL, "Python failed to create Long"); 1050 nxt_python_print_exception(); 1051 1052 } else { 1053 res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result, 1054 p, NULL); 1055 if (nxt_slow_path(res == NULL)) { 1056 nxt_unit_alert(ctx, "Python failed to set_result"); 1057 nxt_python_print_exception(); 1058 1059 } else { 1060 Py_DECREF(res); 1061 } 1062 1063 Py_DECREF(p); 1064 } 1065 } 1066 1067 1068 static void 1069 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) 1070 { 1071 int rc; 1072 nxt_queue_link_t *lnk; 1073 nxt_py_asgi_ctx_data_t *ctx_data; 1074 1075 ctx_data = ctx->data; 1076 1077 while (!nxt_queue_is_empty(&ctx_data->drain_queue)) { 1078 lnk = nxt_queue_first(&ctx_data->drain_queue); 1079 1080 rc = nxt_py_asgi_http_drain(lnk); 1081 if (rc == NXT_UNIT_AGAIN) { 1082 return; 1083 } 1084 1085 nxt_queue_remove(lnk); 1086 } 1087 } 1088 1089 1090 static PyObject * 1091 nxt_py_asgi_port_read(PyObject *self, PyObject *args) 1092 { 1093 int rc; 1094 PyObject *arg0, *arg1, *res; 1095 Py_ssize_t n; 1096 nxt_unit_ctx_t *ctx; 1097 nxt_unit_port_t *port; 1098 nxt_py_asgi_ctx_data_t *ctx_data; 1099 1100 n = PyTuple_GET_SIZE(args); 1101 1102 if (n != 2) { 1103 nxt_unit_alert(NULL, 1104 "nxt_py_asgi_port_read: invalid number of arguments %d", 1105 (int) n); 1106 1107 return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); 1108 } 1109 1110 arg0 = PyTuple_GET_ITEM(args, 0); 1111 if (nxt_slow_path(arg0 == NULL || PyLong_Check(arg0) == 0)) { 1112 return PyErr_Format(PyExc_TypeError, 1113 "the first argument is not a long"); 1114 } 1115 1116 ctx = PyLong_AsVoidPtr(arg0); 1117 1118 arg1 = PyTuple_GET_ITEM(args, 1); 1119 if (nxt_slow_path(arg1 == NULL || PyLong_Check(arg1) == 0)) { 1120 return PyErr_Format(PyExc_TypeError, 1121 "the second argument is not a long"); 1122 } 1123 1124 port = PyLong_AsVoidPtr(arg1); 1125 1126 rc = nxt_unit_process_port_msg(ctx, port); 1127 1128 nxt_unit_debug(ctx, "asgi_port_read(%p,%p): %d", ctx, port, rc); 1129 1130 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 1131 return PyErr_Format(PyExc_RuntimeError, 1132 "error processing port %d message", port->id.id); 1133 } 1134 1135 if (rc == NXT_UNIT_OK) { 1136 ctx_data = ctx->data; 1137 1138 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, 1139 nxt_py_port_read, 1140 arg0, arg1, NULL); 1141 if (nxt_slow_path(res == NULL)) { 1142 nxt_unit_alert(ctx, "Python failed to call 'loop.call_soon'"); 1143 nxt_python_print_exception(); 1144 } 1145 1146 Py_XDECREF(res); 1147 } 1148 1149 Py_RETURN_NONE; 1150 } 1151 1152 1153 PyObject * 1154 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, 1155 void *data) 1156 { 1157 int i; 1158 PyObject *iter, *header, *h_iter, *name, *val, *res; 1159 1160 iter = PyObject_GetIter(headers); 1161 if (nxt_slow_path(iter == NULL)) { 1162 return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); 1163 } 1164 1165 for (i = 0; /* void */; i++) { 1166 header = PyIter_Next(iter); 1167 if (header == NULL) { 1168 break; 1169 } 1170 1171 h_iter = PyObject_GetIter(header); 1172 if (nxt_slow_path(h_iter == NULL)) { 1173 Py_DECREF(header); 1174 Py_DECREF(iter); 1175 1176 return PyErr_Format(PyExc_TypeError, 1177 "'headers' item #%d is not an iterable", i); 1178 } 1179 1180 name = PyIter_Next(h_iter); 1181 if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { 1182 Py_XDECREF(name); 1183 Py_DECREF(h_iter); 1184 Py_DECREF(header); 1185 Py_DECREF(iter); 1186 1187 return PyErr_Format(PyExc_TypeError, 1188 "'headers' item #%d 'name' is not a byte string", i); 1189 } 1190 1191 val = PyIter_Next(h_iter); 1192 if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { 1193 Py_XDECREF(val); 1194 Py_DECREF(h_iter); 1195 Py_DECREF(header); 1196 Py_DECREF(iter); 1197 1198 return PyErr_Format(PyExc_TypeError, 1199 "'headers' item #%d 'value' is not a byte string", i); 1200 } 1201 1202 res = cb(data, i, name, val); 1203 1204 Py_DECREF(name); 1205 Py_DECREF(val); 1206 Py_DECREF(h_iter); 1207 Py_DECREF(header); 1208 1209 if (nxt_slow_path(res == NULL)) { 1210 Py_DECREF(iter); 1211 1212 return NULL; 1213 } 1214 1215 Py_DECREF(res); 1216 } 1217 1218 Py_DECREF(iter); 1219 1220 Py_RETURN_NONE; 1221 } 1222 1223 1224 PyObject * 1225 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) 1226 { 1227 nxt_py_asgi_calc_size_ctx_t *ctx; 1228 1229 ctx = data; 1230 1231 ctx->fields_count++; 1232 ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); 1233 1234 Py_RETURN_NONE; 1235 } 1236 1237 1238 PyObject * 1239 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) 1240 { 1241 int rc; 1242 char *name_str, *val_str; 1243 uint32_t name_len, val_len; 1244 nxt_off_t content_length; 1245 nxt_unit_request_info_t *req; 1246 nxt_py_asgi_add_field_ctx_t *ctx; 1247 1248 name_str = PyBytes_AS_STRING(name); 1249 name_len = PyBytes_GET_SIZE(name); 1250 1251 val_str = PyBytes_AS_STRING(val); 1252 val_len = PyBytes_GET_SIZE(val); 1253 1254 ctx = data; 1255 req = ctx->req; 1256 1257 rc = nxt_unit_response_add_field(req, name_str, name_len, 1258 val_str, val_len); 1259 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1260 return PyErr_Format(PyExc_RuntimeError, 1261 "failed to add header #%d", i); 1262 } 1263 1264 if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { 1265 content_length = nxt_off_t_parse((u_char *) val_str, val_len); 1266 if (nxt_slow_path(content_length < 0)) { 1267 nxt_unit_req_error(req, "failed to parse Content-Length " 1268 "value %.*s", (int) val_len, val_str); 1269 1270 return PyErr_Format(PyExc_ValueError, 1271 "Failed to parse Content-Length: '%.*s'", 1272 (int) val_len, val_str); 1273 } 1274 1275 ctx->content_length = content_length; 1276 } 1277 1278 Py_RETURN_NONE; 1279 } 1280 1281 1282 PyObject * 1283 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, 1284 nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result) 1285 { 1286 PyObject *set_result, *res; 1287 1288 if (nxt_slow_path(result == NULL)) { 1289 Py_DECREF(future); 1290 1291 return NULL; 1292 } 1293 1294 set_result = PyObject_GetAttrString(future, "set_result"); 1295 if (nxt_slow_path(set_result == NULL)) { 1296 nxt_unit_req_alert(req, "failed to get 'set_result' for future"); 1297 1298 Py_CLEAR(future); 1299 1300 goto cleanup_result; 1301 } 1302 1303 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { 1304 nxt_unit_req_alert(req, "'future.set_result' is not a callable"); 1305 1306 Py_CLEAR(future); 1307 1308 goto cleanup; 1309 } 1310 1311 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result, 1312 result, NULL); 1313 if (nxt_slow_path(res == NULL)) { 1314 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); 1315 nxt_python_print_exception(); 1316 1317 Py_CLEAR(future); 1318 } 1319 1320 Py_XDECREF(res); 1321 1322 cleanup: 1323 1324 Py_DECREF(set_result); 1325 1326 cleanup_result: 1327 1328 Py_DECREF(result); 1329 1330 return future; 1331 } 1332 1333 1334 PyObject * 1335 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) 1336 { 1337 PyObject *msg; 1338 1339 msg = PyDict_New(); 1340 if (nxt_slow_path(msg == NULL)) { 1341 nxt_unit_req_alert(req, "Python failed to create message dict"); 1342 nxt_python_print_exception(); 1343 1344 return PyErr_Format(PyExc_RuntimeError, 1345 "failed to create message dict"); 1346 } 1347 1348 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { 1349 nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); 1350 1351 Py_DECREF(msg); 1352 1353 return PyErr_Format(PyExc_RuntimeError, 1354 "failed to set 'msg.type' item"); 1355 } 1356 1357 return msg; 1358 } 1359 1360 1361 PyObject * 1362 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, 1363 PyObject *spec_version) 1364 { 1365 PyObject *scope, *asgi; 1366 1367 scope = PyDict_New(); 1368 if (nxt_slow_path(scope == NULL)) { 1369 nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); 1370 nxt_python_print_exception(); 1371 1372 return PyErr_Format(PyExc_RuntimeError, 1373 "failed to create 'scope' dict"); 1374 } 1375 1376 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { 1377 nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); 1378 1379 Py_DECREF(scope); 1380 1381 return PyErr_Format(PyExc_RuntimeError, 1382 "failed to set 'scope.type' item"); 1383 } 1384 1385 asgi = PyDict_New(); 1386 if (nxt_slow_path(asgi == NULL)) { 1387 nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); 1388 nxt_python_print_exception(); 1389 1390 Py_DECREF(scope); 1391 1392 return PyErr_Format(PyExc_RuntimeError, 1393 "failed to create 'asgi' dict"); 1394 } 1395 1396 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { 1397 nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); 1398 1399 Py_DECREF(asgi); 1400 Py_DECREF(scope); 1401 1402 return PyErr_Format(PyExc_RuntimeError, 1403 "failed to set 'scope.asgi' item"); 1404 } 1405 1406 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, 1407 nxt_py_3_0_str) == -1)) 1408 { 1409 nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); 1410 1411 Py_DECREF(asgi); 1412 Py_DECREF(scope); 1413 1414 return PyErr_Format(PyExc_RuntimeError, 1415 "failed to set 'asgi.version' item"); 1416 } 1417 1418 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, 1419 spec_version) == -1)) 1420 { 1421 nxt_unit_req_alert(req, 1422 "Python failed to set 'asgi.spec_version' item"); 1423 1424 Py_DECREF(asgi); 1425 Py_DECREF(scope); 1426 1427 return PyErr_Format(PyExc_RuntimeError, 1428 "failed to set 'asgi.spec_version' item"); 1429 } 1430 1431 Py_DECREF(asgi); 1432 1433 return scope; 1434 } 1435 1436 1437 void 1438 nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link) 1439 { 1440 nxt_py_asgi_ctx_data_t *ctx_data; 1441 1442 ctx_data = req->ctx->data; 1443 1444 nxt_queue_insert_tail(&ctx_data->drain_queue, link); 1445 } 1446 1447 1448 void 1449 nxt_py_asgi_dealloc(PyObject *self) 1450 { 1451 PyObject_Del(self); 1452 } 1453 1454 1455 PyObject * 1456 nxt_py_asgi_await(PyObject *self) 1457 { 1458 Py_INCREF(self); 1459 return self; 1460 } 1461 1462 1463 PyObject * 1464 nxt_py_asgi_iter(PyObject *self) 1465 { 1466 Py_INCREF(self); 1467 return self; 1468 } 1469 1470 1471 PyObject * 1472 nxt_py_asgi_next(PyObject *self) 1473 { 1474 return NULL; 1475 } 1476 1477 1478 static void 1479 nxt_python_asgi_done(void) 1480 { 1481 nxt_py_asgi_str_done(); 1482 1483 Py_XDECREF(nxt_py_port_read); 1484 } 1485 1486 #else /* !(NXT_HAVE_ASGI) */ 1487 1488 1489 int 1490 nxt_python_asgi_check(PyObject *obj) 1491 { 1492 return 0; 1493 } 1494 1495 1496 int 1497 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 1498 { 1499 nxt_unit_alert(NULL, "ASGI not implemented"); 1500 return NXT_UNIT_ERROR; 1501 } 1502 1503 1504 #endif /* NXT_HAVE_ASGI */ 1505