11624Smax.romanov@nginx.com
21624Smax.romanov@nginx.com /*
31624Smax.romanov@nginx.com * Copyright (C) NGINX, Inc.
41624Smax.romanov@nginx.com */
51624Smax.romanov@nginx.com
61624Smax.romanov@nginx.com
71624Smax.romanov@nginx.com #include <python/nxt_python.h>
81624Smax.romanov@nginx.com
91624Smax.romanov@nginx.com #if (NXT_HAVE_ASGI)
101624Smax.romanov@nginx.com
111624Smax.romanov@nginx.com #include <nxt_main.h>
121624Smax.romanov@nginx.com #include <nxt_unit.h>
131624Smax.romanov@nginx.com #include <nxt_unit_request.h>
141624Smax.romanov@nginx.com #include <nxt_unit_websocket.h>
151624Smax.romanov@nginx.com #include <nxt_websocket_header.h>
161624Smax.romanov@nginx.com #include <python/nxt_python_asgi.h>
171624Smax.romanov@nginx.com #include <python/nxt_python_asgi_str.h>
181624Smax.romanov@nginx.com
191624Smax.romanov@nginx.com
201624Smax.romanov@nginx.com enum {
211624Smax.romanov@nginx.com NXT_WS_INIT,
221624Smax.romanov@nginx.com NXT_WS_CONNECT,
231624Smax.romanov@nginx.com NXT_WS_ACCEPTED,
241624Smax.romanov@nginx.com NXT_WS_DISCONNECTED,
251624Smax.romanov@nginx.com NXT_WS_CLOSED,
261624Smax.romanov@nginx.com };
271624Smax.romanov@nginx.com
281624Smax.romanov@nginx.com
291624Smax.romanov@nginx.com typedef struct {
301624Smax.romanov@nginx.com nxt_queue_link_t link;
311624Smax.romanov@nginx.com nxt_unit_websocket_frame_t *frame;
321624Smax.romanov@nginx.com } nxt_py_asgi_penging_frame_t;
331624Smax.romanov@nginx.com
341624Smax.romanov@nginx.com
351624Smax.romanov@nginx.com typedef struct {
361624Smax.romanov@nginx.com PyObject_HEAD
371624Smax.romanov@nginx.com nxt_unit_request_info_t *req;
381624Smax.romanov@nginx.com PyObject *receive_future;
391624Smax.romanov@nginx.com PyObject *receive_exc_str;
401624Smax.romanov@nginx.com int state;
411624Smax.romanov@nginx.com nxt_queue_t pending_frames;
421624Smax.romanov@nginx.com uint64_t pending_payload_len;
431624Smax.romanov@nginx.com uint64_t pending_frame_len;
441624Smax.romanov@nginx.com int pending_fins;
451624Smax.romanov@nginx.com } nxt_py_asgi_websocket_t;
461624Smax.romanov@nginx.com
471624Smax.romanov@nginx.com
481624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none);
491624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict);
501624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws,
511624Smax.romanov@nginx.com PyObject *dict);
521624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws,
531624Smax.romanov@nginx.com PyObject *dict);
541624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws,
551624Smax.romanov@nginx.com PyObject *dict);
561624Smax.romanov@nginx.com static void nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws,
571624Smax.romanov@nginx.com PyObject *msg);
581624Smax.romanov@nginx.com static void nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws,
591624Smax.romanov@nginx.com PyObject *exc);
601624Smax.romanov@nginx.com static void nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *f);
611624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
621624Smax.romanov@nginx.com nxt_unit_websocket_frame_t *frame);
631624Smax.romanov@nginx.com static uint64_t nxt_py_asgi_websocket_pending_len(
641624Smax.romanov@nginx.com nxt_py_asgi_websocket_t *ws);
651624Smax.romanov@nginx.com static nxt_unit_websocket_frame_t *nxt_py_asgi_websocket_pop_frame(
661624Smax.romanov@nginx.com nxt_py_asgi_websocket_t *ws);
671624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_disconnect_msg(
681624Smax.romanov@nginx.com nxt_py_asgi_websocket_t *ws);
691624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_done(PyObject *self, PyObject *future);
701624Smax.romanov@nginx.com
711624Smax.romanov@nginx.com
721624Smax.romanov@nginx.com static PyMethodDef nxt_py_asgi_websocket_methods[] = {
731624Smax.romanov@nginx.com { "receive", nxt_py_asgi_websocket_receive, METH_NOARGS, 0 },
741624Smax.romanov@nginx.com { "send", nxt_py_asgi_websocket_send, METH_O, 0 },
751624Smax.romanov@nginx.com { "_done", nxt_py_asgi_websocket_done, METH_O, 0 },
761624Smax.romanov@nginx.com { NULL, NULL, 0, 0 }
771624Smax.romanov@nginx.com };
781624Smax.romanov@nginx.com
791624Smax.romanov@nginx.com static PyAsyncMethods nxt_py_asgi_async_methods = {
801624Smax.romanov@nginx.com .am_await = nxt_py_asgi_await,
811624Smax.romanov@nginx.com };
821624Smax.romanov@nginx.com
831624Smax.romanov@nginx.com static PyTypeObject nxt_py_asgi_websocket_type = {
841624Smax.romanov@nginx.com PyVarObject_HEAD_INIT(NULL, 0)
851624Smax.romanov@nginx.com
861624Smax.romanov@nginx.com .tp_name = "unit._asgi_websocket",
871624Smax.romanov@nginx.com .tp_basicsize = sizeof(nxt_py_asgi_websocket_t),
881624Smax.romanov@nginx.com .tp_dealloc = nxt_py_asgi_dealloc,
891624Smax.romanov@nginx.com .tp_as_async = &nxt_py_asgi_async_methods,
901624Smax.romanov@nginx.com .tp_flags = Py_TPFLAGS_DEFAULT,
911624Smax.romanov@nginx.com .tp_doc = "unit ASGI WebSocket connection object",
921624Smax.romanov@nginx.com .tp_iter = nxt_py_asgi_iter,
931624Smax.romanov@nginx.com .tp_iternext = nxt_py_asgi_next,
941624Smax.romanov@nginx.com .tp_methods = nxt_py_asgi_websocket_methods,
951624Smax.romanov@nginx.com };
961624Smax.romanov@nginx.com
971624Smax.romanov@nginx.com static uint64_t nxt_py_asgi_ws_max_frame_size = 1024 * 1024;
981624Smax.romanov@nginx.com static uint64_t nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024;
991624Smax.romanov@nginx.com
1001624Smax.romanov@nginx.com
1011681Smax.romanov@nginx.com int
nxt_py_asgi_websocket_init(void)1021681Smax.romanov@nginx.com nxt_py_asgi_websocket_init(void)
1031624Smax.romanov@nginx.com {
1041624Smax.romanov@nginx.com if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) {
1051681Smax.romanov@nginx.com nxt_unit_alert(NULL,
1061624Smax.romanov@nginx.com "Python failed to initialize the \"asgi_websocket\" type object");
1071681Smax.romanov@nginx.com return NXT_UNIT_ERROR;
1081624Smax.romanov@nginx.com }
1091624Smax.romanov@nginx.com
1101681Smax.romanov@nginx.com return NXT_UNIT_OK;
1111624Smax.romanov@nginx.com }
1121624Smax.romanov@nginx.com
1131624Smax.romanov@nginx.com
1141624Smax.romanov@nginx.com PyObject *
nxt_py_asgi_websocket_create(nxt_unit_request_info_t * req)1151624Smax.romanov@nginx.com nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req)
1161624Smax.romanov@nginx.com {
1171624Smax.romanov@nginx.com nxt_py_asgi_websocket_t *ws;
1181624Smax.romanov@nginx.com
1191624Smax.romanov@nginx.com ws = PyObject_New(nxt_py_asgi_websocket_t, &nxt_py_asgi_websocket_type);
1201624Smax.romanov@nginx.com
1211624Smax.romanov@nginx.com if (nxt_fast_path(ws != NULL)) {
1221624Smax.romanov@nginx.com ws->req = req;
1231624Smax.romanov@nginx.com ws->receive_future = NULL;
1241624Smax.romanov@nginx.com ws->receive_exc_str = NULL;
1251624Smax.romanov@nginx.com ws->state = NXT_WS_INIT;
1261624Smax.romanov@nginx.com nxt_queue_init(&ws->pending_frames);
1271624Smax.romanov@nginx.com ws->pending_payload_len = 0;
1281624Smax.romanov@nginx.com ws->pending_frame_len = 0;
1291624Smax.romanov@nginx.com ws->pending_fins = 0;
1301624Smax.romanov@nginx.com }
1311624Smax.romanov@nginx.com
1321624Smax.romanov@nginx.com return (PyObject *) ws;
1331624Smax.romanov@nginx.com }
1341624Smax.romanov@nginx.com
1351624Smax.romanov@nginx.com
1361624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_receive(PyObject * self,PyObject * none)1371624Smax.romanov@nginx.com nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none)
1381624Smax.romanov@nginx.com {
1391624Smax.romanov@nginx.com PyObject *future, *msg;
1401681Smax.romanov@nginx.com nxt_py_asgi_ctx_data_t *ctx_data;
1411624Smax.romanov@nginx.com nxt_py_asgi_websocket_t *ws;
1421624Smax.romanov@nginx.com
1431624Smax.romanov@nginx.com ws = (nxt_py_asgi_websocket_t *) self;
1441624Smax.romanov@nginx.com
1451624Smax.romanov@nginx.com nxt_unit_req_debug(ws->req, "asgi_websocket_receive");
1461624Smax.romanov@nginx.com
1471624Smax.romanov@nginx.com /* If exception happened out of receive() call, raise it now. */
1481624Smax.romanov@nginx.com if (nxt_slow_path(ws->receive_exc_str != NULL)) {
1491624Smax.romanov@nginx.com PyErr_SetObject(PyExc_RuntimeError, ws->receive_exc_str);
1501624Smax.romanov@nginx.com
1511624Smax.romanov@nginx.com ws->receive_exc_str = NULL;
1521624Smax.romanov@nginx.com
1531624Smax.romanov@nginx.com return NULL;
1541624Smax.romanov@nginx.com }
1551624Smax.romanov@nginx.com
1561624Smax.romanov@nginx.com if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
1571624Smax.romanov@nginx.com nxt_unit_req_error(ws->req,
1581624Smax.romanov@nginx.com "receive() called for closed WebSocket");
1591624Smax.romanov@nginx.com
1601624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
1611624Smax.romanov@nginx.com "WebSocket already closed");
1621624Smax.romanov@nginx.com }
1631624Smax.romanov@nginx.com
1641681Smax.romanov@nginx.com ctx_data = ws->req->ctx->data;
1651681Smax.romanov@nginx.com
1661681Smax.romanov@nginx.com future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
1671624Smax.romanov@nginx.com if (nxt_slow_path(future == NULL)) {
1681624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req, "Python failed to create Future object");
1691624Smax.romanov@nginx.com nxt_python_print_exception();
1701624Smax.romanov@nginx.com
1711624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
1721624Smax.romanov@nginx.com "failed to create Future object");
1731624Smax.romanov@nginx.com }
1741624Smax.romanov@nginx.com
1751624Smax.romanov@nginx.com if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
1761624Smax.romanov@nginx.com ws->state = NXT_WS_CONNECT;
1771624Smax.romanov@nginx.com
1781624Smax.romanov@nginx.com msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str);
1791624Smax.romanov@nginx.com
1801681Smax.romanov@nginx.com return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
1811624Smax.romanov@nginx.com }
1821624Smax.romanov@nginx.com
1831624Smax.romanov@nginx.com if (ws->pending_fins > 0) {
1841624Smax.romanov@nginx.com msg = nxt_py_asgi_websocket_pop_msg(ws, NULL);
1851624Smax.romanov@nginx.com
1861681Smax.romanov@nginx.com return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
1871624Smax.romanov@nginx.com }
1881624Smax.romanov@nginx.com
1891624Smax.romanov@nginx.com if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
1901624Smax.romanov@nginx.com msg = nxt_py_asgi_websocket_disconnect_msg(ws);
1911624Smax.romanov@nginx.com
1921681Smax.romanov@nginx.com return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
1931624Smax.romanov@nginx.com }
1941624Smax.romanov@nginx.com
1951624Smax.romanov@nginx.com ws->receive_future = future;
1961624Smax.romanov@nginx.com Py_INCREF(ws->receive_future);
1971624Smax.romanov@nginx.com
1981624Smax.romanov@nginx.com return future;
1991624Smax.romanov@nginx.com }
2001624Smax.romanov@nginx.com
2011624Smax.romanov@nginx.com
2021624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_send(PyObject * self,PyObject * dict)2031624Smax.romanov@nginx.com nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict)
2041624Smax.romanov@nginx.com {
2051624Smax.romanov@nginx.com PyObject *type;
2061624Smax.romanov@nginx.com const char *type_str;
2071624Smax.romanov@nginx.com Py_ssize_t type_len;
2081624Smax.romanov@nginx.com nxt_py_asgi_websocket_t *ws;
2091624Smax.romanov@nginx.com
2101624Smax.romanov@nginx.com static const nxt_str_t websocket_accept = nxt_string("websocket.accept");
2111624Smax.romanov@nginx.com static const nxt_str_t websocket_close = nxt_string("websocket.close");
2121624Smax.romanov@nginx.com static const nxt_str_t websocket_send = nxt_string("websocket.send");
2131624Smax.romanov@nginx.com
2141624Smax.romanov@nginx.com ws = (nxt_py_asgi_websocket_t *) self;
2151624Smax.romanov@nginx.com
2161624Smax.romanov@nginx.com type = PyDict_GetItem(dict, nxt_py_type_str);
2171624Smax.romanov@nginx.com if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
2181624Smax.romanov@nginx.com nxt_unit_req_error(ws->req, "asgi_websocket_send: "
2191624Smax.romanov@nginx.com "'type' is not a unicode string");
2201624Smax.romanov@nginx.com return PyErr_Format(PyExc_TypeError,
2211624Smax.romanov@nginx.com "'type' is not a unicode string");
2221624Smax.romanov@nginx.com }
2231624Smax.romanov@nginx.com
2241624Smax.romanov@nginx.com type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
2251624Smax.romanov@nginx.com
2261624Smax.romanov@nginx.com nxt_unit_req_debug(ws->req, "asgi_websocket_send type is '%.*s'",
2271624Smax.romanov@nginx.com (int) type_len, type_str);
2281624Smax.romanov@nginx.com
2291624Smax.romanov@nginx.com if (type_len == (Py_ssize_t) websocket_accept.length
2301624Smax.romanov@nginx.com && memcmp(type_str, websocket_accept.start, type_len) == 0)
2311624Smax.romanov@nginx.com {
2321624Smax.romanov@nginx.com return nxt_py_asgi_websocket_accept(ws, dict);
2331624Smax.romanov@nginx.com }
2341624Smax.romanov@nginx.com
2351624Smax.romanov@nginx.com if (type_len == (Py_ssize_t) websocket_close.length
2361624Smax.romanov@nginx.com && memcmp(type_str, websocket_close.start, type_len) == 0)
2371624Smax.romanov@nginx.com {
2381624Smax.romanov@nginx.com return nxt_py_asgi_websocket_close(ws, dict);
2391624Smax.romanov@nginx.com }
2401624Smax.romanov@nginx.com
2411624Smax.romanov@nginx.com if (type_len == (Py_ssize_t) websocket_send.length
2421624Smax.romanov@nginx.com && memcmp(type_str, websocket_send.start, type_len) == 0)
2431624Smax.romanov@nginx.com {
2441624Smax.romanov@nginx.com return nxt_py_asgi_websocket_send_frame(ws, dict);
2451624Smax.romanov@nginx.com }
2461624Smax.romanov@nginx.com
2471624Smax.romanov@nginx.com nxt_unit_req_error(ws->req, "asgi_websocket_send: "
2481624Smax.romanov@nginx.com "unexpected 'type': '%.*s'", (int) type_len, type_str);
2491624Smax.romanov@nginx.com return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
2501624Smax.romanov@nginx.com }
2511624Smax.romanov@nginx.com
2521624Smax.romanov@nginx.com
2531624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t * ws,PyObject * dict)2541624Smax.romanov@nginx.com nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict)
2551624Smax.romanov@nginx.com {
2561624Smax.romanov@nginx.com int rc;
2571624Smax.romanov@nginx.com char *subprotocol_str;
2581624Smax.romanov@nginx.com PyObject *res, *headers, *subprotocol;
2591624Smax.romanov@nginx.com Py_ssize_t subprotocol_len;
2601624Smax.romanov@nginx.com nxt_py_asgi_calc_size_ctx_t calc_size_ctx;
2611624Smax.romanov@nginx.com nxt_py_asgi_add_field_ctx_t add_field_ctx;
2621624Smax.romanov@nginx.com
2631624Smax.romanov@nginx.com static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol");
2641624Smax.romanov@nginx.com
2651624Smax.romanov@nginx.com switch(ws->state) {
2661624Smax.romanov@nginx.com case NXT_WS_INIT:
2671624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
2681624Smax.romanov@nginx.com "WebSocket connect not received");
2691624Smax.romanov@nginx.com case NXT_WS_CONNECT:
2701624Smax.romanov@nginx.com break;
2711624Smax.romanov@nginx.com
2721624Smax.romanov@nginx.com case NXT_WS_ACCEPTED:
2731624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
2741624Smax.romanov@nginx.com
2751624Smax.romanov@nginx.com case NXT_WS_DISCONNECTED:
2761624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
2771624Smax.romanov@nginx.com
2781624Smax.romanov@nginx.com case NXT_WS_CLOSED:
2791624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
2801624Smax.romanov@nginx.com }
2811624Smax.romanov@nginx.com
2821624Smax.romanov@nginx.com if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) {
2831624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
2841624Smax.romanov@nginx.com }
2851624Smax.romanov@nginx.com
2861624Smax.romanov@nginx.com if (nxt_slow_path(nxt_unit_response_is_sent(ws->req))) {
2871624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "response already sent");
2881624Smax.romanov@nginx.com }
2891624Smax.romanov@nginx.com
2901624Smax.romanov@nginx.com calc_size_ctx.fields_size = 0;
2911624Smax.romanov@nginx.com calc_size_ctx.fields_count = 0;
2921624Smax.romanov@nginx.com
2931624Smax.romanov@nginx.com headers = PyDict_GetItem(dict, nxt_py_headers_str);
2941624Smax.romanov@nginx.com if (headers != NULL) {
2951624Smax.romanov@nginx.com res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
2961624Smax.romanov@nginx.com &calc_size_ctx);
2971624Smax.romanov@nginx.com if (nxt_slow_path(res == NULL)) {
2981624Smax.romanov@nginx.com return NULL;
2991624Smax.romanov@nginx.com }
3001624Smax.romanov@nginx.com }
3011624Smax.romanov@nginx.com
3021624Smax.romanov@nginx.com subprotocol = PyDict_GetItem(dict, nxt_py_subprotocol_str);
3031624Smax.romanov@nginx.com if (subprotocol != NULL && PyUnicode_Check(subprotocol)) {
3041624Smax.romanov@nginx.com subprotocol_str = PyUnicode_DATA(subprotocol);
3051624Smax.romanov@nginx.com subprotocol_len = PyUnicode_GET_LENGTH(subprotocol);
3061624Smax.romanov@nginx.com
3071624Smax.romanov@nginx.com calc_size_ctx.fields_size += ws_protocol.length + subprotocol_len;
3081624Smax.romanov@nginx.com calc_size_ctx.fields_count++;
3091624Smax.romanov@nginx.com
3101624Smax.romanov@nginx.com } else {
3111624Smax.romanov@nginx.com subprotocol_str = NULL;
3121624Smax.romanov@nginx.com subprotocol_len = 0;
3131624Smax.romanov@nginx.com }
3141624Smax.romanov@nginx.com
3151624Smax.romanov@nginx.com rc = nxt_unit_response_init(ws->req, 101,
3161624Smax.romanov@nginx.com calc_size_ctx.fields_count,
3171624Smax.romanov@nginx.com calc_size_ctx.fields_size);
3181624Smax.romanov@nginx.com if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3191624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
3201624Smax.romanov@nginx.com "failed to allocate response object");
3211624Smax.romanov@nginx.com }
3221624Smax.romanov@nginx.com
3231624Smax.romanov@nginx.com add_field_ctx.req = ws->req;
3241624Smax.romanov@nginx.com add_field_ctx.content_length = -1;
3251624Smax.romanov@nginx.com
3261624Smax.romanov@nginx.com if (headers != NULL) {
3271624Smax.romanov@nginx.com res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
3281624Smax.romanov@nginx.com &add_field_ctx);
3291624Smax.romanov@nginx.com if (nxt_slow_path(res == NULL)) {
3301624Smax.romanov@nginx.com return NULL;
3311624Smax.romanov@nginx.com }
3321624Smax.romanov@nginx.com }
3331624Smax.romanov@nginx.com
3341624Smax.romanov@nginx.com if (subprotocol_len > 0) {
3351624Smax.romanov@nginx.com rc = nxt_unit_response_add_field(ws->req,
3361624Smax.romanov@nginx.com (const char *) ws_protocol.start,
3371624Smax.romanov@nginx.com ws_protocol.length,
3381624Smax.romanov@nginx.com subprotocol_str, subprotocol_len);
3391624Smax.romanov@nginx.com if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3401624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
3411624Smax.romanov@nginx.com "failed to add header");
3421624Smax.romanov@nginx.com }
3431624Smax.romanov@nginx.com }
3441624Smax.romanov@nginx.com
3451624Smax.romanov@nginx.com rc = nxt_unit_response_send(ws->req);
3461624Smax.romanov@nginx.com if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3471624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "failed to send response");
3481624Smax.romanov@nginx.com }
3491624Smax.romanov@nginx.com
3501624Smax.romanov@nginx.com ws->state = NXT_WS_ACCEPTED;
3511624Smax.romanov@nginx.com
3521624Smax.romanov@nginx.com Py_INCREF(ws);
3531624Smax.romanov@nginx.com
3541624Smax.romanov@nginx.com return (PyObject *) ws;
3551624Smax.romanov@nginx.com }
3561624Smax.romanov@nginx.com
3571624Smax.romanov@nginx.com
3581624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t * ws,PyObject * dict)3591624Smax.romanov@nginx.com nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict)
3601624Smax.romanov@nginx.com {
3611624Smax.romanov@nginx.com int rc;
3621624Smax.romanov@nginx.com uint16_t status_code;
3631624Smax.romanov@nginx.com PyObject *code;
3641624Smax.romanov@nginx.com
3651624Smax.romanov@nginx.com if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
3661624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
3671624Smax.romanov@nginx.com "WebSocket connect not received");
3681624Smax.romanov@nginx.com }
3691624Smax.romanov@nginx.com
3701624Smax.romanov@nginx.com if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
3711624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
3721624Smax.romanov@nginx.com }
3731624Smax.romanov@nginx.com
3741624Smax.romanov@nginx.com if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
3751624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
3761624Smax.romanov@nginx.com }
3771624Smax.romanov@nginx.com
3781624Smax.romanov@nginx.com if (nxt_unit_response_is_websocket(ws->req)) {
3791624Smax.romanov@nginx.com code = PyDict_GetItem(dict, nxt_py_code_str);
3801624Smax.romanov@nginx.com if (nxt_slow_path(code != NULL && !PyLong_Check(code))) {
3811624Smax.romanov@nginx.com return PyErr_Format(PyExc_TypeError, "'code' is not integer");
3821624Smax.romanov@nginx.com }
3831624Smax.romanov@nginx.com
3841624Smax.romanov@nginx.com status_code = (code != NULL) ? htons(PyLong_AsLong(code))
3851624Smax.romanov@nginx.com : htons(NXT_WEBSOCKET_CR_NORMAL);
3861624Smax.romanov@nginx.com
3871624Smax.romanov@nginx.com rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
3881624Smax.romanov@nginx.com 1, &status_code, 2);
3891624Smax.romanov@nginx.com if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3901624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
3911624Smax.romanov@nginx.com "failed to send close frame");
3921624Smax.romanov@nginx.com }
3931624Smax.romanov@nginx.com
3941624Smax.romanov@nginx.com } else {
3951624Smax.romanov@nginx.com rc = nxt_unit_response_init(ws->req, 403, 0, 0);
3961624Smax.romanov@nginx.com if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3971624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
3981624Smax.romanov@nginx.com "failed to allocate response object");
3991624Smax.romanov@nginx.com }
4001624Smax.romanov@nginx.com
4011624Smax.romanov@nginx.com rc = nxt_unit_response_send(ws->req);
4021624Smax.romanov@nginx.com if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4031624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
4041624Smax.romanov@nginx.com "failed to send response");
4051624Smax.romanov@nginx.com }
4061624Smax.romanov@nginx.com }
4071624Smax.romanov@nginx.com
4081624Smax.romanov@nginx.com ws->state = NXT_WS_CLOSED;
4091624Smax.romanov@nginx.com
4101624Smax.romanov@nginx.com Py_INCREF(ws);
4111624Smax.romanov@nginx.com
4121624Smax.romanov@nginx.com return (PyObject *) ws;
4131624Smax.romanov@nginx.com }
4141624Smax.romanov@nginx.com
4151624Smax.romanov@nginx.com
4161624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t * ws,PyObject * dict)4171624Smax.romanov@nginx.com nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict)
4181624Smax.romanov@nginx.com {
4191624Smax.romanov@nginx.com int rc;
4201624Smax.romanov@nginx.com uint8_t opcode;
4211624Smax.romanov@nginx.com PyObject *bytes, *text;
4221624Smax.romanov@nginx.com const void *buf;
4231624Smax.romanov@nginx.com Py_ssize_t buf_size;
4241624Smax.romanov@nginx.com
4251624Smax.romanov@nginx.com if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
4261624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
4271624Smax.romanov@nginx.com "WebSocket connect not received");
4281624Smax.romanov@nginx.com }
4291624Smax.romanov@nginx.com
4301624Smax.romanov@nginx.com if (nxt_slow_path(ws->state == NXT_WS_CONNECT)) {
4311624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
4321624Smax.romanov@nginx.com "WebSocket not accepted yet");
4331624Smax.romanov@nginx.com }
4341624Smax.romanov@nginx.com
4351624Smax.romanov@nginx.com if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
4361624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
4371624Smax.romanov@nginx.com }
4381624Smax.romanov@nginx.com
4391624Smax.romanov@nginx.com if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
4401624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
4411624Smax.romanov@nginx.com }
4421624Smax.romanov@nginx.com
4431624Smax.romanov@nginx.com bytes = PyDict_GetItem(dict, nxt_py_bytes_str);
4441624Smax.romanov@nginx.com if (bytes == Py_None) {
4451624Smax.romanov@nginx.com bytes = NULL;
4461624Smax.romanov@nginx.com }
4471624Smax.romanov@nginx.com
4481624Smax.romanov@nginx.com if (nxt_slow_path(bytes != NULL && !PyBytes_Check(bytes))) {
4491624Smax.romanov@nginx.com return PyErr_Format(PyExc_TypeError,
4501624Smax.romanov@nginx.com "'bytes' is not a byte string");
4511624Smax.romanov@nginx.com }
4521624Smax.romanov@nginx.com
4531624Smax.romanov@nginx.com text = PyDict_GetItem(dict, nxt_py_text_str);
4541624Smax.romanov@nginx.com if (text == Py_None) {
4551624Smax.romanov@nginx.com text = NULL;
4561624Smax.romanov@nginx.com }
4571624Smax.romanov@nginx.com
4581624Smax.romanov@nginx.com if (nxt_slow_path(text != NULL && !PyUnicode_Check(text))) {
4591624Smax.romanov@nginx.com return PyErr_Format(PyExc_TypeError,
4601624Smax.romanov@nginx.com "'text' is not a unicode string");
4611624Smax.romanov@nginx.com }
4621624Smax.romanov@nginx.com
4631624Smax.romanov@nginx.com if (nxt_slow_path(((bytes != NULL) ^ (text != NULL)) == 0)) {
4641624Smax.romanov@nginx.com return PyErr_Format(PyExc_ValueError,
4651624Smax.romanov@nginx.com "Exactly one of 'bytes' or 'text' must be non-None");
4661624Smax.romanov@nginx.com }
4671624Smax.romanov@nginx.com
4681624Smax.romanov@nginx.com if (bytes != NULL) {
4691624Smax.romanov@nginx.com buf = PyBytes_AS_STRING(bytes);
4701624Smax.romanov@nginx.com buf_size = PyBytes_GET_SIZE(bytes);
4711624Smax.romanov@nginx.com opcode = NXT_WEBSOCKET_OP_BINARY;
4721624Smax.romanov@nginx.com
4731624Smax.romanov@nginx.com } else {
4741624Smax.romanov@nginx.com buf = PyUnicode_AsUTF8AndSize(text, &buf_size);
4751624Smax.romanov@nginx.com opcode = NXT_WEBSOCKET_OP_TEXT;
4761624Smax.romanov@nginx.com }
4771624Smax.romanov@nginx.com
4781624Smax.romanov@nginx.com rc = nxt_unit_websocket_send(ws->req, opcode, 1, buf, buf_size);
4791624Smax.romanov@nginx.com if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4801624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "failed to send close frame");
4811624Smax.romanov@nginx.com }
4821624Smax.romanov@nginx.com
4831624Smax.romanov@nginx.com Py_INCREF(ws);
4841624Smax.romanov@nginx.com return (PyObject *) ws;
4851624Smax.romanov@nginx.com }
4861624Smax.romanov@nginx.com
4871624Smax.romanov@nginx.com
4881624Smax.romanov@nginx.com void
nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t * frame)4891624Smax.romanov@nginx.com nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *frame)
4901624Smax.romanov@nginx.com {
4911624Smax.romanov@nginx.com uint8_t opcode;
4921624Smax.romanov@nginx.com uint16_t status_code;
4931624Smax.romanov@nginx.com uint64_t rest;
4941624Smax.romanov@nginx.com PyObject *msg, *exc;
4951624Smax.romanov@nginx.com nxt_py_asgi_websocket_t *ws;
4961624Smax.romanov@nginx.com
4971624Smax.romanov@nginx.com ws = frame->req->data;
4981624Smax.romanov@nginx.com
4991624Smax.romanov@nginx.com nxt_unit_req_debug(ws->req, "asgi_websocket_handler");
5001624Smax.romanov@nginx.com
5011624Smax.romanov@nginx.com opcode = frame->header->opcode;
5021624Smax.romanov@nginx.com if (nxt_slow_path(opcode != NXT_WEBSOCKET_OP_CONT
5031624Smax.romanov@nginx.com && opcode != NXT_WEBSOCKET_OP_TEXT
5041624Smax.romanov@nginx.com && opcode != NXT_WEBSOCKET_OP_BINARY
5051624Smax.romanov@nginx.com && opcode != NXT_WEBSOCKET_OP_CLOSE))
5061624Smax.romanov@nginx.com {
5071624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
5081624Smax.romanov@nginx.com
5091624Smax.romanov@nginx.com nxt_unit_req_debug(ws->req,
5101624Smax.romanov@nginx.com "asgi_websocket_handler: ignore frame with opcode %d",
5111624Smax.romanov@nginx.com opcode);
5121624Smax.romanov@nginx.com
5131624Smax.romanov@nginx.com return;
5141624Smax.romanov@nginx.com }
5151624Smax.romanov@nginx.com
5161624Smax.romanov@nginx.com if (nxt_slow_path(ws->state != NXT_WS_ACCEPTED)) {
5171624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
5181624Smax.romanov@nginx.com
5191624Smax.romanov@nginx.com goto bad_state;
5201624Smax.romanov@nginx.com }
5211624Smax.romanov@nginx.com
5221624Smax.romanov@nginx.com rest = nxt_py_asgi_ws_max_frame_size - ws->pending_frame_len;
5231624Smax.romanov@nginx.com
5241624Smax.romanov@nginx.com if (nxt_slow_path(frame->payload_len > rest)) {
5251624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
5261624Smax.romanov@nginx.com
5271624Smax.romanov@nginx.com goto too_big;
5281624Smax.romanov@nginx.com }
5291624Smax.romanov@nginx.com
5301624Smax.romanov@nginx.com rest = nxt_py_asgi_ws_max_buffer_size - ws->pending_payload_len;
5311624Smax.romanov@nginx.com
5321624Smax.romanov@nginx.com if (nxt_slow_path(frame->payload_len > rest)) {
5331624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
5341624Smax.romanov@nginx.com
5351624Smax.romanov@nginx.com goto too_big;
5361624Smax.romanov@nginx.com }
5371624Smax.romanov@nginx.com
5381624Smax.romanov@nginx.com if (ws->receive_future == NULL || frame->header->fin == 0) {
5391624Smax.romanov@nginx.com nxt_py_asgi_websocket_suspend_frame(frame);
5401624Smax.romanov@nginx.com
5411624Smax.romanov@nginx.com return;
5421624Smax.romanov@nginx.com }
5431624Smax.romanov@nginx.com
5441624Smax.romanov@nginx.com if (!nxt_queue_is_empty(&ws->pending_frames)) {
5451624Smax.romanov@nginx.com if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_TEXT
5461624Smax.romanov@nginx.com || opcode == NXT_WEBSOCKET_OP_BINARY))
5471624Smax.romanov@nginx.com {
5481624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req,
5491624Smax.romanov@nginx.com "Invalid state: pending frames with active receiver. "
5501624Smax.romanov@nginx.com "CONT frame expected. (%d)", opcode);
5511624Smax.romanov@nginx.com
5521624Smax.romanov@nginx.com PyErr_SetString(PyExc_AssertionError,
5531624Smax.romanov@nginx.com "Invalid state: pending frames with active receiver. "
5541624Smax.romanov@nginx.com "CONT frame expected.");
5551624Smax.romanov@nginx.com
5561624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
5571624Smax.romanov@nginx.com
5581624Smax.romanov@nginx.com return;
5591624Smax.romanov@nginx.com }
5601624Smax.romanov@nginx.com }
5611624Smax.romanov@nginx.com
5621624Smax.romanov@nginx.com msg = nxt_py_asgi_websocket_pop_msg(ws, frame);
5631624Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) {
5641624Smax.romanov@nginx.com exc = PyErr_Occurred();
5651624Smax.romanov@nginx.com Py_INCREF(exc);
5661624Smax.romanov@nginx.com
5671624Smax.romanov@nginx.com goto raise;
5681624Smax.romanov@nginx.com }
5691624Smax.romanov@nginx.com
5701624Smax.romanov@nginx.com nxt_py_asgi_websocket_receive_done(ws, msg);
5711624Smax.romanov@nginx.com
5721624Smax.romanov@nginx.com return;
5731624Smax.romanov@nginx.com
5741624Smax.romanov@nginx.com bad_state:
5751624Smax.romanov@nginx.com
5761624Smax.romanov@nginx.com if (ws->receive_future == NULL) {
5771624Smax.romanov@nginx.com ws->receive_exc_str = nxt_py_bad_state_str;
5781624Smax.romanov@nginx.com
5791624Smax.romanov@nginx.com return;
5801624Smax.romanov@nginx.com }
5811624Smax.romanov@nginx.com
5821624Smax.romanov@nginx.com exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
5831624Smax.romanov@nginx.com nxt_py_bad_state_str,
5841624Smax.romanov@nginx.com NULL);
5851624Smax.romanov@nginx.com if (nxt_slow_path(exc == NULL)) {
5861624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req, "RuntimeError create failed");
5871624Smax.romanov@nginx.com nxt_python_print_exception();
5881624Smax.romanov@nginx.com
5891624Smax.romanov@nginx.com exc = Py_None;
5901624Smax.romanov@nginx.com Py_INCREF(exc);
5911624Smax.romanov@nginx.com }
5921624Smax.romanov@nginx.com
5931624Smax.romanov@nginx.com goto raise;
5941624Smax.romanov@nginx.com
5951624Smax.romanov@nginx.com too_big:
5961624Smax.romanov@nginx.com
5971624Smax.romanov@nginx.com status_code = htons(NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG);
5981624Smax.romanov@nginx.com
5991624Smax.romanov@nginx.com (void) nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
6001624Smax.romanov@nginx.com 1, &status_code, 2);
6011624Smax.romanov@nginx.com
6021624Smax.romanov@nginx.com ws->state = NXT_WS_CLOSED;
6031624Smax.romanov@nginx.com
6041624Smax.romanov@nginx.com if (ws->receive_future == NULL) {
6051624Smax.romanov@nginx.com ws->receive_exc_str = nxt_py_message_too_big_str;
6061624Smax.romanov@nginx.com
6071624Smax.romanov@nginx.com return;
6081624Smax.romanov@nginx.com }
6091624Smax.romanov@nginx.com
6101624Smax.romanov@nginx.com exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
6111624Smax.romanov@nginx.com nxt_py_message_too_big_str,
6121624Smax.romanov@nginx.com NULL);
6131624Smax.romanov@nginx.com if (nxt_slow_path(exc == NULL)) {
6141624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req, "RuntimeError create failed");
6151624Smax.romanov@nginx.com nxt_python_print_exception();
6161624Smax.romanov@nginx.com
6171624Smax.romanov@nginx.com exc = Py_None;
6181624Smax.romanov@nginx.com Py_INCREF(exc);
6191624Smax.romanov@nginx.com }
6201624Smax.romanov@nginx.com
6211624Smax.romanov@nginx.com raise:
6221624Smax.romanov@nginx.com
6231624Smax.romanov@nginx.com nxt_py_asgi_websocket_receive_fail(ws, exc);
6241624Smax.romanov@nginx.com }
6251624Smax.romanov@nginx.com
6261624Smax.romanov@nginx.com
6271624Smax.romanov@nginx.com static void
nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t * ws,PyObject * msg)6281624Smax.romanov@nginx.com nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, PyObject *msg)
6291624Smax.romanov@nginx.com {
6301624Smax.romanov@nginx.com PyObject *future, *res;
6311624Smax.romanov@nginx.com
6321624Smax.romanov@nginx.com future = ws->receive_future;
6331624Smax.romanov@nginx.com ws->receive_future = NULL;
6341624Smax.romanov@nginx.com
6351624Smax.romanov@nginx.com res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
6361624Smax.romanov@nginx.com if (nxt_slow_path(res == NULL)) {
6371624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req, "'set_result' call failed");
6381624Smax.romanov@nginx.com nxt_python_print_exception();
6391624Smax.romanov@nginx.com }
6401624Smax.romanov@nginx.com
6411624Smax.romanov@nginx.com Py_XDECREF(res);
6421624Smax.romanov@nginx.com Py_DECREF(future);
6431624Smax.romanov@nginx.com
6441624Smax.romanov@nginx.com Py_DECREF(msg);
6451624Smax.romanov@nginx.com }
6461624Smax.romanov@nginx.com
6471624Smax.romanov@nginx.com
6481624Smax.romanov@nginx.com static void
nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t * ws,PyObject * exc)6491624Smax.romanov@nginx.com nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, PyObject *exc)
6501624Smax.romanov@nginx.com {
6511624Smax.romanov@nginx.com PyObject *future, *res;
6521624Smax.romanov@nginx.com
6531624Smax.romanov@nginx.com future = ws->receive_future;
6541624Smax.romanov@nginx.com ws->receive_future = NULL;
6551624Smax.romanov@nginx.com
6561624Smax.romanov@nginx.com res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
6571624Smax.romanov@nginx.com NULL);
6581624Smax.romanov@nginx.com if (nxt_slow_path(res == NULL)) {
6591624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req, "'set_exception' call failed");
6601624Smax.romanov@nginx.com nxt_python_print_exception();
6611624Smax.romanov@nginx.com }
6621624Smax.romanov@nginx.com
6631624Smax.romanov@nginx.com Py_XDECREF(res);
6641624Smax.romanov@nginx.com Py_DECREF(future);
6651624Smax.romanov@nginx.com
6661624Smax.romanov@nginx.com Py_DECREF(exc);
6671624Smax.romanov@nginx.com }
6681624Smax.romanov@nginx.com
6691624Smax.romanov@nginx.com
6701624Smax.romanov@nginx.com static void
nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t * frame)6711624Smax.romanov@nginx.com nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *frame)
6721624Smax.romanov@nginx.com {
6731624Smax.romanov@nginx.com int rc;
6741624Smax.romanov@nginx.com nxt_py_asgi_websocket_t *ws;
6751624Smax.romanov@nginx.com nxt_py_asgi_penging_frame_t *p;
6761624Smax.romanov@nginx.com
6771624Smax.romanov@nginx.com nxt_unit_req_debug(frame->req, "asgi_websocket_suspend_frame: "
6781624Smax.romanov@nginx.com "%d, %"PRIu64", %d",
6791624Smax.romanov@nginx.com frame->header->opcode, frame->payload_len,
6801624Smax.romanov@nginx.com frame->header->fin);
6811624Smax.romanov@nginx.com
6821624Smax.romanov@nginx.com ws = frame->req->data;
6831624Smax.romanov@nginx.com
6841624Smax.romanov@nginx.com rc = nxt_unit_websocket_retain(frame);
6851624Smax.romanov@nginx.com if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
6861624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req, "Failed to retain frame for suspension.");
6871624Smax.romanov@nginx.com
6881624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
6891624Smax.romanov@nginx.com
6901624Smax.romanov@nginx.com PyErr_SetString(PyExc_RuntimeError,
6911624Smax.romanov@nginx.com "Failed to retain frame for suspension.");
6921624Smax.romanov@nginx.com
6931624Smax.romanov@nginx.com return;
6941624Smax.romanov@nginx.com }
6951624Smax.romanov@nginx.com
6961624Smax.romanov@nginx.com p = nxt_unit_malloc(frame->req->ctx, sizeof(nxt_py_asgi_penging_frame_t));
6971624Smax.romanov@nginx.com if (nxt_slow_path(p == NULL)) {
6981624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req,
6991624Smax.romanov@nginx.com "Failed to allocate buffer to suspend frame.");
7001624Smax.romanov@nginx.com
7011624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
7021624Smax.romanov@nginx.com
7031624Smax.romanov@nginx.com PyErr_SetString(PyExc_RuntimeError,
7041624Smax.romanov@nginx.com "Failed to allocate buffer to suspend frame.");
7051624Smax.romanov@nginx.com
7061624Smax.romanov@nginx.com return;
7071624Smax.romanov@nginx.com }
7081624Smax.romanov@nginx.com
7091624Smax.romanov@nginx.com p->frame = frame;
7101624Smax.romanov@nginx.com nxt_queue_insert_tail(&ws->pending_frames, &p->link);
7111624Smax.romanov@nginx.com
7121624Smax.romanov@nginx.com ws->pending_payload_len += frame->payload_len;
7131624Smax.romanov@nginx.com ws->pending_fins += frame->header->fin;
7141624Smax.romanov@nginx.com
7151624Smax.romanov@nginx.com if (frame->header->fin) {
7161624Smax.romanov@nginx.com ws->pending_frame_len = 0;
7171624Smax.romanov@nginx.com
7181624Smax.romanov@nginx.com } else {
7191624Smax.romanov@nginx.com if (frame->header->opcode == NXT_WEBSOCKET_OP_CONT) {
7201624Smax.romanov@nginx.com ws->pending_frame_len += frame->payload_len;
7211624Smax.romanov@nginx.com
7221624Smax.romanov@nginx.com } else {
7231624Smax.romanov@nginx.com ws->pending_frame_len = frame->payload_len;
7241624Smax.romanov@nginx.com }
7251624Smax.romanov@nginx.com }
7261624Smax.romanov@nginx.com }
7271624Smax.romanov@nginx.com
7281624Smax.romanov@nginx.com
7291624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t * ws,nxt_unit_websocket_frame_t * frame)7301624Smax.romanov@nginx.com nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
7311624Smax.romanov@nginx.com nxt_unit_websocket_frame_t *frame)
7321624Smax.romanov@nginx.com {
7331624Smax.romanov@nginx.com int fin;
7341624Smax.romanov@nginx.com char *buf;
7351624Smax.romanov@nginx.com uint8_t code_buf[2], opcode;
7361624Smax.romanov@nginx.com uint16_t code;
7371624Smax.romanov@nginx.com PyObject *msg, *data, *type, *data_key;
7381624Smax.romanov@nginx.com uint64_t payload_len;
7391624Smax.romanov@nginx.com nxt_unit_websocket_frame_t *fin_frame;
7401624Smax.romanov@nginx.com
7411624Smax.romanov@nginx.com nxt_unit_req_debug(ws->req, "asgi_websocket_pop_msg");
7421624Smax.romanov@nginx.com
7431624Smax.romanov@nginx.com fin_frame = NULL;
7441624Smax.romanov@nginx.com
7451624Smax.romanov@nginx.com if (nxt_queue_is_empty(&ws->pending_frames)
7461624Smax.romanov@nginx.com || (frame != NULL
7471624Smax.romanov@nginx.com && frame->header->opcode == NXT_WEBSOCKET_OP_CLOSE))
7481624Smax.romanov@nginx.com {
7491624Smax.romanov@nginx.com payload_len = frame->payload_len;
7501624Smax.romanov@nginx.com
7511624Smax.romanov@nginx.com } else {
7521624Smax.romanov@nginx.com if (frame != NULL) {
7531624Smax.romanov@nginx.com payload_len = ws->pending_payload_len + frame->payload_len;
7541624Smax.romanov@nginx.com fin_frame = frame;
7551624Smax.romanov@nginx.com
7561624Smax.romanov@nginx.com } else {
7571624Smax.romanov@nginx.com payload_len = nxt_py_asgi_websocket_pending_len(ws);
7581624Smax.romanov@nginx.com }
7591624Smax.romanov@nginx.com
7601624Smax.romanov@nginx.com frame = nxt_py_asgi_websocket_pop_frame(ws);
7611624Smax.romanov@nginx.com }
7621624Smax.romanov@nginx.com
7631624Smax.romanov@nginx.com opcode = frame->header->opcode;
7641624Smax.romanov@nginx.com
7651624Smax.romanov@nginx.com if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_CONT)) {
7661624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req,
7671624Smax.romanov@nginx.com "Invalid state: attempt to process CONT frame.");
7681624Smax.romanov@nginx.com
7691624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
7701624Smax.romanov@nginx.com
7711624Smax.romanov@nginx.com return PyErr_Format(PyExc_AssertionError,
7721624Smax.romanov@nginx.com "Invalid state: attempt to process CONT frame.");
7731624Smax.romanov@nginx.com }
7741624Smax.romanov@nginx.com
7751624Smax.romanov@nginx.com type = nxt_py_websocket_receive_str;
7761624Smax.romanov@nginx.com
7771624Smax.romanov@nginx.com switch (opcode) {
7781624Smax.romanov@nginx.com case NXT_WEBSOCKET_OP_TEXT:
7791624Smax.romanov@nginx.com buf = nxt_unit_malloc(frame->req->ctx, payload_len);
7801624Smax.romanov@nginx.com if (nxt_slow_path(buf == NULL)) {
7811624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req,
7821624Smax.romanov@nginx.com "Failed to allocate buffer for payload (%d).",
7831624Smax.romanov@nginx.com (int) payload_len);
7841624Smax.romanov@nginx.com
7851624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
7861624Smax.romanov@nginx.com
7871624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
7881624Smax.romanov@nginx.com "Failed to allocate buffer for payload (%d).",
7891624Smax.romanov@nginx.com (int) payload_len);
7901624Smax.romanov@nginx.com }
7911624Smax.romanov@nginx.com
7921624Smax.romanov@nginx.com data = NULL;
7931624Smax.romanov@nginx.com data_key = nxt_py_text_str;
7941624Smax.romanov@nginx.com
7951624Smax.romanov@nginx.com break;
7961624Smax.romanov@nginx.com
7971624Smax.romanov@nginx.com case NXT_WEBSOCKET_OP_BINARY:
7981624Smax.romanov@nginx.com data = PyBytes_FromStringAndSize(NULL, payload_len);
7991624Smax.romanov@nginx.com if (nxt_slow_path(data == NULL)) {
8001624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req,
8011624Smax.romanov@nginx.com "Failed to create Bytes for payload (%d).",
8021624Smax.romanov@nginx.com (int) payload_len);
8031624Smax.romanov@nginx.com nxt_python_print_exception();
8041624Smax.romanov@nginx.com
8051624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
8061624Smax.romanov@nginx.com
8071624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
8081624Smax.romanov@nginx.com "Failed to create Bytes for payload.");
8091624Smax.romanov@nginx.com }
8101624Smax.romanov@nginx.com
8111624Smax.romanov@nginx.com buf = (char *) PyBytes_AS_STRING(data);
8121624Smax.romanov@nginx.com data_key = nxt_py_bytes_str;
8131624Smax.romanov@nginx.com
8141624Smax.romanov@nginx.com break;
8151624Smax.romanov@nginx.com
8161624Smax.romanov@nginx.com case NXT_WEBSOCKET_OP_CLOSE:
8171624Smax.romanov@nginx.com if (frame->payload_len >= 2) {
8181624Smax.romanov@nginx.com nxt_unit_websocket_read(frame, code_buf, 2);
8191624Smax.romanov@nginx.com code = ((uint16_t) code_buf[0]) << 8 | code_buf[1];
8201624Smax.romanov@nginx.com
8211624Smax.romanov@nginx.com } else {
8221624Smax.romanov@nginx.com code = NXT_WEBSOCKET_CR_NORMAL;
8231624Smax.romanov@nginx.com }
8241624Smax.romanov@nginx.com
8251624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
8261624Smax.romanov@nginx.com
8271624Smax.romanov@nginx.com data = PyLong_FromLong(code);
8281624Smax.romanov@nginx.com if (nxt_slow_path(data == NULL)) {
8291624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req,
8301624Smax.romanov@nginx.com "Failed to create Long from code %d.",
8311624Smax.romanov@nginx.com (int) code);
8321624Smax.romanov@nginx.com nxt_python_print_exception();
8331624Smax.romanov@nginx.com
8341624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
8351624Smax.romanov@nginx.com "Failed to create Long from code %d.",
8361624Smax.romanov@nginx.com (int) code);
8371624Smax.romanov@nginx.com }
8381624Smax.romanov@nginx.com
8391624Smax.romanov@nginx.com buf = NULL;
8401624Smax.romanov@nginx.com type = nxt_py_websocket_disconnect_str;
8411624Smax.romanov@nginx.com data_key = nxt_py_code_str;
8421624Smax.romanov@nginx.com
8431624Smax.romanov@nginx.com break;
8441624Smax.romanov@nginx.com
8451624Smax.romanov@nginx.com default:
8461624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req, "Unexpected opcode %d", opcode);
8471624Smax.romanov@nginx.com
8481624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
8491624Smax.romanov@nginx.com
8501624Smax.romanov@nginx.com return PyErr_Format(PyExc_AssertionError, "Unexpected opcode %d",
8511624Smax.romanov@nginx.com opcode);
8521624Smax.romanov@nginx.com }
8531624Smax.romanov@nginx.com
8541624Smax.romanov@nginx.com if (buf != NULL) {
8551624Smax.romanov@nginx.com fin = frame->header->fin;
8561624Smax.romanov@nginx.com buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
8571624Smax.romanov@nginx.com
8581624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
8591624Smax.romanov@nginx.com
8601624Smax.romanov@nginx.com if (!fin) {
8611624Smax.romanov@nginx.com while (!nxt_queue_is_empty(&ws->pending_frames)) {
8621624Smax.romanov@nginx.com frame = nxt_py_asgi_websocket_pop_frame(ws);
8631624Smax.romanov@nginx.com fin = frame->header->fin;
8641624Smax.romanov@nginx.com
8651624Smax.romanov@nginx.com buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
8661624Smax.romanov@nginx.com
8671624Smax.romanov@nginx.com nxt_unit_websocket_done(frame);
8681624Smax.romanov@nginx.com
8691624Smax.romanov@nginx.com if (fin) {
8701624Smax.romanov@nginx.com break;
8711624Smax.romanov@nginx.com }
8721624Smax.romanov@nginx.com }
8731624Smax.romanov@nginx.com
8741624Smax.romanov@nginx.com if (fin_frame != NULL) {
8751624Smax.romanov@nginx.com buf += nxt_unit_websocket_read(fin_frame, buf,
8761624Smax.romanov@nginx.com fin_frame->payload_len);
8771624Smax.romanov@nginx.com nxt_unit_websocket_done(fin_frame);
8781624Smax.romanov@nginx.com }
8791624Smax.romanov@nginx.com }
8801624Smax.romanov@nginx.com
8811624Smax.romanov@nginx.com if (opcode == NXT_WEBSOCKET_OP_TEXT) {
8821624Smax.romanov@nginx.com buf -= payload_len;
8831624Smax.romanov@nginx.com
8841624Smax.romanov@nginx.com data = PyUnicode_DecodeUTF8(buf, payload_len, NULL);
8851624Smax.romanov@nginx.com
8861624Smax.romanov@nginx.com nxt_unit_free(ws->req->ctx, buf);
8871624Smax.romanov@nginx.com
8881624Smax.romanov@nginx.com if (nxt_slow_path(data == NULL)) {
8891624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req,
8901624Smax.romanov@nginx.com "Failed to create Unicode for payload (%d).",
8911624Smax.romanov@nginx.com (int) payload_len);
8921624Smax.romanov@nginx.com nxt_python_print_exception();
8931624Smax.romanov@nginx.com
8941624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
8951624Smax.romanov@nginx.com "Failed to create Unicode.");
8961624Smax.romanov@nginx.com }
8971624Smax.romanov@nginx.com }
8981624Smax.romanov@nginx.com }
8991624Smax.romanov@nginx.com
9001624Smax.romanov@nginx.com msg = nxt_py_asgi_new_msg(ws->req, type);
9011624Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) {
9021624Smax.romanov@nginx.com Py_DECREF(data);
9031624Smax.romanov@nginx.com return NULL;
9041624Smax.romanov@nginx.com }
9051624Smax.romanov@nginx.com
9061624Smax.romanov@nginx.com if (nxt_slow_path(PyDict_SetItem(msg, data_key, data) == -1)) {
9071624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req, "Python failed to set 'msg.data' item");
9081624Smax.romanov@nginx.com
9091624Smax.romanov@nginx.com Py_DECREF(msg);
9101624Smax.romanov@nginx.com Py_DECREF(data);
9111624Smax.romanov@nginx.com
9121624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError,
9131624Smax.romanov@nginx.com "Python failed to set 'msg.data' item");
9141624Smax.romanov@nginx.com }
9151624Smax.romanov@nginx.com
9161624Smax.romanov@nginx.com Py_DECREF(data);
9171624Smax.romanov@nginx.com
9181624Smax.romanov@nginx.com return msg;
9191624Smax.romanov@nginx.com }
9201624Smax.romanov@nginx.com
9211624Smax.romanov@nginx.com
9221624Smax.romanov@nginx.com static uint64_t
nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t * ws)9231624Smax.romanov@nginx.com nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t *ws)
9241624Smax.romanov@nginx.com {
9251624Smax.romanov@nginx.com uint64_t res;
9261624Smax.romanov@nginx.com nxt_py_asgi_penging_frame_t *p;
9271624Smax.romanov@nginx.com
9281624Smax.romanov@nginx.com res = 0;
9291624Smax.romanov@nginx.com
9301624Smax.romanov@nginx.com nxt_queue_each(p, &ws->pending_frames, nxt_py_asgi_penging_frame_t, link) {
9311624Smax.romanov@nginx.com res += p->frame->payload_len;
9321624Smax.romanov@nginx.com
9331624Smax.romanov@nginx.com if (p->frame->header->fin) {
9341624Smax.romanov@nginx.com nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d",
9351624Smax.romanov@nginx.com (int) res);
9361624Smax.romanov@nginx.com return res;
9371624Smax.romanov@nginx.com }
9381624Smax.romanov@nginx.com } nxt_queue_loop;
9391624Smax.romanov@nginx.com
9401624Smax.romanov@nginx.com nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d (all)",
9411624Smax.romanov@nginx.com (int) res);
9421624Smax.romanov@nginx.com return res;
9431624Smax.romanov@nginx.com }
9441624Smax.romanov@nginx.com
9451624Smax.romanov@nginx.com
9461624Smax.romanov@nginx.com static nxt_unit_websocket_frame_t *
nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t * ws)9471624Smax.romanov@nginx.com nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t *ws)
9481624Smax.romanov@nginx.com {
9491624Smax.romanov@nginx.com nxt_queue_link_t *lnk;
9501624Smax.romanov@nginx.com nxt_unit_websocket_frame_t *frame;
9511624Smax.romanov@nginx.com nxt_py_asgi_penging_frame_t *p;
9521624Smax.romanov@nginx.com
9531624Smax.romanov@nginx.com lnk = nxt_queue_first(&ws->pending_frames);
9541624Smax.romanov@nginx.com nxt_queue_remove(lnk);
9551624Smax.romanov@nginx.com
9561624Smax.romanov@nginx.com p = nxt_queue_link_data(lnk, nxt_py_asgi_penging_frame_t, link);
9571624Smax.romanov@nginx.com
9581624Smax.romanov@nginx.com frame = p->frame;
9591624Smax.romanov@nginx.com ws->pending_payload_len -= frame->payload_len;
9601624Smax.romanov@nginx.com ws->pending_fins -= frame->header->fin;
9611624Smax.romanov@nginx.com
9621624Smax.romanov@nginx.com nxt_unit_free(frame->req->ctx, p);
9631624Smax.romanov@nginx.com
9641624Smax.romanov@nginx.com nxt_unit_req_debug(frame->req, "asgi_websocket_pop_frame: "
9651624Smax.romanov@nginx.com "%d, %"PRIu64", %d",
9661624Smax.romanov@nginx.com frame->header->opcode, frame->payload_len,
9671624Smax.romanov@nginx.com frame->header->fin);
9681624Smax.romanov@nginx.com
9691624Smax.romanov@nginx.com return frame;
9701624Smax.romanov@nginx.com }
9711624Smax.romanov@nginx.com
9721624Smax.romanov@nginx.com
9731624Smax.romanov@nginx.com void
nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t * req)9741624Smax.romanov@nginx.com nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req)
9751624Smax.romanov@nginx.com {
9761624Smax.romanov@nginx.com PyObject *msg, *exc;
9771624Smax.romanov@nginx.com nxt_py_asgi_websocket_t *ws;
9781624Smax.romanov@nginx.com
9791624Smax.romanov@nginx.com ws = req->data;
9801624Smax.romanov@nginx.com
9811624Smax.romanov@nginx.com nxt_unit_req_debug(req, "asgi_websocket_close_handler");
9821624Smax.romanov@nginx.com
983*1980Smax.romanov@nginx.com if (nxt_slow_path(ws == NULL)) {
984*1980Smax.romanov@nginx.com return;
985*1980Smax.romanov@nginx.com }
986*1980Smax.romanov@nginx.com
9871624Smax.romanov@nginx.com if (ws->receive_future == NULL) {
9881624Smax.romanov@nginx.com ws->state = NXT_WS_DISCONNECTED;
9891624Smax.romanov@nginx.com
9901624Smax.romanov@nginx.com return;
9911624Smax.romanov@nginx.com }
9921624Smax.romanov@nginx.com
9931624Smax.romanov@nginx.com msg = nxt_py_asgi_websocket_disconnect_msg(ws);
9941624Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) {
9951624Smax.romanov@nginx.com exc = PyErr_Occurred();
9961624Smax.romanov@nginx.com Py_INCREF(exc);
9971624Smax.romanov@nginx.com
9981624Smax.romanov@nginx.com nxt_py_asgi_websocket_receive_fail(ws, exc);
9991624Smax.romanov@nginx.com
10001624Smax.romanov@nginx.com } else {
10011624Smax.romanov@nginx.com nxt_py_asgi_websocket_receive_done(ws, msg);
10021624Smax.romanov@nginx.com }
10031624Smax.romanov@nginx.com }
10041624Smax.romanov@nginx.com
10051624Smax.romanov@nginx.com
10061624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t * ws)10071624Smax.romanov@nginx.com nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t *ws)
10081624Smax.romanov@nginx.com {
10091624Smax.romanov@nginx.com PyObject *msg, *code;
10101624Smax.romanov@nginx.com
10111624Smax.romanov@nginx.com msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_disconnect_str);
10121624Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) {
10131624Smax.romanov@nginx.com return NULL;
10141624Smax.romanov@nginx.com }
10151624Smax.romanov@nginx.com
10161624Smax.romanov@nginx.com code = PyLong_FromLong(NXT_WEBSOCKET_CR_GOING_AWAY);
10171624Smax.romanov@nginx.com if (nxt_slow_path(code == NULL)) {
10181624Smax.romanov@nginx.com nxt_unit_req_alert(ws->req, "Python failed to create long");
10191624Smax.romanov@nginx.com nxt_python_print_exception();
10201624Smax.romanov@nginx.com
10211624Smax.romanov@nginx.com Py_DECREF(msg);
10221624Smax.romanov@nginx.com
10231624Smax.romanov@nginx.com return PyErr_Format(PyExc_RuntimeError, "failed to create long");
10241624Smax.romanov@nginx.com }
10251624