1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7 #include <python/nxt_python.h> 8 9 #if (NXT_HAVE_ASGI) 10 11 #include <nxt_main.h> 12 #include <nxt_unit.h> 13 #include <nxt_unit_request.h> 14 #include <nxt_unit_response.h> 15 #include <python/nxt_python_asgi.h> 16 #include <python/nxt_python_asgi_str.h> 17 18 19 static int nxt_python_asgi_ctx_data_alloc(void **pdata); 20 static void nxt_python_asgi_ctx_data_free(void *data); 21 static int nxt_python_asgi_startup(void *data); 22 static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx); 23 24 static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, 25 nxt_unit_port_t *port); 26 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); 27 28 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); 29 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, 30 uint16_t port); 31 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); 32 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); 33 34 static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx); 35 36 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 37 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); 38 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); 39 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); 40 41 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); 42 static void nxt_python_asgi_done(void); 43 44 45 static PyObject *nxt_py_port_read; 46 static nxt_unit_port_t *nxt_py_shared_port; 47 48 static PyMethodDef nxt_py_port_read_method = 49 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; 50 51 static nxt_python_proto_t nxt_py_asgi_proto = { 52 .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc, 53 .ctx_data_free = nxt_python_asgi_ctx_data_free, 54 .startup = nxt_python_asgi_startup, 55 .run = nxt_python_asgi_run, 56 .ready = nxt_python_asgi_ready, 57 .done = nxt_python_asgi_done, 58 }; 59 60 #define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A 61 62 63 int 64 nxt_python_asgi_check(PyObject *obj) 65 { 66 int res; 67 PyObject *call; 68 PyCodeObject *code; 69 70 if (PyFunction_Check(obj)) { 71 code = (PyCodeObject *) PyFunction_GET_CODE(obj); 72 73 return (code->co_flags & CO_COROUTINE) != 0; 74 } 75 76 if (PyMethod_Check(obj)) { 77 obj = PyMethod_GET_FUNCTION(obj); 78 79 code = (PyCodeObject *) PyFunction_GET_CODE(obj); 80 81 return (code->co_flags & CO_COROUTINE) != 0; 82 } 83 84 call = PyObject_GetAttrString(obj, "__call__"); 85 86 if (call == NULL) { 87 return 0; 88 } 89 90 if (PyFunction_Check(call)) { 91 code = (PyCodeObject *) PyFunction_GET_CODE(call); 92 93 res = (code->co_flags & CO_COROUTINE) != 0; 94 95 } else { 96 if (PyMethod_Check(call)) { 97 obj = PyMethod_GET_FUNCTION(call); 98 99 code = (PyCodeObject *) PyFunction_GET_CODE(obj); 100 101 res = (code->co_flags & CO_COROUTINE) != 0; 102 103 } else { 104 res = 0; 105 } 106 } 107 108 Py_DECREF(call); 109 110 return res; 111 } 112 113 114 int 115 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 116 { 117 nxt_unit_debug(NULL, "asgi_init"); 118 119 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) { 120 nxt_unit_alert(NULL, "Python failed to init string objects"); 121 return NXT_UNIT_ERROR; 122 } 123 124 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); 125 if (nxt_slow_path(nxt_py_port_read == NULL)) { 126 nxt_unit_alert(NULL, 127 "Python failed to initialize the 'port_read' function"); 128 return NXT_UNIT_ERROR; 129 } 130 131 if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) { 132 return NXT_UNIT_ERROR; 133 } 134 135 if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) { 136 return NXT_UNIT_ERROR; 137 } 138 139 init->callbacks.request_handler = nxt_py_asgi_request_handler; 140 init->callbacks.data_handler = nxt_py_asgi_http_data_handler; 141 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; 142 init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler; 143 init->callbacks.quit = nxt_py_asgi_quit; 144 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; 145 init->callbacks.add_port = nxt_py_asgi_add_port; 146 init->callbacks.remove_port = nxt_py_asgi_remove_port; 147 148 *proto = nxt_py_asgi_proto; 149 150 return NXT_UNIT_OK; 151 } 152 153 154 static int 155 nxt_python_asgi_ctx_data_alloc(void **pdata) 156 { 157 uint32_t i; 158 PyObject *asyncio, *loop, *new_event_loop, *obj; 159 nxt_py_asgi_ctx_data_t *ctx_data; 160 161 ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t)); 162 if (nxt_slow_path(ctx_data == NULL)) { 163 nxt_unit_alert(NULL, "Failed to allocate context data"); 164 return NXT_UNIT_ERROR; 165 } 166 167 memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t)); 168 169 nxt_queue_init(&ctx_data->drain_queue); 170 171 struct { 172 const char *key; 173 PyObject **handler; 174 175 } handlers[] = { 176 { "create_task", &ctx_data->loop_create_task }, 177 { "add_reader", &ctx_data->loop_add_reader }, 178 { "remove_reader", &ctx_data->loop_remove_reader }, 179 { "call_soon", &ctx_data->loop_call_soon }, 180 { "run_until_complete", &ctx_data->loop_run_until_complete }, 181 { "create_future", &ctx_data->loop_create_future }, 182 }; 183 184 loop = NULL; 185 186 asyncio = PyImport_ImportModule("asyncio"); 187 if (nxt_slow_path(asyncio == NULL)) { 188 nxt_unit_alert(NULL, "Python failed to import module 'asyncio'"); 189 nxt_python_print_exception(); 190 goto fail; 191 } 192 193 new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), 194 "new_event_loop"); 195 if (nxt_slow_path(new_event_loop == NULL)) { 196 nxt_unit_alert(NULL, 197 "Python failed to get 'new_event_loop' from module 'asyncio'"); 198 goto fail; 199 } 200 201 if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) { 202 nxt_unit_alert(NULL, 203 "'asyncio.new_event_loop' is not a callable object"); 204 goto fail; 205 } 206 207 loop = PyObject_CallObject(new_event_loop, NULL); 208 if (nxt_slow_path(loop == NULL)) { 209 nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'"); 210 goto fail; 211 } 212 213 for (i = 0; i < nxt_nitems(handlers); i++) { 214 obj = PyObject_GetAttrString(loop, handlers[i].key); 215 if (nxt_slow_path(obj == NULL)) { 216 nxt_unit_alert(NULL, "Python failed to get 'loop.%s'", 217 handlers[i].key); 218 goto fail; 219 } 220 221 *handlers[i].handler = obj; 222 223 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 224 nxt_unit_alert(NULL, "'loop.%s' is not a callable object", 225 handlers[i].key); 226 goto fail; 227 } 228 } 229 230 obj = PyObject_CallObject(ctx_data->loop_create_future, NULL); 231 if (nxt_slow_path(obj == NULL)) { 232 nxt_unit_alert(NULL, "Python failed to create Future "); 233 nxt_python_print_exception(); 234 goto fail; 235 } 236 237 ctx_data->quit_future = obj; 238 239 obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result"); 240 if (nxt_slow_path(obj == NULL)) { 241 nxt_unit_alert(NULL, "Python failed to get 'future.set_result'"); 242 goto fail; 243 } 244 245 ctx_data->quit_future_set_result = obj; 246 247 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 248 nxt_unit_alert(NULL, "'future.set_result' is not a callable object"); 249 goto fail; 250 } 251 252 Py_DECREF(loop); 253 Py_DECREF(asyncio); 254 255 *pdata = ctx_data; 256 257 return NXT_UNIT_OK; 258 259 fail: 260 261 nxt_python_asgi_ctx_data_free(ctx_data); 262 263 Py_XDECREF(loop); 264 Py_XDECREF(asyncio); 265 266 return NXT_UNIT_ERROR; 267 } 268 269 270 static void 271 nxt_python_asgi_ctx_data_free(void *data) 272 { 273 nxt_py_asgi_ctx_data_t *ctx_data; 274 275 ctx_data = data; 276 277 Py_XDECREF(ctx_data->loop_run_until_complete); 278 Py_XDECREF(ctx_data->loop_create_future); 279 Py_XDECREF(ctx_data->loop_create_task); 280 Py_XDECREF(ctx_data->loop_call_soon); 281 Py_XDECREF(ctx_data->loop_add_reader); 282 Py_XDECREF(ctx_data->loop_remove_reader); 283 Py_XDECREF(ctx_data->quit_future); 284 Py_XDECREF(ctx_data->quit_future_set_result); 285 286 nxt_unit_free(NULL, ctx_data); 287 } 288 289 290 static int 291 nxt_python_asgi_startup(void *data) 292 { 293 return nxt_py_asgi_lifespan_startup(data); 294 } 295 296 297 static int 298 nxt_python_asgi_run(nxt_unit_ctx_t *ctx) 299 { 300 PyObject *res; 301 nxt_py_asgi_ctx_data_t *ctx_data; 302 303 ctx_data = ctx->data; 304 305 res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, 306 ctx_data->quit_future, NULL); 307 if (nxt_slow_path(res == NULL)) { 308 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); 309 nxt_python_print_exception(); 310 311 return NXT_UNIT_ERROR; 312 } 313 314 Py_DECREF(res); 315 316 nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port); 317 nxt_py_asgi_remove_reader(ctx, ctx_data->port); 318 319 if (ctx_data->port != NULL) { 320 ctx_data->port->data = NULL; 321 ctx_data->port = NULL; 322 } 323 324 nxt_py_asgi_lifespan_shutdown(ctx); 325 326 return NXT_UNIT_OK; 327 } 328 329 330 static void 331 nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 332 { 333 PyObject *res; 334 nxt_py_asgi_ctx_data_t *ctx_data; 335 336 if (port == NULL || port->in_fd == -1) { 337 return; 338 } 339 340 ctx_data = ctx->data; 341 342 nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port); 343 344 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, 345 PyLong_FromLong(port->in_fd), NULL); 346 if (nxt_slow_path(res == NULL)) { 347 nxt_unit_alert(ctx, "Python failed to remove_reader"); 348 nxt_python_print_exception(); 349 350 return; 351 } 352 353 Py_DECREF(res); 354 } 355 356 357 static void 358 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) 359 { 360 PyObject *scope, *res, *task, *receive, *send, *done, *asgi; 361 nxt_py_asgi_ctx_data_t *ctx_data; 362 363 if (req->request->websocket_handshake) { 364 asgi = nxt_py_asgi_websocket_create(req); 365 366 } else { 367 asgi = nxt_py_asgi_http_create(req); 368 } 369 370 if (nxt_slow_path(asgi == NULL)) { 371 nxt_unit_req_alert(req, "Python failed to create asgi object"); 372 nxt_unit_request_done(req, NXT_UNIT_ERROR); 373 374 return; 375 } 376 377 receive = PyObject_GetAttrString(asgi, "receive"); 378 if (nxt_slow_path(receive == NULL)) { 379 nxt_unit_req_alert(req, "Python failed to get 'receive' method"); 380 nxt_unit_request_done(req, NXT_UNIT_ERROR); 381 382 goto release_asgi; 383 } 384 385 send = PyObject_GetAttrString(asgi, "send"); 386 if (nxt_slow_path(receive == NULL)) { 387 nxt_unit_req_alert(req, "Python failed to get 'send' method"); 388 nxt_unit_request_done(req, NXT_UNIT_ERROR); 389 390 goto release_receive; 391 } 392 393 done = PyObject_GetAttrString(asgi, "_done"); 394 if (nxt_slow_path(receive == NULL)) { 395 nxt_unit_req_alert(req, "Python failed to get '_done' method"); 396 nxt_unit_request_done(req, NXT_UNIT_ERROR); 397 398 goto release_send; 399 } 400 401 scope = nxt_py_asgi_create_http_scope(req); 402 if (nxt_slow_path(scope == NULL)) { 403 nxt_unit_request_done(req, NXT_UNIT_ERROR); 404 405 goto release_done; 406 } 407 408 req->data = asgi; 409 410 res = PyObject_CallFunctionObjArgs(nxt_py_application, 411 scope, receive, send, NULL); 412 if (nxt_slow_path(res == NULL)) { 413 nxt_unit_req_error(req, "Python failed to call the application"); 414 nxt_python_print_exception(); 415 nxt_unit_request_done(req, NXT_UNIT_ERROR); 416 417 goto release_scope; 418 } 419 420 if (nxt_slow_path(!PyCoro_CheckExact(res))) { 421 nxt_unit_req_error(req, "Application result type is not a coroutine"); 422 nxt_unit_request_done(req, NXT_UNIT_ERROR); 423 424 Py_DECREF(res); 425 426 goto release_scope; 427 } 428 429 ctx_data = req->ctx->data; 430 431 task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); 432 if (nxt_slow_path(task == NULL)) { 433 nxt_unit_req_error(req, "Python failed to call the create_task"); 434 nxt_python_print_exception(); 435 nxt_unit_request_done(req, NXT_UNIT_ERROR); 436 437 Py_DECREF(res); 438 439 goto release_scope; 440 } 441 442 Py_DECREF(res); 443 444 res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, 445 NULL); 446 if (nxt_slow_path(res == NULL)) { 447 nxt_unit_req_error(req, 448 "Python failed to call 'task.add_done_callback'"); 449 nxt_python_print_exception(); 450 nxt_unit_request_done(req, NXT_UNIT_ERROR); 451 452 goto release_task; 453 } 454 455 Py_DECREF(res); 456 release_task: 457 Py_DECREF(task); 458 release_scope: 459 Py_DECREF(scope); 460 release_done: 461 Py_DECREF(done); 462 release_send: 463 Py_DECREF(send); 464 release_receive: 465 Py_DECREF(receive); 466 release_asgi: 467 Py_DECREF(asgi); 468 } 469 470 471 static PyObject * 472 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) 473 { 474 char *p, *target, *query; 475 uint32_t target_length, i; 476 PyObject *scope, *v, *type, *scheme; 477 PyObject *headers, *header; 478 nxt_unit_field_t *f; 479 nxt_unit_request_t *r; 480 481 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); 482 483 #define SET_ITEM(dict, key, value) \ 484 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 485 == -1)) \ 486 { \ 487 nxt_unit_req_alert(req, "Python failed to set '" \ 488 #dict "." #key "' item"); \ 489 goto fail; \ 490 } 491 492 v = NULL; 493 headers = NULL; 494 495 r = req->request; 496 497 if (r->websocket_handshake) { 498 type = nxt_py_websocket_str; 499 scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; 500 501 } else { 502 type = nxt_py_http_str; 503 scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; 504 } 505 506 scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); 507 if (nxt_slow_path(scope == NULL)) { 508 return NULL; 509 } 510 511 p = nxt_unit_sptr_get(&r->version); 512 SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str 513 : nxt_py_1_0_str) 514 SET_ITEM(scope, scheme, scheme) 515 516 v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), 517 r->method_length); 518 if (nxt_slow_path(v == NULL)) { 519 nxt_unit_req_alert(req, "Python failed to create 'method' string"); 520 goto fail; 521 } 522 523 SET_ITEM(scope, method, v) 524 Py_DECREF(v); 525 526 v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, 527 "replace"); 528 if (nxt_slow_path(v == NULL)) { 529 nxt_unit_req_alert(req, "Python failed to create 'path' string"); 530 goto fail; 531 } 532 533 SET_ITEM(scope, path, v) 534 Py_DECREF(v); 535 536 target = nxt_unit_sptr_get(&r->target); 537 query = nxt_unit_sptr_get(&r->query); 538 539 if (r->query.offset != 0) { 540 target_length = query - target - 1; 541 542 } else { 543 target_length = r->target_length; 544 } 545 546 v = PyBytes_FromStringAndSize(target, target_length); 547 if (nxt_slow_path(v == NULL)) { 548 nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); 549 goto fail; 550 } 551 552 SET_ITEM(scope, raw_path, v) 553 Py_DECREF(v); 554 555 v = PyBytes_FromStringAndSize(query, r->query_length); 556 if (nxt_slow_path(v == NULL)) { 557 nxt_unit_req_alert(req, "Python failed to create 'query' string"); 558 goto fail; 559 } 560 561 SET_ITEM(scope, query_string, v) 562 Py_DECREF(v); 563 564 v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); 565 if (nxt_slow_path(v == NULL)) { 566 nxt_unit_req_alert(req, "Python failed to create 'client' pair"); 567 goto fail; 568 } 569 570 SET_ITEM(scope, client, v) 571 Py_DECREF(v); 572 573 v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); 574 if (nxt_slow_path(v == NULL)) { 575 nxt_unit_req_alert(req, "Python failed to create 'server' pair"); 576 goto fail; 577 } 578 579 SET_ITEM(scope, server, v) 580 Py_DECREF(v); 581 582 v = NULL; 583 584 headers = PyTuple_New(r->fields_count); 585 if (nxt_slow_path(headers == NULL)) { 586 nxt_unit_req_alert(req, "Python failed to create 'headers' object"); 587 goto fail; 588 } 589 590 for (i = 0; i < r->fields_count; i++) { 591 f = r->fields + i; 592 593 header = nxt_py_asgi_create_header(f); 594 if (nxt_slow_path(header == NULL)) { 595 nxt_unit_req_alert(req, "Python failed to create 'header' pair"); 596 goto fail; 597 } 598 599 PyTuple_SET_ITEM(headers, i, header); 600 601 if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL 602 && f->name_length == ws_protocol.length 603 && f->value_length > 0 604 && r->websocket_handshake) 605 { 606 v = nxt_py_asgi_create_subprotocols(f); 607 if (nxt_slow_path(v == NULL)) { 608 nxt_unit_req_alert(req, "Failed to create subprotocols"); 609 goto fail; 610 } 611 612 SET_ITEM(scope, subprotocols, v); 613 Py_DECREF(v); 614 } 615 } 616 617 SET_ITEM(scope, headers, headers) 618 Py_DECREF(headers); 619 620 return scope; 621 622 fail: 623 624 Py_XDECREF(v); 625 Py_XDECREF(headers); 626 Py_DECREF(scope); 627 628 return NULL; 629 630 #undef SET_ITEM 631 } 632 633 634 static PyObject * 635 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) 636 { 637 char *p, *s; 638 PyObject *pair, *v; 639 640 pair = PyTuple_New(2); 641 if (nxt_slow_path(pair == NULL)) { 642 return NULL; 643 } 644 645 p = nxt_unit_sptr_get(sptr); 646 s = memchr(p, ':', len); 647 648 v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); 649 if (nxt_slow_path(v == NULL)) { 650 Py_DECREF(pair); 651 652 return NULL; 653 } 654 655 PyTuple_SET_ITEM(pair, 0, v); 656 657 if (s != NULL) { 658 p += len; 659 v = PyLong_FromString(s + 1, &p, 10); 660 661 } else { 662 v = PyLong_FromLong(port); 663 } 664 665 if (nxt_slow_path(v == NULL)) { 666 Py_DECREF(pair); 667 668 return NULL; 669 } 670 671 PyTuple_SET_ITEM(pair, 1, v); 672 673 return pair; 674 } 675 676 677 static PyObject * 678 nxt_py_asgi_create_header(nxt_unit_field_t *f) 679 { 680 char c, *name; 681 uint8_t pos; 682 PyObject *header, *v; 683 684 header = PyTuple_New(2); 685 if (nxt_slow_path(header == NULL)) { 686 return NULL; 687 } 688 689 name = nxt_unit_sptr_get(&f->name); 690 691 for (pos = 0; pos < f->name_length; pos++) { 692 c = name[pos]; 693 if (c >= 'A' && c <= 'Z') { 694 name[pos] = (c | 0x20); 695 } 696 } 697 698 v = PyBytes_FromStringAndSize(name, f->name_length); 699 if (nxt_slow_path(v == NULL)) { 700 Py_DECREF(header); 701 702 return NULL; 703 } 704 705 PyTuple_SET_ITEM(header, 0, v); 706 707 v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), 708 f->value_length); 709 if (nxt_slow_path(v == NULL)) { 710 Py_DECREF(header); 711 712 return NULL; 713 } 714 715 PyTuple_SET_ITEM(header, 1, v); 716 717 return header; 718 } 719 720 721 static PyObject * 722 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) 723 { 724 char *v; 725 uint32_t i, n, start; 726 PyObject *res, *proto; 727 728 v = nxt_unit_sptr_get(&f->value); 729 n = 1; 730 731 for (i = 0; i < f->value_length; i++) { 732 if (v[i] == ',') { 733 n++; 734 } 735 } 736 737 res = PyTuple_New(n); 738 if (nxt_slow_path(res == NULL)) { 739 return NULL; 740 } 741 742 n = 0; 743 start = 0; 744 745 for (i = 0; i < f->value_length; ) { 746 if (v[i] != ',') { 747 i++; 748 749 continue; 750 } 751 752 if (i - start > 0) { 753 proto = PyString_FromStringAndSize(v + start, i - start); 754 if (nxt_slow_path(proto == NULL)) { 755 goto fail; 756 } 757 758 PyTuple_SET_ITEM(res, n, proto); 759 760 n++; 761 } 762 763 do { 764 i++; 765 } while (i < f->value_length && v[i] == ' '); 766 767 start = i; 768 } 769 770 if (i - start > 0) { 771 proto = PyString_FromStringAndSize(v + start, i - start); 772 if (nxt_slow_path(proto == NULL)) { 773 goto fail; 774 } 775 776 PyTuple_SET_ITEM(res, n, proto); 777 } 778 779 return res; 780 781 fail: 782 783 Py_DECREF(res); 784 785 return NULL; 786 } 787 788 789 static int 790 nxt_python_asgi_ready(nxt_unit_ctx_t *ctx) 791 { 792 PyObject *res; 793 nxt_unit_port_t *port; 794 nxt_py_asgi_ctx_data_t *ctx_data; 795 796 if (nxt_slow_path(nxt_py_shared_port == NULL)) { 797 return NXT_UNIT_ERROR; 798 } 799 800 port = nxt_py_shared_port; 801 802 nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port); 803 804 ctx_data = ctx->data; 805 806 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 807 PyLong_FromLong(port->in_fd), 808 nxt_py_port_read, 809 PyLong_FromVoidPtr(ctx), 810 PyLong_FromVoidPtr(port), NULL); 811 if (nxt_slow_path(res == NULL)) { 812 nxt_unit_alert(ctx, "Python failed to add_reader"); 813 nxt_python_print_exception(); 814 815 return NXT_UNIT_ERROR; 816 } 817 818 Py_DECREF(res); 819 820 return NXT_UNIT_OK; 821 } 822 823 824 static int 825 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 826 { 827 int nb; 828 PyObject *res; 829 nxt_py_asgi_ctx_data_t *ctx_data; 830 831 if (port->in_fd == -1) { 832 return NXT_UNIT_OK; 833 } 834 835 nb = 1; 836 837 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { 838 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 839 port->in_fd, strerror(errno), errno); 840 841 return NXT_UNIT_ERROR; 842 } 843 844 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); 845 846 if (port->id.id == NXT_UNIT_SHARED_PORT_ID) { 847 nxt_py_shared_port = port; 848 849 return NXT_UNIT_OK; 850 } 851 852 ctx_data = ctx->data; 853 854 ctx_data->port = port; 855 port->data = ctx_data; 856 857 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 858 PyLong_FromLong(port->in_fd), 859 nxt_py_port_read, 860 PyLong_FromVoidPtr(ctx), 861 PyLong_FromVoidPtr(port), NULL); 862 if (nxt_slow_path(res == NULL)) { 863 nxt_unit_alert(ctx, "Python failed to add_reader"); 864 nxt_python_print_exception(); 865 866 return NXT_UNIT_ERROR; 867 } 868 869 Py_DECREF(res); 870 871 return NXT_UNIT_OK; 872 } 873 874 875 static void 876 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) 877 { 878 if (port->in_fd == -1) { 879 return; 880 } 881 882 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); 883 884 if (nxt_py_shared_port == port) { 885 nxt_py_shared_port = NULL; 886 } 887 } 888 889 890 static void 891 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) 892 { 893 PyObject *res; 894 nxt_py_asgi_ctx_data_t *ctx_data; 895 896 nxt_unit_debug(ctx, "asgi_quit %p", ctx); 897 898 ctx_data = ctx->data; 899 900 if (nxt_py_shared_port != NULL) { 901 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, 902 PyLong_FromLong(nxt_py_shared_port->in_fd), NULL); 903 if (nxt_slow_path(res == NULL)) { 904 nxt_unit_alert(NULL, "Python failed to remove_reader"); 905 nxt_python_print_exception(); 906 907 } else { 908 Py_DECREF(res); 909 } 910 } 911 912 res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result, 913 PyLong_FromLong(0), NULL); 914 if (nxt_slow_path(res == NULL)) { 915 nxt_unit_alert(ctx, "Python failed to set_result"); 916 nxt_python_print_exception(); 917 918 } else { 919 Py_DECREF(res); 920 } 921 } 922 923 924 static void 925 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) 926 { 927 int rc; 928 nxt_queue_link_t *lnk; 929 nxt_py_asgi_ctx_data_t *ctx_data; 930 931 ctx_data = ctx->data; 932 933 while (!nxt_queue_is_empty(&ctx_data->drain_queue)) { 934 lnk = nxt_queue_first(&ctx_data->drain_queue); 935 936 rc = nxt_py_asgi_http_drain(lnk); 937 if (rc == NXT_UNIT_AGAIN) { 938 return; 939 } 940 941 nxt_queue_remove(lnk); 942 } 943 } 944 945 946 static PyObject * 947 nxt_py_asgi_port_read(PyObject *self, PyObject *args) 948 { 949 int rc; 950 PyObject *arg; 951 Py_ssize_t n; 952 nxt_unit_ctx_t *ctx; 953 nxt_unit_port_t *port; 954 955 n = PyTuple_GET_SIZE(args); 956 957 if (n != 2) { 958 nxt_unit_alert(NULL, 959 "nxt_py_asgi_port_read: invalid number of arguments %d", 960 (int) n); 961 962 return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); 963 } 964 965 arg = PyTuple_GET_ITEM(args, 0); 966 if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { 967 return PyErr_Format(PyExc_TypeError, 968 "the first argument is not a long"); 969 } 970 971 ctx = PyLong_AsVoidPtr(arg); 972 973 arg = PyTuple_GET_ITEM(args, 1); 974 if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { 975 return PyErr_Format(PyExc_TypeError, 976 "the second argument is not a long"); 977 } 978 979 port = PyLong_AsVoidPtr(arg); 980 981 nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port); 982 983 rc = nxt_unit_process_port_msg(ctx, port); 984 985 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 986 return PyErr_Format(PyExc_RuntimeError, 987 "error processing port %d message", port->id.id); 988 } 989 990 Py_RETURN_NONE; 991 } 992 993 994 PyObject * 995 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, 996 void *data) 997 { 998 int i; 999 PyObject *iter, *header, *h_iter, *name, *val, *res; 1000 1001 iter = PyObject_GetIter(headers); 1002 if (nxt_slow_path(iter == NULL)) { 1003 return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); 1004 } 1005 1006 for (i = 0; /* void */; i++) { 1007 header = PyIter_Next(iter); 1008 if (header == NULL) { 1009 break; 1010 } 1011 1012 h_iter = PyObject_GetIter(header); 1013 if (nxt_slow_path(h_iter == NULL)) { 1014 Py_DECREF(header); 1015 Py_DECREF(iter); 1016 1017 return PyErr_Format(PyExc_TypeError, 1018 "'headers' item #%d is not an iterable", i); 1019 } 1020 1021 name = PyIter_Next(h_iter); 1022 if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { 1023 Py_XDECREF(name); 1024 Py_DECREF(h_iter); 1025 Py_DECREF(header); 1026 Py_DECREF(iter); 1027 1028 return PyErr_Format(PyExc_TypeError, 1029 "'headers' item #%d 'name' is not a byte string", i); 1030 } 1031 1032 val = PyIter_Next(h_iter); 1033 if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { 1034 Py_XDECREF(val); 1035 Py_DECREF(h_iter); 1036 Py_DECREF(header); 1037 Py_DECREF(iter); 1038 1039 return PyErr_Format(PyExc_TypeError, 1040 "'headers' item #%d 'value' is not a byte string", i); 1041 } 1042 1043 res = cb(data, i, name, val); 1044 1045 Py_DECREF(name); 1046 Py_DECREF(val); 1047 Py_DECREF(h_iter); 1048 Py_DECREF(header); 1049 1050 if (nxt_slow_path(res == NULL)) { 1051 Py_DECREF(iter); 1052 1053 return NULL; 1054 } 1055 1056 Py_DECREF(res); 1057 } 1058 1059 Py_DECREF(iter); 1060 1061 Py_RETURN_NONE; 1062 } 1063 1064 1065 PyObject * 1066 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) 1067 { 1068 nxt_py_asgi_calc_size_ctx_t *ctx; 1069 1070 ctx = data; 1071 1072 ctx->fields_count++; 1073 ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); 1074 1075 Py_RETURN_NONE; 1076 } 1077 1078 1079 PyObject * 1080 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) 1081 { 1082 int rc; 1083 char *name_str, *val_str; 1084 uint32_t name_len, val_len; 1085 nxt_off_t content_length; 1086 nxt_unit_request_info_t *req; 1087 nxt_py_asgi_add_field_ctx_t *ctx; 1088 1089 name_str = PyBytes_AS_STRING(name); 1090 name_len = PyBytes_GET_SIZE(name); 1091 1092 val_str = PyBytes_AS_STRING(val); 1093 val_len = PyBytes_GET_SIZE(val); 1094 1095 ctx = data; 1096 req = ctx->req; 1097 1098 rc = nxt_unit_response_add_field(req, name_str, name_len, 1099 val_str, val_len); 1100 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1101 return PyErr_Format(PyExc_RuntimeError, 1102 "failed to add header #%d", i); 1103 } 1104 1105 if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { 1106 content_length = nxt_off_t_parse((u_char *) val_str, val_len); 1107 if (nxt_slow_path(content_length < 0)) { 1108 nxt_unit_req_error(req, "failed to parse Content-Length " 1109 "value %.*s", (int) val_len, val_str); 1110 1111 return PyErr_Format(PyExc_ValueError, 1112 "Failed to parse Content-Length: '%.*s'", 1113 (int) val_len, val_str); 1114 } 1115 1116 ctx->content_length = content_length; 1117 } 1118 1119 Py_RETURN_NONE; 1120 } 1121 1122 1123 PyObject * 1124 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, 1125 nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result) 1126 { 1127 PyObject *set_result, *res; 1128 1129 if (nxt_slow_path(result == NULL)) { 1130 Py_DECREF(future); 1131 1132 return NULL; 1133 } 1134 1135 set_result = PyObject_GetAttrString(future, "set_result"); 1136 if (nxt_slow_path(set_result == NULL)) { 1137 nxt_unit_req_alert(req, "failed to get 'set_result' for future"); 1138 1139 Py_CLEAR(future); 1140 1141 goto cleanup_result; 1142 } 1143 1144 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { 1145 nxt_unit_req_alert(req, "'future.set_result' is not a callable"); 1146 1147 Py_CLEAR(future); 1148 1149 goto cleanup; 1150 } 1151 1152 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result, 1153 result, NULL); 1154 if (nxt_slow_path(res == NULL)) { 1155 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); 1156 nxt_python_print_exception(); 1157 1158 Py_CLEAR(future); 1159 } 1160 1161 Py_XDECREF(res); 1162 1163 cleanup: 1164 1165 Py_DECREF(set_result); 1166 1167 cleanup_result: 1168 1169 Py_DECREF(result); 1170 1171 return future; 1172 } 1173 1174 1175 PyObject * 1176 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) 1177 { 1178 PyObject *msg; 1179 1180 msg = PyDict_New(); 1181 if (nxt_slow_path(msg == NULL)) { 1182 nxt_unit_req_alert(req, "Python failed to create message dict"); 1183 nxt_python_print_exception(); 1184 1185 return PyErr_Format(PyExc_RuntimeError, 1186 "failed to create message dict"); 1187 } 1188 1189 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { 1190 nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); 1191 1192 Py_DECREF(msg); 1193 1194 return PyErr_Format(PyExc_RuntimeError, 1195 "failed to set 'msg.type' item"); 1196 } 1197 1198 return msg; 1199 } 1200 1201 1202 PyObject * 1203 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, 1204 PyObject *spec_version) 1205 { 1206 PyObject *scope, *asgi; 1207 1208 scope = PyDict_New(); 1209 if (nxt_slow_path(scope == NULL)) { 1210 nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); 1211 nxt_python_print_exception(); 1212 1213 return PyErr_Format(PyExc_RuntimeError, 1214 "failed to create 'scope' dict"); 1215 } 1216 1217 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { 1218 nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); 1219 1220 Py_DECREF(scope); 1221 1222 return PyErr_Format(PyExc_RuntimeError, 1223 "failed to set 'scope.type' item"); 1224 } 1225 1226 asgi = PyDict_New(); 1227 if (nxt_slow_path(asgi == NULL)) { 1228 nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); 1229 nxt_python_print_exception(); 1230 1231 Py_DECREF(scope); 1232 1233 return PyErr_Format(PyExc_RuntimeError, 1234 "failed to create 'asgi' dict"); 1235 } 1236 1237 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { 1238 nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); 1239 1240 Py_DECREF(asgi); 1241 Py_DECREF(scope); 1242 1243 return PyErr_Format(PyExc_RuntimeError, 1244 "failed to set 'scope.asgi' item"); 1245 } 1246 1247 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, 1248 nxt_py_3_0_str) == -1)) 1249 { 1250 nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); 1251 1252 Py_DECREF(asgi); 1253 Py_DECREF(scope); 1254 1255 return PyErr_Format(PyExc_RuntimeError, 1256 "failed to set 'asgi.version' item"); 1257 } 1258 1259 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, 1260 spec_version) == -1)) 1261 { 1262 nxt_unit_req_alert(req, 1263 "Python failed to set 'asgi.spec_version' item"); 1264 1265 Py_DECREF(asgi); 1266 Py_DECREF(scope); 1267 1268 return PyErr_Format(PyExc_RuntimeError, 1269 "failed to set 'asgi.spec_version' item"); 1270 } 1271 1272 Py_DECREF(asgi); 1273 1274 return scope; 1275 } 1276 1277 1278 void 1279 nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link) 1280 { 1281 nxt_py_asgi_ctx_data_t *ctx_data; 1282 1283 ctx_data = req->ctx->data; 1284 1285 nxt_queue_insert_tail(&ctx_data->drain_queue, link); 1286 } 1287 1288 1289 void 1290 nxt_py_asgi_dealloc(PyObject *self) 1291 { 1292 PyObject_Del(self); 1293 } 1294 1295 1296 PyObject * 1297 nxt_py_asgi_await(PyObject *self) 1298 { 1299 Py_INCREF(self); 1300 return self; 1301 } 1302 1303 1304 PyObject * 1305 nxt_py_asgi_iter(PyObject *self) 1306 { 1307 Py_INCREF(self); 1308 return self; 1309 } 1310 1311 1312 PyObject * 1313 nxt_py_asgi_next(PyObject *self) 1314 { 1315 return NULL; 1316 } 1317 1318 1319 static void 1320 nxt_python_asgi_done(void) 1321 { 1322 nxt_py_asgi_str_done(); 1323 1324 Py_XDECREF(nxt_py_port_read); 1325 } 1326 1327 #else /* !(NXT_HAVE_ASGI) */ 1328 1329 1330 int 1331 nxt_python_asgi_check(PyObject *obj) 1332 { 1333 return 0; 1334 } 1335 1336 1337 int 1338 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 1339 { 1340 nxt_unit_alert(NULL, "ASGI not implemented"); 1341 return NXT_UNIT_ERROR; 1342 } 1343 1344 1345 #endif /* NXT_HAVE_ASGI */ 1346