xref: /unit/src/python/nxt_python_asgi_http.c (revision 2674:c055d68bbf43)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <python/nxt_python_asgi.h>
15 #include <python/nxt_python_asgi_str.h>
16 
17 
18 typedef struct {
19     PyObject_HEAD
20     nxt_unit_request_info_t  *req;
21     nxt_queue_link_t         link;
22     PyObject                 *receive_future;
23     PyObject                 *send_future;
24     uint64_t                 content_length;
25     uint64_t                 bytes_sent;
26     PyObject                 *send_body;
27     Py_ssize_t               send_body_off;
28     uint8_t                  complete;
29     uint8_t                  closed;
30     uint8_t                  empty_body_received;
31 } nxt_py_asgi_http_t;
32 
33 
34 static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none);
35 static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http);
36 static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict);
37 static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http,
38     PyObject *dict);
39 static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http,
40     PyObject *dict);
41 static void nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http);
42 static void nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http,
43     PyObject *future, PyObject *msg);
44 static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future);
45 
46 
47 static PyMethodDef nxt_py_asgi_http_methods[] = {
48     { "receive",   nxt_py_asgi_http_receive, METH_NOARGS, 0 },
49     { "send",      nxt_py_asgi_http_send,    METH_O,      0 },
50     { "_done",     nxt_py_asgi_http_done,    METH_O,      0 },
51     { NULL, NULL, 0, 0 }
52 };
53 
54 static PyAsyncMethods nxt_py_asgi_async_methods = {
55     .am_await = nxt_py_asgi_await,
56 };
57 
58 static PyTypeObject nxt_py_asgi_http_type = {
59     PyVarObject_HEAD_INIT(NULL, 0)
60 
61     .tp_name      = "unit._asgi_http",
62     .tp_basicsize = sizeof(nxt_py_asgi_http_t),
63     .tp_dealloc   = nxt_py_asgi_dealloc,
64     .tp_as_async  = &nxt_py_asgi_async_methods,
65     .tp_flags     = Py_TPFLAGS_DEFAULT,
66     .tp_doc       = "unit ASGI HTTP request object",
67     .tp_iter      = nxt_py_asgi_iter,
68     .tp_iternext  = nxt_py_asgi_next,
69     .tp_methods   = nxt_py_asgi_http_methods,
70 };
71 
72 static Py_ssize_t  nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
73 
74 
75 int
nxt_py_asgi_http_init(void)76 nxt_py_asgi_http_init(void)
77 {
78     if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
79         nxt_unit_alert(NULL,
80                        "Python failed to initialize the 'http' type object");
81         return NXT_UNIT_ERROR;
82     }
83 
84     return NXT_UNIT_OK;
85 }
86 
87 
88 PyObject *
nxt_py_asgi_http_create(nxt_unit_request_info_t * req)89 nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
90 {
91     nxt_py_asgi_http_t  *http;
92 
93     http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type);
94 
95     if (nxt_fast_path(http != NULL)) {
96         http->req = req;
97         http->receive_future = NULL;
98         http->send_future = NULL;
99         http->content_length = -1;
100         http->bytes_sent = 0;
101         http->send_body = NULL;
102         http->send_body_off = 0;
103         http->complete = 0;
104         http->closed = 0;
105         http->empty_body_received = 0;
106     }
107 
108     return (PyObject *) http;
109 }
110 
111 
112 static PyObject *
nxt_py_asgi_http_receive(PyObject * self,PyObject * none)113 nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
114 {
115     PyObject                 *msg, *future;
116     nxt_py_asgi_http_t       *http;
117     nxt_py_asgi_ctx_data_t   *ctx_data;
118     nxt_unit_request_info_t  *req;
119 
120     http = (nxt_py_asgi_http_t *) self;
121     req = http->req;
122 
123     nxt_unit_req_debug(req, "asgi_http_receive");
124 
125     if (nxt_slow_path(http->closed || http->complete )) {
126         msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
127 
128     } else {
129         msg = nxt_py_asgi_http_read_msg(http);
130     }
131 
132     if (nxt_slow_path(msg == NULL)) {
133         return NULL;
134     }
135 
136     ctx_data = req->ctx->data;
137 
138     future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
139     if (nxt_slow_path(future == NULL)) {
140         nxt_unit_req_alert(req, "Python failed to create Future object");
141         nxt_python_print_exception();
142 
143         Py_DECREF(msg);
144 
145         return PyErr_Format(PyExc_RuntimeError,
146                             "failed to create Future object");
147     }
148 
149     if (msg != Py_None) {
150         return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg);
151     }
152 
153     http->receive_future = future;
154     Py_INCREF(http->receive_future);
155 
156     Py_DECREF(msg);
157 
158     return future;
159 }
160 
161 
162 static PyObject *
nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t * http)163 nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http)
164 {
165     char                     *body_buf;
166     ssize_t                  read_res;
167     PyObject                 *msg, *body;
168     Py_ssize_t               size;
169     nxt_unit_request_info_t  *req;
170 
171     req = http->req;
172 
173     size = req->content_length;
174 
175     if (size > nxt_py_asgi_http_body_buf_size) {
176         size = nxt_py_asgi_http_body_buf_size;
177     }
178 
179     if (size == 0) {
180         if (http->empty_body_received) {
181             Py_RETURN_NONE;
182         }
183 
184         http->empty_body_received = 1;
185     }
186 
187     if (size > 0) {
188         body = PyBytes_FromStringAndSize(NULL, size);
189         if (nxt_slow_path(body == NULL)) {
190             nxt_unit_req_alert(req, "Python failed to create body byte string");
191             nxt_python_print_exception();
192 
193             return PyErr_Format(PyExc_RuntimeError,
194                                 "failed to create Bytes object");
195         }
196 
197         body_buf = PyBytes_AS_STRING(body);
198 
199         read_res = nxt_unit_request_read(req, body_buf, size);
200 
201     } else {
202         body = NULL;
203         read_res = 0;
204     }
205 
206     if (read_res > 0 || read_res == size) {
207         msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str);
208         if (nxt_slow_path(msg == NULL)) {
209             Py_XDECREF(body);
210 
211             return NULL;
212         }
213 
214 #define SET_ITEM(dict, key, value) \
215     if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value)      \
216                         == -1))                                                \
217     {                                                                          \
218         nxt_unit_req_alert(req,                                                \
219                            "Python failed to set '" #dict "." #key "' item");  \
220         PyErr_SetString(PyExc_RuntimeError,                                    \
221                         "Python failed to set '" #dict "." #key "' item");     \
222         goto fail;                                                             \
223     }
224 
225         if (body != NULL) {
226             SET_ITEM(msg, body, body)
227         }
228 
229         if (req->content_length > 0) {
230             SET_ITEM(msg, more_body, Py_True)
231         }
232 
233 #undef SET_ITEM
234 
235         Py_XDECREF(body);
236 
237         return msg;
238     }
239 
240     Py_XDECREF(body);
241 
242     Py_RETURN_NONE;
243 
244 fail:
245 
246     Py_DECREF(msg);
247     Py_XDECREF(body);
248 
249     return NULL;
250 }
251 
252 
253 static PyObject *
nxt_py_asgi_http_send(PyObject * self,PyObject * dict)254 nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
255 {
256     PyObject            *type;
257     const char          *type_str;
258     Py_ssize_t          type_len;
259     nxt_py_asgi_http_t  *http;
260 
261     static const nxt_str_t  response_start = nxt_string("http.response.start");
262     static const nxt_str_t  response_body = nxt_string("http.response.body");
263 
264     http = (nxt_py_asgi_http_t *) self;
265 
266     type = PyDict_GetItem(dict, nxt_py_type_str);
267     if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
268         nxt_unit_req_error(http->req, "asgi_http_send: "
269                                       "'type' is not a unicode string");
270         return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string");
271     }
272 
273     type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
274 
275     nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
276                        (int) type_len, type_str);
277 
278     if (nxt_unit_response_is_init(http->req)) {
279         if (nxt_str_eq(&response_body, type_str, (size_t) type_len)) {
280             return nxt_py_asgi_http_response_body(http, dict);
281         }
282 
283         return PyErr_Format(PyExc_RuntimeError,
284                             "Expected ASGI message 'http.response.body', "
285                             "but got '%U'", type);
286     }
287 
288     if (nxt_str_eq(&response_start, type_str, (size_t) type_len)) {
289         return nxt_py_asgi_http_response_start(http, dict);
290     }
291 
292     return PyErr_Format(PyExc_RuntimeError,
293                         "Expected ASGI message 'http.response.start', "
294                         "but got '%U'", type);
295 }
296 
297 
298 static PyObject *
nxt_py_asgi_http_response_start(nxt_py_asgi_http_t * http,PyObject * dict)299 nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
300 {
301     int                          rc;
302     PyObject                     *status, *headers, *res;
303     nxt_py_asgi_calc_size_ctx_t  calc_size_ctx;
304     nxt_py_asgi_add_field_ctx_t  add_field_ctx;
305 
306     status = PyDict_GetItem(dict, nxt_py_status_str);
307     if (nxt_slow_path(status == NULL || !PyLong_Check(status))) {
308         nxt_unit_req_error(http->req, "asgi_http_response_start: "
309                                       "'status' is not an integer");
310         return PyErr_Format(PyExc_TypeError, "'status' is not an integer");
311     }
312 
313     calc_size_ctx.fields_size = 0;
314     calc_size_ctx.fields_count = 0;
315 
316     headers = PyDict_GetItem(dict, nxt_py_headers_str);
317     if (headers != NULL) {
318         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
319                                        &calc_size_ctx);
320         if (nxt_slow_path(res == NULL)) {
321             return NULL;
322         }
323 
324         Py_DECREF(res);
325     }
326 
327     rc = nxt_unit_response_init(http->req, PyLong_AsLong(status),
328                                 calc_size_ctx.fields_count,
329                                 calc_size_ctx.fields_size);
330     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
331         return PyErr_Format(PyExc_RuntimeError,
332                             "failed to allocate response object");
333     }
334 
335     add_field_ctx.req = http->req;
336     add_field_ctx.content_length = -1;
337 
338     if (headers != NULL) {
339         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
340                                        &add_field_ctx);
341         if (nxt_slow_path(res == NULL)) {
342             return NULL;
343         }
344 
345         Py_DECREF(res);
346     }
347 
348     http->content_length = add_field_ctx.content_length;
349 
350     Py_INCREF(http);
351     return (PyObject *) http;
352 }
353 
354 
355 static PyObject *
nxt_py_asgi_http_response_body(nxt_py_asgi_http_t * http,PyObject * dict)356 nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
357 {
358     int                     rc;
359     char                    *body_str;
360     ssize_t                 sent;
361     PyObject                *body, *more_body, *future;
362     Py_ssize_t              body_len, body_off;
363     nxt_py_asgi_ctx_data_t  *ctx_data;
364 
365     if (nxt_slow_path(http->complete)) {
366         return PyErr_Format(PyExc_RuntimeError,
367                             "Unexpected ASGI message 'http.response.body' "
368                             "sent, after response already completed");
369     }
370 
371     if (nxt_slow_path(http->send_future != NULL)) {
372         return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
373     }
374 
375     more_body = PyDict_GetItem(dict, nxt_py_more_body_str);
376     if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) {
377         return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool");
378     }
379 
380     body = PyDict_GetItem(dict, nxt_py_body_str);
381 
382     if (body != NULL) {
383         if (PyBytes_Check(body)) {
384             body_str = PyBytes_AS_STRING(body);
385             body_len = PyBytes_GET_SIZE(body);
386 
387         } else if (PyByteArray_Check(body)) {
388             body_str = PyByteArray_AS_STRING(body);
389             body_len = PyByteArray_GET_SIZE(body);
390 
391         } else {
392             return PyErr_Format(PyExc_TypeError,
393                                 "'body' is not a byte string or bytearray");
394         }
395 
396         nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d",
397                            (int) body_len, (more_body == Py_True) );
398 
399         if (nxt_slow_path(http->bytes_sent + body_len
400                               > http->content_length))
401         {
402             return PyErr_Format(PyExc_RuntimeError,
403                                 "Response content longer than Content-Length");
404         }
405 
406         body_off = 0;
407 
408         ctx_data = http->req->ctx->data;
409 
410         while (body_len > 0) {
411             sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
412             if (nxt_slow_path(sent < 0)) {
413                 return PyErr_Format(PyExc_RuntimeError, "failed to send body");
414             }
415 
416             if (nxt_slow_path(sent == 0)) {
417                 nxt_unit_req_debug(http->req, "asgi_http_response_body: "
418                                    "out of shared memory, %d",
419                                    (int) body_len);
420 
421                 future = PyObject_CallObject(ctx_data->loop_create_future,
422                                              NULL);
423                 if (nxt_slow_path(future == NULL)) {
424                     nxt_unit_req_alert(http->req,
425                                        "Python failed to create Future object");
426                     nxt_python_print_exception();
427 
428                     return PyErr_Format(PyExc_RuntimeError,
429                                         "failed to create Future object");
430                 }
431 
432                 http->send_body = body;
433                 Py_INCREF(http->send_body);
434                 http->send_body_off = body_off;
435 
436                 nxt_py_asgi_drain_wait(http->req, &http->link);
437 
438                 http->send_future = future;
439                 Py_INCREF(http->send_future);
440 
441                 return future;
442             }
443 
444             body_str += sent;
445             body_len -= sent;
446             body_off += sent;
447             http->bytes_sent += sent;
448         }
449 
450     } else {
451         nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d",
452                            (more_body == Py_True) );
453 
454         if (!nxt_unit_response_is_sent(http->req)) {
455             rc = nxt_unit_response_send(http->req);
456             if (nxt_slow_path(rc != NXT_UNIT_OK)) {
457                 return PyErr_Format(PyExc_RuntimeError,
458                                     "failed to send response");
459             }
460         }
461     }
462 
463     if (more_body == NULL || more_body == Py_False) {
464         http->complete = 1;
465 
466         nxt_py_asgi_http_emit_disconnect(http);
467     }
468 
469     Py_INCREF(http);
470     return (PyObject *) http;
471 }
472 
473 
474 static void
nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t * http)475 nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http)
476 {
477     PyObject  *msg, *future;
478 
479     if (http->receive_future == NULL) {
480         return;
481     }
482 
483     msg = nxt_py_asgi_new_msg(http->req, nxt_py_http_disconnect_str);
484     if (nxt_slow_path(msg == NULL)) {
485         return;
486     }
487 
488     if (msg == Py_None) {
489         Py_DECREF(msg);
490         return;
491     }
492 
493     future = http->receive_future;
494     http->receive_future = NULL;
495 
496     nxt_py_asgi_http_set_result(http, future, msg);
497 
498     Py_DECREF(msg);
499 }
500 
501 
502 static void
nxt_py_asgi_http_set_result(nxt_py_asgi_http_t * http,PyObject * future,PyObject * msg)503 nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http, PyObject *future,
504     PyObject *msg)
505 {
506     PyObject  *res;
507 
508     res = PyObject_CallMethodObjArgs(future, nxt_py_done_str, NULL);
509     if (nxt_slow_path(res == NULL)) {
510         nxt_unit_req_alert(http->req, "'done' call failed");
511         nxt_python_print_exception();
512     }
513 
514     if (nxt_fast_path(res == Py_False)) {
515         res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg,
516                                          NULL);
517         if (nxt_slow_path(res == NULL)) {
518             nxt_unit_req_alert(http->req, "'set_result' call failed");
519             nxt_python_print_exception();
520         }
521 
522     } else {
523         res = NULL;
524     }
525 
526     Py_XDECREF(res);
527     Py_DECREF(future);
528 }
529 
530 
531 void
nxt_py_asgi_http_data_handler(nxt_unit_request_info_t * req)532 nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req)
533 {
534     PyObject            *msg, *future;
535     nxt_py_asgi_http_t  *http;
536 
537     http = req->data;
538 
539     nxt_unit_req_debug(req, "asgi_http_data_handler");
540 
541     if (http->receive_future == NULL) {
542         return;
543     }
544 
545     msg = nxt_py_asgi_http_read_msg(http);
546     if (nxt_slow_path(msg == NULL)) {
547         return;
548     }
549 
550     if (msg == Py_None) {
551         Py_DECREF(msg);
552         return;
553     }
554 
555     future = http->receive_future;
556     http->receive_future = NULL;
557 
558     nxt_py_asgi_http_set_result(http, future, msg);
559 
560     Py_DECREF(msg);
561 }
562 
563 
564 int
nxt_py_asgi_http_drain(nxt_queue_link_t * lnk)565 nxt_py_asgi_http_drain(nxt_queue_link_t *lnk)
566 {
567     char                *body_str;
568     ssize_t             sent;
569     PyObject            *future, *exc, *res;
570     Py_ssize_t          body_len;
571     nxt_py_asgi_http_t  *http;
572 
573     http = nxt_container_of(lnk, nxt_py_asgi_http_t, link);
574 
575     body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off;
576     body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off;
577 
578     nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len);
579 
580     while (body_len > 0) {
581         sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
582         if (nxt_slow_path(sent < 0)) {
583             goto fail;
584         }
585 
586         if (nxt_slow_path(sent == 0)) {
587             return NXT_UNIT_AGAIN;
588         }
589 
590         body_str += sent;
591         body_len -= sent;
592 
593         http->send_body_off += sent;
594         http->bytes_sent += sent;
595     }
596 
597     Py_CLEAR(http->send_body);
598 
599     future = http->send_future;
600     http->send_future = NULL;
601 
602     nxt_py_asgi_http_set_result(http, future, Py_None);
603 
604     return NXT_UNIT_OK;
605 
606 fail:
607 
608     exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
609                                        nxt_py_failed_to_send_body_str,
610                                        NULL);
611     if (nxt_slow_path(exc == NULL)) {
612         nxt_unit_req_alert(http->req, "RuntimeError create failed");
613         nxt_python_print_exception();
614 
615         exc = Py_None;
616         Py_INCREF(exc);
617     }
618 
619     future = http->send_future;
620     http->send_future = NULL;
621 
622     res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
623                                      NULL);
624     if (nxt_slow_path(res == NULL)) {
625         nxt_unit_req_alert(http->req, "'set_exception' call failed");
626         nxt_python_print_exception();
627     }
628 
629     Py_XDECREF(res);
630     Py_DECREF(future);
631     Py_DECREF(exc);
632 
633     return NXT_UNIT_ERROR;
634 }
635 
636 
637 void
nxt_py_asgi_http_close_handler(nxt_unit_request_info_t * req)638 nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
639 {
640     nxt_py_asgi_http_t  *http;
641 
642     http = req->data;
643 
644     nxt_unit_req_debug(req, "asgi_http_close_handler");
645 
646     if (nxt_fast_path(http != NULL)) {
647         http->closed = 1;
648 
649         nxt_py_asgi_http_emit_disconnect(http);
650     }
651 }
652 
653 
654 static PyObject *
nxt_py_asgi_http_done(PyObject * self,PyObject * future)655 nxt_py_asgi_http_done(PyObject *self, PyObject *future)
656 {
657     int                 rc;
658     PyObject            *res;
659     nxt_py_asgi_http_t  *http;
660 
661     http = (nxt_py_asgi_http_t *) self;
662 
663     nxt_unit_req_debug(http->req, "asgi_http_done");
664 
665     /*
666      * Get Future.result() and it raises an exception, if coroutine exited
667      * with exception.
668      */
669     res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
670     if (nxt_slow_path(res == NULL)) {
671         nxt_unit_req_error(http->req,
672                            "Python failed to call 'future.result()'");
673         nxt_python_print_exception();
674 
675         rc = NXT_UNIT_ERROR;
676 
677     } else {
678         Py_DECREF(res);
679 
680         rc = NXT_UNIT_OK;
681     }
682 
683     nxt_unit_request_done(http->req, rc);
684 
685     Py_RETURN_NONE;
686 }
687 
688 
689 #endif /* NXT_HAVE_ASGI */
690