1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 7 #include <python/nxt_python.h> 8 9 #if (NXT_HAVE_ASGI) 10 11 #include <nxt_main.h> 12 #include <nxt_unit.h> 13 #include <nxt_unit_request.h> 14 #include <python/nxt_python_asgi.h> 15 #include <python/nxt_python_asgi_str.h> 16 17 18 typedef struct { 19 PyObject_HEAD 20 nxt_unit_request_info_t *req; 21 nxt_queue_link_t link; 22 PyObject *receive_future; 23 PyObject *send_future; 24 uint64_t content_length; 25 uint64_t bytes_sent; 26 int complete; 27 int closed; 28 PyObject *send_body; 29 Py_ssize_t send_body_off; 30 } nxt_py_asgi_http_t; 31 32 33 static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none); 34 static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http); 35 static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict); 36 static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, 37 PyObject *dict); 38 static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, 39 PyObject *dict); 40 static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future); 41 42 43 static PyMethodDef nxt_py_asgi_http_methods[] = { 44 { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 }, 45 { "send", nxt_py_asgi_http_send, METH_O, 0 }, 46 { "_done", nxt_py_asgi_http_done, METH_O, 0 }, 47 { NULL, NULL, 0, 0 } 48 }; 49 50 static PyAsyncMethods nxt_py_asgi_async_methods = { 51 .am_await = nxt_py_asgi_await, 52 }; 53 54 static PyTypeObject nxt_py_asgi_http_type = { 55 PyVarObject_HEAD_INIT(NULL, 0) 56 57 .tp_name = "unit._asgi_http", 58 .tp_basicsize = sizeof(nxt_py_asgi_http_t), 59 .tp_dealloc = nxt_py_asgi_dealloc, 60 .tp_as_async = &nxt_py_asgi_async_methods, 61 .tp_flags = Py_TPFLAGS_DEFAULT, 62 .tp_doc = "unit ASGI HTTP request object", 63 .tp_iter = nxt_py_asgi_iter, 64 .tp_iternext = nxt_py_asgi_next, 65 .tp_methods = nxt_py_asgi_http_methods, 66 }; 67 68 static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024; 69 70 71 int 72 nxt_py_asgi_http_init(void) 73 { 74 if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) { 75 nxt_unit_alert(NULL, 76 "Python failed to initialize the 'http' type object"); 77 return NXT_UNIT_ERROR; 78 } 79 80 return NXT_UNIT_OK; 81 } 82 83 84 PyObject * 85 nxt_py_asgi_http_create(nxt_unit_request_info_t *req) 86 { 87 nxt_py_asgi_http_t *http; 88 89 http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type); 90 91 if (nxt_fast_path(http != NULL)) { 92 http->req = req; 93 http->receive_future = NULL; 94 http->send_future = NULL; 95 http->content_length = -1; 96 http->bytes_sent = 0; 97 http->complete = 0; 98 http->closed = 0; 99 http->send_body = NULL; 100 http->send_body_off = 0; 101 } 102 103 return (PyObject *) http; 104 } 105 106 107 static PyObject * 108 nxt_py_asgi_http_receive(PyObject *self, PyObject *none) 109 { 110 PyObject *msg, *future; 111 nxt_py_asgi_http_t *http; 112 nxt_py_asgi_ctx_data_t *ctx_data; 113 nxt_unit_request_info_t *req; 114 115 http = (nxt_py_asgi_http_t *) self; 116 req = http->req; 117 118 nxt_unit_req_debug(req, "asgi_http_receive"); 119 120 if (nxt_slow_path(http->closed || nxt_unit_response_is_sent(req))) { 121 msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str); 122 123 } else { 124 msg = nxt_py_asgi_http_read_msg(http); 125 } 126 127 if (nxt_slow_path(msg == NULL)) { 128 return NULL; 129 } 130 131 ctx_data = req->ctx->data; 132 133 future = PyObject_CallObject(ctx_data->loop_create_future, NULL); 134 if (nxt_slow_path(future == NULL)) { 135 nxt_unit_req_alert(req, "Python failed to create Future object"); 136 nxt_python_print_exception(); 137 138 Py_DECREF(msg); 139 140 return PyErr_Format(PyExc_RuntimeError, 141 "failed to create Future object"); 142 } 143 144 if (msg != Py_None) { 145 return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg); 146 } 147 148 http->receive_future = future; 149 Py_INCREF(http->receive_future); 150 151 Py_DECREF(msg); 152 153 return future; 154 } 155 156 157 static PyObject * 158 nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http) 159 { 160 char *body_buf; 161 ssize_t read_res; 162 PyObject *msg, *body; 163 Py_ssize_t size; 164 nxt_unit_request_info_t *req; 165 166 req = http->req; 167 168 size = req->content_length; 169 170 if (size > nxt_py_asgi_http_body_buf_size) { 171 size = nxt_py_asgi_http_body_buf_size; 172 } 173 174 if (size > 0) { 175 body = PyBytes_FromStringAndSize(NULL, size); 176 if (nxt_slow_path(body == NULL)) { 177 nxt_unit_req_alert(req, "Python failed to create body byte string"); 178 nxt_python_print_exception(); 179 180 return PyErr_Format(PyExc_RuntimeError, 181 "failed to create Bytes object"); 182 } 183 184 body_buf = PyBytes_AS_STRING(body); 185 186 read_res = nxt_unit_request_read(req, body_buf, size); 187 188 } else { 189 body = NULL; 190 read_res = 0; 191 } 192 193 if (read_res > 0 || read_res == size) { 194 msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str); 195 if (nxt_slow_path(msg == NULL)) { 196 Py_XDECREF(body); 197 198 return NULL; 199 } 200 201 #define SET_ITEM(dict, key, value) \ 202 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ 203 == -1)) \ 204 { \ 205 nxt_unit_req_alert(req, \ 206 "Python failed to set '" #dict "." #key "' item"); \ 207 PyErr_SetString(PyExc_RuntimeError, \ 208 "Python failed to set '" #dict "." #key "' item"); \ 209 goto fail; \ 210 } 211 212 if (body != NULL) { 213 SET_ITEM(msg, body, body) 214 } 215 216 if (req->content_length > 0) { 217 SET_ITEM(msg, more_body, Py_True) 218 } 219 220 #undef SET_ITEM 221 222 Py_XDECREF(body); 223 224 return msg; 225 } 226 227 Py_XDECREF(body); 228 229 Py_RETURN_NONE; 230 231 fail: 232 233 Py_DECREF(msg); 234 Py_XDECREF(body); 235 236 return NULL; 237 } 238 239 240 static PyObject * 241 nxt_py_asgi_http_send(PyObject *self, PyObject *dict) 242 { 243 PyObject *type; 244 const char *type_str; 245 Py_ssize_t type_len; 246 nxt_py_asgi_http_t *http; 247 248 static const nxt_str_t response_start = nxt_string("http.response.start"); 249 static const nxt_str_t response_body = nxt_string("http.response.body"); 250 251 http = (nxt_py_asgi_http_t *) self; 252 253 type = PyDict_GetItem(dict, nxt_py_type_str); 254 if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { 255 nxt_unit_req_error(http->req, "asgi_http_send: " 256 "'type' is not a unicode string"); 257 return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string"); 258 } 259 260 type_str = PyUnicode_AsUTF8AndSize(type, &type_len); 261 262 nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'", 263 (int) type_len, type_str); 264 265 if (nxt_unit_response_is_init(http->req)) { 266 if (nxt_str_eq(&response_body, type_str, (size_t) type_len)) { 267 return nxt_py_asgi_http_response_body(http, dict); 268 } 269 270 return PyErr_Format(PyExc_RuntimeError, 271 "Expected ASGI message 'http.response.body', " 272 "but got '%U'", type); 273 } 274 275 if (nxt_str_eq(&response_start, type_str, (size_t) type_len)) { 276 return nxt_py_asgi_http_response_start(http, dict); 277 } 278 279 return PyErr_Format(PyExc_RuntimeError, 280 "Expected ASGI message 'http.response.start', " 281 "but got '%U'", type); 282 } 283 284 285 static PyObject * 286 nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict) 287 { 288 int rc; 289 PyObject *status, *headers, *res; 290 nxt_py_asgi_calc_size_ctx_t calc_size_ctx; 291 nxt_py_asgi_add_field_ctx_t add_field_ctx; 292 293 status = PyDict_GetItem(dict, nxt_py_status_str); 294 if (nxt_slow_path(status == NULL || !PyLong_Check(status))) { 295 nxt_unit_req_error(http->req, "asgi_http_response_start: " 296 "'status' is not an integer"); 297 return PyErr_Format(PyExc_TypeError, "'status' is not an integer"); 298 } 299 300 calc_size_ctx.fields_size = 0; 301 calc_size_ctx.fields_count = 0; 302 303 headers = PyDict_GetItem(dict, nxt_py_headers_str); 304 if (headers != NULL) { 305 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, 306 &calc_size_ctx); 307 if (nxt_slow_path(res == NULL)) { 308 return NULL; 309 } 310 311 Py_DECREF(res); 312 } 313 314 rc = nxt_unit_response_init(http->req, PyLong_AsLong(status), 315 calc_size_ctx.fields_count, 316 calc_size_ctx.fields_size); 317 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 318 return PyErr_Format(PyExc_RuntimeError, 319 "failed to allocate response object"); 320 } 321 322 add_field_ctx.req = http->req; 323 add_field_ctx.content_length = -1; 324 325 if (headers != NULL) { 326 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, 327 &add_field_ctx); 328 if (nxt_slow_path(res == NULL)) { 329 return NULL; 330 } 331 332 Py_DECREF(res); 333 } 334 335 http->content_length = add_field_ctx.content_length; 336 337 Py_INCREF(http); 338 return (PyObject *) http; 339 } 340 341 342 static PyObject * 343 nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) 344 { 345 int rc; 346 char *body_str; 347 ssize_t sent; 348 PyObject *body, *more_body, *future; 349 Py_ssize_t body_len, body_off; 350 nxt_py_asgi_ctx_data_t *ctx_data; 351 352 body = PyDict_GetItem(dict, nxt_py_body_str); 353 if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) { 354 return PyErr_Format(PyExc_TypeError, "'body' is not a byte string"); 355 } 356 357 more_body = PyDict_GetItem(dict, nxt_py_more_body_str); 358 if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) { 359 return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool"); 360 } 361 362 if (nxt_slow_path(http->complete)) { 363 return PyErr_Format(PyExc_RuntimeError, 364 "Unexpected ASGI message 'http.response.body' " 365 "sent, after response already completed"); 366 } 367 368 if (nxt_slow_path(http->send_future != NULL)) { 369 return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); 370 } 371 372 if (body != NULL) { 373 body_str = PyBytes_AS_STRING(body); 374 body_len = PyBytes_GET_SIZE(body); 375 376 nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d", 377 (int) body_len, (more_body == Py_True) ); 378 379 if (nxt_slow_path(http->bytes_sent + body_len 380 > http->content_length)) 381 { 382 return PyErr_Format(PyExc_RuntimeError, 383 "Response content longer than Content-Length"); 384 } 385 386 body_off = 0; 387 388 ctx_data = http->req->ctx->data; 389 390 while (body_len > 0) { 391 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); 392 if (nxt_slow_path(sent < 0)) { 393 return PyErr_Format(PyExc_RuntimeError, "failed to send body"); 394 } 395 396 if (nxt_slow_path(sent == 0)) { 397 nxt_unit_req_debug(http->req, "asgi_http_response_body: " 398 "out of shared memory, %d", 399 (int) body_len); 400 401 future = PyObject_CallObject(ctx_data->loop_create_future, 402 NULL); 403 if (nxt_slow_path(future == NULL)) { 404 nxt_unit_req_alert(http->req, 405 "Python failed to create Future object"); 406 nxt_python_print_exception(); 407 408 return PyErr_Format(PyExc_RuntimeError, 409 "failed to create Future object"); 410 } 411 412 http->send_body = body; 413 Py_INCREF(http->send_body); 414 http->send_body_off = body_off; 415 416 nxt_py_asgi_drain_wait(http->req, &http->link); 417 418 http->send_future = future; 419 Py_INCREF(http->send_future); 420 421 return future; 422 } 423 424 body_str += sent; 425 body_len -= sent; 426 body_off += sent; 427 http->bytes_sent += sent; 428 } 429 430 } else { 431 nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d", 432 (more_body == Py_True) ); 433 434 if (!nxt_unit_response_is_sent(http->req)) { 435 rc = nxt_unit_response_send(http->req); 436 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 437 return PyErr_Format(PyExc_RuntimeError, 438 "failed to send response"); 439 } 440 } 441 } 442 443 if (more_body == NULL || more_body == Py_False) { 444 http->complete = 1; 445 } 446 447 Py_INCREF(http); 448 return (PyObject *) http; 449 } 450 451 452 void 453 nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req) 454 { 455 PyObject *msg, *future, *res; 456 nxt_py_asgi_http_t *http; 457 458 http = req->data; 459 460 nxt_unit_req_debug(req, "asgi_http_data_handler"); 461 462 if (http->receive_future == NULL) { 463 return; 464 } 465 466 msg = nxt_py_asgi_http_read_msg(http); 467 if (nxt_slow_path(msg == NULL)) { 468 return; 469 } 470 471 if (msg == Py_None) { 472 Py_DECREF(msg); 473 return; 474 } 475 476 future = http->receive_future; 477 http->receive_future = NULL; 478 479 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); 480 if (nxt_slow_path(res == NULL)) { 481 nxt_unit_req_alert(req, "'set_result' call failed"); 482 nxt_python_print_exception(); 483 } 484 485 Py_XDECREF(res); 486 Py_DECREF(future); 487 488 Py_DECREF(msg); 489 } 490 491 492 int 493 nxt_py_asgi_http_drain(nxt_queue_link_t *lnk) 494 { 495 char *body_str; 496 ssize_t sent; 497 PyObject *future, *exc, *res; 498 Py_ssize_t body_len; 499 nxt_py_asgi_http_t *http; 500 501 http = nxt_container_of(lnk, nxt_py_asgi_http_t, link); 502 503 body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off; 504 body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off; 505 506 nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len); 507 508 while (body_len > 0) { 509 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); 510 if (nxt_slow_path(sent < 0)) { 511 goto fail; 512 } 513 514 if (nxt_slow_path(sent == 0)) { 515 return NXT_UNIT_AGAIN; 516 } 517 518 body_str += sent; 519 body_len -= sent; 520 521 http->send_body_off += sent; 522 http->bytes_sent += sent; 523 } 524 525 Py_CLEAR(http->send_body); 526 527 future = http->send_future; 528 http->send_future = NULL; 529 530 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None, 531 NULL); 532 if (nxt_slow_path(res == NULL)) { 533 nxt_unit_req_alert(http->req, "'set_result' call failed"); 534 nxt_python_print_exception(); 535 } 536 537 Py_XDECREF(res); 538 Py_DECREF(future); 539 540 return NXT_UNIT_OK; 541 542 fail: 543 544 exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, 545 nxt_py_failed_to_send_body_str, 546 NULL); 547 if (nxt_slow_path(exc == NULL)) { 548 nxt_unit_req_alert(http->req, "RuntimeError create failed"); 549 nxt_python_print_exception(); 550 551 exc = Py_None; 552 Py_INCREF(exc); 553 } 554 555 future = http->send_future; 556 http->send_future = NULL; 557 558 res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, 559 NULL); 560 if (nxt_slow_path(res == NULL)) { 561 nxt_unit_req_alert(http->req, "'set_exception' call failed"); 562 nxt_python_print_exception(); 563 } 564 565 Py_XDECREF(res); 566 Py_DECREF(future); 567 Py_DECREF(exc); 568 569 return NXT_UNIT_ERROR; 570 } 571 572 573 void 574 nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req) 575 { 576 PyObject *msg, *future, *res; 577 nxt_py_asgi_http_t *http; 578 579 http = req->data; 580 581 nxt_unit_req_debug(req, "asgi_http_close_handler"); 582 583 http->closed = 1; 584 585 if (http->receive_future == NULL) { 586 return; 587 } 588 589 msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str); 590 if (nxt_slow_path(msg == NULL)) { 591 return; 592 } 593 594 if (msg == Py_None) { 595 Py_DECREF(msg); 596 return; 597 } 598 599 future = http->receive_future; 600 http->receive_future = NULL; 601 602 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); 603 if (nxt_slow_path(res == NULL)) { 604 nxt_unit_req_alert(req, "'set_result' call failed"); 605 nxt_python_print_exception(); 606 } 607 608 Py_XDECREF(res); 609 Py_DECREF(future); 610 611 Py_DECREF(msg); 612 } 613 614 615 static PyObject * 616 nxt_py_asgi_http_done(PyObject *self, PyObject *future) 617 { 618 int rc; 619 PyObject *res; 620 nxt_py_asgi_http_t *http; 621 622 http = (nxt_py_asgi_http_t *) self; 623 624 nxt_unit_req_debug(http->req, "asgi_http_done"); 625 626 /* 627 * Get Future.result() and it raises an exception, if coroutine exited 628 * with exception. 629 */ 630 res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); 631 if (nxt_slow_path(res == NULL)) { 632 nxt_unit_req_error(http->req, 633 "Python failed to call 'future.result()'"); 634 nxt_python_print_exception(); 635 636 rc = NXT_UNIT_ERROR; 637 638 } else { 639 Py_DECREF(res); 640 641 rc = NXT_UNIT_OK; 642 } 643 644 nxt_unit_request_done(http->req, rc); 645 646 Py_RETURN_NONE; 647 } 648 649 650 #endif /* NXT_HAVE_ASGI */ 651