xref: /unit/src/python/nxt_python_asgi_websocket.c (revision 1624:e46b1b422545)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <nxt_unit_websocket.h>
15 #include <nxt_websocket_header.h>
16 #include <python/nxt_python_asgi.h>
17 #include <python/nxt_python_asgi_str.h>
18 
19 
20 enum {
21     NXT_WS_INIT,
22     NXT_WS_CONNECT,
23     NXT_WS_ACCEPTED,
24     NXT_WS_DISCONNECTED,
25     NXT_WS_CLOSED,
26 };
27 
28 
29 typedef struct {
30     nxt_queue_link_t            link;
31     nxt_unit_websocket_frame_t  *frame;
32 } nxt_py_asgi_penging_frame_t;
33 
34 
35 typedef struct {
36     PyObject_HEAD
37     nxt_unit_request_info_t  *req;
38     PyObject                 *receive_future;
39     PyObject                 *receive_exc_str;
40     int                      state;
41     nxt_queue_t              pending_frames;
42     uint64_t                 pending_payload_len;
43     uint64_t                 pending_frame_len;
44     int                      pending_fins;
45 } nxt_py_asgi_websocket_t;
46 
47 
48 static PyObject *nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none);
49 static PyObject *nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict);
50 static PyObject *nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws,
51     PyObject *dict);
52 static PyObject *nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws,
53     PyObject *dict);
54 static PyObject *nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws,
55     PyObject *dict);
56 static void nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws,
57     PyObject *msg);
58 static void nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws,
59     PyObject *exc);
60 static void nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *f);
61 static PyObject *nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
62     nxt_unit_websocket_frame_t *frame);
63 static uint64_t nxt_py_asgi_websocket_pending_len(
64     nxt_py_asgi_websocket_t *ws);
65 static nxt_unit_websocket_frame_t *nxt_py_asgi_websocket_pop_frame(
66     nxt_py_asgi_websocket_t *ws);
67 static PyObject *nxt_py_asgi_websocket_disconnect_msg(
68     nxt_py_asgi_websocket_t *ws);
69 static PyObject *nxt_py_asgi_websocket_done(PyObject *self, PyObject *future);
70 
71 
72 static PyMethodDef nxt_py_asgi_websocket_methods[] = {
73     { "receive",   nxt_py_asgi_websocket_receive, METH_NOARGS, 0 },
74     { "send",      nxt_py_asgi_websocket_send,    METH_O,      0 },
75     { "_done",     nxt_py_asgi_websocket_done,    METH_O,      0 },
76     { NULL, NULL, 0, 0 }
77 };
78 
79 static PyAsyncMethods nxt_py_asgi_async_methods = {
80     .am_await = nxt_py_asgi_await,
81 };
82 
83 static PyTypeObject nxt_py_asgi_websocket_type = {
84     PyVarObject_HEAD_INIT(NULL, 0)
85 
86     .tp_name      = "unit._asgi_websocket",
87     .tp_basicsize = sizeof(nxt_py_asgi_websocket_t),
88     .tp_dealloc   = nxt_py_asgi_dealloc,
89     .tp_as_async  = &nxt_py_asgi_async_methods,
90     .tp_flags     = Py_TPFLAGS_DEFAULT,
91     .tp_doc       = "unit ASGI WebSocket connection object",
92     .tp_iter      = nxt_py_asgi_iter,
93     .tp_iternext  = nxt_py_asgi_next,
94     .tp_methods   = nxt_py_asgi_websocket_methods,
95 };
96 
97 static uint64_t  nxt_py_asgi_ws_max_frame_size = 1024 * 1024;
98 static uint64_t  nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024;
99 
100 
101 nxt_int_t
102 nxt_py_asgi_websocket_init(nxt_task_t *task)
103 {
104     if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) {
105         nxt_alert(task,
106               "Python failed to initialize the \"asgi_websocket\" type object");
107         return NXT_ERROR;
108     }
109 
110     return NXT_OK;
111 }
112 
113 
114 PyObject *
115 nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req)
116 {
117     nxt_py_asgi_websocket_t  *ws;
118 
119     ws = PyObject_New(nxt_py_asgi_websocket_t, &nxt_py_asgi_websocket_type);
120 
121     if (nxt_fast_path(ws != NULL)) {
122         ws->req = req;
123         ws->receive_future = NULL;
124         ws->receive_exc_str = NULL;
125         ws->state = NXT_WS_INIT;
126         nxt_queue_init(&ws->pending_frames);
127         ws->pending_payload_len = 0;
128         ws->pending_frame_len = 0;
129         ws->pending_fins = 0;
130     }
131 
132     return (PyObject *) ws;
133 }
134 
135 
136 static PyObject *
137 nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none)
138 {
139     PyObject                 *future, *msg;
140     nxt_py_asgi_websocket_t  *ws;
141 
142     ws = (nxt_py_asgi_websocket_t *) self;
143 
144     nxt_unit_req_debug(ws->req, "asgi_websocket_receive");
145 
146     /* If exception happened out of receive() call, raise it now. */
147     if (nxt_slow_path(ws->receive_exc_str != NULL)) {
148         PyErr_SetObject(PyExc_RuntimeError, ws->receive_exc_str);
149 
150         ws->receive_exc_str = NULL;
151 
152         return NULL;
153     }
154 
155     if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
156         nxt_unit_req_error(ws->req,
157                            "receive() called for closed WebSocket");
158 
159         return PyErr_Format(PyExc_RuntimeError,
160                             "WebSocket already closed");
161     }
162 
163     future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
164     if (nxt_slow_path(future == NULL)) {
165         nxt_unit_req_alert(ws->req, "Python failed to create Future object");
166         nxt_python_print_exception();
167 
168         return PyErr_Format(PyExc_RuntimeError,
169                             "failed to create Future object");
170     }
171 
172     if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
173         ws->state = NXT_WS_CONNECT;
174 
175         msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str);
176 
177         return nxt_py_asgi_set_result_soon(ws->req, future, msg);
178     }
179 
180     if (ws->pending_fins > 0) {
181         msg = nxt_py_asgi_websocket_pop_msg(ws, NULL);
182 
183         return nxt_py_asgi_set_result_soon(ws->req, future, msg);
184     }
185 
186     if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
187         msg = nxt_py_asgi_websocket_disconnect_msg(ws);
188 
189         return nxt_py_asgi_set_result_soon(ws->req, future, msg);
190     }
191 
192     ws->receive_future = future;
193     Py_INCREF(ws->receive_future);
194 
195     return future;
196 }
197 
198 
199 static PyObject *
200 nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict)
201 {
202     PyObject                 *type;
203     const char               *type_str;
204     Py_ssize_t               type_len;
205     nxt_py_asgi_websocket_t  *ws;
206 
207     static const nxt_str_t  websocket_accept = nxt_string("websocket.accept");
208     static const nxt_str_t  websocket_close = nxt_string("websocket.close");
209     static const nxt_str_t  websocket_send = nxt_string("websocket.send");
210 
211     ws = (nxt_py_asgi_websocket_t *) self;
212 
213     type = PyDict_GetItem(dict, nxt_py_type_str);
214     if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
215         nxt_unit_req_error(ws->req, "asgi_websocket_send: "
216                            "'type' is not a unicode string");
217         return PyErr_Format(PyExc_TypeError,
218                             "'type' is not a unicode string");
219     }
220 
221     type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
222 
223     nxt_unit_req_debug(ws->req, "asgi_websocket_send type is '%.*s'",
224                        (int) type_len, type_str);
225 
226     if (type_len == (Py_ssize_t) websocket_accept.length
227         && memcmp(type_str, websocket_accept.start, type_len) == 0)
228     {
229         return nxt_py_asgi_websocket_accept(ws, dict);
230     }
231 
232     if (type_len == (Py_ssize_t) websocket_close.length
233         && memcmp(type_str, websocket_close.start, type_len) == 0)
234     {
235         return nxt_py_asgi_websocket_close(ws, dict);
236     }
237 
238     if (type_len == (Py_ssize_t) websocket_send.length
239         && memcmp(type_str, websocket_send.start, type_len) == 0)
240     {
241         return nxt_py_asgi_websocket_send_frame(ws, dict);
242     }
243 
244     nxt_unit_req_error(ws->req, "asgi_websocket_send: "
245                        "unexpected 'type': '%.*s'", (int) type_len, type_str);
246     return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
247 }
248 
249 
250 static PyObject *
251 nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict)
252 {
253     int                          rc;
254     char                         *subprotocol_str;
255     PyObject                     *res, *headers, *subprotocol;
256     Py_ssize_t                   subprotocol_len;
257     nxt_py_asgi_calc_size_ctx_t  calc_size_ctx;
258     nxt_py_asgi_add_field_ctx_t  add_field_ctx;
259 
260     static const nxt_str_t  ws_protocol = nxt_string("sec-websocket-protocol");
261 
262     switch(ws->state) {
263     case NXT_WS_INIT:
264         return PyErr_Format(PyExc_RuntimeError,
265                             "WebSocket connect not received");
266     case NXT_WS_CONNECT:
267         break;
268 
269     case NXT_WS_ACCEPTED:
270         return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
271 
272     case NXT_WS_DISCONNECTED:
273         return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
274 
275     case NXT_WS_CLOSED:
276         return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
277     }
278 
279     if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) {
280         return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
281     }
282 
283     if (nxt_slow_path(nxt_unit_response_is_sent(ws->req))) {
284         return PyErr_Format(PyExc_RuntimeError, "response already sent");
285     }
286 
287     calc_size_ctx.fields_size = 0;
288     calc_size_ctx.fields_count = 0;
289 
290     headers = PyDict_GetItem(dict, nxt_py_headers_str);
291     if (headers != NULL) {
292         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
293                                        &calc_size_ctx);
294         if (nxt_slow_path(res == NULL)) {
295             return NULL;
296         }
297     }
298 
299     subprotocol = PyDict_GetItem(dict, nxt_py_subprotocol_str);
300     if (subprotocol != NULL && PyUnicode_Check(subprotocol)) {
301         subprotocol_str = PyUnicode_DATA(subprotocol);
302         subprotocol_len = PyUnicode_GET_LENGTH(subprotocol);
303 
304         calc_size_ctx.fields_size += ws_protocol.length + subprotocol_len;
305         calc_size_ctx.fields_count++;
306 
307     } else {
308         subprotocol_str = NULL;
309         subprotocol_len = 0;
310     }
311 
312     rc = nxt_unit_response_init(ws->req, 101,
313                                 calc_size_ctx.fields_count,
314                                 calc_size_ctx.fields_size);
315     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
316         return PyErr_Format(PyExc_RuntimeError,
317                             "failed to allocate response object");
318     }
319 
320     add_field_ctx.req = ws->req;
321     add_field_ctx.content_length = -1;
322 
323     if (headers != NULL) {
324         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
325                                        &add_field_ctx);
326         if (nxt_slow_path(res == NULL)) {
327             return NULL;
328         }
329     }
330 
331     if (subprotocol_len > 0) {
332         rc = nxt_unit_response_add_field(ws->req,
333                                          (const char *) ws_protocol.start,
334                                          ws_protocol.length,
335                                          subprotocol_str, subprotocol_len);
336         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
337             return PyErr_Format(PyExc_RuntimeError,
338                                 "failed to add header");
339         }
340     }
341 
342     rc = nxt_unit_response_send(ws->req);
343     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
344         return PyErr_Format(PyExc_RuntimeError, "failed to send response");
345     }
346 
347     ws->state = NXT_WS_ACCEPTED;
348 
349     Py_INCREF(ws);
350 
351     return (PyObject *) ws;
352 }
353 
354 
355 static PyObject *
356 nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict)
357 {
358     int       rc;
359     uint16_t  status_code;
360     PyObject  *code;
361 
362     if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
363         return PyErr_Format(PyExc_RuntimeError,
364                             "WebSocket connect not received");
365     }
366 
367     if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
368         return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
369     }
370 
371     if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
372         return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
373     }
374 
375     if (nxt_unit_response_is_websocket(ws->req)) {
376         code = PyDict_GetItem(dict, nxt_py_code_str);
377         if (nxt_slow_path(code != NULL && !PyLong_Check(code))) {
378             return PyErr_Format(PyExc_TypeError, "'code' is not integer");
379         }
380 
381         status_code = (code != NULL) ? htons(PyLong_AsLong(code))
382                                      : htons(NXT_WEBSOCKET_CR_NORMAL);
383 
384         rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
385                                      1, &status_code, 2);
386         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
387             return PyErr_Format(PyExc_RuntimeError,
388                                 "failed to send close frame");
389         }
390 
391     } else {
392         rc = nxt_unit_response_init(ws->req, 403, 0, 0);
393         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
394             return PyErr_Format(PyExc_RuntimeError,
395                                 "failed to allocate response object");
396         }
397 
398         rc = nxt_unit_response_send(ws->req);
399         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
400             return PyErr_Format(PyExc_RuntimeError,
401                                 "failed to send response");
402         }
403     }
404 
405     ws->state = NXT_WS_CLOSED;
406 
407     Py_INCREF(ws);
408 
409     return (PyObject *) ws;
410 }
411 
412 
413 static PyObject *
414 nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict)
415 {
416     int         rc;
417     uint8_t     opcode;
418     PyObject    *bytes, *text;
419     const void  *buf;
420     Py_ssize_t  buf_size;
421 
422     if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
423         return PyErr_Format(PyExc_RuntimeError,
424                             "WebSocket connect not received");
425     }
426 
427     if (nxt_slow_path(ws->state == NXT_WS_CONNECT)) {
428         return PyErr_Format(PyExc_RuntimeError,
429                             "WebSocket not accepted yet");
430     }
431 
432     if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
433         return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
434     }
435 
436     if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
437         return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
438     }
439 
440     bytes = PyDict_GetItem(dict, nxt_py_bytes_str);
441     if (bytes == Py_None) {
442         bytes = NULL;
443     }
444 
445     if (nxt_slow_path(bytes != NULL && !PyBytes_Check(bytes))) {
446         return PyErr_Format(PyExc_TypeError,
447                             "'bytes' is not a byte string");
448     }
449 
450     text = PyDict_GetItem(dict, nxt_py_text_str);
451     if (text == Py_None) {
452         text = NULL;
453     }
454 
455     if (nxt_slow_path(text != NULL && !PyUnicode_Check(text))) {
456         return PyErr_Format(PyExc_TypeError,
457                             "'text' is not a unicode string");
458     }
459 
460     if (nxt_slow_path(((bytes != NULL) ^ (text != NULL)) == 0)) {
461         return PyErr_Format(PyExc_ValueError,
462                        "Exactly one of 'bytes' or 'text' must be non-None");
463     }
464 
465     if (bytes != NULL) {
466         buf = PyBytes_AS_STRING(bytes);
467         buf_size = PyBytes_GET_SIZE(bytes);
468         opcode = NXT_WEBSOCKET_OP_BINARY;
469 
470     } else {
471         buf = PyUnicode_AsUTF8AndSize(text, &buf_size);
472         opcode = NXT_WEBSOCKET_OP_TEXT;
473     }
474 
475     rc = nxt_unit_websocket_send(ws->req, opcode, 1, buf, buf_size);
476     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
477         return PyErr_Format(PyExc_RuntimeError, "failed to send close frame");
478     }
479 
480     Py_INCREF(ws);
481     return (PyObject *) ws;
482 }
483 
484 
485 void
486 nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *frame)
487 {
488     uint8_t                  opcode;
489     uint16_t                 status_code;
490     uint64_t                 rest;
491     PyObject                 *msg, *exc;
492     nxt_py_asgi_websocket_t  *ws;
493 
494     ws = frame->req->data;
495 
496     nxt_unit_req_debug(ws->req, "asgi_websocket_handler");
497 
498     opcode = frame->header->opcode;
499     if (nxt_slow_path(opcode != NXT_WEBSOCKET_OP_CONT
500                       && opcode != NXT_WEBSOCKET_OP_TEXT
501                       && opcode != NXT_WEBSOCKET_OP_BINARY
502                       && opcode != NXT_WEBSOCKET_OP_CLOSE))
503     {
504         nxt_unit_websocket_done(frame);
505 
506         nxt_unit_req_debug(ws->req,
507                           "asgi_websocket_handler: ignore frame with opcode %d",
508                            opcode);
509 
510         return;
511     }
512 
513     if (nxt_slow_path(ws->state != NXT_WS_ACCEPTED)) {
514         nxt_unit_websocket_done(frame);
515 
516         goto bad_state;
517     }
518 
519     rest = nxt_py_asgi_ws_max_frame_size - ws->pending_frame_len;
520 
521     if (nxt_slow_path(frame->payload_len > rest)) {
522         nxt_unit_websocket_done(frame);
523 
524         goto too_big;
525     }
526 
527     rest = nxt_py_asgi_ws_max_buffer_size - ws->pending_payload_len;
528 
529     if (nxt_slow_path(frame->payload_len > rest)) {
530         nxt_unit_websocket_done(frame);
531 
532         goto too_big;
533     }
534 
535     if (ws->receive_future == NULL || frame->header->fin == 0) {
536         nxt_py_asgi_websocket_suspend_frame(frame);
537 
538         return;
539     }
540 
541     if (!nxt_queue_is_empty(&ws->pending_frames)) {
542         if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_TEXT
543                           || opcode == NXT_WEBSOCKET_OP_BINARY))
544         {
545             nxt_unit_req_alert(ws->req,
546                          "Invalid state: pending frames with active receiver. "
547                          "CONT frame expected. (%d)", opcode);
548 
549             PyErr_SetString(PyExc_AssertionError,
550                          "Invalid state: pending frames with active receiver. "
551                          "CONT frame expected.");
552 
553             nxt_unit_websocket_done(frame);
554 
555             return;
556         }
557     }
558 
559     msg = nxt_py_asgi_websocket_pop_msg(ws, frame);
560     if (nxt_slow_path(msg == NULL)) {
561         exc = PyErr_Occurred();
562         Py_INCREF(exc);
563 
564         goto raise;
565     }
566 
567     nxt_py_asgi_websocket_receive_done(ws, msg);
568 
569     return;
570 
571 bad_state:
572 
573     if (ws->receive_future == NULL) {
574         ws->receive_exc_str = nxt_py_bad_state_str;
575 
576         return;
577     }
578 
579     exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
580                                        nxt_py_bad_state_str,
581                                        NULL);
582     if (nxt_slow_path(exc == NULL)) {
583         nxt_unit_req_alert(ws->req, "RuntimeError create failed");
584         nxt_python_print_exception();
585 
586         exc = Py_None;
587         Py_INCREF(exc);
588     }
589 
590     goto raise;
591 
592 too_big:
593 
594     status_code = htons(NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG);
595 
596     (void) nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
597                                    1, &status_code, 2);
598 
599     ws->state = NXT_WS_CLOSED;
600 
601     if (ws->receive_future == NULL) {
602         ws->receive_exc_str = nxt_py_message_too_big_str;
603 
604         return;
605     }
606 
607     exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
608                                        nxt_py_message_too_big_str,
609                                        NULL);
610     if (nxt_slow_path(exc == NULL)) {
611         nxt_unit_req_alert(ws->req, "RuntimeError create failed");
612         nxt_python_print_exception();
613 
614         exc = Py_None;
615         Py_INCREF(exc);
616     }
617 
618 raise:
619 
620     nxt_py_asgi_websocket_receive_fail(ws, exc);
621 }
622 
623 
624 static void
625 nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, PyObject *msg)
626 {
627     PyObject  *future, *res;
628 
629     future = ws->receive_future;
630     ws->receive_future = NULL;
631 
632     res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
633     if (nxt_slow_path(res == NULL)) {
634         nxt_unit_req_alert(ws->req, "'set_result' call failed");
635         nxt_python_print_exception();
636     }
637 
638     Py_XDECREF(res);
639     Py_DECREF(future);
640 
641     Py_DECREF(msg);
642 }
643 
644 
645 static void
646 nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, PyObject *exc)
647 {
648     PyObject  *future, *res;
649 
650     future = ws->receive_future;
651     ws->receive_future = NULL;
652 
653     res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
654                                      NULL);
655     if (nxt_slow_path(res == NULL)) {
656         nxt_unit_req_alert(ws->req, "'set_exception' call failed");
657         nxt_python_print_exception();
658     }
659 
660     Py_XDECREF(res);
661     Py_DECREF(future);
662 
663     Py_DECREF(exc);
664 }
665 
666 
667 static void
668 nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *frame)
669 {
670     int                          rc;
671     nxt_py_asgi_websocket_t      *ws;
672     nxt_py_asgi_penging_frame_t  *p;
673 
674     nxt_unit_req_debug(frame->req, "asgi_websocket_suspend_frame: "
675                        "%d, %"PRIu64", %d",
676                        frame->header->opcode, frame->payload_len,
677                        frame->header->fin);
678 
679     ws = frame->req->data;
680 
681     rc = nxt_unit_websocket_retain(frame);
682     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
683         nxt_unit_req_alert(ws->req, "Failed to retain frame for suspension.");
684 
685         nxt_unit_websocket_done(frame);
686 
687         PyErr_SetString(PyExc_RuntimeError,
688                         "Failed to retain frame for suspension.");
689 
690         return;
691     }
692 
693     p = nxt_unit_malloc(frame->req->ctx, sizeof(nxt_py_asgi_penging_frame_t));
694     if (nxt_slow_path(p == NULL)) {
695         nxt_unit_req_alert(ws->req,
696                            "Failed to allocate buffer to suspend frame.");
697 
698         nxt_unit_websocket_done(frame);
699 
700         PyErr_SetString(PyExc_RuntimeError,
701                         "Failed to allocate buffer to suspend frame.");
702 
703         return;
704     }
705 
706     p->frame = frame;
707     nxt_queue_insert_tail(&ws->pending_frames, &p->link);
708 
709     ws->pending_payload_len += frame->payload_len;
710     ws->pending_fins += frame->header->fin;
711 
712     if (frame->header->fin) {
713         ws->pending_frame_len = 0;
714 
715     } else {
716         if (frame->header->opcode == NXT_WEBSOCKET_OP_CONT) {
717             ws->pending_frame_len += frame->payload_len;
718 
719         } else {
720             ws->pending_frame_len = frame->payload_len;
721         }
722     }
723 }
724 
725 
726 static PyObject *
727 nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
728     nxt_unit_websocket_frame_t *frame)
729 {
730     int                         fin;
731     char                        *buf;
732     uint8_t                     code_buf[2], opcode;
733     uint16_t                    code;
734     PyObject                    *msg, *data, *type, *data_key;
735     uint64_t                    payload_len;
736     nxt_unit_websocket_frame_t  *fin_frame;
737 
738     nxt_unit_req_debug(ws->req, "asgi_websocket_pop_msg");
739 
740     fin_frame = NULL;
741 
742     if (nxt_queue_is_empty(&ws->pending_frames)
743         || (frame != NULL
744             && frame->header->opcode == NXT_WEBSOCKET_OP_CLOSE))
745     {
746         payload_len = frame->payload_len;
747 
748     } else {
749         if (frame != NULL) {
750             payload_len = ws->pending_payload_len + frame->payload_len;
751             fin_frame = frame;
752 
753         } else {
754             payload_len = nxt_py_asgi_websocket_pending_len(ws);
755         }
756 
757         frame = nxt_py_asgi_websocket_pop_frame(ws);
758     }
759 
760     opcode = frame->header->opcode;
761 
762     if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_CONT)) {
763         nxt_unit_req_alert(ws->req,
764                            "Invalid state: attempt to process CONT frame.");
765 
766         nxt_unit_websocket_done(frame);
767 
768         return PyErr_Format(PyExc_AssertionError,
769                             "Invalid state: attempt to process CONT frame.");
770     }
771 
772     type = nxt_py_websocket_receive_str;
773 
774     switch (opcode) {
775     case NXT_WEBSOCKET_OP_TEXT:
776         buf = nxt_unit_malloc(frame->req->ctx, payload_len);
777         if (nxt_slow_path(buf == NULL)) {
778             nxt_unit_req_alert(ws->req,
779                                "Failed to allocate buffer for payload (%d).",
780                                (int) payload_len);
781 
782             nxt_unit_websocket_done(frame);
783 
784             return PyErr_Format(PyExc_RuntimeError,
785                                 "Failed to allocate buffer for payload (%d).",
786                                 (int) payload_len);
787         }
788 
789         data = NULL;
790         data_key = nxt_py_text_str;
791 
792         break;
793 
794     case NXT_WEBSOCKET_OP_BINARY:
795         data = PyBytes_FromStringAndSize(NULL, payload_len);
796         if (nxt_slow_path(data == NULL)) {
797             nxt_unit_req_alert(ws->req,
798                                "Failed to create Bytes for payload (%d).",
799                                (int) payload_len);
800             nxt_python_print_exception();
801 
802             nxt_unit_websocket_done(frame);
803 
804             return PyErr_Format(PyExc_RuntimeError,
805                                 "Failed to create Bytes for payload.");
806         }
807 
808         buf = (char *) PyBytes_AS_STRING(data);
809         data_key = nxt_py_bytes_str;
810 
811         break;
812 
813     case NXT_WEBSOCKET_OP_CLOSE:
814         if (frame->payload_len >= 2) {
815             nxt_unit_websocket_read(frame, code_buf, 2);
816             code = ((uint16_t) code_buf[0]) << 8 | code_buf[1];
817 
818         } else {
819             code = NXT_WEBSOCKET_CR_NORMAL;
820         }
821 
822         nxt_unit_websocket_done(frame);
823 
824         data = PyLong_FromLong(code);
825         if (nxt_slow_path(data == NULL)) {
826             nxt_unit_req_alert(ws->req,
827                                "Failed to create Long from code %d.",
828                                (int) code);
829             nxt_python_print_exception();
830 
831             return PyErr_Format(PyExc_RuntimeError,
832                                 "Failed to create Long from code %d.",
833                                 (int) code);
834         }
835 
836         buf = NULL;
837         type = nxt_py_websocket_disconnect_str;
838         data_key = nxt_py_code_str;
839 
840         break;
841 
842     default:
843         nxt_unit_req_alert(ws->req, "Unexpected opcode %d", opcode);
844 
845         nxt_unit_websocket_done(frame);
846 
847         return PyErr_Format(PyExc_AssertionError, "Unexpected opcode %d",
848                             opcode);
849     }
850 
851     if (buf != NULL) {
852         fin = frame->header->fin;
853         buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
854 
855         nxt_unit_websocket_done(frame);
856 
857         if (!fin) {
858             while (!nxt_queue_is_empty(&ws->pending_frames)) {
859                 frame = nxt_py_asgi_websocket_pop_frame(ws);
860                 fin = frame->header->fin;
861 
862                 buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
863 
864                 nxt_unit_websocket_done(frame);
865 
866                 if (fin) {
867                     break;
868                 }
869             }
870 
871             if (fin_frame != NULL) {
872                 buf += nxt_unit_websocket_read(fin_frame, buf,
873                                                fin_frame->payload_len);
874                 nxt_unit_websocket_done(fin_frame);
875             }
876         }
877 
878         if (opcode == NXT_WEBSOCKET_OP_TEXT) {
879             buf -= payload_len;
880 
881             data = PyUnicode_DecodeUTF8(buf, payload_len, NULL);
882 
883             nxt_unit_free(ws->req->ctx, buf);
884 
885             if (nxt_slow_path(data == NULL)) {
886                 nxt_unit_req_alert(ws->req,
887                                    "Failed to create Unicode for payload (%d).",
888                                    (int) payload_len);
889                 nxt_python_print_exception();
890 
891                 return PyErr_Format(PyExc_RuntimeError,
892                                     "Failed to create Unicode.");
893             }
894         }
895     }
896 
897     msg = nxt_py_asgi_new_msg(ws->req, type);
898     if (nxt_slow_path(msg == NULL)) {
899         Py_DECREF(data);
900         return NULL;
901     }
902 
903     if (nxt_slow_path(PyDict_SetItem(msg, data_key, data) == -1)) {
904         nxt_unit_req_alert(ws->req, "Python failed to set 'msg.data' item");
905 
906         Py_DECREF(msg);
907         Py_DECREF(data);
908 
909         return PyErr_Format(PyExc_RuntimeError,
910                             "Python failed to set 'msg.data' item");
911     }
912 
913     Py_DECREF(data);
914 
915     return msg;
916 }
917 
918 
919 static uint64_t
920 nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t *ws)
921 {
922     uint64_t                     res;
923     nxt_py_asgi_penging_frame_t  *p;
924 
925     res = 0;
926 
927     nxt_queue_each(p, &ws->pending_frames, nxt_py_asgi_penging_frame_t, link) {
928         res += p->frame->payload_len;
929 
930         if (p->frame->header->fin) {
931             nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d",
932                                (int) res);
933             return res;
934         }
935     } nxt_queue_loop;
936 
937     nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d (all)",
938                        (int) res);
939     return res;
940 }
941 
942 
943 static nxt_unit_websocket_frame_t *
944 nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t *ws)
945 {
946     nxt_queue_link_t             *lnk;
947     nxt_unit_websocket_frame_t   *frame;
948     nxt_py_asgi_penging_frame_t  *p;
949 
950     lnk = nxt_queue_first(&ws->pending_frames);
951     nxt_queue_remove(lnk);
952 
953     p = nxt_queue_link_data(lnk, nxt_py_asgi_penging_frame_t, link);
954 
955     frame = p->frame;
956     ws->pending_payload_len -= frame->payload_len;
957     ws->pending_fins -= frame->header->fin;
958 
959     nxt_unit_free(frame->req->ctx, p);
960 
961     nxt_unit_req_debug(frame->req, "asgi_websocket_pop_frame: "
962                        "%d, %"PRIu64", %d",
963                        frame->header->opcode, frame->payload_len,
964                        frame->header->fin);
965 
966     return frame;
967 }
968 
969 
970 void
971 nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req)
972 {
973     PyObject                 *msg, *exc;
974     nxt_py_asgi_websocket_t  *ws;
975 
976     ws = req->data;
977 
978     nxt_unit_req_debug(req, "asgi_websocket_close_handler");
979 
980     if (ws->receive_future == NULL) {
981         ws->state = NXT_WS_DISCONNECTED;
982 
983         return;
984     }
985 
986     msg = nxt_py_asgi_websocket_disconnect_msg(ws);
987     if (nxt_slow_path(msg == NULL)) {
988         exc = PyErr_Occurred();
989         Py_INCREF(exc);
990 
991         nxt_py_asgi_websocket_receive_fail(ws, exc);
992 
993     } else {
994         nxt_py_asgi_websocket_receive_done(ws, msg);
995     }
996 }
997 
998 
999 static PyObject *
1000 nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t *ws)
1001 {
1002     PyObject  *msg, *code;
1003 
1004     msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_disconnect_str);
1005     if (nxt_slow_path(msg == NULL)) {
1006         return NULL;
1007     }
1008 
1009     code = PyLong_FromLong(NXT_WEBSOCKET_CR_GOING_AWAY);
1010     if (nxt_slow_path(code == NULL)) {
1011         nxt_unit_req_alert(ws->req, "Python failed to create long");
1012         nxt_python_print_exception();
1013 
1014         Py_DECREF(msg);
1015 
1016         return PyErr_Format(PyExc_RuntimeError, "failed to create long");
1017     }
1018 
1019     if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_code_str, code) == -1)) {
1020         nxt_unit_req_alert(ws->req, "Python failed to set 'msg.code' item");
1021 
1022         Py_DECREF(msg);
1023         Py_DECREF(code);
1024 
1025         return PyErr_Format(PyExc_RuntimeError,
1026                             "Python failed to set 'msg.code' item");
1027     }
1028 
1029     Py_DECREF(code);
1030 
1031     return msg;
1032 }
1033 
1034 
1035 static PyObject *
1036 nxt_py_asgi_websocket_done(PyObject *self, PyObject *future)
1037 {
1038     int                      rc;
1039     uint16_t                 status_code;
1040     PyObject                 *res;
1041     nxt_py_asgi_websocket_t  *ws;
1042 
1043     ws = (nxt_py_asgi_websocket_t *) self;
1044 
1045     nxt_unit_req_debug(ws->req, "asgi_websocket_done: %p", self);
1046 
1047     /*
1048      * Get Future.result() and it raises an exception, if coroutine exited
1049      * with exception.
1050      */
1051     res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
1052     if (nxt_slow_path(res == NULL)) {
1053         nxt_unit_req_error(ws->req,
1054                            "Python failed to call 'future.result()'");
1055         nxt_python_print_exception();
1056 
1057         rc = NXT_UNIT_ERROR;
1058 
1059     } else {
1060         Py_DECREF(res);
1061 
1062         rc = NXT_UNIT_OK;
1063     }
1064 
1065     if (ws->state == NXT_WS_ACCEPTED) {
1066         status_code = (rc == NXT_UNIT_OK)
1067                       ? htons(NXT_WEBSOCKET_CR_NORMAL)
1068                       : htons(NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR);
1069 
1070         rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
1071                                      1, &status_code, 2);
1072     }
1073 
1074     while (!nxt_queue_is_empty(&ws->pending_frames)) {
1075         nxt_unit_websocket_done(nxt_py_asgi_websocket_pop_frame(ws));
1076     }
1077 
1078     nxt_unit_request_done(ws->req, rc);
1079 
1080     Py_RETURN_NONE;
1081 }
1082 
1083 
1084 #endif /* NXT_HAVE_ASGI */
1085