1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7#include <python/nxt_python.h> 8 9#if (NXT_HAVE_ASGI) 10 11#include <nxt_main.h> 12#include <nxt_unit.h> 13#include <nxt_unit_request.h> 14#include <python/nxt_python_asgi.h> 15#include <python/nxt_python_asgi_str.h> 16 17 18typedef struct { 19 PyObject_HEAD 20 nxt_unit_request_info_t *req; 21 nxt_queue_link_t link; 22 PyObject *receive_future; 23 PyObject *send_future; 24 uint64_t content_length; 25 uint64_t bytes_sent; 26 PyObject *send_body; 27 Py_ssize_t send_body_off; 28 uint8_t complete; 29 uint8_t closed; 30 uint8_t empty_body_received; 31} nxt_py_asgi_http_t; 32 33 34static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none); 35static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http); 36static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict); 37static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, 38 PyObject *dict); 39static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, 40 PyObject *dict); 41static void nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http); 42static void nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http, 43 PyObject *future, PyObject *msg); 44static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future); 45 46 47static PyMethodDef nxt_py_asgi_http_methods[] = { 48 { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 }, 49 { "send", nxt_py_asgi_http_send, METH_O, 0 }, 50 { "_done", nxt_py_asgi_http_done, METH_O, 0 }, 51 { NULL, NULL, 0, 0 } 52}; 53 54static PyAsyncMethods nxt_py_asgi_async_methods = { 55 .am_await = nxt_py_asgi_await, 56}; 57 58static PyTypeObject nxt_py_asgi_http_type = { 59 PyVarObject_HEAD_INIT(NULL, 0) 60 61 .tp_name = "unit._asgi_http", 62 .tp_basicsize = sizeof(nxt_py_asgi_http_t), 63 .tp_dealloc = nxt_py_asgi_dealloc, 64 .tp_as_async = &nxt_py_asgi_async_methods, 65 .tp_flags = Py_TPFLAGS_DEFAULT, 66 .tp_doc = "unit ASGI HTTP request object", 67 .tp_iter = nxt_py_asgi_iter, 68 .tp_iternext = nxt_py_asgi_next, 69 .tp_methods = nxt_py_asgi_http_methods, 70}; 71 72static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024; 73 74 75int 76nxt_py_asgi_http_init(void) 77{ 78 if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) { 79 nxt_unit_alert(NULL, 80 "Python failed to initialize the 'http' type object"); 81 return NXT_UNIT_ERROR; 82 } 83 84 return NXT_UNIT_OK; 85} 86 87 88PyObject * 89nxt_py_asgi_http_create(nxt_unit_request_info_t *req) 90{ 91 nxt_py_asgi_http_t *http; 92 93 http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type); 94 95 if (nxt_fast_path(http != NULL)) { 96 http->req = req; 97 http->receive_future = NULL; 98 http->send_future = NULL; 99 http->content_length = -1; 100 http->bytes_sent = 0; 101 http->send_body = NULL; 102 http->send_body_off = 0; 103 http->complete = 0; 104 http->closed = 0; 105 http->empty_body_received = 0; 106 } 107 108 return (PyObject *) http; 109} 110 111 112static PyObject * 113nxt_py_asgi_http_receive(PyObject *self, PyObject *none) 114{ 115 PyObject *msg, *future; 116 nxt_py_asgi_http_t *http; 117 nxt_py_asgi_ctx_data_t *ctx_data; 118 nxt_unit_request_info_t *req; 119 120 http = (nxt_py_asgi_http_t *) self; 121 req = http->req; 122 123 nxt_unit_req_debug(req, "asgi_http_receive"); 124 125 if (nxt_slow_path(http->closed || http->complete )) { 126 msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str); 127 128 } else { 129 msg = nxt_py_asgi_http_read_msg(http); 130 } 131 132 if (nxt_slow_path(msg == NULL)) { 133 return NULL; 134 } 135 136 ctx_data = req->ctx->data; 137 138 future = PyObject_CallObject(ctx_data->loop_create_future, NULL); 139 if (nxt_slow_path(future == NULL)) { 140 nxt_unit_req_alert(req, "Python failed to create Future object"); 141 nxt_python_print_exception(); 142 143 Py_DECREF(msg); 144 145 return PyErr_Format(PyExc_RuntimeError, 146 "failed to create Future object"); 147 } 148 149 if (msg != Py_None) { 150 return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg); 151 } 152 153 http->receive_future = future; 154 Py_INCREF(http->receive_future); 155 156 Py_DECREF(msg); 157 158 return future; 159} 160 161 162static PyObject * 163nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http) 164{ 165 char *body_buf; 166 ssize_t read_res; 167 PyObject *msg, *body; 168 Py_ssize_t size; 169 nxt_unit_request_info_t *req; 170 171 req = http->req; 172 173 size = req->content_length; 174 175 if (size > nxt_py_asgi_http_body_buf_size) { 176 size = nxt_py_asgi_http_body_buf_size; 177 } 178 179 if (size == 0) { 180 if (http->empty_body_received) { 181 Py_RETURN_NONE; 182 } 183 184 http->empty_body_received = 1; 185 } 186 187 if (size > 0) { 188 body = PyBytes_FromStringAndSize(NULL, size); 189 if (nxt_slow_path(body == NULL)) { 190 nxt_unit_req_alert(req, "Python failed to create body byte string"); 191 nxt_python_print_exception(); 192 193 return PyErr_Format(PyExc_RuntimeError, 194 "failed to create Bytes object"); 195 } 196 197 body_buf = PyBytes_AS_STRING(body); 198 199 read_res = nxt_unit_request_read(req, body_buf, size); 200 201 } else { 202 body = NULL; 203 read_res = 0; 204 } 205 206 if (read_res > 0 || read_res == size) { 207 msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str); 208 if (nxt_slow_path(msg == NULL)) { 209 Py_XDECREF(body); 210 211 return NULL; 212 } 213 214#define SET_ITEM(dict, key, value) \ 215 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 216 == -1)) \ 217 { \ 218 nxt_unit_req_alert(req, \ 219 "Python failed to set '" #dict "." #key "' item"); \ 220 PyErr_SetString(PyExc_RuntimeError, \ 221 "Python failed to set '" #dict "." #key "' item"); \ 222 goto fail; \ 223 } 224 225 if (body != NULL) { 226 SET_ITEM(msg, body, body) 227 } 228 229 if (req->content_length > 0) { 230 SET_ITEM(msg, more_body, Py_True) 231 } 232 233#undef SET_ITEM 234 235 Py_XDECREF(body); 236 237 return msg; 238 } 239 240 Py_XDECREF(body); 241 242 Py_RETURN_NONE; 243 244fail: 245 246 Py_DECREF(msg); 247 Py_XDECREF(body); 248 249 return NULL; 250} 251 252 253static PyObject * 254nxt_py_asgi_http_send(PyObject *self, PyObject *dict) 255{ 256 PyObject *type; 257 const char *type_str; 258 Py_ssize_t type_len; 259 nxt_py_asgi_http_t *http; 260 261 static const nxt_str_t response_start = nxt_string("http.response.start"); 262 static const nxt_str_t response_body = nxt_string("http.response.body"); 263 264 http = (nxt_py_asgi_http_t *) self; 265 266 type = PyDict_GetItem(dict, nxt_py_type_str); 267 if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { 268 nxt_unit_req_error(http->req, "asgi_http_send: " 269 "'type' is not a unicode string"); 270 return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string"); 271 } 272 273 type_str = PyUnicode_AsUTF8AndSize(type, &type_len); 274 275 nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'", 276 (int) type_len, type_str); 277 278 if (nxt_unit_response_is_init(http->req)) { 279 if (nxt_str_eq(&response_body, type_str, (size_t) type_len)) { 280 return nxt_py_asgi_http_response_body(http, dict); 281 } 282 283 return PyErr_Format(PyExc_RuntimeError, 284 "Expected ASGI message 'http.response.body', " 285 "but got '%U'", type); 286 } 287 288 if (nxt_str_eq(&response_start, type_str, (size_t) type_len)) { 289 return nxt_py_asgi_http_response_start(http, dict); 290 } 291 292 return PyErr_Format(PyExc_RuntimeError, 293 "Expected ASGI message 'http.response.start', " 294 "but got '%U'", type); 295} 296 297 298static PyObject * 299nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict) 300{ 301 int rc; 302 PyObject *status, *headers, *res; 303 nxt_py_asgi_calc_size_ctx_t calc_size_ctx; 304 nxt_py_asgi_add_field_ctx_t add_field_ctx; 305 306 status = PyDict_GetItem(dict, nxt_py_status_str); 307 if (nxt_slow_path(status == NULL || !PyLong_Check(status))) { 308 nxt_unit_req_error(http->req, "asgi_http_response_start: " 309 "'status' is not an integer"); 310 return PyErr_Format(PyExc_TypeError, "'status' is not an integer"); 311 } 312 313 calc_size_ctx.fields_size = 0; 314 calc_size_ctx.fields_count = 0; 315 316 headers = PyDict_GetItem(dict, nxt_py_headers_str); 317 if (headers != NULL) { 318 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, 319 &calc_size_ctx); 320 if (nxt_slow_path(res == NULL)) { 321 return NULL; 322 } 323 324 Py_DECREF(res); 325 } 326 327 rc = nxt_unit_response_init(http->req, PyLong_AsLong(status), 328 calc_size_ctx.fields_count, 329 calc_size_ctx.fields_size); 330 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 331 return PyErr_Format(PyExc_RuntimeError, 332 "failed to allocate response object"); 333 } 334 335 add_field_ctx.req = http->req; 336 add_field_ctx.content_length = -1; 337 338 if (headers != NULL) { 339 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, 340 &add_field_ctx); 341 if (nxt_slow_path(res == NULL)) { 342 return NULL; 343 } 344 345 Py_DECREF(res); 346 } 347 348 http->content_length = add_field_ctx.content_length; 349 350 Py_INCREF(http); 351 return (PyObject *) http; 352} 353 354 355static PyObject * 356nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) 357{ 358 int rc; 359 char *body_str; 360 ssize_t sent; 361 PyObject *body, *more_body, *future; 362 Py_ssize_t body_len, body_off; 363 nxt_py_asgi_ctx_data_t *ctx_data; 364 365 body = PyDict_GetItem(dict, nxt_py_body_str); 366 if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) { 367 return PyErr_Format(PyExc_TypeError, "'body' is not a byte string"); 368 } 369 370 more_body = PyDict_GetItem(dict, nxt_py_more_body_str); 371 if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) { 372 return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool"); 373 } 374 375 if (nxt_slow_path(http->complete)) { 376 return PyErr_Format(PyExc_RuntimeError, 377 "Unexpected ASGI message 'http.response.body' " 378 "sent, after response already completed"); 379 } 380 381 if (nxt_slow_path(http->send_future != NULL)) { 382 return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); 383 } 384 385 if (body != NULL) { 386 body_str = PyBytes_AS_STRING(body); 387 body_len = PyBytes_GET_SIZE(body); 388 389 nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d", 390 (int) body_len, (more_body == Py_True) ); 391 392 if (nxt_slow_path(http->bytes_sent + body_len 393 > http->content_length)) 394 { 395 return PyErr_Format(PyExc_RuntimeError, 396 "Response content longer than Content-Length"); 397 } 398 399 body_off = 0; 400 401 ctx_data = http->req->ctx->data; 402 403 while (body_len > 0) { 404 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); 405 if (nxt_slow_path(sent < 0)) { 406 return PyErr_Format(PyExc_RuntimeError, "failed to send body"); 407 } 408 409 if (nxt_slow_path(sent == 0)) { 410 nxt_unit_req_debug(http->req, "asgi_http_response_body: " 411 "out of shared memory, %d", 412 (int) body_len); 413 414 future = PyObject_CallObject(ctx_data->loop_create_future, 415 NULL); 416 if (nxt_slow_path(future == NULL)) { 417 nxt_unit_req_alert(http->req, 418 "Python failed to create Future object"); 419 nxt_python_print_exception(); 420 421 return PyErr_Format(PyExc_RuntimeError, 422 "failed to create Future object"); 423 } 424 425 http->send_body = body; 426 Py_INCREF(http->send_body); 427 http->send_body_off = body_off; 428 429 nxt_py_asgi_drain_wait(http->req, &http->link); 430 431 http->send_future = future; 432 Py_INCREF(http->send_future); 433 434 return future; 435 } 436 437 body_str += sent; 438 body_len -= sent; 439 body_off += sent; 440 http->bytes_sent += sent; 441 } 442 443 } else { 444 nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d", 445 (more_body == Py_True) ); 446 447 if (!nxt_unit_response_is_sent(http->req)) { 448 rc = nxt_unit_response_send(http->req); 449 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 450 return PyErr_Format(PyExc_RuntimeError, 451 "failed to send response"); 452 } 453 } 454 } 455 456 if (more_body == NULL || more_body == Py_False) { 457 http->complete = 1; 458 459 nxt_py_asgi_http_emit_disconnect(http); 460 } 461 462 Py_INCREF(http); 463 return (PyObject *) http; 464} 465 466 467static void 468nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http) 469{ 470 PyObject *msg, *future; 471 472 if (http->receive_future == NULL) { 473 return; 474 } 475 476 msg = nxt_py_asgi_new_msg(http->req, nxt_py_http_disconnect_str); 477 if (nxt_slow_path(msg == NULL)) { 478 return; 479 } 480 481 if (msg == Py_None) { 482 Py_DECREF(msg); 483 return; 484 } 485 486 future = http->receive_future; 487 http->receive_future = NULL; 488 489 nxt_py_asgi_http_set_result(http, future, msg); 490 491 Py_DECREF(msg); 492} 493 494 495static void 496nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http, PyObject *future, 497 PyObject *msg) 498{ 499 PyObject *res; 500 501 res = PyObject_CallMethodObjArgs(future, nxt_py_done_str, NULL); 502 if (nxt_slow_path(res == NULL)) { 503 nxt_unit_req_alert(http->req, "'done' call failed"); 504 nxt_python_print_exception(); 505 } 506 507 if (nxt_fast_path(res == Py_False)) { 508 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, 509 NULL); 510 if (nxt_slow_path(res == NULL)) { 511 nxt_unit_req_alert(http->req, "'set_result' call failed"); 512 nxt_python_print_exception(); 513 } 514 515 } else { 516 res = NULL; 517 } 518 519 Py_XDECREF(res); 520 Py_DECREF(future); 521} 522 523 524void 525nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req) 526{ 527 PyObject *msg, *future; 528 nxt_py_asgi_http_t *http; 529 530 http = req->data; 531 532 nxt_unit_req_debug(req, "asgi_http_data_handler"); 533 534 if (http->receive_future == NULL) { 535 return; 536 } 537 538 msg = nxt_py_asgi_http_read_msg(http); 539 if (nxt_slow_path(msg == NULL)) { 540 return; 541 } 542 543 if (msg == Py_None) { 544 Py_DECREF(msg); 545 return; 546 } 547 548 future = http->receive_future; 549 http->receive_future = NULL; 550 551 nxt_py_asgi_http_set_result(http, future, msg); 552 553 Py_DECREF(msg); 554} 555 556 557int 558nxt_py_asgi_http_drain(nxt_queue_link_t *lnk) 559{ 560 char *body_str; 561 ssize_t sent; 562 PyObject *future, *exc, *res; 563 Py_ssize_t body_len; 564 nxt_py_asgi_http_t *http; 565 566 http = nxt_container_of(lnk, nxt_py_asgi_http_t, link); 567 568 body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off; 569 body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off; 570 571 nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len); 572 573 while (body_len > 0) { 574 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); 575 if (nxt_slow_path(sent < 0)) { 576 goto fail; 577 } 578 579 if (nxt_slow_path(sent == 0)) { 580 return NXT_UNIT_AGAIN; 581 } 582 583 body_str += sent; 584 body_len -= sent; 585 586 http->send_body_off += sent; 587 http->bytes_sent += sent; 588 } 589 590 Py_CLEAR(http->send_body); 591 592 future = http->send_future; 593 http->send_future = NULL; 594 595 nxt_py_asgi_http_set_result(http, future, Py_None); 596 597 return NXT_UNIT_OK; 598 599fail: 600 601 exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, 602 nxt_py_failed_to_send_body_str, 603 NULL); 604 if (nxt_slow_path(exc == NULL)) { 605 nxt_unit_req_alert(http->req, "RuntimeError create failed"); 606 nxt_python_print_exception(); 607 608 exc = Py_None; 609 Py_INCREF(exc); 610 } 611 612 future = http->send_future; 613 http->send_future = NULL; 614 615 res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, 616 NULL); 617 if (nxt_slow_path(res == NULL)) { 618 nxt_unit_req_alert(http->req, "'set_exception' call failed"); 619 nxt_python_print_exception(); 620 } 621 622 Py_XDECREF(res); 623 Py_DECREF(future); 624 Py_DECREF(exc); 625 626 return NXT_UNIT_ERROR; 627} 628 629 630void 631nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req) 632{ 633 nxt_py_asgi_http_t *http; 634 635 http = req->data; 636 637 nxt_unit_req_debug(req, "asgi_http_close_handler"); 638
| 1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7#include <python/nxt_python.h> 8 9#if (NXT_HAVE_ASGI) 10 11#include <nxt_main.h> 12#include <nxt_unit.h> 13#include <nxt_unit_request.h> 14#include <python/nxt_python_asgi.h> 15#include <python/nxt_python_asgi_str.h> 16 17 18typedef struct { 19 PyObject_HEAD 20 nxt_unit_request_info_t *req; 21 nxt_queue_link_t link; 22 PyObject *receive_future; 23 PyObject *send_future; 24 uint64_t content_length; 25 uint64_t bytes_sent; 26 PyObject *send_body; 27 Py_ssize_t send_body_off; 28 uint8_t complete; 29 uint8_t closed; 30 uint8_t empty_body_received; 31} nxt_py_asgi_http_t; 32 33 34static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none); 35static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http); 36static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict); 37static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, 38 PyObject *dict); 39static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, 40 PyObject *dict); 41static void nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http); 42static void nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http, 43 PyObject *future, PyObject *msg); 44static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future); 45 46 47static PyMethodDef nxt_py_asgi_http_methods[] = { 48 { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 }, 49 { "send", nxt_py_asgi_http_send, METH_O, 0 }, 50 { "_done", nxt_py_asgi_http_done, METH_O, 0 }, 51 { NULL, NULL, 0, 0 } 52}; 53 54static PyAsyncMethods nxt_py_asgi_async_methods = { 55 .am_await = nxt_py_asgi_await, 56}; 57 58static PyTypeObject nxt_py_asgi_http_type = { 59 PyVarObject_HEAD_INIT(NULL, 0) 60 61 .tp_name = "unit._asgi_http", 62 .tp_basicsize = sizeof(nxt_py_asgi_http_t), 63 .tp_dealloc = nxt_py_asgi_dealloc, 64 .tp_as_async = &nxt_py_asgi_async_methods, 65 .tp_flags = Py_TPFLAGS_DEFAULT, 66 .tp_doc = "unit ASGI HTTP request object", 67 .tp_iter = nxt_py_asgi_iter, 68 .tp_iternext = nxt_py_asgi_next, 69 .tp_methods = nxt_py_asgi_http_methods, 70}; 71 72static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024; 73 74 75int 76nxt_py_asgi_http_init(void) 77{ 78 if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) { 79 nxt_unit_alert(NULL, 80 "Python failed to initialize the 'http' type object"); 81 return NXT_UNIT_ERROR; 82 } 83 84 return NXT_UNIT_OK; 85} 86 87 88PyObject * 89nxt_py_asgi_http_create(nxt_unit_request_info_t *req) 90{ 91 nxt_py_asgi_http_t *http; 92 93 http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type); 94 95 if (nxt_fast_path(http != NULL)) { 96 http->req = req; 97 http->receive_future = NULL; 98 http->send_future = NULL; 99 http->content_length = -1; 100 http->bytes_sent = 0; 101 http->send_body = NULL; 102 http->send_body_off = 0; 103 http->complete = 0; 104 http->closed = 0; 105 http->empty_body_received = 0; 106 } 107 108 return (PyObject *) http; 109} 110 111 112static PyObject * 113nxt_py_asgi_http_receive(PyObject *self, PyObject *none) 114{ 115 PyObject *msg, *future; 116 nxt_py_asgi_http_t *http; 117 nxt_py_asgi_ctx_data_t *ctx_data; 118 nxt_unit_request_info_t *req; 119 120 http = (nxt_py_asgi_http_t *) self; 121 req = http->req; 122 123 nxt_unit_req_debug(req, "asgi_http_receive"); 124 125 if (nxt_slow_path(http->closed || http->complete )) { 126 msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str); 127 128 } else { 129 msg = nxt_py_asgi_http_read_msg(http); 130 } 131 132 if (nxt_slow_path(msg == NULL)) { 133 return NULL; 134 } 135 136 ctx_data = req->ctx->data; 137 138 future = PyObject_CallObject(ctx_data->loop_create_future, NULL); 139 if (nxt_slow_path(future == NULL)) { 140 nxt_unit_req_alert(req, "Python failed to create Future object"); 141 nxt_python_print_exception(); 142 143 Py_DECREF(msg); 144 145 return PyErr_Format(PyExc_RuntimeError, 146 "failed to create Future object"); 147 } 148 149 if (msg != Py_None) { 150 return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg); 151 } 152 153 http->receive_future = future; 154 Py_INCREF(http->receive_future); 155 156 Py_DECREF(msg); 157 158 return future; 159} 160 161 162static PyObject * 163nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http) 164{ 165 char *body_buf; 166 ssize_t read_res; 167 PyObject *msg, *body; 168 Py_ssize_t size; 169 nxt_unit_request_info_t *req; 170 171 req = http->req; 172 173 size = req->content_length; 174 175 if (size > nxt_py_asgi_http_body_buf_size) { 176 size = nxt_py_asgi_http_body_buf_size; 177 } 178 179 if (size == 0) { 180 if (http->empty_body_received) { 181 Py_RETURN_NONE; 182 } 183 184 http->empty_body_received = 1; 185 } 186 187 if (size > 0) { 188 body = PyBytes_FromStringAndSize(NULL, size); 189 if (nxt_slow_path(body == NULL)) { 190 nxt_unit_req_alert(req, "Python failed to create body byte string"); 191 nxt_python_print_exception(); 192 193 return PyErr_Format(PyExc_RuntimeError, 194 "failed to create Bytes object"); 195 } 196 197 body_buf = PyBytes_AS_STRING(body); 198 199 read_res = nxt_unit_request_read(req, body_buf, size); 200 201 } else { 202 body = NULL; 203 read_res = 0; 204 } 205 206 if (read_res > 0 || read_res == size) { 207 msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str); 208 if (nxt_slow_path(msg == NULL)) { 209 Py_XDECREF(body); 210 211 return NULL; 212 } 213 214#define SET_ITEM(dict, key, value) \ 215 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 216 == -1)) \ 217 { \ 218 nxt_unit_req_alert(req, \ 219 "Python failed to set '" #dict "." #key "' item"); \ 220 PyErr_SetString(PyExc_RuntimeError, \ 221 "Python failed to set '" #dict "." #key "' item"); \ 222 goto fail; \ 223 } 224 225 if (body != NULL) { 226 SET_ITEM(msg, body, body) 227 } 228 229 if (req->content_length > 0) { 230 SET_ITEM(msg, more_body, Py_True) 231 } 232 233#undef SET_ITEM 234 235 Py_XDECREF(body); 236 237 return msg; 238 } 239 240 Py_XDECREF(body); 241 242 Py_RETURN_NONE; 243 244fail: 245 246 Py_DECREF(msg); 247 Py_XDECREF(body); 248 249 return NULL; 250} 251 252 253static PyObject * 254nxt_py_asgi_http_send(PyObject *self, PyObject *dict) 255{ 256 PyObject *type; 257 const char *type_str; 258 Py_ssize_t type_len; 259 nxt_py_asgi_http_t *http; 260 261 static const nxt_str_t response_start = nxt_string("http.response.start"); 262 static const nxt_str_t response_body = nxt_string("http.response.body"); 263 264 http = (nxt_py_asgi_http_t *) self; 265 266 type = PyDict_GetItem(dict, nxt_py_type_str); 267 if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { 268 nxt_unit_req_error(http->req, "asgi_http_send: " 269 "'type' is not a unicode string"); 270 return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string"); 271 } 272 273 type_str = PyUnicode_AsUTF8AndSize(type, &type_len); 274 275 nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'", 276 (int) type_len, type_str); 277 278 if (nxt_unit_response_is_init(http->req)) { 279 if (nxt_str_eq(&response_body, type_str, (size_t) type_len)) { 280 return nxt_py_asgi_http_response_body(http, dict); 281 } 282 283 return PyErr_Format(PyExc_RuntimeError, 284 "Expected ASGI message 'http.response.body', " 285 "but got '%U'", type); 286 } 287 288 if (nxt_str_eq(&response_start, type_str, (size_t) type_len)) { 289 return nxt_py_asgi_http_response_start(http, dict); 290 } 291 292 return PyErr_Format(PyExc_RuntimeError, 293 "Expected ASGI message 'http.response.start', " 294 "but got '%U'", type); 295} 296 297 298static PyObject * 299nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict) 300{ 301 int rc; 302 PyObject *status, *headers, *res; 303 nxt_py_asgi_calc_size_ctx_t calc_size_ctx; 304 nxt_py_asgi_add_field_ctx_t add_field_ctx; 305 306 status = PyDict_GetItem(dict, nxt_py_status_str); 307 if (nxt_slow_path(status == NULL || !PyLong_Check(status))) { 308 nxt_unit_req_error(http->req, "asgi_http_response_start: " 309 "'status' is not an integer"); 310 return PyErr_Format(PyExc_TypeError, "'status' is not an integer"); 311 } 312 313 calc_size_ctx.fields_size = 0; 314 calc_size_ctx.fields_count = 0; 315 316 headers = PyDict_GetItem(dict, nxt_py_headers_str); 317 if (headers != NULL) { 318 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, 319 &calc_size_ctx); 320 if (nxt_slow_path(res == NULL)) { 321 return NULL; 322 } 323 324 Py_DECREF(res); 325 } 326 327 rc = nxt_unit_response_init(http->req, PyLong_AsLong(status), 328 calc_size_ctx.fields_count, 329 calc_size_ctx.fields_size); 330 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 331 return PyErr_Format(PyExc_RuntimeError, 332 "failed to allocate response object"); 333 } 334 335 add_field_ctx.req = http->req; 336 add_field_ctx.content_length = -1; 337 338 if (headers != NULL) { 339 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, 340 &add_field_ctx); 341 if (nxt_slow_path(res == NULL)) { 342 return NULL; 343 } 344 345 Py_DECREF(res); 346 } 347 348 http->content_length = add_field_ctx.content_length; 349 350 Py_INCREF(http); 351 return (PyObject *) http; 352} 353 354 355static PyObject * 356nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) 357{ 358 int rc; 359 char *body_str; 360 ssize_t sent; 361 PyObject *body, *more_body, *future; 362 Py_ssize_t body_len, body_off; 363 nxt_py_asgi_ctx_data_t *ctx_data; 364 365 body = PyDict_GetItem(dict, nxt_py_body_str); 366 if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) { 367 return PyErr_Format(PyExc_TypeError, "'body' is not a byte string"); 368 } 369 370 more_body = PyDict_GetItem(dict, nxt_py_more_body_str); 371 if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) { 372 return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool"); 373 } 374 375 if (nxt_slow_path(http->complete)) { 376 return PyErr_Format(PyExc_RuntimeError, 377 "Unexpected ASGI message 'http.response.body' " 378 "sent, after response already completed"); 379 } 380 381 if (nxt_slow_path(http->send_future != NULL)) { 382 return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); 383 } 384 385 if (body != NULL) { 386 body_str = PyBytes_AS_STRING(body); 387 body_len = PyBytes_GET_SIZE(body); 388 389 nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d", 390 (int) body_len, (more_body == Py_True) ); 391 392 if (nxt_slow_path(http->bytes_sent + body_len 393 > http->content_length)) 394 { 395 return PyErr_Format(PyExc_RuntimeError, 396 "Response content longer than Content-Length"); 397 } 398 399 body_off = 0; 400 401 ctx_data = http->req->ctx->data; 402 403 while (body_len > 0) { 404 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); 405 if (nxt_slow_path(sent < 0)) { 406 return PyErr_Format(PyExc_RuntimeError, "failed to send body"); 407 } 408 409 if (nxt_slow_path(sent == 0)) { 410 nxt_unit_req_debug(http->req, "asgi_http_response_body: " 411 "out of shared memory, %d", 412 (int) body_len); 413 414 future = PyObject_CallObject(ctx_data->loop_create_future, 415 NULL); 416 if (nxt_slow_path(future == NULL)) { 417 nxt_unit_req_alert(http->req, 418 "Python failed to create Future object"); 419 nxt_python_print_exception(); 420 421 return PyErr_Format(PyExc_RuntimeError, 422 "failed to create Future object"); 423 } 424 425 http->send_body = body; 426 Py_INCREF(http->send_body); 427 http->send_body_off = body_off; 428 429 nxt_py_asgi_drain_wait(http->req, &http->link); 430 431 http->send_future = future; 432 Py_INCREF(http->send_future); 433 434 return future; 435 } 436 437 body_str += sent; 438 body_len -= sent; 439 body_off += sent; 440 http->bytes_sent += sent; 441 } 442 443 } else { 444 nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d", 445 (more_body == Py_True) ); 446 447 if (!nxt_unit_response_is_sent(http->req)) { 448 rc = nxt_unit_response_send(http->req); 449 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 450 return PyErr_Format(PyExc_RuntimeError, 451 "failed to send response"); 452 } 453 } 454 } 455 456 if (more_body == NULL || more_body == Py_False) { 457 http->complete = 1; 458 459 nxt_py_asgi_http_emit_disconnect(http); 460 } 461 462 Py_INCREF(http); 463 return (PyObject *) http; 464} 465 466 467static void 468nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http) 469{ 470 PyObject *msg, *future; 471 472 if (http->receive_future == NULL) { 473 return; 474 } 475 476 msg = nxt_py_asgi_new_msg(http->req, nxt_py_http_disconnect_str); 477 if (nxt_slow_path(msg == NULL)) { 478 return; 479 } 480 481 if (msg == Py_None) { 482 Py_DECREF(msg); 483 return; 484 } 485 486 future = http->receive_future; 487 http->receive_future = NULL; 488 489 nxt_py_asgi_http_set_result(http, future, msg); 490 491 Py_DECREF(msg); 492} 493 494 495static void 496nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http, PyObject *future, 497 PyObject *msg) 498{ 499 PyObject *res; 500 501 res = PyObject_CallMethodObjArgs(future, nxt_py_done_str, NULL); 502 if (nxt_slow_path(res == NULL)) { 503 nxt_unit_req_alert(http->req, "'done' call failed"); 504 nxt_python_print_exception(); 505 } 506 507 if (nxt_fast_path(res == Py_False)) { 508 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, 509 NULL); 510 if (nxt_slow_path(res == NULL)) { 511 nxt_unit_req_alert(http->req, "'set_result' call failed"); 512 nxt_python_print_exception(); 513 } 514 515 } else { 516 res = NULL; 517 } 518 519 Py_XDECREF(res); 520 Py_DECREF(future); 521} 522 523 524void 525nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req) 526{ 527 PyObject *msg, *future; 528 nxt_py_asgi_http_t *http; 529 530 http = req->data; 531 532 nxt_unit_req_debug(req, "asgi_http_data_handler"); 533 534 if (http->receive_future == NULL) { 535 return; 536 } 537 538 msg = nxt_py_asgi_http_read_msg(http); 539 if (nxt_slow_path(msg == NULL)) { 540 return; 541 } 542 543 if (msg == Py_None) { 544 Py_DECREF(msg); 545 return; 546 } 547 548 future = http->receive_future; 549 http->receive_future = NULL; 550 551 nxt_py_asgi_http_set_result(http, future, msg); 552 553 Py_DECREF(msg); 554} 555 556 557int 558nxt_py_asgi_http_drain(nxt_queue_link_t *lnk) 559{ 560 char *body_str; 561 ssize_t sent; 562 PyObject *future, *exc, *res; 563 Py_ssize_t body_len; 564 nxt_py_asgi_http_t *http; 565 566 http = nxt_container_of(lnk, nxt_py_asgi_http_t, link); 567 568 body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off; 569 body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off; 570 571 nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len); 572 573 while (body_len > 0) { 574 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); 575 if (nxt_slow_path(sent < 0)) { 576 goto fail; 577 } 578 579 if (nxt_slow_path(sent == 0)) { 580 return NXT_UNIT_AGAIN; 581 } 582 583 body_str += sent; 584 body_len -= sent; 585 586 http->send_body_off += sent; 587 http->bytes_sent += sent; 588 } 589 590 Py_CLEAR(http->send_body); 591 592 future = http->send_future; 593 http->send_future = NULL; 594 595 nxt_py_asgi_http_set_result(http, future, Py_None); 596 597 return NXT_UNIT_OK; 598 599fail: 600 601 exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, 602 nxt_py_failed_to_send_body_str, 603 NULL); 604 if (nxt_slow_path(exc == NULL)) { 605 nxt_unit_req_alert(http->req, "RuntimeError create failed"); 606 nxt_python_print_exception(); 607 608 exc = Py_None; 609 Py_INCREF(exc); 610 } 611 612 future = http->send_future; 613 http->send_future = NULL; 614 615 res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, 616 NULL); 617 if (nxt_slow_path(res == NULL)) { 618 nxt_unit_req_alert(http->req, "'set_exception' call failed"); 619 nxt_python_print_exception(); 620 } 621 622 Py_XDECREF(res); 623 Py_DECREF(future); 624 Py_DECREF(exc); 625 626 return NXT_UNIT_ERROR; 627} 628 629 630void 631nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req) 632{ 633 nxt_py_asgi_http_t *http; 634 635 http = req->data; 636 637 nxt_unit_req_debug(req, "asgi_http_close_handler"); 638
|