1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7 #include <python/nxt_python.h> 8 9 #if (NXT_HAVE_ASGI) 10 11 #include <nxt_main.h> 12 #include <nxt_unit.h> 13 #include <nxt_unit_request.h> 14 #include <nxt_unit_response.h> 15 #include <python/nxt_python_asgi.h> 16 #include <python/nxt_python_asgi_str.h> 17 18 19 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); 20 21 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); 22 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, 23 uint16_t port); 24 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); 25 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); 26 27 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 28 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); 29 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); 30 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); 31 32 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); 33 34 35 PyObject *nxt_py_loop_run_until_complete; 36 PyObject *nxt_py_loop_create_future; 37 PyObject *nxt_py_loop_create_task; 38 39 nxt_queue_t nxt_py_asgi_drain_queue; 40 41 static PyObject *nxt_py_loop_call_soon; 42 static PyObject *nxt_py_quit_future; 43 static PyObject *nxt_py_quit_future_set_result; 44 static PyObject *nxt_py_loop_add_reader; 45 static PyObject *nxt_py_loop_remove_reader; 46 static PyObject *nxt_py_port_read; 47 48 static PyMethodDef nxt_py_port_read_method = 49 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; 50 51 #define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A 52 53 54 int 55 nxt_python_asgi_check(PyObject *obj) 56 { 57 int res; 58 PyObject *call; 59 PyCodeObject *code; 60 61 if (PyFunction_Check(obj)) { 62 code = (PyCodeObject *) PyFunction_GET_CODE(obj); 63 64 return (code->co_flags & CO_COROUTINE) != 0; 65 } 66 67 if (PyMethod_Check(obj)) { 68 obj = PyMethod_GET_FUNCTION(obj); 69 70 code = (PyCodeObject *) PyFunction_GET_CODE(obj); 71 72 return (code->co_flags & CO_COROUTINE) != 0; 73 } 74 75 call = PyObject_GetAttrString(obj, "__call__"); 76 77 if (call == NULL) { 78 return 0; 79 } 80 81 if (PyFunction_Check(call)) { 82 code = (PyCodeObject *) PyFunction_GET_CODE(call); 83 84 res = (code->co_flags & CO_COROUTINE) != 0; 85 86 } else { 87 if (PyMethod_Check(call)) { 88 obj = PyMethod_GET_FUNCTION(call); 89 90 code = (PyCodeObject *) PyFunction_GET_CODE(obj); 91 92 res = (code->co_flags & CO_COROUTINE) != 0; 93 94 } else { 95 res = 0; 96 } 97 } 98 99 Py_DECREF(call); 100 101 return res; 102 } 103 104 105 nxt_int_t 106 nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) 107 { 108 PyObject *asyncio, *loop, *get_event_loop; 109 nxt_int_t rc; 110 111 nxt_debug(task, "asgi_init"); 112 113 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) { 114 nxt_alert(task, "Python failed to init string objects"); 115 return NXT_ERROR; 116 } 117 118 asyncio = PyImport_ImportModule("asyncio"); 119 if (nxt_slow_path(asyncio == NULL)) { 120 nxt_alert(task, "Python failed to import module 'asyncio'"); 121 nxt_python_print_exception(); 122 return NXT_ERROR; 123 } 124 125 loop = NULL; 126 get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), 127 "get_event_loop"); 128 if (nxt_slow_path(get_event_loop == NULL)) { 129 nxt_alert(task, 130 "Python failed to get 'get_event_loop' from module 'asyncio'"); 131 goto fail; 132 } 133 134 if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) { 135 nxt_alert(task, "'asyncio.get_event_loop' is not a callable object"); 136 goto fail; 137 } 138 139 loop = PyObject_CallObject(get_event_loop, NULL); 140 if (nxt_slow_path(loop == NULL)) { 141 nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'"); 142 goto fail; 143 } 144 145 nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task"); 146 if (nxt_slow_path(nxt_py_loop_create_task == NULL)) { 147 nxt_alert(task, "Python failed to get 'loop.create_task'"); 148 goto fail; 149 } 150 151 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) { 152 nxt_alert(task, "'loop.create_task' is not a callable object"); 153 goto fail; 154 } 155 156 nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader"); 157 if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) { 158 nxt_alert(task, "Python failed to get 'loop.add_reader'"); 159 goto fail; 160 } 161 162 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) { 163 nxt_alert(task, "'loop.add_reader' is not a callable object"); 164 goto fail; 165 } 166 167 nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader"); 168 if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) { 169 nxt_alert(task, "Python failed to get 'loop.remove_reader'"); 170 goto fail; 171 } 172 173 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) { 174 nxt_alert(task, "'loop.remove_reader' is not a callable object"); 175 goto fail; 176 } 177 178 nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon"); 179 if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) { 180 nxt_alert(task, "Python failed to get 'loop.call_soon'"); 181 goto fail; 182 } 183 184 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) { 185 nxt_alert(task, "'loop.call_soon' is not a callable object"); 186 goto fail; 187 } 188 189 nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop, 190 "run_until_complete"); 191 if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) { 192 nxt_alert(task, "Python failed to get 'loop.run_until_complete'"); 193 goto fail; 194 } 195 196 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) { 197 nxt_alert(task, "'loop.run_until_complete' is not a callable object"); 198 goto fail; 199 } 200 201 nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future"); 202 if (nxt_slow_path(nxt_py_loop_create_future == NULL)) { 203 nxt_alert(task, "Python failed to get 'loop.create_future'"); 204 goto fail; 205 } 206 207 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) { 208 nxt_alert(task, "'loop.create_future' is not a callable object"); 209 goto fail; 210 } 211 212 nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL); 213 if (nxt_slow_path(nxt_py_quit_future == NULL)) { 214 nxt_alert(task, "Python failed to create Future "); 215 nxt_python_print_exception(); 216 goto fail; 217 } 218 219 nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future, 220 "set_result"); 221 if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) { 222 nxt_alert(task, "Python failed to get 'future.set_result'"); 223 goto fail; 224 } 225 226 if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) { 227 nxt_alert(task, "'future.set_result' is not a callable object"); 228 goto fail; 229 } 230 231 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); 232 if (nxt_slow_path(nxt_py_port_read == NULL)) { 233 nxt_alert(task, "Python failed to initialize the 'port_read' function"); 234 goto fail; 235 } 236 237 nxt_queue_init(&nxt_py_asgi_drain_queue); 238 239 if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) { 240 goto fail; 241 } 242 243 if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) { 244 goto fail; 245 } 246 247 rc = nxt_py_asgi_lifespan_startup(task); 248 if (nxt_slow_path(rc == NXT_ERROR)) { 249 goto fail; 250 } 251 252 init->callbacks.request_handler = nxt_py_asgi_request_handler; 253 init->callbacks.data_handler = nxt_py_asgi_http_data_handler; 254 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; 255 init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler; 256 init->callbacks.quit = nxt_py_asgi_quit; 257 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; 258 init->callbacks.add_port = nxt_py_asgi_add_port; 259 init->callbacks.remove_port = nxt_py_asgi_remove_port; 260 261 Py_DECREF(loop); 262 Py_DECREF(asyncio); 263 264 return NXT_OK; 265 266 fail: 267 268 Py_XDECREF(loop); 269 Py_DECREF(asyncio); 270 271 return NXT_ERROR; 272 } 273 274 275 nxt_int_t 276 nxt_python_asgi_run(nxt_unit_ctx_t *ctx) 277 { 278 PyObject *res; 279 280 res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, 281 nxt_py_quit_future, NULL); 282 if (nxt_slow_path(res == NULL)) { 283 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); 284 nxt_python_print_exception(); 285 286 return NXT_ERROR; 287 } 288 289 Py_DECREF(res); 290 291 nxt_py_asgi_lifespan_shutdown(); 292 293 return NXT_OK; 294 } 295 296 297 static void 298 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) 299 { 300 PyObject *scope, *res, *task, *receive, *send, *done, *asgi; 301 302 if (req->request->websocket_handshake) { 303 asgi = nxt_py_asgi_websocket_create(req); 304 305 } else { 306 asgi = nxt_py_asgi_http_create(req); 307 } 308 309 if (nxt_slow_path(asgi == NULL)) { 310 nxt_unit_req_alert(req, "Python failed to create asgi object"); 311 nxt_unit_request_done(req, NXT_UNIT_ERROR); 312 313 return; 314 } 315 316 receive = PyObject_GetAttrString(asgi, "receive"); 317 if (nxt_slow_path(receive == NULL)) { 318 nxt_unit_req_alert(req, "Python failed to get 'receive' method"); 319 nxt_unit_request_done(req, NXT_UNIT_ERROR); 320 321 goto release_asgi; 322 } 323 324 send = PyObject_GetAttrString(asgi, "send"); 325 if (nxt_slow_path(receive == NULL)) { 326 nxt_unit_req_alert(req, "Python failed to get 'send' method"); 327 nxt_unit_request_done(req, NXT_UNIT_ERROR); 328 329 goto release_receive; 330 } 331 332 done = PyObject_GetAttrString(asgi, "_done"); 333 if (nxt_slow_path(receive == NULL)) { 334 nxt_unit_req_alert(req, "Python failed to get '_done' method"); 335 nxt_unit_request_done(req, NXT_UNIT_ERROR); 336 337 goto release_send; 338 } 339 340 scope = nxt_py_asgi_create_http_scope(req); 341 if (nxt_slow_path(scope == NULL)) { 342 nxt_unit_request_done(req, NXT_UNIT_ERROR); 343 344 goto release_done; 345 } 346 347 req->data = asgi; 348 349 res = PyObject_CallFunctionObjArgs(nxt_py_application, 350 scope, receive, send, NULL); 351 if (nxt_slow_path(res == NULL)) { 352 nxt_unit_req_error(req, "Python failed to call the application"); 353 nxt_python_print_exception(); 354 nxt_unit_request_done(req, NXT_UNIT_ERROR); 355 356 goto release_scope; 357 } 358 359 if (nxt_slow_path(!PyCoro_CheckExact(res))) { 360 nxt_unit_req_error(req, "Application result type is not a coroutine"); 361 nxt_unit_request_done(req, NXT_UNIT_ERROR); 362 363 Py_DECREF(res); 364 365 goto release_scope; 366 } 367 368 task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL); 369 if (nxt_slow_path(task == NULL)) { 370 nxt_unit_req_error(req, "Python failed to call the create_task"); 371 nxt_python_print_exception(); 372 nxt_unit_request_done(req, NXT_UNIT_ERROR); 373 374 Py_DECREF(res); 375 376 goto release_scope; 377 } 378 379 Py_DECREF(res); 380 381 res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, 382 NULL); 383 if (nxt_slow_path(res == NULL)) { 384 nxt_unit_req_error(req, 385 "Python failed to call 'task.add_done_callback'"); 386 nxt_python_print_exception(); 387 nxt_unit_request_done(req, NXT_UNIT_ERROR); 388 389 goto release_task; 390 } 391 392 Py_DECREF(res); 393 release_task: 394 Py_DECREF(task); 395 release_scope: 396 Py_DECREF(scope); 397 release_done: 398 Py_DECREF(done); 399 release_send: 400 Py_DECREF(send); 401 release_receive: 402 Py_DECREF(receive); 403 release_asgi: 404 Py_DECREF(asgi); 405 } 406 407 408 static PyObject * 409 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) 410 { 411 char *p, *target, *query; 412 uint32_t target_length, i; 413 PyObject *scope, *v, *type, *scheme; 414 PyObject *headers, *header; 415 nxt_unit_field_t *f; 416 nxt_unit_request_t *r; 417 418 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); 419 420 #define SET_ITEM(dict, key, value) \ 421 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 422 == -1)) \ 423 { \ 424 nxt_unit_req_alert(req, "Python failed to set '" \ 425 #dict "." #key "' item"); \ 426 goto fail; \ 427 } 428 429 v = NULL; 430 headers = NULL; 431 432 r = req->request; 433 434 if (r->websocket_handshake) { 435 type = nxt_py_websocket_str; 436 scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; 437 438 } else { 439 type = nxt_py_http_str; 440 scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; 441 } 442 443 scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); 444 if (nxt_slow_path(scope == NULL)) { 445 return NULL; 446 } 447 448 p = nxt_unit_sptr_get(&r->version); 449 SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str 450 : nxt_py_1_0_str) 451 SET_ITEM(scope, scheme, scheme) 452 453 v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), 454 r->method_length); 455 if (nxt_slow_path(v == NULL)) { 456 nxt_unit_req_alert(req, "Python failed to create 'method' string"); 457 goto fail; 458 } 459 460 SET_ITEM(scope, method, v) 461 Py_DECREF(v); 462 463 v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, 464 "replace"); 465 if (nxt_slow_path(v == NULL)) { 466 nxt_unit_req_alert(req, "Python failed to create 'path' string"); 467 goto fail; 468 } 469 470 SET_ITEM(scope, path, v) 471 Py_DECREF(v); 472 473 target = nxt_unit_sptr_get(&r->target); 474 query = nxt_unit_sptr_get(&r->query); 475 476 if (r->query.offset != 0) { 477 target_length = query - target - 1; 478 479 } else { 480 target_length = r->target_length; 481 } 482 483 v = PyBytes_FromStringAndSize(target, target_length); 484 if (nxt_slow_path(v == NULL)) { 485 nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); 486 goto fail; 487 } 488 489 SET_ITEM(scope, raw_path, v) 490 Py_DECREF(v); 491 492 v = PyBytes_FromStringAndSize(query, r->query_length); 493 if (nxt_slow_path(v == NULL)) { 494 nxt_unit_req_alert(req, "Python failed to create 'query' string"); 495 goto fail; 496 } 497 498 SET_ITEM(scope, query_string, v) 499 Py_DECREF(v); 500 501 v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); 502 if (nxt_slow_path(v == NULL)) { 503 nxt_unit_req_alert(req, "Python failed to create 'client' pair"); 504 goto fail; 505 } 506 507 SET_ITEM(scope, client, v) 508 Py_DECREF(v); 509 510 v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); 511 if (nxt_slow_path(v == NULL)) { 512 nxt_unit_req_alert(req, "Python failed to create 'server' pair"); 513 goto fail; 514 } 515 516 SET_ITEM(scope, server, v) 517 Py_DECREF(v); 518 519 v = NULL; 520 521 headers = PyTuple_New(r->fields_count); 522 if (nxt_slow_path(headers == NULL)) { 523 nxt_unit_req_alert(req, "Python failed to create 'headers' object"); 524 goto fail; 525 } 526 527 for (i = 0; i < r->fields_count; i++) { 528 f = r->fields + i; 529 530 header = nxt_py_asgi_create_header(f); 531 if (nxt_slow_path(header == NULL)) { 532 nxt_unit_req_alert(req, "Python failed to create 'header' pair"); 533 goto fail; 534 } 535 536 PyTuple_SET_ITEM(headers, i, header); 537 538 if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL 539 && f->name_length == ws_protocol.length 540 && f->value_length > 0 541 && r->websocket_handshake) 542 { 543 v = nxt_py_asgi_create_subprotocols(f); 544 if (nxt_slow_path(v == NULL)) { 545 nxt_unit_req_alert(req, "Failed to create subprotocols"); 546 goto fail; 547 } 548 549 SET_ITEM(scope, subprotocols, v); 550 Py_DECREF(v); 551 } 552 } 553 554 SET_ITEM(scope, headers, headers) 555 Py_DECREF(headers); 556 557 return scope; 558 559 fail: 560 561 Py_XDECREF(v); 562 Py_XDECREF(headers); 563 Py_DECREF(scope); 564 565 return NULL; 566 567 #undef SET_ITEM 568 } 569 570 571 static PyObject * 572 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) 573 { 574 char *p, *s; 575 PyObject *pair, *v; 576 577 pair = PyTuple_New(2); 578 if (nxt_slow_path(pair == NULL)) { 579 return NULL; 580 } 581 582 p = nxt_unit_sptr_get(sptr); 583 s = memchr(p, ':', len); 584 585 v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); 586 if (nxt_slow_path(v == NULL)) { 587 Py_DECREF(pair); 588 589 return NULL; 590 } 591 592 PyTuple_SET_ITEM(pair, 0, v); 593 594 if (s != NULL) { 595 p += len; 596 v = PyLong_FromString(s + 1, &p, 10); 597 598 } else { 599 v = PyLong_FromLong(port); 600 } 601 602 if (nxt_slow_path(v == NULL)) { 603 Py_DECREF(pair); 604 605 return NULL; 606 } 607 608 PyTuple_SET_ITEM(pair, 1, v); 609 610 return pair; 611 } 612 613 614 static PyObject * 615 nxt_py_asgi_create_header(nxt_unit_field_t *f) 616 { 617 char c, *name; 618 uint8_t pos; 619 PyObject *header, *v; 620 621 header = PyTuple_New(2); 622 if (nxt_slow_path(header == NULL)) { 623 return NULL; 624 } 625 626 name = nxt_unit_sptr_get(&f->name); 627 628 for (pos = 0; pos < f->name_length; pos++) { 629 c = name[pos]; 630 if (c >= 'A' && c <= 'Z') { 631 name[pos] = (c | 0x20); 632 } 633 } 634 635 v = PyBytes_FromStringAndSize(name, f->name_length); 636 if (nxt_slow_path(v == NULL)) { 637 Py_DECREF(header); 638 639 return NULL; 640 } 641 642 PyTuple_SET_ITEM(header, 0, v); 643 644 v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), 645 f->value_length); 646 if (nxt_slow_path(v == NULL)) { 647 Py_DECREF(header); 648 649 return NULL; 650 } 651 652 PyTuple_SET_ITEM(header, 1, v); 653 654 return header; 655 } 656 657 658 static PyObject * 659 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) 660 { 661 char *v; 662 uint32_t i, n, start; 663 PyObject *res, *proto; 664 665 v = nxt_unit_sptr_get(&f->value); 666 n = 1; 667 668 for (i = 0; i < f->value_length; i++) { 669 if (v[i] == ',') { 670 n++; 671 } 672 } 673 674 res = PyTuple_New(n); 675 if (nxt_slow_path(res == NULL)) { 676 return NULL; 677 } 678 679 n = 0; 680 start = 0; 681 682 for (i = 0; i < f->value_length; ) { 683 if (v[i] != ',') { 684 i++; 685 686 continue; 687 } 688 689 if (i - start > 0) { 690 proto = PyString_FromStringAndSize(v + start, i - start); 691 if (nxt_slow_path(proto == NULL)) { 692 goto fail; 693 } 694 695 PyTuple_SET_ITEM(res, n, proto); 696 697 n++; 698 } 699 700 do { 701 i++; 702 } while (i < f->value_length && v[i] == ' '); 703 704 start = i; 705 } 706 707 if (i - start > 0) { 708 proto = PyString_FromStringAndSize(v + start, i - start); 709 if (nxt_slow_path(proto == NULL)) { 710 goto fail; 711 } 712 713 PyTuple_SET_ITEM(res, n, proto); 714 } 715 716 return res; 717 718 fail: 719 720 Py_DECREF(res); 721 722 return NULL; 723 } 724 725 726 static int 727 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 728 { 729 int nb; 730 PyObject *res; 731 732 if (port->in_fd == -1) { 733 return NXT_UNIT_OK; 734 } 735 736 nb = 1; 737 738 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { 739 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 740 port->in_fd, strerror(errno), errno); 741 742 return NXT_UNIT_ERROR; 743 } 744 745 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); 746 747 res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader, 748 PyLong_FromLong(port->in_fd), 749 nxt_py_port_read, 750 PyLong_FromVoidPtr(ctx), 751 PyLong_FromVoidPtr(port), NULL); 752 if (nxt_slow_path(res == NULL)) { 753 nxt_unit_alert(ctx, "Python failed to add_reader"); 754 755 return NXT_UNIT_ERROR; 756 } 757 758 Py_DECREF(res); 759 760 return NXT_UNIT_OK; 761 } 762 763 764 static void 765 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) 766 { 767 PyObject *res; 768 769 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); 770 771 if (port->in_fd == -1) { 772 return; 773 } 774 775 res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader, 776 PyLong_FromLong(port->in_fd), NULL); 777 if (nxt_slow_path(res == NULL)) { 778 nxt_unit_alert(NULL, "Python failed to remove_reader"); 779 } 780 781 Py_DECREF(res); 782 } 783 784 785 static void 786 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) 787 { 788 PyObject *res; 789 790 nxt_unit_debug(ctx, "asgi_quit %p", ctx); 791 792 res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result, 793 PyLong_FromLong(0), NULL); 794 if (nxt_slow_path(res == NULL)) { 795 nxt_unit_alert(ctx, "Python failed to set_result"); 796 } 797 798 Py_DECREF(res); 799 } 800 801 802 static void 803 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) 804 { 805 int rc; 806 nxt_queue_link_t *lnk; 807 808 while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) { 809 lnk = nxt_queue_first(&nxt_py_asgi_drain_queue); 810 811 rc = nxt_py_asgi_http_drain(lnk); 812 if (rc == NXT_UNIT_AGAIN) { 813 break; 814 } 815 816 nxt_queue_remove(lnk); 817 } 818 } 819 820 821 static PyObject * 822 nxt_py_asgi_port_read(PyObject *self, PyObject *args) 823 { 824 int rc; 825 PyObject *arg; 826 Py_ssize_t n; 827 nxt_unit_ctx_t *ctx; 828 nxt_unit_port_t *port; 829 830 n = PyTuple_GET_SIZE(args); 831 832 if (n != 2) { 833 nxt_unit_alert(NULL, 834 "nxt_py_asgi_port_read: invalid number of arguments %d", 835 (int) n); 836 837 return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); 838 } 839 840 arg = PyTuple_GET_ITEM(args, 0); 841 if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { 842 return PyErr_Format(PyExc_TypeError, 843 "the first argument is not a long"); 844 } 845 846 ctx = PyLong_AsVoidPtr(arg); 847 848 arg = PyTuple_GET_ITEM(args, 1); 849 if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { 850 return PyErr_Format(PyExc_TypeError, 851 "the second argument is not a long"); 852 } 853 854 port = PyLong_AsVoidPtr(arg); 855 856 nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port); 857 858 rc = nxt_unit_process_port_msg(ctx, port); 859 860 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 861 return PyErr_Format(PyExc_RuntimeError, 862 "error processing port message"); 863 } 864 865 Py_RETURN_NONE; 866 } 867 868 869 PyObject * 870 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, 871 void *data) 872 { 873 int i; 874 PyObject *iter, *header, *h_iter, *name, *val, *res; 875 876 iter = PyObject_GetIter(headers); 877 if (nxt_slow_path(iter == NULL)) { 878 return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); 879 } 880 881 for (i = 0; /* void */; i++) { 882 header = PyIter_Next(iter); 883 if (header == NULL) { 884 break; 885 } 886 887 h_iter = PyObject_GetIter(header); 888 if (nxt_slow_path(h_iter == NULL)) { 889 Py_DECREF(header); 890 Py_DECREF(iter); 891 892 return PyErr_Format(PyExc_TypeError, 893 "'headers' item #%d is not an iterable", i); 894 } 895 896 name = PyIter_Next(h_iter); 897 if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { 898 Py_XDECREF(name); 899 Py_DECREF(h_iter); 900 Py_DECREF(header); 901 Py_DECREF(iter); 902 903 return PyErr_Format(PyExc_TypeError, 904 "'headers' item #%d 'name' is not a byte string", i); 905 } 906 907 val = PyIter_Next(h_iter); 908 if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { 909 Py_XDECREF(val); 910 Py_DECREF(h_iter); 911 Py_DECREF(header); 912 Py_DECREF(iter); 913 914 return PyErr_Format(PyExc_TypeError, 915 "'headers' item #%d 'value' is not a byte string", i); 916 } 917 918 res = cb(data, i, name, val); 919 920 Py_DECREF(name); 921 Py_DECREF(val); 922 Py_DECREF(h_iter); 923 Py_DECREF(header); 924 925 if (nxt_slow_path(res == NULL)) { 926 Py_DECREF(iter); 927 928 return NULL; 929 } 930 931 Py_DECREF(res); 932 } 933 934 Py_DECREF(iter); 935 936 Py_RETURN_NONE; 937 } 938 939 940 PyObject * 941 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) 942 { 943 nxt_py_asgi_calc_size_ctx_t *ctx; 944 945 ctx = data; 946 947 ctx->fields_count++; 948 ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); 949 950 Py_RETURN_NONE; 951 } 952 953 954 PyObject * 955 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) 956 { 957 int rc; 958 char *name_str, *val_str; 959 uint32_t name_len, val_len; 960 nxt_off_t content_length; 961 nxt_unit_request_info_t *req; 962 nxt_py_asgi_add_field_ctx_t *ctx; 963 964 name_str = PyBytes_AS_STRING(name); 965 name_len = PyBytes_GET_SIZE(name); 966 967 val_str = PyBytes_AS_STRING(val); 968 val_len = PyBytes_GET_SIZE(val); 969 970 ctx = data; 971 req = ctx->req; 972 973 rc = nxt_unit_response_add_field(req, name_str, name_len, 974 val_str, val_len); 975 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 976 return PyErr_Format(PyExc_RuntimeError, 977 "failed to add header #%d", i); 978 } 979 980 if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { 981 content_length = nxt_off_t_parse((u_char *) val_str, val_len); 982 if (nxt_slow_path(content_length < 0)) { 983 nxt_unit_req_error(req, "failed to parse Content-Length " 984 "value %.*s", (int) val_len, val_str); 985 986 return PyErr_Format(PyExc_ValueError, 987 "Failed to parse Content-Length: '%.*s'", 988 (int) val_len, val_str); 989 } 990 991 ctx->content_length = content_length; 992 } 993 994 Py_RETURN_NONE; 995 } 996 997 998 PyObject * 999 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, 1000 PyObject *result) 1001 { 1002 PyObject *set_result, *res; 1003 1004 if (nxt_slow_path(result == NULL)) { 1005 Py_DECREF(future); 1006 1007 return NULL; 1008 } 1009 1010 set_result = PyObject_GetAttrString(future, "set_result"); 1011 if (nxt_slow_path(set_result == NULL)) { 1012 nxt_unit_req_alert(req, "failed to get 'set_result' for future"); 1013 1014 Py_CLEAR(future); 1015 1016 goto cleanup; 1017 } 1018 1019 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { 1020 nxt_unit_req_alert(req, "'future.set_result' is not a callable"); 1021 1022 Py_CLEAR(future); 1023 1024 goto cleanup; 1025 } 1026 1027 res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result, 1028 result, NULL); 1029 if (nxt_slow_path(res == NULL)) { 1030 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); 1031 nxt_python_print_exception(); 1032 1033 Py_CLEAR(future); 1034 } 1035 1036 Py_XDECREF(res); 1037 1038 cleanup: 1039 1040 Py_DECREF(set_result); 1041 Py_DECREF(result); 1042 1043 return future; 1044 } 1045 1046 1047 PyObject * 1048 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) 1049 { 1050 PyObject *msg; 1051 1052 msg = PyDict_New(); 1053 if (nxt_slow_path(msg == NULL)) { 1054 nxt_unit_req_alert(req, "Python failed to create message dict"); 1055 nxt_python_print_exception(); 1056 1057 return PyErr_Format(PyExc_RuntimeError, 1058 "failed to create message dict"); 1059 } 1060 1061 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { 1062 nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); 1063 1064 Py_DECREF(msg); 1065 1066 return PyErr_Format(PyExc_RuntimeError, 1067 "failed to set 'msg.type' item"); 1068 } 1069 1070 return msg; 1071 } 1072 1073 1074 PyObject * 1075 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, 1076 PyObject *spec_version) 1077 { 1078 PyObject *scope, *asgi; 1079 1080 scope = PyDict_New(); 1081 if (nxt_slow_path(scope == NULL)) { 1082 nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); 1083 nxt_python_print_exception(); 1084 1085 return PyErr_Format(PyExc_RuntimeError, 1086 "failed to create 'scope' dict"); 1087 } 1088 1089 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { 1090 nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); 1091 1092 Py_DECREF(scope); 1093 1094 return PyErr_Format(PyExc_RuntimeError, 1095 "failed to set 'scope.type' item"); 1096 } 1097 1098 asgi = PyDict_New(); 1099 if (nxt_slow_path(asgi == NULL)) { 1100 nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); 1101 nxt_python_print_exception(); 1102 1103 Py_DECREF(scope); 1104 1105 return PyErr_Format(PyExc_RuntimeError, 1106 "failed to create 'asgi' dict"); 1107 } 1108 1109 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { 1110 nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); 1111 1112 Py_DECREF(asgi); 1113 Py_DECREF(scope); 1114 1115 return PyErr_Format(PyExc_RuntimeError, 1116 "failed to set 'scope.asgi' item"); 1117 } 1118 1119 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, 1120 nxt_py_3_0_str) == -1)) 1121 { 1122 nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); 1123 1124 Py_DECREF(asgi); 1125 Py_DECREF(scope); 1126 1127 return PyErr_Format(PyExc_RuntimeError, 1128 "failed to set 'asgi.version' item"); 1129 } 1130 1131 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, 1132 spec_version) == -1)) 1133 { 1134 nxt_unit_req_alert(req, 1135 "Python failed to set 'asgi.spec_version' item"); 1136 1137 Py_DECREF(asgi); 1138 Py_DECREF(scope); 1139 1140 return PyErr_Format(PyExc_RuntimeError, 1141 "failed to set 'asgi.spec_version' item"); 1142 } 1143 1144 Py_DECREF(asgi); 1145 1146 return scope; 1147 } 1148 1149 1150 void 1151 nxt_py_asgi_dealloc(PyObject *self) 1152 { 1153 PyObject_Del(self); 1154 } 1155 1156 1157 PyObject * 1158 nxt_py_asgi_await(PyObject *self) 1159 { 1160 Py_INCREF(self); 1161 return self; 1162 } 1163 1164 1165 PyObject * 1166 nxt_py_asgi_iter(PyObject *self) 1167 { 1168 Py_INCREF(self); 1169 return self; 1170 } 1171 1172 1173 PyObject * 1174 nxt_py_asgi_next(PyObject *self) 1175 { 1176 return NULL; 1177 } 1178 1179 1180 void 1181 nxt_python_asgi_done(void) 1182 { 1183 nxt_py_asgi_str_done(); 1184 1185 Py_XDECREF(nxt_py_quit_future); 1186 Py_XDECREF(nxt_py_quit_future_set_result); 1187 Py_XDECREF(nxt_py_loop_run_until_complete); 1188 Py_XDECREF(nxt_py_loop_create_future); 1189 Py_XDECREF(nxt_py_loop_create_task); 1190 Py_XDECREF(nxt_py_loop_call_soon); 1191 Py_XDECREF(nxt_py_loop_add_reader); 1192 Py_XDECREF(nxt_py_loop_remove_reader); 1193 Py_XDECREF(nxt_py_port_read); 1194 } 1195 1196 #else /* !(NXT_HAVE_ASGI) */ 1197 1198 1199 int 1200 nxt_python_asgi_check(PyObject *obj) 1201 { 1202 return 0; 1203 } 1204 1205 1206 nxt_int_t 1207 nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) 1208 { 1209 nxt_alert(task, "ASGI not implemented"); 1210 return NXT_ERROR; 1211 } 1212 1213 1214 nxt_int_t 1215 nxt_python_asgi_run(nxt_unit_ctx_t *ctx) 1216 { 1217 nxt_unit_alert(ctx, "ASGI not implemented"); 1218 return NXT_ERROR; 1219 } 1220 1221 1222 void 1223 nxt_python_asgi_done(void) 1224 { 1225 } 1226 1227 #endif /* NXT_HAVE_ASGI */ 1228