1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7 #include <python/nxt_python.h> 8 9 #if (NXT_HAVE_ASGI) 10 11 #include <nxt_main.h> 12 #include <nxt_unit.h> 13 #include <nxt_unit_request.h> 14 #include <nxt_unit_response.h> 15 #include <python/nxt_python_asgi.h> 16 #include <python/nxt_python_asgi_str.h> 17 18 19 static int nxt_python_asgi_ctx_data_alloc(void **pdata); 20 static void nxt_python_asgi_ctx_data_free(void *data); 21 static int nxt_python_asgi_startup(void *data); 22 static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx); 23 24 static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, 25 nxt_unit_port_t *port); 26 static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); 27 28 static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); 29 static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, 30 uint16_t port); 31 static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); 32 static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); 33 34 static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx); 35 36 static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 37 static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); 38 static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); 39 static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); 40 41 static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); 42 static void nxt_python_asgi_done(void); 43 44 45 static PyObject *nxt_py_port_read; 46 static nxt_unit_port_t *nxt_py_shared_port; 47 48 static PyMethodDef nxt_py_port_read_method = 49 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; 50 51 static nxt_python_proto_t nxt_py_asgi_proto = { 52 .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc, 53 .ctx_data_free = nxt_python_asgi_ctx_data_free, 54 .startup = nxt_python_asgi_startup, 55 .run = nxt_python_asgi_run, 56 .ready = nxt_python_asgi_ready, 57 .done = nxt_python_asgi_done, 58 }; 59 60 #define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A 61 62 63 int 64 nxt_python_asgi_check(PyObject *obj) 65 { 66 int res; 67 PyObject *call; 68 PyCodeObject *code; 69 70 if (PyFunction_Check(obj)) { 71 code = (PyCodeObject *) PyFunction_GET_CODE(obj); 72 73 return (code->co_flags & CO_COROUTINE) != 0; 74 } 75 76 if (PyMethod_Check(obj)) { 77 obj = PyMethod_GET_FUNCTION(obj); 78 79 code = (PyCodeObject *) PyFunction_GET_CODE(obj); 80 81 return (code->co_flags & CO_COROUTINE) != 0; 82 } 83 84 call = PyObject_GetAttrString(obj, "__call__"); 85 86 if (call == NULL) { 87 return 0; 88 } 89 90 if (PyFunction_Check(call)) { 91 code = (PyCodeObject *) PyFunction_GET_CODE(call); 92 93 res = (code->co_flags & CO_COROUTINE) != 0; 94 95 } else { 96 if (PyMethod_Check(call)) { 97 obj = PyMethod_GET_FUNCTION(call); 98 99 code = (PyCodeObject *) PyFunction_GET_CODE(obj); 100 101 res = (code->co_flags & CO_COROUTINE) != 0; 102 103 } else { 104 res = 0; 105 } 106 } 107 108 Py_DECREF(call); 109 110 return res; 111 } 112 113 114 int 115 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 116 { 117 nxt_unit_debug(NULL, "asgi_init"); 118 119 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) { 120 nxt_unit_alert(NULL, "Python failed to init string objects"); 121 return NXT_UNIT_ERROR; 122 } 123 124 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); 125 if (nxt_slow_path(nxt_py_port_read == NULL)) { 126 nxt_unit_alert(NULL, 127 "Python failed to initialize the 'port_read' function"); 128 return NXT_UNIT_ERROR; 129 } 130 131 if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) { 132 return NXT_UNIT_ERROR; 133 } 134 135 if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) { 136 return NXT_UNIT_ERROR; 137 } 138 139 init->callbacks.request_handler = nxt_py_asgi_request_handler; 140 init->callbacks.data_handler = nxt_py_asgi_http_data_handler; 141 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; 142 init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler; 143 init->callbacks.quit = nxt_py_asgi_quit; 144 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; 145 init->callbacks.add_port = nxt_py_asgi_add_port; 146 init->callbacks.remove_port = nxt_py_asgi_remove_port; 147 148 *proto = nxt_py_asgi_proto; 149 150 return NXT_UNIT_OK; 151 } 152 153 154 static int 155 nxt_python_asgi_ctx_data_alloc(void **pdata) 156 { 157 uint32_t i; 158 PyObject *asyncio, *loop, *new_event_loop, *obj; 159 nxt_py_asgi_ctx_data_t *ctx_data; 160 161 ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t)); 162 if (nxt_slow_path(ctx_data == NULL)) { 163 nxt_unit_alert(NULL, "Failed to allocate context data"); 164 return NXT_UNIT_ERROR; 165 } 166 167 memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t)); 168 169 nxt_queue_init(&ctx_data->drain_queue); 170 171 struct { 172 const char *key; 173 PyObject **handler; 174 175 } handlers[] = { 176 { "create_task", &ctx_data->loop_create_task }, 177 { "add_reader", &ctx_data->loop_add_reader }, 178 { "remove_reader", &ctx_data->loop_remove_reader }, 179 { "call_soon", &ctx_data->loop_call_soon }, 180 { "run_until_complete", &ctx_data->loop_run_until_complete }, 181 { "create_future", &ctx_data->loop_create_future }, 182 }; 183 184 loop = NULL; 185 186 asyncio = PyImport_ImportModule("asyncio"); 187 if (nxt_slow_path(asyncio == NULL)) { 188 nxt_unit_alert(NULL, "Python failed to import module 'asyncio'"); 189 nxt_python_print_exception(); 190 goto fail; 191 } 192 193 new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), 194 "new_event_loop"); 195 if (nxt_slow_path(new_event_loop == NULL)) { 196 nxt_unit_alert(NULL, 197 "Python failed to get 'new_event_loop' from module 'asyncio'"); 198 goto fail; 199 } 200 201 if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) { 202 nxt_unit_alert(NULL, 203 "'asyncio.new_event_loop' is not a callable object"); 204 goto fail; 205 } 206 207 loop = PyObject_CallObject(new_event_loop, NULL); 208 if (nxt_slow_path(loop == NULL)) { 209 nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'"); 210 goto fail; 211 } 212 213 for (i = 0; i < nxt_nitems(handlers); i++) { 214 obj = PyObject_GetAttrString(loop, handlers[i].key); 215 if (nxt_slow_path(obj == NULL)) { 216 nxt_unit_alert(NULL, "Python failed to get 'loop.%s'", 217 handlers[i].key); 218 goto fail; 219 } 220 221 *handlers[i].handler = obj; 222 223 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 224 nxt_unit_alert(NULL, "'loop.%s' is not a callable object", 225 handlers[i].key); 226 goto fail; 227 } 228 } 229 230 obj = PyObject_CallObject(ctx_data->loop_create_future, NULL); 231 if (nxt_slow_path(obj == NULL)) { 232 nxt_unit_alert(NULL, "Python failed to create Future "); 233 nxt_python_print_exception(); 234 goto fail; 235 } 236 237 ctx_data->quit_future = obj; 238 239 obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result"); 240 if (nxt_slow_path(obj == NULL)) { 241 nxt_unit_alert(NULL, "Python failed to get 'future.set_result'"); 242 goto fail; 243 } 244 245 ctx_data->quit_future_set_result = obj; 246 247 if (nxt_slow_path(PyCallable_Check(obj) == 0)) { 248 nxt_unit_alert(NULL, "'future.set_result' is not a callable object"); 249 goto fail; 250 } 251 252 Py_DECREF(loop); 253 Py_DECREF(asyncio); 254 255 *pdata = ctx_data; 256 257 return NXT_UNIT_OK; 258 259 fail: 260 261 nxt_python_asgi_ctx_data_free(ctx_data); 262 263 Py_XDECREF(loop); 264 Py_XDECREF(asyncio); 265 266 return NXT_UNIT_ERROR; 267 } 268 269 270 static void 271 nxt_python_asgi_ctx_data_free(void *data) 272 { 273 nxt_py_asgi_ctx_data_t *ctx_data; 274 275 ctx_data = data; 276 277 Py_XDECREF(ctx_data->loop_run_until_complete); 278 Py_XDECREF(ctx_data->loop_create_future); 279 Py_XDECREF(ctx_data->loop_create_task); 280 Py_XDECREF(ctx_data->loop_call_soon); 281 Py_XDECREF(ctx_data->loop_add_reader); 282 Py_XDECREF(ctx_data->loop_remove_reader); 283 Py_XDECREF(ctx_data->quit_future); 284 Py_XDECREF(ctx_data->quit_future_set_result); 285 286 nxt_unit_free(NULL, ctx_data); 287 } 288 289 290 static int 291 nxt_python_asgi_startup(void *data) 292 { 293 return nxt_py_asgi_lifespan_startup(data); 294 } 295 296 297 static int 298 nxt_python_asgi_run(nxt_unit_ctx_t *ctx) 299 { 300 PyObject *res; 301 nxt_py_asgi_ctx_data_t *ctx_data; 302 303 ctx_data = ctx->data; 304 305 res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, 306 ctx_data->quit_future, NULL); 307 if (nxt_slow_path(res == NULL)) { 308 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); 309 nxt_python_print_exception(); 310 311 return NXT_UNIT_ERROR; 312 } 313 314 Py_DECREF(res); 315 316 nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port); 317 nxt_py_asgi_remove_reader(ctx, ctx_data->port); 318 319 if (ctx_data->port != NULL) { 320 ctx_data->port->data = NULL; 321 ctx_data->port = NULL; 322 } 323 324 nxt_py_asgi_lifespan_shutdown(ctx); 325 326 return NXT_UNIT_OK; 327 } 328 329 330 static void 331 nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 332 { 333 PyObject *res, *fd; 334 nxt_py_asgi_ctx_data_t *ctx_data; 335 336 if (port == NULL || port->in_fd == -1) { 337 return; 338 } 339 340 ctx_data = ctx->data; 341 342 nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port); 343 344 fd = PyLong_FromLong(port->in_fd); 345 if (nxt_slow_path(fd == NULL)) { 346 nxt_unit_alert(ctx, "Python failed to create Long object"); 347 nxt_python_print_exception(); 348 349 return; 350 } 351 352 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL); 353 if (nxt_slow_path(res == NULL)) { 354 nxt_unit_alert(ctx, "Python failed to remove_reader"); 355 nxt_python_print_exception(); 356 357 } else { 358 Py_DECREF(res); 359 } 360 361 Py_DECREF(fd); 362 } 363 364 365 static void 366 nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) 367 { 368 PyObject *scope, *res, *task, *receive, *send, *done, *asgi; 369 nxt_py_asgi_ctx_data_t *ctx_data; 370 371 if (req->request->websocket_handshake) { 372 asgi = nxt_py_asgi_websocket_create(req); 373 374 } else { 375 asgi = nxt_py_asgi_http_create(req); 376 } 377 378 if (nxt_slow_path(asgi == NULL)) { 379 nxt_unit_req_alert(req, "Python failed to create asgi object"); 380 nxt_unit_request_done(req, NXT_UNIT_ERROR); 381 382 return; 383 } 384 385 receive = PyObject_GetAttrString(asgi, "receive"); 386 if (nxt_slow_path(receive == NULL)) { 387 nxt_unit_req_alert(req, "Python failed to get 'receive' method"); 388 nxt_unit_request_done(req, NXT_UNIT_ERROR); 389 390 goto release_asgi; 391 } 392 393 send = PyObject_GetAttrString(asgi, "send"); 394 if (nxt_slow_path(receive == NULL)) { 395 nxt_unit_req_alert(req, "Python failed to get 'send' method"); 396 nxt_unit_request_done(req, NXT_UNIT_ERROR); 397 398 goto release_receive; 399 } 400 401 done = PyObject_GetAttrString(asgi, "_done"); 402 if (nxt_slow_path(receive == NULL)) { 403 nxt_unit_req_alert(req, "Python failed to get '_done' method"); 404 nxt_unit_request_done(req, NXT_UNIT_ERROR); 405 406 goto release_send; 407 } 408 409 scope = nxt_py_asgi_create_http_scope(req); 410 if (nxt_slow_path(scope == NULL)) { 411 nxt_unit_request_done(req, NXT_UNIT_ERROR); 412 413 goto release_done; 414 } 415 416 req->data = asgi; 417 418 res = PyObject_CallFunctionObjArgs(nxt_py_application, 419 scope, receive, send, NULL); 420 if (nxt_slow_path(res == NULL)) { 421 nxt_unit_req_error(req, "Python failed to call the application"); 422 nxt_python_print_exception(); 423 nxt_unit_request_done(req, NXT_UNIT_ERROR); 424 425 goto release_scope; 426 } 427 428 if (nxt_slow_path(!PyCoro_CheckExact(res))) { 429 nxt_unit_req_error(req, "Application result type is not a coroutine"); 430 nxt_unit_request_done(req, NXT_UNIT_ERROR); 431 432 Py_DECREF(res); 433 434 goto release_scope; 435 } 436 437 ctx_data = req->ctx->data; 438 439 task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); 440 if (nxt_slow_path(task == NULL)) { 441 nxt_unit_req_error(req, "Python failed to call the create_task"); 442 nxt_python_print_exception(); 443 nxt_unit_request_done(req, NXT_UNIT_ERROR); 444 445 Py_DECREF(res); 446 447 goto release_scope; 448 } 449 450 Py_DECREF(res); 451 452 res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, 453 NULL); 454 if (nxt_slow_path(res == NULL)) { 455 nxt_unit_req_error(req, 456 "Python failed to call 'task.add_done_callback'"); 457 nxt_python_print_exception(); 458 nxt_unit_request_done(req, NXT_UNIT_ERROR); 459 460 goto release_task; 461 } 462 463 Py_DECREF(res); 464 release_task: 465 Py_DECREF(task); 466 release_scope: 467 Py_DECREF(scope); 468 release_done: 469 Py_DECREF(done); 470 release_send: 471 Py_DECREF(send); 472 release_receive: 473 Py_DECREF(receive); 474 release_asgi: 475 Py_DECREF(asgi); 476 } 477 478 479 static PyObject * 480 nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) 481 { 482 char *p, *target, *query; 483 uint32_t target_length, i; 484 PyObject *scope, *v, *type, *scheme; 485 PyObject *headers, *header; 486 nxt_unit_field_t *f; 487 nxt_unit_request_t *r; 488 489 static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); 490 491 #define SET_ITEM(dict, key, value) \ 492 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 493 == -1)) \ 494 { \ 495 nxt_unit_req_alert(req, "Python failed to set '" \ 496 #dict "." #key "' item"); \ 497 goto fail; \ 498 } 499 500 v = NULL; 501 headers = NULL; 502 503 r = req->request; 504 505 if (r->websocket_handshake) { 506 type = nxt_py_websocket_str; 507 scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; 508 509 } else { 510 type = nxt_py_http_str; 511 scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; 512 } 513 514 scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); 515 if (nxt_slow_path(scope == NULL)) { 516 return NULL; 517 } 518 519 p = nxt_unit_sptr_get(&r->version); 520 SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str 521 : nxt_py_1_0_str) 522 SET_ITEM(scope, scheme, scheme) 523 524 v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), 525 r->method_length); 526 if (nxt_slow_path(v == NULL)) { 527 nxt_unit_req_alert(req, "Python failed to create 'method' string"); 528 goto fail; 529 } 530 531 SET_ITEM(scope, method, v) 532 Py_DECREF(v); 533 534 v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, 535 "replace"); 536 if (nxt_slow_path(v == NULL)) { 537 nxt_unit_req_alert(req, "Python failed to create 'path' string"); 538 goto fail; 539 } 540 541 SET_ITEM(scope, path, v) 542 Py_DECREF(v); 543 544 target = nxt_unit_sptr_get(&r->target); 545 query = nxt_unit_sptr_get(&r->query); 546 547 if (r->query.offset != 0) { 548 target_length = query - target - 1; 549 550 } else { 551 target_length = r->target_length; 552 } 553 554 v = PyBytes_FromStringAndSize(target, target_length); 555 if (nxt_slow_path(v == NULL)) { 556 nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); 557 goto fail; 558 } 559 560 SET_ITEM(scope, raw_path, v) 561 Py_DECREF(v); 562 563 v = PyBytes_FromStringAndSize(query, r->query_length); 564 if (nxt_slow_path(v == NULL)) { 565 nxt_unit_req_alert(req, "Python failed to create 'query' string"); 566 goto fail; 567 } 568 569 SET_ITEM(scope, query_string, v) 570 Py_DECREF(v); 571 572 v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); 573 if (nxt_slow_path(v == NULL)) { 574 nxt_unit_req_alert(req, "Python failed to create 'client' pair"); 575 goto fail; 576 } 577 578 SET_ITEM(scope, client, v) 579 Py_DECREF(v); 580 581 v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); 582 if (nxt_slow_path(v == NULL)) { 583 nxt_unit_req_alert(req, "Python failed to create 'server' pair"); 584 goto fail; 585 } 586 587 SET_ITEM(scope, server, v) 588 Py_DECREF(v); 589 590 v = NULL; 591 592 headers = PyTuple_New(r->fields_count); 593 if (nxt_slow_path(headers == NULL)) { 594 nxt_unit_req_alert(req, "Python failed to create 'headers' object"); 595 goto fail; 596 } 597 598 for (i = 0; i < r->fields_count; i++) { 599 f = r->fields + i; 600 601 header = nxt_py_asgi_create_header(f); 602 if (nxt_slow_path(header == NULL)) { 603 nxt_unit_req_alert(req, "Python failed to create 'header' pair"); 604 goto fail; 605 } 606 607 PyTuple_SET_ITEM(headers, i, header); 608 609 if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL 610 && f->name_length == ws_protocol.length 611 && f->value_length > 0 612 && r->websocket_handshake) 613 { 614 v = nxt_py_asgi_create_subprotocols(f); 615 if (nxt_slow_path(v == NULL)) { 616 nxt_unit_req_alert(req, "Failed to create subprotocols"); 617 goto fail; 618 } 619 620 SET_ITEM(scope, subprotocols, v); 621 Py_DECREF(v); 622 } 623 } 624 625 SET_ITEM(scope, headers, headers) 626 Py_DECREF(headers); 627 628 return scope; 629 630 fail: 631 632 Py_XDECREF(v); 633 Py_XDECREF(headers); 634 Py_DECREF(scope); 635 636 return NULL; 637 638 #undef SET_ITEM 639 } 640 641 642 static PyObject * 643 nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) 644 { 645 char *p, *s; 646 PyObject *pair, *v; 647 648 pair = PyTuple_New(2); 649 if (nxt_slow_path(pair == NULL)) { 650 return NULL; 651 } 652 653 p = nxt_unit_sptr_get(sptr); 654 s = memchr(p, ':', len); 655 656 v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); 657 if (nxt_slow_path(v == NULL)) { 658 Py_DECREF(pair); 659 660 return NULL; 661 } 662 663 PyTuple_SET_ITEM(pair, 0, v); 664 665 if (s != NULL) { 666 p += len; 667 v = PyLong_FromString(s + 1, &p, 10); 668 669 } else { 670 v = PyLong_FromLong(port); 671 } 672 673 if (nxt_slow_path(v == NULL)) { 674 Py_DECREF(pair); 675 676 return NULL; 677 } 678 679 PyTuple_SET_ITEM(pair, 1, v); 680 681 return pair; 682 } 683 684 685 static PyObject * 686 nxt_py_asgi_create_header(nxt_unit_field_t *f) 687 { 688 char c, *name; 689 uint8_t pos; 690 PyObject *header, *v; 691 692 header = PyTuple_New(2); 693 if (nxt_slow_path(header == NULL)) { 694 return NULL; 695 } 696 697 name = nxt_unit_sptr_get(&f->name); 698 699 for (pos = 0; pos < f->name_length; pos++) { 700 c = name[pos]; 701 if (c >= 'A' && c <= 'Z') { 702 name[pos] = (c | 0x20); 703 } 704 } 705 706 v = PyBytes_FromStringAndSize(name, f->name_length); 707 if (nxt_slow_path(v == NULL)) { 708 Py_DECREF(header); 709 710 return NULL; 711 } 712 713 PyTuple_SET_ITEM(header, 0, v); 714 715 v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), 716 f->value_length); 717 if (nxt_slow_path(v == NULL)) { 718 Py_DECREF(header); 719 720 return NULL; 721 } 722 723 PyTuple_SET_ITEM(header, 1, v); 724 725 return header; 726 } 727 728 729 static PyObject * 730 nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) 731 { 732 char *v; 733 uint32_t i, n, start; 734 PyObject *res, *proto; 735 736 v = nxt_unit_sptr_get(&f->value); 737 n = 1; 738 739 for (i = 0; i < f->value_length; i++) { 740 if (v[i] == ',') { 741 n++; 742 } 743 } 744 745 res = PyTuple_New(n); 746 if (nxt_slow_path(res == NULL)) { 747 return NULL; 748 } 749 750 n = 0; 751 start = 0; 752 753 for (i = 0; i < f->value_length; ) { 754 if (v[i] != ',') { 755 i++; 756 757 continue; 758 } 759 760 if (i - start > 0) { 761 proto = PyString_FromStringAndSize(v + start, i - start); 762 if (nxt_slow_path(proto == NULL)) { 763 goto fail; 764 } 765 766 PyTuple_SET_ITEM(res, n, proto); 767 768 n++; 769 } 770 771 do { 772 i++; 773 } while (i < f->value_length && v[i] == ' '); 774 775 start = i; 776 } 777 778 if (i - start > 0) { 779 proto = PyString_FromStringAndSize(v + start, i - start); 780 if (nxt_slow_path(proto == NULL)) { 781 goto fail; 782 } 783 784 PyTuple_SET_ITEM(res, n, proto); 785 } 786 787 return res; 788 789 fail: 790 791 Py_DECREF(res); 792 793 return NULL; 794 } 795 796 797 static int 798 nxt_python_asgi_ready(nxt_unit_ctx_t *ctx) 799 { 800 int rc; 801 PyObject *res, *fd, *py_ctx, *py_port; 802 nxt_unit_port_t *port; 803 nxt_py_asgi_ctx_data_t *ctx_data; 804 805 if (nxt_slow_path(nxt_py_shared_port == NULL)) { 806 return NXT_UNIT_ERROR; 807 } 808 809 port = nxt_py_shared_port; 810 811 nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port); 812 813 ctx_data = ctx->data; 814 815 rc = NXT_UNIT_ERROR; 816 817 fd = PyLong_FromLong(port->in_fd); 818 if (nxt_slow_path(fd == NULL)) { 819 nxt_unit_alert(ctx, "Python failed to create fd"); 820 nxt_python_print_exception(); 821 822 return rc; 823 } 824 825 py_ctx = PyLong_FromVoidPtr(ctx); 826 if (nxt_slow_path(py_ctx == NULL)) { 827 nxt_unit_alert(ctx, "Python failed to create py_ctx"); 828 nxt_python_print_exception(); 829 830 goto clean_fd; 831 } 832 833 py_port = PyLong_FromVoidPtr(port); 834 if (nxt_slow_path(py_port == NULL)) { 835 nxt_unit_alert(ctx, "Python failed to create py_port"); 836 nxt_python_print_exception(); 837 838 goto clean_py_ctx; 839 } 840 841 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 842 fd, nxt_py_port_read, 843 py_ctx, py_port, NULL); 844 if (nxt_slow_path(res == NULL)) { 845 nxt_unit_alert(ctx, "Python failed to add_reader"); 846 nxt_python_print_exception(); 847 848 } else { 849 Py_DECREF(res); 850 851 rc = NXT_UNIT_OK; 852 } 853 854 Py_DECREF(py_port); 855 856 clean_py_ctx: 857 858 Py_DECREF(py_ctx); 859 860 clean_fd: 861 862 Py_DECREF(fd); 863 864 return rc; 865 } 866 867 868 static int 869 nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 870 { 871 int nb, rc; 872 PyObject *res, *fd, *py_ctx, *py_port; 873 nxt_py_asgi_ctx_data_t *ctx_data; 874 875 if (port->in_fd == -1) { 876 return NXT_UNIT_OK; 877 } 878 879 nb = 1; 880 881 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { 882 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 883 port->in_fd, strerror(errno), errno); 884 885 return NXT_UNIT_ERROR; 886 } 887 888 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); 889 890 if (port->id.id == NXT_UNIT_SHARED_PORT_ID) { 891 nxt_py_shared_port = port; 892 893 return NXT_UNIT_OK; 894 } 895 896 ctx_data = ctx->data; 897 898 ctx_data->port = port; 899 port->data = ctx_data; 900 901 rc = NXT_UNIT_ERROR; 902 903 fd = PyLong_FromLong(port->in_fd); 904 if (nxt_slow_path(fd == NULL)) { 905 nxt_unit_alert(ctx, "Python failed to create fd"); 906 nxt_python_print_exception(); 907 908 return rc; 909 } 910 911 py_ctx = PyLong_FromVoidPtr(ctx); 912 if (nxt_slow_path(py_ctx == NULL)) { 913 nxt_unit_alert(ctx, "Python failed to create py_ctx"); 914 nxt_python_print_exception(); 915 916 goto clean_fd; 917 } 918 919 py_port = PyLong_FromVoidPtr(port); 920 if (nxt_slow_path(py_port == NULL)) { 921 nxt_unit_alert(ctx, "Python failed to create py_port"); 922 nxt_python_print_exception(); 923 924 goto clean_py_ctx; 925 } 926 927 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, 928 fd, nxt_py_port_read, 929 py_ctx, py_port, NULL); 930 if (nxt_slow_path(res == NULL)) { 931 nxt_unit_alert(ctx, "Python failed to add_reader"); 932 nxt_python_print_exception(); 933 934 } else { 935 Py_DECREF(res); 936 937 rc = NXT_UNIT_OK; 938 } 939 940 Py_DECREF(py_port); 941 942 clean_py_ctx: 943 944 Py_DECREF(py_ctx); 945 946 clean_fd: 947 948 Py_DECREF(fd); 949 950 return rc; 951 } 952 953 954 static void 955 nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) 956 { 957 if (port->in_fd == -1) { 958 return; 959 } 960 961 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); 962 963 if (nxt_py_shared_port == port) { 964 nxt_py_shared_port = NULL; 965 } 966 } 967 968 969 static void 970 nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) 971 { 972 PyObject *res, *p; 973 nxt_py_asgi_ctx_data_t *ctx_data; 974 975 nxt_unit_debug(ctx, "asgi_quit %p", ctx); 976 977 ctx_data = ctx->data; 978 979 if (nxt_py_shared_port != NULL) { 980 p = PyLong_FromLong(nxt_py_shared_port->in_fd); 981 if (nxt_slow_path(p == NULL)) { 982 nxt_unit_alert(NULL, "Python failed to create Long"); 983 nxt_python_print_exception(); 984 985 } else { 986 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, 987 p, NULL); 988 if (nxt_slow_path(res == NULL)) { 989 nxt_unit_alert(NULL, "Python failed to remove_reader"); 990 nxt_python_print_exception(); 991 992 } else { 993 Py_DECREF(res); 994 } 995 996 Py_DECREF(p); 997 } 998 } 999 1000 p = PyLong_FromLong(0); 1001 if (nxt_slow_path(p == NULL)) { 1002 nxt_unit_alert(NULL, "Python failed to create Long"); 1003 nxt_python_print_exception(); 1004 1005 } else { 1006 res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result, 1007 p, NULL); 1008 if (nxt_slow_path(res == NULL)) { 1009 nxt_unit_alert(ctx, "Python failed to set_result"); 1010 nxt_python_print_exception(); 1011 1012 } else { 1013 Py_DECREF(res); 1014 } 1015 1016 Py_DECREF(p); 1017 } 1018 } 1019 1020 1021 static void 1022 nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) 1023 { 1024 int rc; 1025 nxt_queue_link_t *lnk; 1026 nxt_py_asgi_ctx_data_t *ctx_data; 1027 1028 ctx_data = ctx->data; 1029 1030 while (!nxt_queue_is_empty(&ctx_data->drain_queue)) { 1031 lnk = nxt_queue_first(&ctx_data->drain_queue); 1032 1033 rc = nxt_py_asgi_http_drain(lnk); 1034 if (rc == NXT_UNIT_AGAIN) { 1035 return; 1036 } 1037 1038 nxt_queue_remove(lnk); 1039 } 1040 } 1041 1042 1043 static PyObject * 1044 nxt_py_asgi_port_read(PyObject *self, PyObject *args) 1045 { 1046 int rc; 1047 PyObject *arg; 1048 Py_ssize_t n; 1049 nxt_unit_ctx_t *ctx; 1050 nxt_unit_port_t *port; 1051 1052 n = PyTuple_GET_SIZE(args); 1053 1054 if (n != 2) { 1055 nxt_unit_alert(NULL, 1056 "nxt_py_asgi_port_read: invalid number of arguments %d", 1057 (int) n); 1058 1059 return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); 1060 } 1061 1062 arg = PyTuple_GET_ITEM(args, 0); 1063 if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { 1064 return PyErr_Format(PyExc_TypeError, 1065 "the first argument is not a long"); 1066 } 1067 1068 ctx = PyLong_AsVoidPtr(arg); 1069 1070 arg = PyTuple_GET_ITEM(args, 1); 1071 if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { 1072 return PyErr_Format(PyExc_TypeError, 1073 "the second argument is not a long"); 1074 } 1075 1076 port = PyLong_AsVoidPtr(arg); 1077 1078 nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port); 1079 1080 rc = nxt_unit_process_port_msg(ctx, port); 1081 1082 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 1083 return PyErr_Format(PyExc_RuntimeError, 1084 "error processing port %d message", port->id.id); 1085 } 1086 1087 Py_RETURN_NONE; 1088 } 1089 1090 1091 PyObject * 1092 nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, 1093 void *data) 1094 { 1095 int i; 1096 PyObject *iter, *header, *h_iter, *name, *val, *res; 1097 1098 iter = PyObject_GetIter(headers); 1099 if (nxt_slow_path(iter == NULL)) { 1100 return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); 1101 } 1102 1103 for (i = 0; /* void */; i++) { 1104 header = PyIter_Next(iter); 1105 if (header == NULL) { 1106 break; 1107 } 1108 1109 h_iter = PyObject_GetIter(header); 1110 if (nxt_slow_path(h_iter == NULL)) { 1111 Py_DECREF(header); 1112 Py_DECREF(iter); 1113 1114 return PyErr_Format(PyExc_TypeError, 1115 "'headers' item #%d is not an iterable", i); 1116 } 1117 1118 name = PyIter_Next(h_iter); 1119 if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { 1120 Py_XDECREF(name); 1121 Py_DECREF(h_iter); 1122 Py_DECREF(header); 1123 Py_DECREF(iter); 1124 1125 return PyErr_Format(PyExc_TypeError, 1126 "'headers' item #%d 'name' is not a byte string", i); 1127 } 1128 1129 val = PyIter_Next(h_iter); 1130 if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { 1131 Py_XDECREF(val); 1132 Py_DECREF(h_iter); 1133 Py_DECREF(header); 1134 Py_DECREF(iter); 1135 1136 return PyErr_Format(PyExc_TypeError, 1137 "'headers' item #%d 'value' is not a byte string", i); 1138 } 1139 1140 res = cb(data, i, name, val); 1141 1142 Py_DECREF(name); 1143 Py_DECREF(val); 1144 Py_DECREF(h_iter); 1145 Py_DECREF(header); 1146 1147 if (nxt_slow_path(res == NULL)) { 1148 Py_DECREF(iter); 1149 1150 return NULL; 1151 } 1152 1153 Py_DECREF(res); 1154 } 1155 1156 Py_DECREF(iter); 1157 1158 Py_RETURN_NONE; 1159 } 1160 1161 1162 PyObject * 1163 nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) 1164 { 1165 nxt_py_asgi_calc_size_ctx_t *ctx; 1166 1167 ctx = data; 1168 1169 ctx->fields_count++; 1170 ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); 1171 1172 Py_RETURN_NONE; 1173 } 1174 1175 1176 PyObject * 1177 nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) 1178 { 1179 int rc; 1180 char *name_str, *val_str; 1181 uint32_t name_len, val_len; 1182 nxt_off_t content_length; 1183 nxt_unit_request_info_t *req; 1184 nxt_py_asgi_add_field_ctx_t *ctx; 1185 1186 name_str = PyBytes_AS_STRING(name); 1187 name_len = PyBytes_GET_SIZE(name); 1188 1189 val_str = PyBytes_AS_STRING(val); 1190 val_len = PyBytes_GET_SIZE(val); 1191 1192 ctx = data; 1193 req = ctx->req; 1194 1195 rc = nxt_unit_response_add_field(req, name_str, name_len, 1196 val_str, val_len); 1197 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1198 return PyErr_Format(PyExc_RuntimeError, 1199 "failed to add header #%d", i); 1200 } 1201 1202 if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { 1203 content_length = nxt_off_t_parse((u_char *) val_str, val_len); 1204 if (nxt_slow_path(content_length < 0)) { 1205 nxt_unit_req_error(req, "failed to parse Content-Length " 1206 "value %.*s", (int) val_len, val_str); 1207 1208 return PyErr_Format(PyExc_ValueError, 1209 "Failed to parse Content-Length: '%.*s'", 1210 (int) val_len, val_str); 1211 } 1212 1213 ctx->content_length = content_length; 1214 } 1215 1216 Py_RETURN_NONE; 1217 } 1218 1219 1220 PyObject * 1221 nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, 1222 nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result) 1223 { 1224 PyObject *set_result, *res; 1225 1226 if (nxt_slow_path(result == NULL)) { 1227 Py_DECREF(future); 1228 1229 return NULL; 1230 } 1231 1232 set_result = PyObject_GetAttrString(future, "set_result"); 1233 if (nxt_slow_path(set_result == NULL)) { 1234 nxt_unit_req_alert(req, "failed to get 'set_result' for future"); 1235 1236 Py_CLEAR(future); 1237 1238 goto cleanup_result; 1239 } 1240 1241 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { 1242 nxt_unit_req_alert(req, "'future.set_result' is not a callable"); 1243 1244 Py_CLEAR(future); 1245 1246 goto cleanup; 1247 } 1248 1249 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result, 1250 result, NULL); 1251 if (nxt_slow_path(res == NULL)) { 1252 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); 1253 nxt_python_print_exception(); 1254 1255 Py_CLEAR(future); 1256 } 1257 1258 Py_XDECREF(res); 1259 1260 cleanup: 1261 1262 Py_DECREF(set_result); 1263 1264 cleanup_result: 1265 1266 Py_DECREF(result); 1267 1268 return future; 1269 } 1270 1271 1272 PyObject * 1273 nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) 1274 { 1275 PyObject *msg; 1276 1277 msg = PyDict_New(); 1278 if (nxt_slow_path(msg == NULL)) { 1279 nxt_unit_req_alert(req, "Python failed to create message dict"); 1280 nxt_python_print_exception(); 1281 1282 return PyErr_Format(PyExc_RuntimeError, 1283 "failed to create message dict"); 1284 } 1285 1286 if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { 1287 nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); 1288 1289 Py_DECREF(msg); 1290 1291 return PyErr_Format(PyExc_RuntimeError, 1292 "failed to set 'msg.type' item"); 1293 } 1294 1295 return msg; 1296 } 1297 1298 1299 PyObject * 1300 nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, 1301 PyObject *spec_version) 1302 { 1303 PyObject *scope, *asgi; 1304 1305 scope = PyDict_New(); 1306 if (nxt_slow_path(scope == NULL)) { 1307 nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); 1308 nxt_python_print_exception(); 1309 1310 return PyErr_Format(PyExc_RuntimeError, 1311 "failed to create 'scope' dict"); 1312 } 1313 1314 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { 1315 nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); 1316 1317 Py_DECREF(scope); 1318 1319 return PyErr_Format(PyExc_RuntimeError, 1320 "failed to set 'scope.type' item"); 1321 } 1322 1323 asgi = PyDict_New(); 1324 if (nxt_slow_path(asgi == NULL)) { 1325 nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); 1326 nxt_python_print_exception(); 1327 1328 Py_DECREF(scope); 1329 1330 return PyErr_Format(PyExc_RuntimeError, 1331 "failed to create 'asgi' dict"); 1332 } 1333 1334 if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { 1335 nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); 1336 1337 Py_DECREF(asgi); 1338 Py_DECREF(scope); 1339 1340 return PyErr_Format(PyExc_RuntimeError, 1341 "failed to set 'scope.asgi' item"); 1342 } 1343 1344 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, 1345 nxt_py_3_0_str) == -1)) 1346 { 1347 nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); 1348 1349 Py_DECREF(asgi); 1350 Py_DECREF(scope); 1351 1352 return PyErr_Format(PyExc_RuntimeError, 1353 "failed to set 'asgi.version' item"); 1354 } 1355 1356 if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, 1357 spec_version) == -1)) 1358 { 1359 nxt_unit_req_alert(req, 1360 "Python failed to set 'asgi.spec_version' item"); 1361 1362 Py_DECREF(asgi); 1363 Py_DECREF(scope); 1364 1365 return PyErr_Format(PyExc_RuntimeError, 1366 "failed to set 'asgi.spec_version' item"); 1367 } 1368 1369 Py_DECREF(asgi); 1370 1371 return scope; 1372 } 1373 1374 1375 void 1376 nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link) 1377 { 1378 nxt_py_asgi_ctx_data_t *ctx_data; 1379 1380 ctx_data = req->ctx->data; 1381 1382 nxt_queue_insert_tail(&ctx_data->drain_queue, link); 1383 } 1384 1385 1386 void 1387 nxt_py_asgi_dealloc(PyObject *self) 1388 { 1389 PyObject_Del(self); 1390 } 1391 1392 1393 PyObject * 1394 nxt_py_asgi_await(PyObject *self) 1395 { 1396 Py_INCREF(self); 1397 return self; 1398 } 1399 1400 1401 PyObject * 1402 nxt_py_asgi_iter(PyObject *self) 1403 { 1404 Py_INCREF(self); 1405 return self; 1406 } 1407 1408 1409 PyObject * 1410 nxt_py_asgi_next(PyObject *self) 1411 { 1412 return NULL; 1413 } 1414 1415 1416 static void 1417 nxt_python_asgi_done(void) 1418 { 1419 nxt_py_asgi_str_done(); 1420 1421 Py_XDECREF(nxt_py_port_read); 1422 } 1423 1424 #else /* !(NXT_HAVE_ASGI) */ 1425 1426 1427 int 1428 nxt_python_asgi_check(PyObject *obj) 1429 { 1430 return 0; 1431 } 1432 1433 1434 int 1435 nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) 1436 { 1437 nxt_unit_alert(NULL, "ASGI not implemented"); 1438 return NXT_UNIT_ERROR; 1439 } 1440 1441 1442 #endif /* NXT_HAVE_ASGI */ 1443