1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7 #include <python/nxt_python.h> 8 9 #if (NXT_HAVE_ASGI) 10 11 #include <nxt_main.h> 12 #include <nxt_unit.h> 13 #include <nxt_unit_request.h> 14 #include <python/nxt_python_asgi.h> 15 #include <python/nxt_python_asgi_str.h> 16 17 18 typedef struct { 19 PyObject_HEAD 20 nxt_unit_request_info_t *req; 21 nxt_queue_link_t link; 22 PyObject *receive_future; 23 PyObject *send_future; 24 uint64_t content_length; 25 uint64_t bytes_sent; 26 int complete; 27 int closed; 28 PyObject *send_body; 29 Py_ssize_t send_body_off; 30 } nxt_py_asgi_http_t; 31 32 33 static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none); 34 static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http); 35 static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict); 36 static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, 37 PyObject *dict); 38 static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, 39 PyObject *dict); 40 static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future); 41 42 43 static PyMethodDef nxt_py_asgi_http_methods[] = { 44 { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 }, 45 { "send", nxt_py_asgi_http_send, METH_O, 0 }, 46 { "_done", nxt_py_asgi_http_done, METH_O, 0 }, 47 { NULL, NULL, 0, 0 } 48 }; 49 50 static PyAsyncMethods nxt_py_asgi_async_methods = { 51 .am_await = nxt_py_asgi_await, 52 }; 53 54 static PyTypeObject nxt_py_asgi_http_type = { 55 PyVarObject_HEAD_INIT(NULL, 0) 56 57 .tp_name = "unit._asgi_http", 58 .tp_basicsize = sizeof(nxt_py_asgi_http_t), 59 .tp_dealloc = nxt_py_asgi_dealloc, 60 .tp_as_async = &nxt_py_asgi_async_methods, 61 .tp_flags = Py_TPFLAGS_DEFAULT, 62 .tp_doc = "unit ASGI HTTP request object", 63 .tp_iter = nxt_py_asgi_iter, 64 .tp_iternext = nxt_py_asgi_next, 65 .tp_methods = nxt_py_asgi_http_methods, 66 }; 67 68 static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024; 69 70 71 int 72 nxt_py_asgi_http_init(void) 73 { 74 if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) { 75 nxt_unit_alert(NULL, 76 "Python failed to initialize the 'http' type object"); 77 return NXT_UNIT_ERROR; 78 } 79 80 return NXT_UNIT_OK; 81 } 82 83 84 PyObject * 85 nxt_py_asgi_http_create(nxt_unit_request_info_t *req) 86 { 87 nxt_py_asgi_http_t *http; 88 89 http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type); 90 91 if (nxt_fast_path(http != NULL)) { 92 http->req = req; 93 http->receive_future = NULL; 94 http->send_future = NULL; 95 http->content_length = -1; 96 http->bytes_sent = 0; 97 http->complete = 0; 98 http->closed = 0; 99 http->send_body = NULL; 100 http->send_body_off = 0; 101 } 102 103 return (PyObject *) http; 104 } 105 106 107 static PyObject * 108 nxt_py_asgi_http_receive(PyObject *self, PyObject *none) 109 { 110 PyObject *msg, *future; 111 nxt_py_asgi_http_t *http; 112 nxt_py_asgi_ctx_data_t *ctx_data; 113 nxt_unit_request_info_t *req; 114 115 http = (nxt_py_asgi_http_t *) self; 116 req = http->req; 117 118 nxt_unit_req_debug(req, "asgi_http_receive"); 119 120 if (nxt_slow_path(http->closed || nxt_unit_response_is_sent(req))) { 121 msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str); 122 123 } else { 124 msg = nxt_py_asgi_http_read_msg(http); 125 } 126 127 if (nxt_slow_path(msg == NULL)) { 128 return NULL; 129 } 130 131 ctx_data = req->ctx->data; 132 133 future = PyObject_CallObject(ctx_data->loop_create_future, NULL); 134 if (nxt_slow_path(future == NULL)) { 135 nxt_unit_req_alert(req, "Python failed to create Future object"); 136 nxt_python_print_exception(); 137 138 Py_DECREF(msg); 139 140 return PyErr_Format(PyExc_RuntimeError, 141 "failed to create Future object"); 142 } 143 144 if (msg != Py_None) { 145 return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg); 146 } 147 148 http->receive_future = future; 149 Py_INCREF(http->receive_future); 150 151 Py_DECREF(msg); 152 153 return future; 154 } 155 156 157 static PyObject * 158 nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http) 159 { 160 char *body_buf; 161 ssize_t read_res; 162 PyObject *msg, *body; 163 Py_ssize_t size; 164 nxt_unit_request_info_t *req; 165 166 req = http->req; 167 168 size = req->content_length; 169 170 if (size > nxt_py_asgi_http_body_buf_size) { 171 size = nxt_py_asgi_http_body_buf_size; 172 } 173 174 if (size > 0) { 175 body = PyBytes_FromStringAndSize(NULL, size); 176 if (nxt_slow_path(body == NULL)) { 177 nxt_unit_req_alert(req, "Python failed to create body byte string"); 178 nxt_python_print_exception(); 179 180 return PyErr_Format(PyExc_RuntimeError, 181 "failed to create Bytes object"); 182 } 183 184 body_buf = PyBytes_AS_STRING(body); 185 186 read_res = nxt_unit_request_read(req, body_buf, size); 187 188 } else { 189 body = NULL; 190 read_res = 0; 191 } 192 193 if (read_res > 0 || read_res == size) { 194 msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str); 195 if (nxt_slow_path(msg == NULL)) { 196 Py_XDECREF(body); 197 198 return NULL; 199 } 200 201 #define SET_ITEM(dict, key, value) \ 202 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 203 == -1)) \ 204 { \ 205 nxt_unit_req_alert(req, \ 206 "Python failed to set '" #dict "." #key "' item"); \ 207 PyErr_SetString(PyExc_RuntimeError, \ 208 "Python failed to set '" #dict "." #key "' item"); \ 209 goto fail; \ 210 } 211 212 if (body != NULL) { 213 SET_ITEM(msg, body, body) 214 } 215 216 if (req->content_length > 0) { 217 SET_ITEM(msg, more_body, Py_True) 218 } 219 220 #undef SET_ITEM 221 222 Py_XDECREF(body); 223 224 return msg; 225 } 226 227 Py_XDECREF(body); 228 229 Py_RETURN_NONE; 230 231 fail: 232 233 Py_DECREF(msg); 234 Py_XDECREF(body); 235 236 return NULL; 237 } 238 239 240 static PyObject * 241 nxt_py_asgi_http_send(PyObject *self, PyObject *dict) 242 { 243 PyObject *type; 244 const char *type_str; 245 Py_ssize_t type_len; 246 nxt_py_asgi_http_t *http; 247 248 static const nxt_str_t response_start = nxt_string("http.response.start"); 249 static const nxt_str_t response_body = nxt_string("http.response.body"); 250 251 http = (nxt_py_asgi_http_t *) self; 252 253 type = PyDict_GetItem(dict, nxt_py_type_str); 254 if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { 255 nxt_unit_req_error(http->req, "asgi_http_send: " 256 "'type' is not a unicode string"); 257 return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string"); 258 } 259 260 type_str = PyUnicode_AsUTF8AndSize(type, &type_len); 261 262 nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'", 263 (int) type_len, type_str); 264 265 if (type_len == (Py_ssize_t) response_start.length 266 && memcmp(type_str, response_start.start, type_len) == 0) 267 { 268 return nxt_py_asgi_http_response_start(http, dict); 269 } 270 271 if (type_len == (Py_ssize_t) response_body.length 272 && memcmp(type_str, response_body.start, type_len) == 0) 273 { 274 return nxt_py_asgi_http_response_body(http, dict); 275 } 276 277 nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'", 278 (int) type_len, type_str); 279 280 return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type); 281 } 282 283 284 static PyObject * 285 nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict) 286 { 287 int rc; 288 PyObject *status, *headers, *res; 289 nxt_py_asgi_calc_size_ctx_t calc_size_ctx; 290 nxt_py_asgi_add_field_ctx_t add_field_ctx; 291 292 status = PyDict_GetItem(dict, nxt_py_status_str); 293 if (nxt_slow_path(status == NULL || !PyLong_Check(status))) { 294 nxt_unit_req_error(http->req, "asgi_http_response_start: " 295 "'status' is not an integer"); 296 return PyErr_Format(PyExc_TypeError, "'status' is not an integer"); 297 } 298 299 calc_size_ctx.fields_size = 0; 300 calc_size_ctx.fields_count = 0; 301 302 headers = PyDict_GetItem(dict, nxt_py_headers_str); 303 if (headers != NULL) { 304 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, 305 &calc_size_ctx); 306 if (nxt_slow_path(res == NULL)) { 307 return NULL; 308 } 309 310 Py_DECREF(res); 311 } 312 313 rc = nxt_unit_response_init(http->req, PyLong_AsLong(status), 314 calc_size_ctx.fields_count, 315 calc_size_ctx.fields_size); 316 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 317 return PyErr_Format(PyExc_RuntimeError, 318 "failed to allocate response object"); 319 } 320 321 add_field_ctx.req = http->req; 322 add_field_ctx.content_length = -1; 323 324 if (headers != NULL) { 325 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, 326 &add_field_ctx); 327 if (nxt_slow_path(res == NULL)) { 328 return NULL; 329 } 330 331 Py_DECREF(res); 332 } 333 334 http->content_length = add_field_ctx.content_length; 335 336 Py_INCREF(http); 337 return (PyObject *) http; 338 } 339 340 341 static PyObject * 342 nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) 343 { 344 int rc; 345 char *body_str; 346 ssize_t sent; 347 PyObject *body, *more_body, *future; 348 Py_ssize_t body_len, body_off; 349 nxt_py_asgi_ctx_data_t *ctx_data; 350 351 body = PyDict_GetItem(dict, nxt_py_body_str); 352 if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) { 353 return PyErr_Format(PyExc_TypeError, "'body' is not a byte string"); 354 } 355 356 more_body = PyDict_GetItem(dict, nxt_py_more_body_str); 357 if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) { 358 return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool"); 359 } 360 361 if (nxt_slow_path(http->complete)) { 362 return PyErr_Format(PyExc_RuntimeError, 363 "Unexpected ASGI message 'http.response.body' " 364 "sent, after response already completed"); 365 } 366 367 if (nxt_slow_path(http->send_future != NULL)) { 368 return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); 369 } 370 371 if (body != NULL) { 372 body_str = PyBytes_AS_STRING(body); 373 body_len = PyBytes_GET_SIZE(body); 374 375 nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d", 376 (int) body_len, (more_body == Py_True) ); 377 378 if (nxt_slow_path(http->bytes_sent + body_len 379 > http->content_length)) 380 { 381 return PyErr_Format(PyExc_RuntimeError, 382 "Response content longer than Content-Length"); 383 } 384 385 body_off = 0; 386 387 ctx_data = http->req->ctx->data; 388 389 while (body_len > 0) { 390 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); 391 if (nxt_slow_path(sent < 0)) { 392 return PyErr_Format(PyExc_RuntimeError, "failed to send body"); 393 } 394 395 if (nxt_slow_path(sent == 0)) { 396 nxt_unit_req_debug(http->req, "asgi_http_response_body: " 397 "out of shared memory, %d", 398 (int) body_len); 399 400 future = PyObject_CallObject(ctx_data->loop_create_future, 401 NULL); 402 if (nxt_slow_path(future == NULL)) { 403 nxt_unit_req_alert(http->req, 404 "Python failed to create Future object"); 405 nxt_python_print_exception(); 406 407 return PyErr_Format(PyExc_RuntimeError, 408 "failed to create Future object"); 409 } 410 411 http->send_body = body; 412 Py_INCREF(http->send_body); 413 http->send_body_off = body_off; 414 415 nxt_py_asgi_drain_wait(http->req, &http->link); 416 417 http->send_future = future; 418 Py_INCREF(http->send_future); 419 420 return future; 421 } 422 423 body_str += sent; 424 body_len -= sent; 425 body_off += sent; 426 http->bytes_sent += sent; 427 } 428 429 } else { 430 nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d", 431 (more_body == Py_True) ); 432 433 if (!nxt_unit_response_is_sent(http->req)) { 434 rc = nxt_unit_response_send(http->req); 435 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 436 return PyErr_Format(PyExc_RuntimeError, 437 "failed to send response"); 438 } 439 } 440 } 441 442 if (more_body == NULL || more_body == Py_False) { 443 http->complete = 1; 444 } 445 446 Py_INCREF(http); 447 return (PyObject *) http; 448 } 449 450 451 void 452 nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req) 453 { 454 PyObject *msg, *future, *res; 455 nxt_py_asgi_http_t *http; 456 457 http = req->data; 458 459 nxt_unit_req_debug(req, "asgi_http_data_handler"); 460 461 if (http->receive_future == NULL) { 462 return; 463 } 464 465 msg = nxt_py_asgi_http_read_msg(http); 466 if (nxt_slow_path(msg == NULL)) { 467 return; 468 } 469 470 if (msg == Py_None) { 471 Py_DECREF(msg); 472 return; 473 } 474 475 future = http->receive_future; 476 http->receive_future = NULL; 477 478 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); 479 if (nxt_slow_path(res == NULL)) { 480 nxt_unit_req_alert(req, "'set_result' call failed"); 481 nxt_python_print_exception(); 482 } 483 484 Py_XDECREF(res); 485 Py_DECREF(future); 486 487 Py_DECREF(msg); 488 } 489 490 491 int 492 nxt_py_asgi_http_drain(nxt_queue_link_t *lnk) 493 { 494 char *body_str; 495 ssize_t sent; 496 PyObject *future, *exc, *res; 497 Py_ssize_t body_len; 498 nxt_py_asgi_http_t *http; 499 500 http = nxt_container_of(lnk, nxt_py_asgi_http_t, link); 501 502 body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off; 503 body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off; 504 505 nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len); 506 507 while (body_len > 0) { 508 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); 509 if (nxt_slow_path(sent < 0)) { 510 goto fail; 511 } 512 513 if (nxt_slow_path(sent == 0)) { 514 return NXT_UNIT_AGAIN; 515 } 516 517 body_str += sent; 518 body_len -= sent; 519 520 http->send_body_off += sent; 521 http->bytes_sent += sent; 522 } 523 524 Py_CLEAR(http->send_body); 525 526 future = http->send_future; 527 http->send_future = NULL; 528 529 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None, 530 NULL); 531 if (nxt_slow_path(res == NULL)) { 532 nxt_unit_req_alert(http->req, "'set_result' call failed"); 533 nxt_python_print_exception(); 534 } 535 536 Py_XDECREF(res); 537 Py_DECREF(future); 538 539 return NXT_UNIT_OK; 540 541 fail: 542 543 exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, 544 nxt_py_failed_to_send_body_str, 545 NULL); 546 if (nxt_slow_path(exc == NULL)) { 547 nxt_unit_req_alert(http->req, "RuntimeError create failed"); 548 nxt_python_print_exception(); 549 550 exc = Py_None; 551 Py_INCREF(exc); 552 } 553 554 future = http->send_future; 555 http->send_future = NULL; 556 557 res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, 558 NULL); 559 if (nxt_slow_path(res == NULL)) { 560 nxt_unit_req_alert(http->req, "'set_exception' call failed"); 561 nxt_python_print_exception(); 562 } 563 564 Py_XDECREF(res); 565 Py_DECREF(future); 566 Py_DECREF(exc); 567 568 return NXT_UNIT_ERROR; 569 } 570 571 572 void 573 nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req) 574 { 575 PyObject *msg, *future, *res; 576 nxt_py_asgi_http_t *http; 577 578 http = req->data; 579 580 nxt_unit_req_debug(req, "asgi_http_close_handler"); 581 582 http->closed = 1; 583 584 if (http->receive_future == NULL) { 585 return; 586 } 587 588 msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str); 589 if (nxt_slow_path(msg == NULL)) { 590 return; 591 } 592 593 if (msg == Py_None) { 594 Py_DECREF(msg); 595 return; 596 } 597 598 future = http->receive_future; 599 http->receive_future = NULL; 600 601 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); 602 if (nxt_slow_path(res == NULL)) { 603 nxt_unit_req_alert(req, "'set_result' call failed"); 604 nxt_python_print_exception(); 605 } 606 607 Py_XDECREF(res); 608 Py_DECREF(future); 609 610 Py_DECREF(msg); 611 } 612 613 614 static PyObject * 615 nxt_py_asgi_http_done(PyObject *self, PyObject *future) 616 { 617 int rc; 618 PyObject *res; 619 nxt_py_asgi_http_t *http; 620 621 http = (nxt_py_asgi_http_t *) self; 622 623 nxt_unit_req_debug(http->req, "asgi_http_done"); 624 625 /* 626 * Get Future.result() and it raises an exception, if coroutine exited 627 * with exception. 628 */ 629 res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); 630 if (nxt_slow_path(res == NULL)) { 631 nxt_unit_req_error(http->req, 632 "Python failed to call 'future.result()'"); 633 nxt_python_print_exception(); 634 635 rc = NXT_UNIT_ERROR; 636 637 } else { 638 Py_DECREF(res); 639 640 rc = NXT_UNIT_OK; 641 } 642 643 nxt_unit_request_done(http->req, rc); 644 645 Py_RETURN_NONE; 646 } 647 648 649 #endif /* NXT_HAVE_ASGI */ 650