xref: /unit/src/python/nxt_python_asgi_http.c (revision 1715:95874fd97501)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <python/nxt_python_asgi.h>
15 #include <python/nxt_python_asgi_str.h>
16 
17 
18 typedef struct {
19     PyObject_HEAD
20     nxt_unit_request_info_t  *req;
21     nxt_queue_link_t         link;
22     PyObject                 *receive_future;
23     PyObject                 *send_future;
24     uint64_t                 content_length;
25     uint64_t                 bytes_sent;
26     int                      complete;
27     int                      closed;
28     PyObject                 *send_body;
29     Py_ssize_t               send_body_off;
30 } nxt_py_asgi_http_t;
31 
32 
33 static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none);
34 static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http);
35 static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict);
36 static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http,
37     PyObject *dict);
38 static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http,
39     PyObject *dict);
40 static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future);
41 
42 
43 static PyMethodDef nxt_py_asgi_http_methods[] = {
44     { "receive",   nxt_py_asgi_http_receive, METH_NOARGS, 0 },
45     { "send",      nxt_py_asgi_http_send,    METH_O,      0 },
46     { "_done",     nxt_py_asgi_http_done,    METH_O,      0 },
47     { NULL, NULL, 0, 0 }
48 };
49 
50 static PyAsyncMethods nxt_py_asgi_async_methods = {
51     .am_await = nxt_py_asgi_await,
52 };
53 
54 static PyTypeObject nxt_py_asgi_http_type = {
55     PyVarObject_HEAD_INIT(NULL, 0)
56 
57     .tp_name      = "unit._asgi_http",
58     .tp_basicsize = sizeof(nxt_py_asgi_http_t),
59     .tp_dealloc   = nxt_py_asgi_dealloc,
60     .tp_as_async  = &nxt_py_asgi_async_methods,
61     .tp_flags     = Py_TPFLAGS_DEFAULT,
62     .tp_doc       = "unit ASGI HTTP request object",
63     .tp_iter      = nxt_py_asgi_iter,
64     .tp_iternext  = nxt_py_asgi_next,
65     .tp_methods   = nxt_py_asgi_http_methods,
66 };
67 
68 static Py_ssize_t  nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
69 
70 
71 int
72 nxt_py_asgi_http_init(void)
73 {
74     if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
75         nxt_unit_alert(NULL,
76                        "Python failed to initialize the 'http' type object");
77         return NXT_UNIT_ERROR;
78     }
79 
80     return NXT_UNIT_OK;
81 }
82 
83 
84 PyObject *
85 nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
86 {
87     nxt_py_asgi_http_t  *http;
88 
89     http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type);
90 
91     if (nxt_fast_path(http != NULL)) {
92         http->req = req;
93         http->receive_future = NULL;
94         http->send_future = NULL;
95         http->content_length = -1;
96         http->bytes_sent = 0;
97         http->complete = 0;
98         http->closed = 0;
99         http->send_body = NULL;
100         http->send_body_off = 0;
101     }
102 
103     return (PyObject *) http;
104 }
105 
106 
107 static PyObject *
108 nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
109 {
110     PyObject                 *msg, *future;
111     nxt_py_asgi_http_t       *http;
112     nxt_py_asgi_ctx_data_t   *ctx_data;
113     nxt_unit_request_info_t  *req;
114 
115     http = (nxt_py_asgi_http_t *) self;
116     req = http->req;
117 
118     nxt_unit_req_debug(req, "asgi_http_receive");
119 
120     if (nxt_slow_path(http->closed || nxt_unit_response_is_sent(req))) {
121         msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
122 
123     } else {
124         msg = nxt_py_asgi_http_read_msg(http);
125     }
126 
127     if (nxt_slow_path(msg == NULL)) {
128         return NULL;
129     }
130 
131     ctx_data = req->ctx->data;
132 
133     future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
134     if (nxt_slow_path(future == NULL)) {
135         nxt_unit_req_alert(req, "Python failed to create Future object");
136         nxt_python_print_exception();
137 
138         Py_DECREF(msg);
139 
140         return PyErr_Format(PyExc_RuntimeError,
141                             "failed to create Future object");
142     }
143 
144     if (msg != Py_None) {
145         return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg);
146     }
147 
148     http->receive_future = future;
149     Py_INCREF(http->receive_future);
150 
151     Py_DECREF(msg);
152 
153     return future;
154 }
155 
156 
157 static PyObject *
158 nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http)
159 {
160     char                     *body_buf;
161     ssize_t                  read_res;
162     PyObject                 *msg, *body;
163     Py_ssize_t               size;
164     nxt_unit_request_info_t  *req;
165 
166     req = http->req;
167 
168     size = req->content_length;
169 
170     if (size > nxt_py_asgi_http_body_buf_size) {
171         size = nxt_py_asgi_http_body_buf_size;
172     }
173 
174     if (size > 0) {
175         body = PyBytes_FromStringAndSize(NULL, size);
176         if (nxt_slow_path(body == NULL)) {
177             nxt_unit_req_alert(req, "Python failed to create body byte string");
178             nxt_python_print_exception();
179 
180             return PyErr_Format(PyExc_RuntimeError,
181                                 "failed to create Bytes object");
182         }
183 
184         body_buf = PyBytes_AS_STRING(body);
185 
186         read_res = nxt_unit_request_read(req, body_buf, size);
187 
188     } else {
189         body = NULL;
190         read_res = 0;
191     }
192 
193     if (read_res > 0 || read_res == size) {
194         msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str);
195         if (nxt_slow_path(msg == NULL)) {
196             Py_XDECREF(body);
197 
198             return NULL;
199         }
200 
201 #define SET_ITEM(dict, key, value) \
202     if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value)      \
203                         == -1))                                                \
204     {                                                                          \
205         nxt_unit_req_alert(req,                                                \
206                            "Python failed to set '" #dict "." #key "' item");  \
207         PyErr_SetString(PyExc_RuntimeError,                                    \
208                         "Python failed to set '" #dict "." #key "' item");     \
209         goto fail;                                                             \
210     }
211 
212         if (body != NULL) {
213             SET_ITEM(msg, body, body)
214         }
215 
216         if (req->content_length > 0) {
217             SET_ITEM(msg, more_body, Py_True)
218         }
219 
220 #undef SET_ITEM
221 
222         Py_XDECREF(body);
223 
224         return msg;
225     }
226 
227     Py_XDECREF(body);
228 
229     Py_RETURN_NONE;
230 
231 fail:
232 
233     Py_DECREF(msg);
234     Py_XDECREF(body);
235 
236     return NULL;
237 }
238 
239 
240 static PyObject *
241 nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
242 {
243     PyObject            *type;
244     const char          *type_str;
245     Py_ssize_t          type_len;
246     nxt_py_asgi_http_t  *http;
247 
248     static const nxt_str_t  response_start = nxt_string("http.response.start");
249     static const nxt_str_t  response_body = nxt_string("http.response.body");
250 
251     http = (nxt_py_asgi_http_t *) self;
252 
253     type = PyDict_GetItem(dict, nxt_py_type_str);
254     if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
255         nxt_unit_req_error(http->req, "asgi_http_send: "
256                                       "'type' is not a unicode string");
257         return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string");
258     }
259 
260     type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
261 
262     nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
263                        (int) type_len, type_str);
264 
265     if (type_len == (Py_ssize_t) response_start.length
266         && memcmp(type_str, response_start.start, type_len) == 0)
267     {
268         return nxt_py_asgi_http_response_start(http, dict);
269     }
270 
271     if (type_len == (Py_ssize_t) response_body.length
272         && memcmp(type_str, response_body.start, type_len) == 0)
273     {
274         return nxt_py_asgi_http_response_body(http, dict);
275     }
276 
277     nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'",
278                        (int) type_len, type_str);
279 
280     return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
281 }
282 
283 
284 static PyObject *
285 nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
286 {
287     int                          rc;
288     PyObject                     *status, *headers, *res;
289     nxt_py_asgi_calc_size_ctx_t  calc_size_ctx;
290     nxt_py_asgi_add_field_ctx_t  add_field_ctx;
291 
292     status = PyDict_GetItem(dict, nxt_py_status_str);
293     if (nxt_slow_path(status == NULL || !PyLong_Check(status))) {
294         nxt_unit_req_error(http->req, "asgi_http_response_start: "
295                                       "'status' is not an integer");
296         return PyErr_Format(PyExc_TypeError, "'status' is not an integer");
297     }
298 
299     calc_size_ctx.fields_size = 0;
300     calc_size_ctx.fields_count = 0;
301 
302     headers = PyDict_GetItem(dict, nxt_py_headers_str);
303     if (headers != NULL) {
304         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
305                                        &calc_size_ctx);
306         if (nxt_slow_path(res == NULL)) {
307             return NULL;
308         }
309 
310         Py_DECREF(res);
311     }
312 
313     rc = nxt_unit_response_init(http->req, PyLong_AsLong(status),
314                                 calc_size_ctx.fields_count,
315                                 calc_size_ctx.fields_size);
316     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
317         return PyErr_Format(PyExc_RuntimeError,
318                             "failed to allocate response object");
319     }
320 
321     add_field_ctx.req = http->req;
322     add_field_ctx.content_length = -1;
323 
324     if (headers != NULL) {
325         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
326                                        &add_field_ctx);
327         if (nxt_slow_path(res == NULL)) {
328             return NULL;
329         }
330 
331         Py_DECREF(res);
332     }
333 
334     http->content_length = add_field_ctx.content_length;
335 
336     Py_INCREF(http);
337     return (PyObject *) http;
338 }
339 
340 
341 static PyObject *
342 nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
343 {
344     int                     rc;
345     char                    *body_str;
346     ssize_t                 sent;
347     PyObject                *body, *more_body, *future;
348     Py_ssize_t              body_len, body_off;
349     nxt_py_asgi_ctx_data_t  *ctx_data;
350 
351     body = PyDict_GetItem(dict, nxt_py_body_str);
352     if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) {
353         return PyErr_Format(PyExc_TypeError, "'body' is not a byte string");
354     }
355 
356     more_body = PyDict_GetItem(dict, nxt_py_more_body_str);
357     if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) {
358         return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool");
359     }
360 
361     if (nxt_slow_path(http->complete)) {
362         return PyErr_Format(PyExc_RuntimeError,
363                             "Unexpected ASGI message 'http.response.body' "
364                             "sent, after response already completed");
365     }
366 
367     if (nxt_slow_path(http->send_future != NULL)) {
368         return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
369     }
370 
371     if (body != NULL) {
372         body_str = PyBytes_AS_STRING(body);
373         body_len = PyBytes_GET_SIZE(body);
374 
375         nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d",
376                            (int) body_len, (more_body == Py_True) );
377 
378         if (nxt_slow_path(http->bytes_sent + body_len
379                               > http->content_length))
380         {
381             return PyErr_Format(PyExc_RuntimeError,
382                                 "Response content longer than Content-Length");
383         }
384 
385         body_off = 0;
386 
387         ctx_data = http->req->ctx->data;
388 
389         while (body_len > 0) {
390             sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
391             if (nxt_slow_path(sent < 0)) {
392                 return PyErr_Format(PyExc_RuntimeError, "failed to send body");
393             }
394 
395             if (nxt_slow_path(sent == 0)) {
396                 nxt_unit_req_debug(http->req, "asgi_http_response_body: "
397                                    "out of shared memory, %d",
398                                    (int) body_len);
399 
400                 future = PyObject_CallObject(ctx_data->loop_create_future,
401                                              NULL);
402                 if (nxt_slow_path(future == NULL)) {
403                     nxt_unit_req_alert(http->req,
404                                        "Python failed to create Future object");
405                     nxt_python_print_exception();
406 
407                     return PyErr_Format(PyExc_RuntimeError,
408                                         "failed to create Future object");
409                 }
410 
411                 http->send_body = body;
412                 Py_INCREF(http->send_body);
413                 http->send_body_off = body_off;
414 
415                 nxt_py_asgi_drain_wait(http->req, &http->link);
416 
417                 http->send_future = future;
418                 Py_INCREF(http->send_future);
419 
420                 return future;
421             }
422 
423             body_str += sent;
424             body_len -= sent;
425             body_off += sent;
426             http->bytes_sent += sent;
427         }
428 
429     } else {
430         nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d",
431                            (more_body == Py_True) );
432 
433         if (!nxt_unit_response_is_sent(http->req)) {
434             rc = nxt_unit_response_send(http->req);
435             if (nxt_slow_path(rc != NXT_UNIT_OK)) {
436                 return PyErr_Format(PyExc_RuntimeError,
437                                     "failed to send response");
438             }
439         }
440     }
441 
442     if (more_body == NULL || more_body == Py_False) {
443         http->complete = 1;
444     }
445 
446     Py_INCREF(http);
447     return (PyObject *) http;
448 }
449 
450 
451 void
452 nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req)
453 {
454     PyObject            *msg, *future, *res;
455     nxt_py_asgi_http_t  *http;
456 
457     http = req->data;
458 
459     nxt_unit_req_debug(req, "asgi_http_data_handler");
460 
461     if (http->receive_future == NULL) {
462         return;
463     }
464 
465     msg = nxt_py_asgi_http_read_msg(http);
466     if (nxt_slow_path(msg == NULL)) {
467         return;
468     }
469 
470     if (msg == Py_None) {
471         Py_DECREF(msg);
472         return;
473     }
474 
475     future = http->receive_future;
476     http->receive_future = NULL;
477 
478     res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
479     if (nxt_slow_path(res == NULL)) {
480         nxt_unit_req_alert(req, "'set_result' call failed");
481         nxt_python_print_exception();
482     }
483 
484     Py_XDECREF(res);
485     Py_DECREF(future);
486 
487     Py_DECREF(msg);
488 }
489 
490 
491 int
492 nxt_py_asgi_http_drain(nxt_queue_link_t *lnk)
493 {
494     char                *body_str;
495     ssize_t             sent;
496     PyObject            *future, *exc, *res;
497     Py_ssize_t          body_len;
498     nxt_py_asgi_http_t  *http;
499 
500     http = nxt_container_of(lnk, nxt_py_asgi_http_t, link);
501 
502     body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off;
503     body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off;
504 
505     nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len);
506 
507     while (body_len > 0) {
508         sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
509         if (nxt_slow_path(sent < 0)) {
510             goto fail;
511         }
512 
513         if (nxt_slow_path(sent == 0)) {
514             return NXT_UNIT_AGAIN;
515         }
516 
517         body_str += sent;
518         body_len -= sent;
519 
520         http->send_body_off += sent;
521         http->bytes_sent += sent;
522     }
523 
524     Py_CLEAR(http->send_body);
525 
526     future = http->send_future;
527     http->send_future = NULL;
528 
529     res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None,
530                                      NULL);
531     if (nxt_slow_path(res == NULL)) {
532         nxt_unit_req_alert(http->req, "'set_result' call failed");
533         nxt_python_print_exception();
534     }
535 
536     Py_XDECREF(res);
537     Py_DECREF(future);
538 
539     return NXT_UNIT_OK;
540 
541 fail:
542 
543     exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
544                                        nxt_py_failed_to_send_body_str,
545                                        NULL);
546     if (nxt_slow_path(exc == NULL)) {
547         nxt_unit_req_alert(http->req, "RuntimeError create failed");
548         nxt_python_print_exception();
549 
550         exc = Py_None;
551         Py_INCREF(exc);
552     }
553 
554     future = http->send_future;
555     http->send_future = NULL;
556 
557     res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
558                                      NULL);
559     if (nxt_slow_path(res == NULL)) {
560         nxt_unit_req_alert(http->req, "'set_exception' call failed");
561         nxt_python_print_exception();
562     }
563 
564     Py_XDECREF(res);
565     Py_DECREF(future);
566     Py_DECREF(exc);
567 
568     return NXT_UNIT_ERROR;
569 }
570 
571 
572 void
573 nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
574 {
575     PyObject            *msg, *future, *res;
576     nxt_py_asgi_http_t  *http;
577 
578     http = req->data;
579 
580     nxt_unit_req_debug(req, "asgi_http_close_handler");
581 
582     http->closed = 1;
583 
584     if (http->receive_future == NULL) {
585         return;
586     }
587 
588     msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
589     if (nxt_slow_path(msg == NULL)) {
590         return;
591     }
592 
593     if (msg == Py_None) {
594         Py_DECREF(msg);
595         return;
596     }
597 
598     future = http->receive_future;
599     http->receive_future = NULL;
600 
601     res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
602     if (nxt_slow_path(res == NULL)) {
603         nxt_unit_req_alert(req, "'set_result' call failed");
604         nxt_python_print_exception();
605     }
606 
607     Py_XDECREF(res);
608     Py_DECREF(future);
609 
610     Py_DECREF(msg);
611 }
612 
613 
614 static PyObject *
615 nxt_py_asgi_http_done(PyObject *self, PyObject *future)
616 {
617     int                 rc;
618     PyObject            *res;
619     nxt_py_asgi_http_t  *http;
620 
621     http = (nxt_py_asgi_http_t *) self;
622 
623     nxt_unit_req_debug(http->req, "asgi_http_done");
624 
625     /*
626      * Get Future.result() and it raises an exception, if coroutine exited
627      * with exception.
628      */
629     res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
630     if (nxt_slow_path(res == NULL)) {
631         nxt_unit_req_error(http->req,
632                            "Python failed to call 'future.result()'");
633         nxt_python_print_exception();
634 
635         rc = NXT_UNIT_ERROR;
636 
637     } else {
638         Py_DECREF(res);
639 
640         rc = NXT_UNIT_OK;
641     }
642 
643     nxt_unit_request_done(http->req, rc);
644 
645     Py_RETURN_NONE;
646 }
647 
648 
649 #endif /* NXT_HAVE_ASGI */
650