xref: /unit/src/python/nxt_python_asgi_websocket.c (revision 1980:43553aa72111)
11624Smax.romanov@nginx.com 
21624Smax.romanov@nginx.com /*
31624Smax.romanov@nginx.com  * Copyright (C) NGINX, Inc.
41624Smax.romanov@nginx.com  */
51624Smax.romanov@nginx.com 
61624Smax.romanov@nginx.com 
71624Smax.romanov@nginx.com #include <python/nxt_python.h>
81624Smax.romanov@nginx.com 
91624Smax.romanov@nginx.com #if (NXT_HAVE_ASGI)
101624Smax.romanov@nginx.com 
111624Smax.romanov@nginx.com #include <nxt_main.h>
121624Smax.romanov@nginx.com #include <nxt_unit.h>
131624Smax.romanov@nginx.com #include <nxt_unit_request.h>
141624Smax.romanov@nginx.com #include <nxt_unit_websocket.h>
151624Smax.romanov@nginx.com #include <nxt_websocket_header.h>
161624Smax.romanov@nginx.com #include <python/nxt_python_asgi.h>
171624Smax.romanov@nginx.com #include <python/nxt_python_asgi_str.h>
181624Smax.romanov@nginx.com 
191624Smax.romanov@nginx.com 
201624Smax.romanov@nginx.com enum {
211624Smax.romanov@nginx.com     NXT_WS_INIT,
221624Smax.romanov@nginx.com     NXT_WS_CONNECT,
231624Smax.romanov@nginx.com     NXT_WS_ACCEPTED,
241624Smax.romanov@nginx.com     NXT_WS_DISCONNECTED,
251624Smax.romanov@nginx.com     NXT_WS_CLOSED,
261624Smax.romanov@nginx.com };
271624Smax.romanov@nginx.com 
281624Smax.romanov@nginx.com 
291624Smax.romanov@nginx.com typedef struct {
301624Smax.romanov@nginx.com     nxt_queue_link_t            link;
311624Smax.romanov@nginx.com     nxt_unit_websocket_frame_t  *frame;
321624Smax.romanov@nginx.com } nxt_py_asgi_penging_frame_t;
331624Smax.romanov@nginx.com 
341624Smax.romanov@nginx.com 
351624Smax.romanov@nginx.com typedef struct {
361624Smax.romanov@nginx.com     PyObject_HEAD
371624Smax.romanov@nginx.com     nxt_unit_request_info_t  *req;
381624Smax.romanov@nginx.com     PyObject                 *receive_future;
391624Smax.romanov@nginx.com     PyObject                 *receive_exc_str;
401624Smax.romanov@nginx.com     int                      state;
411624Smax.romanov@nginx.com     nxt_queue_t              pending_frames;
421624Smax.romanov@nginx.com     uint64_t                 pending_payload_len;
431624Smax.romanov@nginx.com     uint64_t                 pending_frame_len;
441624Smax.romanov@nginx.com     int                      pending_fins;
451624Smax.romanov@nginx.com } nxt_py_asgi_websocket_t;
461624Smax.romanov@nginx.com 
471624Smax.romanov@nginx.com 
481624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none);
491624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict);
501624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws,
511624Smax.romanov@nginx.com     PyObject *dict);
521624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws,
531624Smax.romanov@nginx.com     PyObject *dict);
541624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws,
551624Smax.romanov@nginx.com     PyObject *dict);
561624Smax.romanov@nginx.com static void nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws,
571624Smax.romanov@nginx.com     PyObject *msg);
581624Smax.romanov@nginx.com static void nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws,
591624Smax.romanov@nginx.com     PyObject *exc);
601624Smax.romanov@nginx.com static void nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *f);
611624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
621624Smax.romanov@nginx.com     nxt_unit_websocket_frame_t *frame);
631624Smax.romanov@nginx.com static uint64_t nxt_py_asgi_websocket_pending_len(
641624Smax.romanov@nginx.com     nxt_py_asgi_websocket_t *ws);
651624Smax.romanov@nginx.com static nxt_unit_websocket_frame_t *nxt_py_asgi_websocket_pop_frame(
661624Smax.romanov@nginx.com     nxt_py_asgi_websocket_t *ws);
671624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_disconnect_msg(
681624Smax.romanov@nginx.com     nxt_py_asgi_websocket_t *ws);
691624Smax.romanov@nginx.com static PyObject *nxt_py_asgi_websocket_done(PyObject *self, PyObject *future);
701624Smax.romanov@nginx.com 
711624Smax.romanov@nginx.com 
721624Smax.romanov@nginx.com static PyMethodDef nxt_py_asgi_websocket_methods[] = {
731624Smax.romanov@nginx.com     { "receive",   nxt_py_asgi_websocket_receive, METH_NOARGS, 0 },
741624Smax.romanov@nginx.com     { "send",      nxt_py_asgi_websocket_send,    METH_O,      0 },
751624Smax.romanov@nginx.com     { "_done",     nxt_py_asgi_websocket_done,    METH_O,      0 },
761624Smax.romanov@nginx.com     { NULL, NULL, 0, 0 }
771624Smax.romanov@nginx.com };
781624Smax.romanov@nginx.com 
791624Smax.romanov@nginx.com static PyAsyncMethods nxt_py_asgi_async_methods = {
801624Smax.romanov@nginx.com     .am_await = nxt_py_asgi_await,
811624Smax.romanov@nginx.com };
821624Smax.romanov@nginx.com 
831624Smax.romanov@nginx.com static PyTypeObject nxt_py_asgi_websocket_type = {
841624Smax.romanov@nginx.com     PyVarObject_HEAD_INIT(NULL, 0)
851624Smax.romanov@nginx.com 
861624Smax.romanov@nginx.com     .tp_name      = "unit._asgi_websocket",
871624Smax.romanov@nginx.com     .tp_basicsize = sizeof(nxt_py_asgi_websocket_t),
881624Smax.romanov@nginx.com     .tp_dealloc   = nxt_py_asgi_dealloc,
891624Smax.romanov@nginx.com     .tp_as_async  = &nxt_py_asgi_async_methods,
901624Smax.romanov@nginx.com     .tp_flags     = Py_TPFLAGS_DEFAULT,
911624Smax.romanov@nginx.com     .tp_doc       = "unit ASGI WebSocket connection object",
921624Smax.romanov@nginx.com     .tp_iter      = nxt_py_asgi_iter,
931624Smax.romanov@nginx.com     .tp_iternext  = nxt_py_asgi_next,
941624Smax.romanov@nginx.com     .tp_methods   = nxt_py_asgi_websocket_methods,
951624Smax.romanov@nginx.com };
961624Smax.romanov@nginx.com 
971624Smax.romanov@nginx.com static uint64_t  nxt_py_asgi_ws_max_frame_size = 1024 * 1024;
981624Smax.romanov@nginx.com static uint64_t  nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024;
991624Smax.romanov@nginx.com 
1001624Smax.romanov@nginx.com 
1011681Smax.romanov@nginx.com int
nxt_py_asgi_websocket_init(void)1021681Smax.romanov@nginx.com nxt_py_asgi_websocket_init(void)
1031624Smax.romanov@nginx.com {
1041624Smax.romanov@nginx.com     if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) {
1051681Smax.romanov@nginx.com         nxt_unit_alert(NULL,
1061624Smax.romanov@nginx.com               "Python failed to initialize the \"asgi_websocket\" type object");
1071681Smax.romanov@nginx.com         return NXT_UNIT_ERROR;
1081624Smax.romanov@nginx.com     }
1091624Smax.romanov@nginx.com 
1101681Smax.romanov@nginx.com     return NXT_UNIT_OK;
1111624Smax.romanov@nginx.com }
1121624Smax.romanov@nginx.com 
1131624Smax.romanov@nginx.com 
1141624Smax.romanov@nginx.com PyObject *
nxt_py_asgi_websocket_create(nxt_unit_request_info_t * req)1151624Smax.romanov@nginx.com nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req)
1161624Smax.romanov@nginx.com {
1171624Smax.romanov@nginx.com     nxt_py_asgi_websocket_t  *ws;
1181624Smax.romanov@nginx.com 
1191624Smax.romanov@nginx.com     ws = PyObject_New(nxt_py_asgi_websocket_t, &nxt_py_asgi_websocket_type);
1201624Smax.romanov@nginx.com 
1211624Smax.romanov@nginx.com     if (nxt_fast_path(ws != NULL)) {
1221624Smax.romanov@nginx.com         ws->req = req;
1231624Smax.romanov@nginx.com         ws->receive_future = NULL;
1241624Smax.romanov@nginx.com         ws->receive_exc_str = NULL;
1251624Smax.romanov@nginx.com         ws->state = NXT_WS_INIT;
1261624Smax.romanov@nginx.com         nxt_queue_init(&ws->pending_frames);
1271624Smax.romanov@nginx.com         ws->pending_payload_len = 0;
1281624Smax.romanov@nginx.com         ws->pending_frame_len = 0;
1291624Smax.romanov@nginx.com         ws->pending_fins = 0;
1301624Smax.romanov@nginx.com     }
1311624Smax.romanov@nginx.com 
1321624Smax.romanov@nginx.com     return (PyObject *) ws;
1331624Smax.romanov@nginx.com }
1341624Smax.romanov@nginx.com 
1351624Smax.romanov@nginx.com 
1361624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_receive(PyObject * self,PyObject * none)1371624Smax.romanov@nginx.com nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none)
1381624Smax.romanov@nginx.com {
1391624Smax.romanov@nginx.com     PyObject                 *future, *msg;
1401681Smax.romanov@nginx.com     nxt_py_asgi_ctx_data_t   *ctx_data;
1411624Smax.romanov@nginx.com     nxt_py_asgi_websocket_t  *ws;
1421624Smax.romanov@nginx.com 
1431624Smax.romanov@nginx.com     ws = (nxt_py_asgi_websocket_t *) self;
1441624Smax.romanov@nginx.com 
1451624Smax.romanov@nginx.com     nxt_unit_req_debug(ws->req, "asgi_websocket_receive");
1461624Smax.romanov@nginx.com 
1471624Smax.romanov@nginx.com     /* If exception happened out of receive() call, raise it now. */
1481624Smax.romanov@nginx.com     if (nxt_slow_path(ws->receive_exc_str != NULL)) {
1491624Smax.romanov@nginx.com         PyErr_SetObject(PyExc_RuntimeError, ws->receive_exc_str);
1501624Smax.romanov@nginx.com 
1511624Smax.romanov@nginx.com         ws->receive_exc_str = NULL;
1521624Smax.romanov@nginx.com 
1531624Smax.romanov@nginx.com         return NULL;
1541624Smax.romanov@nginx.com     }
1551624Smax.romanov@nginx.com 
1561624Smax.romanov@nginx.com     if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
1571624Smax.romanov@nginx.com         nxt_unit_req_error(ws->req,
1581624Smax.romanov@nginx.com                            "receive() called for closed WebSocket");
1591624Smax.romanov@nginx.com 
1601624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError,
1611624Smax.romanov@nginx.com                             "WebSocket already closed");
1621624Smax.romanov@nginx.com     }
1631624Smax.romanov@nginx.com 
1641681Smax.romanov@nginx.com     ctx_data = ws->req->ctx->data;
1651681Smax.romanov@nginx.com 
1661681Smax.romanov@nginx.com     future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
1671624Smax.romanov@nginx.com     if (nxt_slow_path(future == NULL)) {
1681624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req, "Python failed to create Future object");
1691624Smax.romanov@nginx.com         nxt_python_print_exception();
1701624Smax.romanov@nginx.com 
1711624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError,
1721624Smax.romanov@nginx.com                             "failed to create Future object");
1731624Smax.romanov@nginx.com     }
1741624Smax.romanov@nginx.com 
1751624Smax.romanov@nginx.com     if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
1761624Smax.romanov@nginx.com         ws->state = NXT_WS_CONNECT;
1771624Smax.romanov@nginx.com 
1781624Smax.romanov@nginx.com         msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str);
1791624Smax.romanov@nginx.com 
1801681Smax.romanov@nginx.com         return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
1811624Smax.romanov@nginx.com     }
1821624Smax.romanov@nginx.com 
1831624Smax.romanov@nginx.com     if (ws->pending_fins > 0) {
1841624Smax.romanov@nginx.com         msg = nxt_py_asgi_websocket_pop_msg(ws, NULL);
1851624Smax.romanov@nginx.com 
1861681Smax.romanov@nginx.com         return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
1871624Smax.romanov@nginx.com     }
1881624Smax.romanov@nginx.com 
1891624Smax.romanov@nginx.com     if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
1901624Smax.romanov@nginx.com         msg = nxt_py_asgi_websocket_disconnect_msg(ws);
1911624Smax.romanov@nginx.com 
1921681Smax.romanov@nginx.com         return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg);
1931624Smax.romanov@nginx.com     }
1941624Smax.romanov@nginx.com 
1951624Smax.romanov@nginx.com     ws->receive_future = future;
1961624Smax.romanov@nginx.com     Py_INCREF(ws->receive_future);
1971624Smax.romanov@nginx.com 
1981624Smax.romanov@nginx.com     return future;
1991624Smax.romanov@nginx.com }
2001624Smax.romanov@nginx.com 
2011624Smax.romanov@nginx.com 
2021624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_send(PyObject * self,PyObject * dict)2031624Smax.romanov@nginx.com nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict)
2041624Smax.romanov@nginx.com {
2051624Smax.romanov@nginx.com     PyObject                 *type;
2061624Smax.romanov@nginx.com     const char               *type_str;
2071624Smax.romanov@nginx.com     Py_ssize_t               type_len;
2081624Smax.romanov@nginx.com     nxt_py_asgi_websocket_t  *ws;
2091624Smax.romanov@nginx.com 
2101624Smax.romanov@nginx.com     static const nxt_str_t  websocket_accept = nxt_string("websocket.accept");
2111624Smax.romanov@nginx.com     static const nxt_str_t  websocket_close = nxt_string("websocket.close");
2121624Smax.romanov@nginx.com     static const nxt_str_t  websocket_send = nxt_string("websocket.send");
2131624Smax.romanov@nginx.com 
2141624Smax.romanov@nginx.com     ws = (nxt_py_asgi_websocket_t *) self;
2151624Smax.romanov@nginx.com 
2161624Smax.romanov@nginx.com     type = PyDict_GetItem(dict, nxt_py_type_str);
2171624Smax.romanov@nginx.com     if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
2181624Smax.romanov@nginx.com         nxt_unit_req_error(ws->req, "asgi_websocket_send: "
2191624Smax.romanov@nginx.com                            "'type' is not a unicode string");
2201624Smax.romanov@nginx.com         return PyErr_Format(PyExc_TypeError,
2211624Smax.romanov@nginx.com                             "'type' is not a unicode string");
2221624Smax.romanov@nginx.com     }
2231624Smax.romanov@nginx.com 
2241624Smax.romanov@nginx.com     type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
2251624Smax.romanov@nginx.com 
2261624Smax.romanov@nginx.com     nxt_unit_req_debug(ws->req, "asgi_websocket_send type is '%.*s'",
2271624Smax.romanov@nginx.com                        (int) type_len, type_str);
2281624Smax.romanov@nginx.com 
2291624Smax.romanov@nginx.com     if (type_len == (Py_ssize_t) websocket_accept.length
2301624Smax.romanov@nginx.com         && memcmp(type_str, websocket_accept.start, type_len) == 0)
2311624Smax.romanov@nginx.com     {
2321624Smax.romanov@nginx.com         return nxt_py_asgi_websocket_accept(ws, dict);
2331624Smax.romanov@nginx.com     }
2341624Smax.romanov@nginx.com 
2351624Smax.romanov@nginx.com     if (type_len == (Py_ssize_t) websocket_close.length
2361624Smax.romanov@nginx.com         && memcmp(type_str, websocket_close.start, type_len) == 0)
2371624Smax.romanov@nginx.com     {
2381624Smax.romanov@nginx.com         return nxt_py_asgi_websocket_close(ws, dict);
2391624Smax.romanov@nginx.com     }
2401624Smax.romanov@nginx.com 
2411624Smax.romanov@nginx.com     if (type_len == (Py_ssize_t) websocket_send.length
2421624Smax.romanov@nginx.com         && memcmp(type_str, websocket_send.start, type_len) == 0)
2431624Smax.romanov@nginx.com     {
2441624Smax.romanov@nginx.com         return nxt_py_asgi_websocket_send_frame(ws, dict);
2451624Smax.romanov@nginx.com     }
2461624Smax.romanov@nginx.com 
2471624Smax.romanov@nginx.com     nxt_unit_req_error(ws->req, "asgi_websocket_send: "
2481624Smax.romanov@nginx.com                        "unexpected 'type': '%.*s'", (int) type_len, type_str);
2491624Smax.romanov@nginx.com     return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
2501624Smax.romanov@nginx.com }
2511624Smax.romanov@nginx.com 
2521624Smax.romanov@nginx.com 
2531624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t * ws,PyObject * dict)2541624Smax.romanov@nginx.com nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict)
2551624Smax.romanov@nginx.com {
2561624Smax.romanov@nginx.com     int                          rc;
2571624Smax.romanov@nginx.com     char                         *subprotocol_str;
2581624Smax.romanov@nginx.com     PyObject                     *res, *headers, *subprotocol;
2591624Smax.romanov@nginx.com     Py_ssize_t                   subprotocol_len;
2601624Smax.romanov@nginx.com     nxt_py_asgi_calc_size_ctx_t  calc_size_ctx;
2611624Smax.romanov@nginx.com     nxt_py_asgi_add_field_ctx_t  add_field_ctx;
2621624Smax.romanov@nginx.com 
2631624Smax.romanov@nginx.com     static const nxt_str_t  ws_protocol = nxt_string("sec-websocket-protocol");
2641624Smax.romanov@nginx.com 
2651624Smax.romanov@nginx.com     switch(ws->state) {
2661624Smax.romanov@nginx.com     case NXT_WS_INIT:
2671624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError,
2681624Smax.romanov@nginx.com                             "WebSocket connect not received");
2691624Smax.romanov@nginx.com     case NXT_WS_CONNECT:
2701624Smax.romanov@nginx.com         break;
2711624Smax.romanov@nginx.com 
2721624Smax.romanov@nginx.com     case NXT_WS_ACCEPTED:
2731624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
2741624Smax.romanov@nginx.com 
2751624Smax.romanov@nginx.com     case NXT_WS_DISCONNECTED:
2761624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
2771624Smax.romanov@nginx.com 
2781624Smax.romanov@nginx.com     case NXT_WS_CLOSED:
2791624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
2801624Smax.romanov@nginx.com     }
2811624Smax.romanov@nginx.com 
2821624Smax.romanov@nginx.com     if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) {
2831624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
2841624Smax.romanov@nginx.com     }
2851624Smax.romanov@nginx.com 
2861624Smax.romanov@nginx.com     if (nxt_slow_path(nxt_unit_response_is_sent(ws->req))) {
2871624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "response already sent");
2881624Smax.romanov@nginx.com     }
2891624Smax.romanov@nginx.com 
2901624Smax.romanov@nginx.com     calc_size_ctx.fields_size = 0;
2911624Smax.romanov@nginx.com     calc_size_ctx.fields_count = 0;
2921624Smax.romanov@nginx.com 
2931624Smax.romanov@nginx.com     headers = PyDict_GetItem(dict, nxt_py_headers_str);
2941624Smax.romanov@nginx.com     if (headers != NULL) {
2951624Smax.romanov@nginx.com         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
2961624Smax.romanov@nginx.com                                        &calc_size_ctx);
2971624Smax.romanov@nginx.com         if (nxt_slow_path(res == NULL)) {
2981624Smax.romanov@nginx.com             return NULL;
2991624Smax.romanov@nginx.com         }
3001624Smax.romanov@nginx.com     }
3011624Smax.romanov@nginx.com 
3021624Smax.romanov@nginx.com     subprotocol = PyDict_GetItem(dict, nxt_py_subprotocol_str);
3031624Smax.romanov@nginx.com     if (subprotocol != NULL && PyUnicode_Check(subprotocol)) {
3041624Smax.romanov@nginx.com         subprotocol_str = PyUnicode_DATA(subprotocol);
3051624Smax.romanov@nginx.com         subprotocol_len = PyUnicode_GET_LENGTH(subprotocol);
3061624Smax.romanov@nginx.com 
3071624Smax.romanov@nginx.com         calc_size_ctx.fields_size += ws_protocol.length + subprotocol_len;
3081624Smax.romanov@nginx.com         calc_size_ctx.fields_count++;
3091624Smax.romanov@nginx.com 
3101624Smax.romanov@nginx.com     } else {
3111624Smax.romanov@nginx.com         subprotocol_str = NULL;
3121624Smax.romanov@nginx.com         subprotocol_len = 0;
3131624Smax.romanov@nginx.com     }
3141624Smax.romanov@nginx.com 
3151624Smax.romanov@nginx.com     rc = nxt_unit_response_init(ws->req, 101,
3161624Smax.romanov@nginx.com                                 calc_size_ctx.fields_count,
3171624Smax.romanov@nginx.com                                 calc_size_ctx.fields_size);
3181624Smax.romanov@nginx.com     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3191624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError,
3201624Smax.romanov@nginx.com                             "failed to allocate response object");
3211624Smax.romanov@nginx.com     }
3221624Smax.romanov@nginx.com 
3231624Smax.romanov@nginx.com     add_field_ctx.req = ws->req;
3241624Smax.romanov@nginx.com     add_field_ctx.content_length = -1;
3251624Smax.romanov@nginx.com 
3261624Smax.romanov@nginx.com     if (headers != NULL) {
3271624Smax.romanov@nginx.com         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
3281624Smax.romanov@nginx.com                                        &add_field_ctx);
3291624Smax.romanov@nginx.com         if (nxt_slow_path(res == NULL)) {
3301624Smax.romanov@nginx.com             return NULL;
3311624Smax.romanov@nginx.com         }
3321624Smax.romanov@nginx.com     }
3331624Smax.romanov@nginx.com 
3341624Smax.romanov@nginx.com     if (subprotocol_len > 0) {
3351624Smax.romanov@nginx.com         rc = nxt_unit_response_add_field(ws->req,
3361624Smax.romanov@nginx.com                                          (const char *) ws_protocol.start,
3371624Smax.romanov@nginx.com                                          ws_protocol.length,
3381624Smax.romanov@nginx.com                                          subprotocol_str, subprotocol_len);
3391624Smax.romanov@nginx.com         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3401624Smax.romanov@nginx.com             return PyErr_Format(PyExc_RuntimeError,
3411624Smax.romanov@nginx.com                                 "failed to add header");
3421624Smax.romanov@nginx.com         }
3431624Smax.romanov@nginx.com     }
3441624Smax.romanov@nginx.com 
3451624Smax.romanov@nginx.com     rc = nxt_unit_response_send(ws->req);
3461624Smax.romanov@nginx.com     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3471624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "failed to send response");
3481624Smax.romanov@nginx.com     }
3491624Smax.romanov@nginx.com 
3501624Smax.romanov@nginx.com     ws->state = NXT_WS_ACCEPTED;
3511624Smax.romanov@nginx.com 
3521624Smax.romanov@nginx.com     Py_INCREF(ws);
3531624Smax.romanov@nginx.com 
3541624Smax.romanov@nginx.com     return (PyObject *) ws;
3551624Smax.romanov@nginx.com }
3561624Smax.romanov@nginx.com 
3571624Smax.romanov@nginx.com 
3581624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t * ws,PyObject * dict)3591624Smax.romanov@nginx.com nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict)
3601624Smax.romanov@nginx.com {
3611624Smax.romanov@nginx.com     int       rc;
3621624Smax.romanov@nginx.com     uint16_t  status_code;
3631624Smax.romanov@nginx.com     PyObject  *code;
3641624Smax.romanov@nginx.com 
3651624Smax.romanov@nginx.com     if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
3661624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError,
3671624Smax.romanov@nginx.com                             "WebSocket connect not received");
3681624Smax.romanov@nginx.com     }
3691624Smax.romanov@nginx.com 
3701624Smax.romanov@nginx.com     if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
3711624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
3721624Smax.romanov@nginx.com     }
3731624Smax.romanov@nginx.com 
3741624Smax.romanov@nginx.com     if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
3751624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
3761624Smax.romanov@nginx.com     }
3771624Smax.romanov@nginx.com 
3781624Smax.romanov@nginx.com     if (nxt_unit_response_is_websocket(ws->req)) {
3791624Smax.romanov@nginx.com         code = PyDict_GetItem(dict, nxt_py_code_str);
3801624Smax.romanov@nginx.com         if (nxt_slow_path(code != NULL && !PyLong_Check(code))) {
3811624Smax.romanov@nginx.com             return PyErr_Format(PyExc_TypeError, "'code' is not integer");
3821624Smax.romanov@nginx.com         }
3831624Smax.romanov@nginx.com 
3841624Smax.romanov@nginx.com         status_code = (code != NULL) ? htons(PyLong_AsLong(code))
3851624Smax.romanov@nginx.com                                      : htons(NXT_WEBSOCKET_CR_NORMAL);
3861624Smax.romanov@nginx.com 
3871624Smax.romanov@nginx.com         rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
3881624Smax.romanov@nginx.com                                      1, &status_code, 2);
3891624Smax.romanov@nginx.com         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3901624Smax.romanov@nginx.com             return PyErr_Format(PyExc_RuntimeError,
3911624Smax.romanov@nginx.com                                 "failed to send close frame");
3921624Smax.romanov@nginx.com         }
3931624Smax.romanov@nginx.com 
3941624Smax.romanov@nginx.com     } else {
3951624Smax.romanov@nginx.com         rc = nxt_unit_response_init(ws->req, 403, 0, 0);
3961624Smax.romanov@nginx.com         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3971624Smax.romanov@nginx.com             return PyErr_Format(PyExc_RuntimeError,
3981624Smax.romanov@nginx.com                                 "failed to allocate response object");
3991624Smax.romanov@nginx.com         }
4001624Smax.romanov@nginx.com 
4011624Smax.romanov@nginx.com         rc = nxt_unit_response_send(ws->req);
4021624Smax.romanov@nginx.com         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4031624Smax.romanov@nginx.com             return PyErr_Format(PyExc_RuntimeError,
4041624Smax.romanov@nginx.com                                 "failed to send response");
4051624Smax.romanov@nginx.com         }
4061624Smax.romanov@nginx.com     }
4071624Smax.romanov@nginx.com 
4081624Smax.romanov@nginx.com     ws->state = NXT_WS_CLOSED;
4091624Smax.romanov@nginx.com 
4101624Smax.romanov@nginx.com     Py_INCREF(ws);
4111624Smax.romanov@nginx.com 
4121624Smax.romanov@nginx.com     return (PyObject *) ws;
4131624Smax.romanov@nginx.com }
4141624Smax.romanov@nginx.com 
4151624Smax.romanov@nginx.com 
4161624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t * ws,PyObject * dict)4171624Smax.romanov@nginx.com nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict)
4181624Smax.romanov@nginx.com {
4191624Smax.romanov@nginx.com     int         rc;
4201624Smax.romanov@nginx.com     uint8_t     opcode;
4211624Smax.romanov@nginx.com     PyObject    *bytes, *text;
4221624Smax.romanov@nginx.com     const void  *buf;
4231624Smax.romanov@nginx.com     Py_ssize_t  buf_size;
4241624Smax.romanov@nginx.com 
4251624Smax.romanov@nginx.com     if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
4261624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError,
4271624Smax.romanov@nginx.com                             "WebSocket connect not received");
4281624Smax.romanov@nginx.com     }
4291624Smax.romanov@nginx.com 
4301624Smax.romanov@nginx.com     if (nxt_slow_path(ws->state == NXT_WS_CONNECT)) {
4311624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError,
4321624Smax.romanov@nginx.com                             "WebSocket not accepted yet");
4331624Smax.romanov@nginx.com     }
4341624Smax.romanov@nginx.com 
4351624Smax.romanov@nginx.com     if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
4361624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
4371624Smax.romanov@nginx.com     }
4381624Smax.romanov@nginx.com 
4391624Smax.romanov@nginx.com     if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
4401624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
4411624Smax.romanov@nginx.com     }
4421624Smax.romanov@nginx.com 
4431624Smax.romanov@nginx.com     bytes = PyDict_GetItem(dict, nxt_py_bytes_str);
4441624Smax.romanov@nginx.com     if (bytes == Py_None) {
4451624Smax.romanov@nginx.com         bytes = NULL;
4461624Smax.romanov@nginx.com     }
4471624Smax.romanov@nginx.com 
4481624Smax.romanov@nginx.com     if (nxt_slow_path(bytes != NULL && !PyBytes_Check(bytes))) {
4491624Smax.romanov@nginx.com         return PyErr_Format(PyExc_TypeError,
4501624Smax.romanov@nginx.com                             "'bytes' is not a byte string");
4511624Smax.romanov@nginx.com     }
4521624Smax.romanov@nginx.com 
4531624Smax.romanov@nginx.com     text = PyDict_GetItem(dict, nxt_py_text_str);
4541624Smax.romanov@nginx.com     if (text == Py_None) {
4551624Smax.romanov@nginx.com         text = NULL;
4561624Smax.romanov@nginx.com     }
4571624Smax.romanov@nginx.com 
4581624Smax.romanov@nginx.com     if (nxt_slow_path(text != NULL && !PyUnicode_Check(text))) {
4591624Smax.romanov@nginx.com         return PyErr_Format(PyExc_TypeError,
4601624Smax.romanov@nginx.com                             "'text' is not a unicode string");
4611624Smax.romanov@nginx.com     }
4621624Smax.romanov@nginx.com 
4631624Smax.romanov@nginx.com     if (nxt_slow_path(((bytes != NULL) ^ (text != NULL)) == 0)) {
4641624Smax.romanov@nginx.com         return PyErr_Format(PyExc_ValueError,
4651624Smax.romanov@nginx.com                        "Exactly one of 'bytes' or 'text' must be non-None");
4661624Smax.romanov@nginx.com     }
4671624Smax.romanov@nginx.com 
4681624Smax.romanov@nginx.com     if (bytes != NULL) {
4691624Smax.romanov@nginx.com         buf = PyBytes_AS_STRING(bytes);
4701624Smax.romanov@nginx.com         buf_size = PyBytes_GET_SIZE(bytes);
4711624Smax.romanov@nginx.com         opcode = NXT_WEBSOCKET_OP_BINARY;
4721624Smax.romanov@nginx.com 
4731624Smax.romanov@nginx.com     } else {
4741624Smax.romanov@nginx.com         buf = PyUnicode_AsUTF8AndSize(text, &buf_size);
4751624Smax.romanov@nginx.com         opcode = NXT_WEBSOCKET_OP_TEXT;
4761624Smax.romanov@nginx.com     }
4771624Smax.romanov@nginx.com 
4781624Smax.romanov@nginx.com     rc = nxt_unit_websocket_send(ws->req, opcode, 1, buf, buf_size);
4791624Smax.romanov@nginx.com     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4801624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "failed to send close frame");
4811624Smax.romanov@nginx.com     }
4821624Smax.romanov@nginx.com 
4831624Smax.romanov@nginx.com     Py_INCREF(ws);
4841624Smax.romanov@nginx.com     return (PyObject *) ws;
4851624Smax.romanov@nginx.com }
4861624Smax.romanov@nginx.com 
4871624Smax.romanov@nginx.com 
4881624Smax.romanov@nginx.com void
nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t * frame)4891624Smax.romanov@nginx.com nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *frame)
4901624Smax.romanov@nginx.com {
4911624Smax.romanov@nginx.com     uint8_t                  opcode;
4921624Smax.romanov@nginx.com     uint16_t                 status_code;
4931624Smax.romanov@nginx.com     uint64_t                 rest;
4941624Smax.romanov@nginx.com     PyObject                 *msg, *exc;
4951624Smax.romanov@nginx.com     nxt_py_asgi_websocket_t  *ws;
4961624Smax.romanov@nginx.com 
4971624Smax.romanov@nginx.com     ws = frame->req->data;
4981624Smax.romanov@nginx.com 
4991624Smax.romanov@nginx.com     nxt_unit_req_debug(ws->req, "asgi_websocket_handler");
5001624Smax.romanov@nginx.com 
5011624Smax.romanov@nginx.com     opcode = frame->header->opcode;
5021624Smax.romanov@nginx.com     if (nxt_slow_path(opcode != NXT_WEBSOCKET_OP_CONT
5031624Smax.romanov@nginx.com                       && opcode != NXT_WEBSOCKET_OP_TEXT
5041624Smax.romanov@nginx.com                       && opcode != NXT_WEBSOCKET_OP_BINARY
5051624Smax.romanov@nginx.com                       && opcode != NXT_WEBSOCKET_OP_CLOSE))
5061624Smax.romanov@nginx.com     {
5071624Smax.romanov@nginx.com         nxt_unit_websocket_done(frame);
5081624Smax.romanov@nginx.com 
5091624Smax.romanov@nginx.com         nxt_unit_req_debug(ws->req,
5101624Smax.romanov@nginx.com                           "asgi_websocket_handler: ignore frame with opcode %d",
5111624Smax.romanov@nginx.com                            opcode);
5121624Smax.romanov@nginx.com 
5131624Smax.romanov@nginx.com         return;
5141624Smax.romanov@nginx.com     }
5151624Smax.romanov@nginx.com 
5161624Smax.romanov@nginx.com     if (nxt_slow_path(ws->state != NXT_WS_ACCEPTED)) {
5171624Smax.romanov@nginx.com         nxt_unit_websocket_done(frame);
5181624Smax.romanov@nginx.com 
5191624Smax.romanov@nginx.com         goto bad_state;
5201624Smax.romanov@nginx.com     }
5211624Smax.romanov@nginx.com 
5221624Smax.romanov@nginx.com     rest = nxt_py_asgi_ws_max_frame_size - ws->pending_frame_len;
5231624Smax.romanov@nginx.com 
5241624Smax.romanov@nginx.com     if (nxt_slow_path(frame->payload_len > rest)) {
5251624Smax.romanov@nginx.com         nxt_unit_websocket_done(frame);
5261624Smax.romanov@nginx.com 
5271624Smax.romanov@nginx.com         goto too_big;
5281624Smax.romanov@nginx.com     }
5291624Smax.romanov@nginx.com 
5301624Smax.romanov@nginx.com     rest = nxt_py_asgi_ws_max_buffer_size - ws->pending_payload_len;
5311624Smax.romanov@nginx.com 
5321624Smax.romanov@nginx.com     if (nxt_slow_path(frame->payload_len > rest)) {
5331624Smax.romanov@nginx.com         nxt_unit_websocket_done(frame);
5341624Smax.romanov@nginx.com 
5351624Smax.romanov@nginx.com         goto too_big;
5361624Smax.romanov@nginx.com     }
5371624Smax.romanov@nginx.com 
5381624Smax.romanov@nginx.com     if (ws->receive_future == NULL || frame->header->fin == 0) {
5391624Smax.romanov@nginx.com         nxt_py_asgi_websocket_suspend_frame(frame);
5401624Smax.romanov@nginx.com 
5411624Smax.romanov@nginx.com         return;
5421624Smax.romanov@nginx.com     }
5431624Smax.romanov@nginx.com 
5441624Smax.romanov@nginx.com     if (!nxt_queue_is_empty(&ws->pending_frames)) {
5451624Smax.romanov@nginx.com         if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_TEXT
5461624Smax.romanov@nginx.com                           || opcode == NXT_WEBSOCKET_OP_BINARY))
5471624Smax.romanov@nginx.com         {
5481624Smax.romanov@nginx.com             nxt_unit_req_alert(ws->req,
5491624Smax.romanov@nginx.com                          "Invalid state: pending frames with active receiver. "
5501624Smax.romanov@nginx.com                          "CONT frame expected. (%d)", opcode);
5511624Smax.romanov@nginx.com 
5521624Smax.romanov@nginx.com             PyErr_SetString(PyExc_AssertionError,
5531624Smax.romanov@nginx.com                          "Invalid state: pending frames with active receiver. "
5541624Smax.romanov@nginx.com                          "CONT frame expected.");
5551624Smax.romanov@nginx.com 
5561624Smax.romanov@nginx.com             nxt_unit_websocket_done(frame);
5571624Smax.romanov@nginx.com 
5581624Smax.romanov@nginx.com             return;
5591624Smax.romanov@nginx.com         }
5601624Smax.romanov@nginx.com     }
5611624Smax.romanov@nginx.com 
5621624Smax.romanov@nginx.com     msg = nxt_py_asgi_websocket_pop_msg(ws, frame);
5631624Smax.romanov@nginx.com     if (nxt_slow_path(msg == NULL)) {
5641624Smax.romanov@nginx.com         exc = PyErr_Occurred();
5651624Smax.romanov@nginx.com         Py_INCREF(exc);
5661624Smax.romanov@nginx.com 
5671624Smax.romanov@nginx.com         goto raise;
5681624Smax.romanov@nginx.com     }
5691624Smax.romanov@nginx.com 
5701624Smax.romanov@nginx.com     nxt_py_asgi_websocket_receive_done(ws, msg);
5711624Smax.romanov@nginx.com 
5721624Smax.romanov@nginx.com     return;
5731624Smax.romanov@nginx.com 
5741624Smax.romanov@nginx.com bad_state:
5751624Smax.romanov@nginx.com 
5761624Smax.romanov@nginx.com     if (ws->receive_future == NULL) {
5771624Smax.romanov@nginx.com         ws->receive_exc_str = nxt_py_bad_state_str;
5781624Smax.romanov@nginx.com 
5791624Smax.romanov@nginx.com         return;
5801624Smax.romanov@nginx.com     }
5811624Smax.romanov@nginx.com 
5821624Smax.romanov@nginx.com     exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
5831624Smax.romanov@nginx.com                                        nxt_py_bad_state_str,
5841624Smax.romanov@nginx.com                                        NULL);
5851624Smax.romanov@nginx.com     if (nxt_slow_path(exc == NULL)) {
5861624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req, "RuntimeError create failed");
5871624Smax.romanov@nginx.com         nxt_python_print_exception();
5881624Smax.romanov@nginx.com 
5891624Smax.romanov@nginx.com         exc = Py_None;
5901624Smax.romanov@nginx.com         Py_INCREF(exc);
5911624Smax.romanov@nginx.com     }
5921624Smax.romanov@nginx.com 
5931624Smax.romanov@nginx.com     goto raise;
5941624Smax.romanov@nginx.com 
5951624Smax.romanov@nginx.com too_big:
5961624Smax.romanov@nginx.com 
5971624Smax.romanov@nginx.com     status_code = htons(NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG);
5981624Smax.romanov@nginx.com 
5991624Smax.romanov@nginx.com     (void) nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
6001624Smax.romanov@nginx.com                                    1, &status_code, 2);
6011624Smax.romanov@nginx.com 
6021624Smax.romanov@nginx.com     ws->state = NXT_WS_CLOSED;
6031624Smax.romanov@nginx.com 
6041624Smax.romanov@nginx.com     if (ws->receive_future == NULL) {
6051624Smax.romanov@nginx.com         ws->receive_exc_str = nxt_py_message_too_big_str;
6061624Smax.romanov@nginx.com 
6071624Smax.romanov@nginx.com         return;
6081624Smax.romanov@nginx.com     }
6091624Smax.romanov@nginx.com 
6101624Smax.romanov@nginx.com     exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
6111624Smax.romanov@nginx.com                                        nxt_py_message_too_big_str,
6121624Smax.romanov@nginx.com                                        NULL);
6131624Smax.romanov@nginx.com     if (nxt_slow_path(exc == NULL)) {
6141624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req, "RuntimeError create failed");
6151624Smax.romanov@nginx.com         nxt_python_print_exception();
6161624Smax.romanov@nginx.com 
6171624Smax.romanov@nginx.com         exc = Py_None;
6181624Smax.romanov@nginx.com         Py_INCREF(exc);
6191624Smax.romanov@nginx.com     }
6201624Smax.romanov@nginx.com 
6211624Smax.romanov@nginx.com raise:
6221624Smax.romanov@nginx.com 
6231624Smax.romanov@nginx.com     nxt_py_asgi_websocket_receive_fail(ws, exc);
6241624Smax.romanov@nginx.com }
6251624Smax.romanov@nginx.com 
6261624Smax.romanov@nginx.com 
6271624Smax.romanov@nginx.com static void
nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t * ws,PyObject * msg)6281624Smax.romanov@nginx.com nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, PyObject *msg)
6291624Smax.romanov@nginx.com {
6301624Smax.romanov@nginx.com     PyObject  *future, *res;
6311624Smax.romanov@nginx.com 
6321624Smax.romanov@nginx.com     future = ws->receive_future;
6331624Smax.romanov@nginx.com     ws->receive_future = NULL;
6341624Smax.romanov@nginx.com 
6351624Smax.romanov@nginx.com     res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
6361624Smax.romanov@nginx.com     if (nxt_slow_path(res == NULL)) {
6371624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req, "'set_result' call failed");
6381624Smax.romanov@nginx.com         nxt_python_print_exception();
6391624Smax.romanov@nginx.com     }
6401624Smax.romanov@nginx.com 
6411624Smax.romanov@nginx.com     Py_XDECREF(res);
6421624Smax.romanov@nginx.com     Py_DECREF(future);
6431624Smax.romanov@nginx.com 
6441624Smax.romanov@nginx.com     Py_DECREF(msg);
6451624Smax.romanov@nginx.com }
6461624Smax.romanov@nginx.com 
6471624Smax.romanov@nginx.com 
6481624Smax.romanov@nginx.com static void
nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t * ws,PyObject * exc)6491624Smax.romanov@nginx.com nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, PyObject *exc)
6501624Smax.romanov@nginx.com {
6511624Smax.romanov@nginx.com     PyObject  *future, *res;
6521624Smax.romanov@nginx.com 
6531624Smax.romanov@nginx.com     future = ws->receive_future;
6541624Smax.romanov@nginx.com     ws->receive_future = NULL;
6551624Smax.romanov@nginx.com 
6561624Smax.romanov@nginx.com     res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
6571624Smax.romanov@nginx.com                                      NULL);
6581624Smax.romanov@nginx.com     if (nxt_slow_path(res == NULL)) {
6591624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req, "'set_exception' call failed");
6601624Smax.romanov@nginx.com         nxt_python_print_exception();
6611624Smax.romanov@nginx.com     }
6621624Smax.romanov@nginx.com 
6631624Smax.romanov@nginx.com     Py_XDECREF(res);
6641624Smax.romanov@nginx.com     Py_DECREF(future);
6651624Smax.romanov@nginx.com 
6661624Smax.romanov@nginx.com     Py_DECREF(exc);
6671624Smax.romanov@nginx.com }
6681624Smax.romanov@nginx.com 
6691624Smax.romanov@nginx.com 
6701624Smax.romanov@nginx.com static void
nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t * frame)6711624Smax.romanov@nginx.com nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *frame)
6721624Smax.romanov@nginx.com {
6731624Smax.romanov@nginx.com     int                          rc;
6741624Smax.romanov@nginx.com     nxt_py_asgi_websocket_t      *ws;
6751624Smax.romanov@nginx.com     nxt_py_asgi_penging_frame_t  *p;
6761624Smax.romanov@nginx.com 
6771624Smax.romanov@nginx.com     nxt_unit_req_debug(frame->req, "asgi_websocket_suspend_frame: "
6781624Smax.romanov@nginx.com                        "%d, %"PRIu64", %d",
6791624Smax.romanov@nginx.com                        frame->header->opcode, frame->payload_len,
6801624Smax.romanov@nginx.com                        frame->header->fin);
6811624Smax.romanov@nginx.com 
6821624Smax.romanov@nginx.com     ws = frame->req->data;
6831624Smax.romanov@nginx.com 
6841624Smax.romanov@nginx.com     rc = nxt_unit_websocket_retain(frame);
6851624Smax.romanov@nginx.com     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
6861624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req, "Failed to retain frame for suspension.");
6871624Smax.romanov@nginx.com 
6881624Smax.romanov@nginx.com         nxt_unit_websocket_done(frame);
6891624Smax.romanov@nginx.com 
6901624Smax.romanov@nginx.com         PyErr_SetString(PyExc_RuntimeError,
6911624Smax.romanov@nginx.com                         "Failed to retain frame for suspension.");
6921624Smax.romanov@nginx.com 
6931624Smax.romanov@nginx.com         return;
6941624Smax.romanov@nginx.com     }
6951624Smax.romanov@nginx.com 
6961624Smax.romanov@nginx.com     p = nxt_unit_malloc(frame->req->ctx, sizeof(nxt_py_asgi_penging_frame_t));
6971624Smax.romanov@nginx.com     if (nxt_slow_path(p == NULL)) {
6981624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req,
6991624Smax.romanov@nginx.com                            "Failed to allocate buffer to suspend frame.");
7001624Smax.romanov@nginx.com 
7011624Smax.romanov@nginx.com         nxt_unit_websocket_done(frame);
7021624Smax.romanov@nginx.com 
7031624Smax.romanov@nginx.com         PyErr_SetString(PyExc_RuntimeError,
7041624Smax.romanov@nginx.com                         "Failed to allocate buffer to suspend frame.");
7051624Smax.romanov@nginx.com 
7061624Smax.romanov@nginx.com         return;
7071624Smax.romanov@nginx.com     }
7081624Smax.romanov@nginx.com 
7091624Smax.romanov@nginx.com     p->frame = frame;
7101624Smax.romanov@nginx.com     nxt_queue_insert_tail(&ws->pending_frames, &p->link);
7111624Smax.romanov@nginx.com 
7121624Smax.romanov@nginx.com     ws->pending_payload_len += frame->payload_len;
7131624Smax.romanov@nginx.com     ws->pending_fins += frame->header->fin;
7141624Smax.romanov@nginx.com 
7151624Smax.romanov@nginx.com     if (frame->header->fin) {
7161624Smax.romanov@nginx.com         ws->pending_frame_len = 0;
7171624Smax.romanov@nginx.com 
7181624Smax.romanov@nginx.com     } else {
7191624Smax.romanov@nginx.com         if (frame->header->opcode == NXT_WEBSOCKET_OP_CONT) {
7201624Smax.romanov@nginx.com             ws->pending_frame_len += frame->payload_len;
7211624Smax.romanov@nginx.com 
7221624Smax.romanov@nginx.com         } else {
7231624Smax.romanov@nginx.com             ws->pending_frame_len = frame->payload_len;
7241624Smax.romanov@nginx.com         }
7251624Smax.romanov@nginx.com     }
7261624Smax.romanov@nginx.com }
7271624Smax.romanov@nginx.com 
7281624Smax.romanov@nginx.com 
7291624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t * ws,nxt_unit_websocket_frame_t * frame)7301624Smax.romanov@nginx.com nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
7311624Smax.romanov@nginx.com     nxt_unit_websocket_frame_t *frame)
7321624Smax.romanov@nginx.com {
7331624Smax.romanov@nginx.com     int                         fin;
7341624Smax.romanov@nginx.com     char                        *buf;
7351624Smax.romanov@nginx.com     uint8_t                     code_buf[2], opcode;
7361624Smax.romanov@nginx.com     uint16_t                    code;
7371624Smax.romanov@nginx.com     PyObject                    *msg, *data, *type, *data_key;
7381624Smax.romanov@nginx.com     uint64_t                    payload_len;
7391624Smax.romanov@nginx.com     nxt_unit_websocket_frame_t  *fin_frame;
7401624Smax.romanov@nginx.com 
7411624Smax.romanov@nginx.com     nxt_unit_req_debug(ws->req, "asgi_websocket_pop_msg");
7421624Smax.romanov@nginx.com 
7431624Smax.romanov@nginx.com     fin_frame = NULL;
7441624Smax.romanov@nginx.com 
7451624Smax.romanov@nginx.com     if (nxt_queue_is_empty(&ws->pending_frames)
7461624Smax.romanov@nginx.com         || (frame != NULL
7471624Smax.romanov@nginx.com             && frame->header->opcode == NXT_WEBSOCKET_OP_CLOSE))
7481624Smax.romanov@nginx.com     {
7491624Smax.romanov@nginx.com         payload_len = frame->payload_len;
7501624Smax.romanov@nginx.com 
7511624Smax.romanov@nginx.com     } else {
7521624Smax.romanov@nginx.com         if (frame != NULL) {
7531624Smax.romanov@nginx.com             payload_len = ws->pending_payload_len + frame->payload_len;
7541624Smax.romanov@nginx.com             fin_frame = frame;
7551624Smax.romanov@nginx.com 
7561624Smax.romanov@nginx.com         } else {
7571624Smax.romanov@nginx.com             payload_len = nxt_py_asgi_websocket_pending_len(ws);
7581624Smax.romanov@nginx.com         }
7591624Smax.romanov@nginx.com 
7601624Smax.romanov@nginx.com         frame = nxt_py_asgi_websocket_pop_frame(ws);
7611624Smax.romanov@nginx.com     }
7621624Smax.romanov@nginx.com 
7631624Smax.romanov@nginx.com     opcode = frame->header->opcode;
7641624Smax.romanov@nginx.com 
7651624Smax.romanov@nginx.com     if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_CONT)) {
7661624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req,
7671624Smax.romanov@nginx.com                            "Invalid state: attempt to process CONT frame.");
7681624Smax.romanov@nginx.com 
7691624Smax.romanov@nginx.com         nxt_unit_websocket_done(frame);
7701624Smax.romanov@nginx.com 
7711624Smax.romanov@nginx.com         return PyErr_Format(PyExc_AssertionError,
7721624Smax.romanov@nginx.com                             "Invalid state: attempt to process CONT frame.");
7731624Smax.romanov@nginx.com     }
7741624Smax.romanov@nginx.com 
7751624Smax.romanov@nginx.com     type = nxt_py_websocket_receive_str;
7761624Smax.romanov@nginx.com 
7771624Smax.romanov@nginx.com     switch (opcode) {
7781624Smax.romanov@nginx.com     case NXT_WEBSOCKET_OP_TEXT:
7791624Smax.romanov@nginx.com         buf = nxt_unit_malloc(frame->req->ctx, payload_len);
7801624Smax.romanov@nginx.com         if (nxt_slow_path(buf == NULL)) {
7811624Smax.romanov@nginx.com             nxt_unit_req_alert(ws->req,
7821624Smax.romanov@nginx.com                                "Failed to allocate buffer for payload (%d).",
7831624Smax.romanov@nginx.com                                (int) payload_len);
7841624Smax.romanov@nginx.com 
7851624Smax.romanov@nginx.com             nxt_unit_websocket_done(frame);
7861624Smax.romanov@nginx.com 
7871624Smax.romanov@nginx.com             return PyErr_Format(PyExc_RuntimeError,
7881624Smax.romanov@nginx.com                                 "Failed to allocate buffer for payload (%d).",
7891624Smax.romanov@nginx.com                                 (int) payload_len);
7901624Smax.romanov@nginx.com         }
7911624Smax.romanov@nginx.com 
7921624Smax.romanov@nginx.com         data = NULL;
7931624Smax.romanov@nginx.com         data_key = nxt_py_text_str;
7941624Smax.romanov@nginx.com 
7951624Smax.romanov@nginx.com         break;
7961624Smax.romanov@nginx.com 
7971624Smax.romanov@nginx.com     case NXT_WEBSOCKET_OP_BINARY:
7981624Smax.romanov@nginx.com         data = PyBytes_FromStringAndSize(NULL, payload_len);
7991624Smax.romanov@nginx.com         if (nxt_slow_path(data == NULL)) {
8001624Smax.romanov@nginx.com             nxt_unit_req_alert(ws->req,
8011624Smax.romanov@nginx.com                                "Failed to create Bytes for payload (%d).",
8021624Smax.romanov@nginx.com                                (int) payload_len);
8031624Smax.romanov@nginx.com             nxt_python_print_exception();
8041624Smax.romanov@nginx.com 
8051624Smax.romanov@nginx.com             nxt_unit_websocket_done(frame);
8061624Smax.romanov@nginx.com 
8071624Smax.romanov@nginx.com             return PyErr_Format(PyExc_RuntimeError,
8081624Smax.romanov@nginx.com                                 "Failed to create Bytes for payload.");
8091624Smax.romanov@nginx.com         }
8101624Smax.romanov@nginx.com 
8111624Smax.romanov@nginx.com         buf = (char *) PyBytes_AS_STRING(data);
8121624Smax.romanov@nginx.com         data_key = nxt_py_bytes_str;
8131624Smax.romanov@nginx.com 
8141624Smax.romanov@nginx.com         break;
8151624Smax.romanov@nginx.com 
8161624Smax.romanov@nginx.com     case NXT_WEBSOCKET_OP_CLOSE:
8171624Smax.romanov@nginx.com         if (frame->payload_len >= 2) {
8181624Smax.romanov@nginx.com             nxt_unit_websocket_read(frame, code_buf, 2);
8191624Smax.romanov@nginx.com             code = ((uint16_t) code_buf[0]) << 8 | code_buf[1];
8201624Smax.romanov@nginx.com 
8211624Smax.romanov@nginx.com         } else {
8221624Smax.romanov@nginx.com             code = NXT_WEBSOCKET_CR_NORMAL;
8231624Smax.romanov@nginx.com         }
8241624Smax.romanov@nginx.com 
8251624Smax.romanov@nginx.com         nxt_unit_websocket_done(frame);
8261624Smax.romanov@nginx.com 
8271624Smax.romanov@nginx.com         data = PyLong_FromLong(code);
8281624Smax.romanov@nginx.com         if (nxt_slow_path(data == NULL)) {
8291624Smax.romanov@nginx.com             nxt_unit_req_alert(ws->req,
8301624Smax.romanov@nginx.com                                "Failed to create Long from code %d.",
8311624Smax.romanov@nginx.com                                (int) code);
8321624Smax.romanov@nginx.com             nxt_python_print_exception();
8331624Smax.romanov@nginx.com 
8341624Smax.romanov@nginx.com             return PyErr_Format(PyExc_RuntimeError,
8351624Smax.romanov@nginx.com                                 "Failed to create Long from code %d.",
8361624Smax.romanov@nginx.com                                 (int) code);
8371624Smax.romanov@nginx.com         }
8381624Smax.romanov@nginx.com 
8391624Smax.romanov@nginx.com         buf = NULL;
8401624Smax.romanov@nginx.com         type = nxt_py_websocket_disconnect_str;
8411624Smax.romanov@nginx.com         data_key = nxt_py_code_str;
8421624Smax.romanov@nginx.com 
8431624Smax.romanov@nginx.com         break;
8441624Smax.romanov@nginx.com 
8451624Smax.romanov@nginx.com     default:
8461624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req, "Unexpected opcode %d", opcode);
8471624Smax.romanov@nginx.com 
8481624Smax.romanov@nginx.com         nxt_unit_websocket_done(frame);
8491624Smax.romanov@nginx.com 
8501624Smax.romanov@nginx.com         return PyErr_Format(PyExc_AssertionError, "Unexpected opcode %d",
8511624Smax.romanov@nginx.com                             opcode);
8521624Smax.romanov@nginx.com     }
8531624Smax.romanov@nginx.com 
8541624Smax.romanov@nginx.com     if (buf != NULL) {
8551624Smax.romanov@nginx.com         fin = frame->header->fin;
8561624Smax.romanov@nginx.com         buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
8571624Smax.romanov@nginx.com 
8581624Smax.romanov@nginx.com         nxt_unit_websocket_done(frame);
8591624Smax.romanov@nginx.com 
8601624Smax.romanov@nginx.com         if (!fin) {
8611624Smax.romanov@nginx.com             while (!nxt_queue_is_empty(&ws->pending_frames)) {
8621624Smax.romanov@nginx.com                 frame = nxt_py_asgi_websocket_pop_frame(ws);
8631624Smax.romanov@nginx.com                 fin = frame->header->fin;
8641624Smax.romanov@nginx.com 
8651624Smax.romanov@nginx.com                 buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
8661624Smax.romanov@nginx.com 
8671624Smax.romanov@nginx.com                 nxt_unit_websocket_done(frame);
8681624Smax.romanov@nginx.com 
8691624Smax.romanov@nginx.com                 if (fin) {
8701624Smax.romanov@nginx.com                     break;
8711624Smax.romanov@nginx.com                 }
8721624Smax.romanov@nginx.com             }
8731624Smax.romanov@nginx.com 
8741624Smax.romanov@nginx.com             if (fin_frame != NULL) {
8751624Smax.romanov@nginx.com                 buf += nxt_unit_websocket_read(fin_frame, buf,
8761624Smax.romanov@nginx.com                                                fin_frame->payload_len);
8771624Smax.romanov@nginx.com                 nxt_unit_websocket_done(fin_frame);
8781624Smax.romanov@nginx.com             }
8791624Smax.romanov@nginx.com         }
8801624Smax.romanov@nginx.com 
8811624Smax.romanov@nginx.com         if (opcode == NXT_WEBSOCKET_OP_TEXT) {
8821624Smax.romanov@nginx.com             buf -= payload_len;
8831624Smax.romanov@nginx.com 
8841624Smax.romanov@nginx.com             data = PyUnicode_DecodeUTF8(buf, payload_len, NULL);
8851624Smax.romanov@nginx.com 
8861624Smax.romanov@nginx.com             nxt_unit_free(ws->req->ctx, buf);
8871624Smax.romanov@nginx.com 
8881624Smax.romanov@nginx.com             if (nxt_slow_path(data == NULL)) {
8891624Smax.romanov@nginx.com                 nxt_unit_req_alert(ws->req,
8901624Smax.romanov@nginx.com                                    "Failed to create Unicode for payload (%d).",
8911624Smax.romanov@nginx.com                                    (int) payload_len);
8921624Smax.romanov@nginx.com                 nxt_python_print_exception();
8931624Smax.romanov@nginx.com 
8941624Smax.romanov@nginx.com                 return PyErr_Format(PyExc_RuntimeError,
8951624Smax.romanov@nginx.com                                     "Failed to create Unicode.");
8961624Smax.romanov@nginx.com             }
8971624Smax.romanov@nginx.com         }
8981624Smax.romanov@nginx.com     }
8991624Smax.romanov@nginx.com 
9001624Smax.romanov@nginx.com     msg = nxt_py_asgi_new_msg(ws->req, type);
9011624Smax.romanov@nginx.com     if (nxt_slow_path(msg == NULL)) {
9021624Smax.romanov@nginx.com         Py_DECREF(data);
9031624Smax.romanov@nginx.com         return NULL;
9041624Smax.romanov@nginx.com     }
9051624Smax.romanov@nginx.com 
9061624Smax.romanov@nginx.com     if (nxt_slow_path(PyDict_SetItem(msg, data_key, data) == -1)) {
9071624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req, "Python failed to set 'msg.data' item");
9081624Smax.romanov@nginx.com 
9091624Smax.romanov@nginx.com         Py_DECREF(msg);
9101624Smax.romanov@nginx.com         Py_DECREF(data);
9111624Smax.romanov@nginx.com 
9121624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError,
9131624Smax.romanov@nginx.com                             "Python failed to set 'msg.data' item");
9141624Smax.romanov@nginx.com     }
9151624Smax.romanov@nginx.com 
9161624Smax.romanov@nginx.com     Py_DECREF(data);
9171624Smax.romanov@nginx.com 
9181624Smax.romanov@nginx.com     return msg;
9191624Smax.romanov@nginx.com }
9201624Smax.romanov@nginx.com 
9211624Smax.romanov@nginx.com 
9221624Smax.romanov@nginx.com static uint64_t
nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t * ws)9231624Smax.romanov@nginx.com nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t *ws)
9241624Smax.romanov@nginx.com {
9251624Smax.romanov@nginx.com     uint64_t                     res;
9261624Smax.romanov@nginx.com     nxt_py_asgi_penging_frame_t  *p;
9271624Smax.romanov@nginx.com 
9281624Smax.romanov@nginx.com     res = 0;
9291624Smax.romanov@nginx.com 
9301624Smax.romanov@nginx.com     nxt_queue_each(p, &ws->pending_frames, nxt_py_asgi_penging_frame_t, link) {
9311624Smax.romanov@nginx.com         res += p->frame->payload_len;
9321624Smax.romanov@nginx.com 
9331624Smax.romanov@nginx.com         if (p->frame->header->fin) {
9341624Smax.romanov@nginx.com             nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d",
9351624Smax.romanov@nginx.com                                (int) res);
9361624Smax.romanov@nginx.com             return res;
9371624Smax.romanov@nginx.com         }
9381624Smax.romanov@nginx.com     } nxt_queue_loop;
9391624Smax.romanov@nginx.com 
9401624Smax.romanov@nginx.com     nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d (all)",
9411624Smax.romanov@nginx.com                        (int) res);
9421624Smax.romanov@nginx.com     return res;
9431624Smax.romanov@nginx.com }
9441624Smax.romanov@nginx.com 
9451624Smax.romanov@nginx.com 
9461624Smax.romanov@nginx.com static nxt_unit_websocket_frame_t *
nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t * ws)9471624Smax.romanov@nginx.com nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t *ws)
9481624Smax.romanov@nginx.com {
9491624Smax.romanov@nginx.com     nxt_queue_link_t             *lnk;
9501624Smax.romanov@nginx.com     nxt_unit_websocket_frame_t   *frame;
9511624Smax.romanov@nginx.com     nxt_py_asgi_penging_frame_t  *p;
9521624Smax.romanov@nginx.com 
9531624Smax.romanov@nginx.com     lnk = nxt_queue_first(&ws->pending_frames);
9541624Smax.romanov@nginx.com     nxt_queue_remove(lnk);
9551624Smax.romanov@nginx.com 
9561624Smax.romanov@nginx.com     p = nxt_queue_link_data(lnk, nxt_py_asgi_penging_frame_t, link);
9571624Smax.romanov@nginx.com 
9581624Smax.romanov@nginx.com     frame = p->frame;
9591624Smax.romanov@nginx.com     ws->pending_payload_len -= frame->payload_len;
9601624Smax.romanov@nginx.com     ws->pending_fins -= frame->header->fin;
9611624Smax.romanov@nginx.com 
9621624Smax.romanov@nginx.com     nxt_unit_free(frame->req->ctx, p);
9631624Smax.romanov@nginx.com 
9641624Smax.romanov@nginx.com     nxt_unit_req_debug(frame->req, "asgi_websocket_pop_frame: "
9651624Smax.romanov@nginx.com                        "%d, %"PRIu64", %d",
9661624Smax.romanov@nginx.com                        frame->header->opcode, frame->payload_len,
9671624Smax.romanov@nginx.com                        frame->header->fin);
9681624Smax.romanov@nginx.com 
9691624Smax.romanov@nginx.com     return frame;
9701624Smax.romanov@nginx.com }
9711624Smax.romanov@nginx.com 
9721624Smax.romanov@nginx.com 
9731624Smax.romanov@nginx.com void
nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t * req)9741624Smax.romanov@nginx.com nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req)
9751624Smax.romanov@nginx.com {
9761624Smax.romanov@nginx.com     PyObject                 *msg, *exc;
9771624Smax.romanov@nginx.com     nxt_py_asgi_websocket_t  *ws;
9781624Smax.romanov@nginx.com 
9791624Smax.romanov@nginx.com     ws = req->data;
9801624Smax.romanov@nginx.com 
9811624Smax.romanov@nginx.com     nxt_unit_req_debug(req, "asgi_websocket_close_handler");
9821624Smax.romanov@nginx.com 
983*1980Smax.romanov@nginx.com     if (nxt_slow_path(ws == NULL)) {
984*1980Smax.romanov@nginx.com         return;
985*1980Smax.romanov@nginx.com     }
986*1980Smax.romanov@nginx.com 
9871624Smax.romanov@nginx.com     if (ws->receive_future == NULL) {
9881624Smax.romanov@nginx.com         ws->state = NXT_WS_DISCONNECTED;
9891624Smax.romanov@nginx.com 
9901624Smax.romanov@nginx.com         return;
9911624Smax.romanov@nginx.com     }
9921624Smax.romanov@nginx.com 
9931624Smax.romanov@nginx.com     msg = nxt_py_asgi_websocket_disconnect_msg(ws);
9941624Smax.romanov@nginx.com     if (nxt_slow_path(msg == NULL)) {
9951624Smax.romanov@nginx.com         exc = PyErr_Occurred();
9961624Smax.romanov@nginx.com         Py_INCREF(exc);
9971624Smax.romanov@nginx.com 
9981624Smax.romanov@nginx.com         nxt_py_asgi_websocket_receive_fail(ws, exc);
9991624Smax.romanov@nginx.com 
10001624Smax.romanov@nginx.com     } else {
10011624Smax.romanov@nginx.com         nxt_py_asgi_websocket_receive_done(ws, msg);
10021624Smax.romanov@nginx.com     }
10031624Smax.romanov@nginx.com }
10041624Smax.romanov@nginx.com 
10051624Smax.romanov@nginx.com 
10061624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t * ws)10071624Smax.romanov@nginx.com nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t *ws)
10081624Smax.romanov@nginx.com {
10091624Smax.romanov@nginx.com     PyObject  *msg, *code;
10101624Smax.romanov@nginx.com 
10111624Smax.romanov@nginx.com     msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_disconnect_str);
10121624Smax.romanov@nginx.com     if (nxt_slow_path(msg == NULL)) {
10131624Smax.romanov@nginx.com         return NULL;
10141624Smax.romanov@nginx.com     }
10151624Smax.romanov@nginx.com 
10161624Smax.romanov@nginx.com     code = PyLong_FromLong(NXT_WEBSOCKET_CR_GOING_AWAY);
10171624Smax.romanov@nginx.com     if (nxt_slow_path(code == NULL)) {
10181624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req, "Python failed to create long");
10191624Smax.romanov@nginx.com         nxt_python_print_exception();
10201624Smax.romanov@nginx.com 
10211624Smax.romanov@nginx.com         Py_DECREF(msg);
10221624Smax.romanov@nginx.com 
10231624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError, "failed to create long");
10241624Smax.romanov@nginx.com     }
10251624Smax.romanov@nginx.com 
10261624Smax.romanov@nginx.com     if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_code_str, code) == -1)) {
10271624Smax.romanov@nginx.com         nxt_unit_req_alert(ws->req, "Python failed to set 'msg.code' item");
10281624Smax.romanov@nginx.com 
10291624Smax.romanov@nginx.com         Py_DECREF(msg);
10301624Smax.romanov@nginx.com         Py_DECREF(code);
10311624Smax.romanov@nginx.com 
10321624Smax.romanov@nginx.com         return PyErr_Format(PyExc_RuntimeError,
10331624Smax.romanov@nginx.com                             "Python failed to set 'msg.code' item");
10341624Smax.romanov@nginx.com     }
10351624Smax.romanov@nginx.com 
10361624Smax.romanov@nginx.com     Py_DECREF(code);
10371624Smax.romanov@nginx.com 
10381624Smax.romanov@nginx.com     return msg;
10391624Smax.romanov@nginx.com }
10401624Smax.romanov@nginx.com 
10411624Smax.romanov@nginx.com 
10421624Smax.romanov@nginx.com static PyObject *
nxt_py_asgi_websocket_done(PyObject * self,PyObject * future)10431624Smax.romanov@nginx.com nxt_py_asgi_websocket_done(PyObject *self, PyObject *future)
10441624Smax.romanov@nginx.com {
10451624Smax.romanov@nginx.com     int                      rc;
10461624Smax.romanov@nginx.com     uint16_t                 status_code;
10471624Smax.romanov@nginx.com     PyObject                 *res;
10481624Smax.romanov@nginx.com     nxt_py_asgi_websocket_t  *ws;
10491624Smax.romanov@nginx.com 
10501624Smax.romanov@nginx.com     ws = (nxt_py_asgi_websocket_t *) self;
10511624Smax.romanov@nginx.com 
10521624Smax.romanov@nginx.com     nxt_unit_req_debug(ws->req, "asgi_websocket_done: %p", self);
10531624Smax.romanov@nginx.com 
10541624Smax.romanov@nginx.com     /*
10551624Smax.romanov@nginx.com      * Get Future.result() and it raises an exception, if coroutine exited
10561624Smax.romanov@nginx.com      * with exception.
10571624Smax.romanov@nginx.com      */
10581624Smax.romanov@nginx.com     res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
10591624Smax.romanov@nginx.com     if (nxt_slow_path(res == NULL)) {
10601624Smax.romanov@nginx.com         nxt_unit_req_error(ws->req,
10611624Smax.romanov@nginx.com                            "Python failed to call 'future.result()'");
10621624Smax.romanov@nginx.com         nxt_python_print_exception();
10631624Smax.romanov@nginx.com 
10641624Smax.romanov@nginx.com         rc = NXT_UNIT_ERROR;
10651624Smax.romanov@nginx.com 
10661624Smax.romanov@nginx.com     } else {
10671624Smax.romanov@nginx.com         Py_DECREF(res);
10681624Smax.romanov@nginx.com 
10691624Smax.romanov@nginx.com         rc = NXT_UNIT_OK;
10701624Smax.romanov@nginx.com     }
10711624Smax.romanov@nginx.com 
10721624Smax.romanov@nginx.com     if (ws->state == NXT_WS_ACCEPTED) {
10731624Smax.romanov@nginx.com         status_code = (rc == NXT_UNIT_OK)
10741624Smax.romanov@nginx.com                       ? htons(NXT_WEBSOCKET_CR_NORMAL)
10751624Smax.romanov@nginx.com                       : htons(NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR);
10761624Smax.romanov@nginx.com 
10771624Smax.romanov@nginx.com         rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
10781624Smax.romanov@nginx.com                                      1, &status_code, 2);
10791624Smax.romanov@nginx.com     }
10801624Smax.romanov@nginx.com 
10811624Smax.romanov@nginx.com     while (!nxt_queue_is_empty(&ws->pending_frames)) {
10821624Smax.romanov@nginx.com         nxt_unit_websocket_done(nxt_py_asgi_websocket_pop_frame(ws));
10831624Smax.romanov@nginx.com     }
10841624Smax.romanov@nginx.com 
10851624Smax.romanov@nginx.com     nxt_unit_request_done(ws->req, rc);
10861624Smax.romanov@nginx.com 
10871624Smax.romanov@nginx.com     Py_RETURN_NONE;
10881624Smax.romanov@nginx.com }
10891624Smax.romanov@nginx.com 
10901624Smax.romanov@nginx.com 
10911624Smax.romanov@nginx.com #endif /* NXT_HAVE_ASGI */
1092