xref: /unit/src/python/nxt_python_asgi_http.c (revision 1717:fcd578736562)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <python/nxt_python_asgi.h>
15 #include <python/nxt_python_asgi_str.h>
16 
17 
18 typedef struct {
19     PyObject_HEAD
20     nxt_unit_request_info_t  *req;
21     nxt_queue_link_t         link;
22     PyObject                 *receive_future;
23     PyObject                 *send_future;
24     uint64_t                 content_length;
25     uint64_t                 bytes_sent;
26     int                      complete;
27     int                      closed;
28     PyObject                 *send_body;
29     Py_ssize_t               send_body_off;
30 } nxt_py_asgi_http_t;
31 
32 
33 static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none);
34 static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http);
35 static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict);
36 static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http,
37     PyObject *dict);
38 static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http,
39     PyObject *dict);
40 static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future);
41 
42 
43 static PyMethodDef nxt_py_asgi_http_methods[] = {
44     { "receive",   nxt_py_asgi_http_receive, METH_NOARGS, 0 },
45     { "send",      nxt_py_asgi_http_send,    METH_O,      0 },
46     { "_done",     nxt_py_asgi_http_done,    METH_O,      0 },
47     { NULL, NULL, 0, 0 }
48 };
49 
50 static PyAsyncMethods nxt_py_asgi_async_methods = {
51     .am_await = nxt_py_asgi_await,
52 };
53 
54 static PyTypeObject nxt_py_asgi_http_type = {
55     PyVarObject_HEAD_INIT(NULL, 0)
56 
57     .tp_name      = "unit._asgi_http",
58     .tp_basicsize = sizeof(nxt_py_asgi_http_t),
59     .tp_dealloc   = nxt_py_asgi_dealloc,
60     .tp_as_async  = &nxt_py_asgi_async_methods,
61     .tp_flags     = Py_TPFLAGS_DEFAULT,
62     .tp_doc       = "unit ASGI HTTP request object",
63     .tp_iter      = nxt_py_asgi_iter,
64     .tp_iternext  = nxt_py_asgi_next,
65     .tp_methods   = nxt_py_asgi_http_methods,
66 };
67 
68 static Py_ssize_t  nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
69 
70 
71 int
72 nxt_py_asgi_http_init(void)
73 {
74     if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
75         nxt_unit_alert(NULL,
76                        "Python failed to initialize the 'http' type object");
77         return NXT_UNIT_ERROR;
78     }
79 
80     return NXT_UNIT_OK;
81 }
82 
83 
84 PyObject *
85 nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
86 {
87     nxt_py_asgi_http_t  *http;
88 
89     http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type);
90 
91     if (nxt_fast_path(http != NULL)) {
92         http->req = req;
93         http->receive_future = NULL;
94         http->send_future = NULL;
95         http->content_length = -1;
96         http->bytes_sent = 0;
97         http->complete = 0;
98         http->closed = 0;
99         http->send_body = NULL;
100         http->send_body_off = 0;
101     }
102 
103     return (PyObject *) http;
104 }
105 
106 
107 static PyObject *
108 nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
109 {
110     PyObject                 *msg, *future;
111     nxt_py_asgi_http_t       *http;
112     nxt_py_asgi_ctx_data_t   *ctx_data;
113     nxt_unit_request_info_t  *req;
114 
115     http = (nxt_py_asgi_http_t *) self;
116     req = http->req;
117 
118     nxt_unit_req_debug(req, "asgi_http_receive");
119 
120     if (nxt_slow_path(http->closed || nxt_unit_response_is_sent(req))) {
121         msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
122 
123     } else {
124         msg = nxt_py_asgi_http_read_msg(http);
125     }
126 
127     if (nxt_slow_path(msg == NULL)) {
128         return NULL;
129     }
130 
131     ctx_data = req->ctx->data;
132 
133     future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
134     if (nxt_slow_path(future == NULL)) {
135         nxt_unit_req_alert(req, "Python failed to create Future object");
136         nxt_python_print_exception();
137 
138         Py_DECREF(msg);
139 
140         return PyErr_Format(PyExc_RuntimeError,
141                             "failed to create Future object");
142     }
143 
144     if (msg != Py_None) {
145         return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg);
146     }
147 
148     http->receive_future = future;
149     Py_INCREF(http->receive_future);
150 
151     Py_DECREF(msg);
152 
153     return future;
154 }
155 
156 
157 static PyObject *
158 nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http)
159 {
160     char                     *body_buf;
161     ssize_t                  read_res;
162     PyObject                 *msg, *body;
163     Py_ssize_t               size;
164     nxt_unit_request_info_t  *req;
165 
166     req = http->req;
167 
168     size = req->content_length;
169 
170     if (size > nxt_py_asgi_http_body_buf_size) {
171         size = nxt_py_asgi_http_body_buf_size;
172     }
173 
174     if (size > 0) {
175         body = PyBytes_FromStringAndSize(NULL, size);
176         if (nxt_slow_path(body == NULL)) {
177             nxt_unit_req_alert(req, "Python failed to create body byte string");
178             nxt_python_print_exception();
179 
180             return PyErr_Format(PyExc_RuntimeError,
181                                 "failed to create Bytes object");
182         }
183 
184         body_buf = PyBytes_AS_STRING(body);
185 
186         read_res = nxt_unit_request_read(req, body_buf, size);
187 
188     } else {
189         body = NULL;
190         read_res = 0;
191     }
192 
193     if (read_res > 0 || read_res == size) {
194         msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str);
195         if (nxt_slow_path(msg == NULL)) {
196             Py_XDECREF(body);
197 
198             return NULL;
199         }
200 
201 #define SET_ITEM(dict, key, value) \
202     if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value)      \
203                         == -1))                                                \
204     {                                                                          \
205         nxt_unit_req_alert(req,                                                \
206                            "Python failed to set '" #dict "." #key "' item");  \
207         PyErr_SetString(PyExc_RuntimeError,                                    \
208                         "Python failed to set '" #dict "." #key "' item");     \
209         goto fail;                                                             \
210     }
211 
212         if (body != NULL) {
213             SET_ITEM(msg, body, body)
214         }
215 
216         if (req->content_length > 0) {
217             SET_ITEM(msg, more_body, Py_True)
218         }
219 
220 #undef SET_ITEM
221 
222         Py_XDECREF(body);
223 
224         return msg;
225     }
226 
227     Py_XDECREF(body);
228 
229     Py_RETURN_NONE;
230 
231 fail:
232 
233     Py_DECREF(msg);
234     Py_XDECREF(body);
235 
236     return NULL;
237 }
238 
239 
240 static PyObject *
241 nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
242 {
243     PyObject            *type;
244     const char          *type_str;
245     Py_ssize_t          type_len;
246     nxt_py_asgi_http_t  *http;
247 
248     static const nxt_str_t  response_start = nxt_string("http.response.start");
249     static const nxt_str_t  response_body = nxt_string("http.response.body");
250 
251     http = (nxt_py_asgi_http_t *) self;
252 
253     type = PyDict_GetItem(dict, nxt_py_type_str);
254     if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
255         nxt_unit_req_error(http->req, "asgi_http_send: "
256                                       "'type' is not a unicode string");
257         return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string");
258     }
259 
260     type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
261 
262     nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
263                        (int) type_len, type_str);
264 
265     if (nxt_unit_response_is_init(http->req)) {
266         if (nxt_str_eq(&response_body, type_str, (size_t) type_len)) {
267             return nxt_py_asgi_http_response_body(http, dict);
268         }
269 
270         return PyErr_Format(PyExc_RuntimeError,
271                             "Expected ASGI message 'http.response.body', "
272                             "but got '%U'", type);
273     }
274 
275     if (nxt_str_eq(&response_start, type_str, (size_t) type_len)) {
276         return nxt_py_asgi_http_response_start(http, dict);
277     }
278 
279     return PyErr_Format(PyExc_RuntimeError,
280                         "Expected ASGI message 'http.response.start', "
281                         "but got '%U'", type);
282 }
283 
284 
285 static PyObject *
286 nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
287 {
288     int                          rc;
289     PyObject                     *status, *headers, *res;
290     nxt_py_asgi_calc_size_ctx_t  calc_size_ctx;
291     nxt_py_asgi_add_field_ctx_t  add_field_ctx;
292 
293     status = PyDict_GetItem(dict, nxt_py_status_str);
294     if (nxt_slow_path(status == NULL || !PyLong_Check(status))) {
295         nxt_unit_req_error(http->req, "asgi_http_response_start: "
296                                       "'status' is not an integer");
297         return PyErr_Format(PyExc_TypeError, "'status' is not an integer");
298     }
299 
300     calc_size_ctx.fields_size = 0;
301     calc_size_ctx.fields_count = 0;
302 
303     headers = PyDict_GetItem(dict, nxt_py_headers_str);
304     if (headers != NULL) {
305         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
306                                        &calc_size_ctx);
307         if (nxt_slow_path(res == NULL)) {
308             return NULL;
309         }
310 
311         Py_DECREF(res);
312     }
313 
314     rc = nxt_unit_response_init(http->req, PyLong_AsLong(status),
315                                 calc_size_ctx.fields_count,
316                                 calc_size_ctx.fields_size);
317     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
318         return PyErr_Format(PyExc_RuntimeError,
319                             "failed to allocate response object");
320     }
321 
322     add_field_ctx.req = http->req;
323     add_field_ctx.content_length = -1;
324 
325     if (headers != NULL) {
326         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
327                                        &add_field_ctx);
328         if (nxt_slow_path(res == NULL)) {
329             return NULL;
330         }
331 
332         Py_DECREF(res);
333     }
334 
335     http->content_length = add_field_ctx.content_length;
336 
337     Py_INCREF(http);
338     return (PyObject *) http;
339 }
340 
341 
342 static PyObject *
343 nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
344 {
345     int                     rc;
346     char                    *body_str;
347     ssize_t                 sent;
348     PyObject                *body, *more_body, *future;
349     Py_ssize_t              body_len, body_off;
350     nxt_py_asgi_ctx_data_t  *ctx_data;
351 
352     body = PyDict_GetItem(dict, nxt_py_body_str);
353     if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) {
354         return PyErr_Format(PyExc_TypeError, "'body' is not a byte string");
355     }
356 
357     more_body = PyDict_GetItem(dict, nxt_py_more_body_str);
358     if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) {
359         return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool");
360     }
361 
362     if (nxt_slow_path(http->complete)) {
363         return PyErr_Format(PyExc_RuntimeError,
364                             "Unexpected ASGI message 'http.response.body' "
365                             "sent, after response already completed");
366     }
367 
368     if (nxt_slow_path(http->send_future != NULL)) {
369         return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
370     }
371 
372     if (body != NULL) {
373         body_str = PyBytes_AS_STRING(body);
374         body_len = PyBytes_GET_SIZE(body);
375 
376         nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d",
377                            (int) body_len, (more_body == Py_True) );
378 
379         if (nxt_slow_path(http->bytes_sent + body_len
380                               > http->content_length))
381         {
382             return PyErr_Format(PyExc_RuntimeError,
383                                 "Response content longer than Content-Length");
384         }
385 
386         body_off = 0;
387 
388         ctx_data = http->req->ctx->data;
389 
390         while (body_len > 0) {
391             sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
392             if (nxt_slow_path(sent < 0)) {
393                 return PyErr_Format(PyExc_RuntimeError, "failed to send body");
394             }
395 
396             if (nxt_slow_path(sent == 0)) {
397                 nxt_unit_req_debug(http->req, "asgi_http_response_body: "
398                                    "out of shared memory, %d",
399                                    (int) body_len);
400 
401                 future = PyObject_CallObject(ctx_data->loop_create_future,
402                                              NULL);
403                 if (nxt_slow_path(future == NULL)) {
404                     nxt_unit_req_alert(http->req,
405                                        "Python failed to create Future object");
406                     nxt_python_print_exception();
407 
408                     return PyErr_Format(PyExc_RuntimeError,
409                                         "failed to create Future object");
410                 }
411 
412                 http->send_body = body;
413                 Py_INCREF(http->send_body);
414                 http->send_body_off = body_off;
415 
416                 nxt_py_asgi_drain_wait(http->req, &http->link);
417 
418                 http->send_future = future;
419                 Py_INCREF(http->send_future);
420 
421                 return future;
422             }
423 
424             body_str += sent;
425             body_len -= sent;
426             body_off += sent;
427             http->bytes_sent += sent;
428         }
429 
430     } else {
431         nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d",
432                            (more_body == Py_True) );
433 
434         if (!nxt_unit_response_is_sent(http->req)) {
435             rc = nxt_unit_response_send(http->req);
436             if (nxt_slow_path(rc != NXT_UNIT_OK)) {
437                 return PyErr_Format(PyExc_RuntimeError,
438                                     "failed to send response");
439             }
440         }
441     }
442 
443     if (more_body == NULL || more_body == Py_False) {
444         http->complete = 1;
445     }
446 
447     Py_INCREF(http);
448     return (PyObject *) http;
449 }
450 
451 
452 void
453 nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req)
454 {
455     PyObject            *msg, *future, *res;
456     nxt_py_asgi_http_t  *http;
457 
458     http = req->data;
459 
460     nxt_unit_req_debug(req, "asgi_http_data_handler");
461 
462     if (http->receive_future == NULL) {
463         return;
464     }
465 
466     msg = nxt_py_asgi_http_read_msg(http);
467     if (nxt_slow_path(msg == NULL)) {
468         return;
469     }
470 
471     if (msg == Py_None) {
472         Py_DECREF(msg);
473         return;
474     }
475 
476     future = http->receive_future;
477     http->receive_future = NULL;
478 
479     res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
480     if (nxt_slow_path(res == NULL)) {
481         nxt_unit_req_alert(req, "'set_result' call failed");
482         nxt_python_print_exception();
483     }
484 
485     Py_XDECREF(res);
486     Py_DECREF(future);
487 
488     Py_DECREF(msg);
489 }
490 
491 
492 int
493 nxt_py_asgi_http_drain(nxt_queue_link_t *lnk)
494 {
495     char                *body_str;
496     ssize_t             sent;
497     PyObject            *future, *exc, *res;
498     Py_ssize_t          body_len;
499     nxt_py_asgi_http_t  *http;
500 
501     http = nxt_container_of(lnk, nxt_py_asgi_http_t, link);
502 
503     body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off;
504     body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off;
505 
506     nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len);
507 
508     while (body_len > 0) {
509         sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
510         if (nxt_slow_path(sent < 0)) {
511             goto fail;
512         }
513 
514         if (nxt_slow_path(sent == 0)) {
515             return NXT_UNIT_AGAIN;
516         }
517 
518         body_str += sent;
519         body_len -= sent;
520 
521         http->send_body_off += sent;
522         http->bytes_sent += sent;
523     }
524 
525     Py_CLEAR(http->send_body);
526 
527     future = http->send_future;
528     http->send_future = NULL;
529 
530     res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None,
531                                      NULL);
532     if (nxt_slow_path(res == NULL)) {
533         nxt_unit_req_alert(http->req, "'set_result' call failed");
534         nxt_python_print_exception();
535     }
536 
537     Py_XDECREF(res);
538     Py_DECREF(future);
539 
540     return NXT_UNIT_OK;
541 
542 fail:
543 
544     exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
545                                        nxt_py_failed_to_send_body_str,
546                                        NULL);
547     if (nxt_slow_path(exc == NULL)) {
548         nxt_unit_req_alert(http->req, "RuntimeError create failed");
549         nxt_python_print_exception();
550 
551         exc = Py_None;
552         Py_INCREF(exc);
553     }
554 
555     future = http->send_future;
556     http->send_future = NULL;
557 
558     res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
559                                      NULL);
560     if (nxt_slow_path(res == NULL)) {
561         nxt_unit_req_alert(http->req, "'set_exception' call failed");
562         nxt_python_print_exception();
563     }
564 
565     Py_XDECREF(res);
566     Py_DECREF(future);
567     Py_DECREF(exc);
568 
569     return NXT_UNIT_ERROR;
570 }
571 
572 
573 void
574 nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
575 {
576     PyObject            *msg, *future, *res;
577     nxt_py_asgi_http_t  *http;
578 
579     http = req->data;
580 
581     nxt_unit_req_debug(req, "asgi_http_close_handler");
582 
583     http->closed = 1;
584 
585     if (http->receive_future == NULL) {
586         return;
587     }
588 
589     msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
590     if (nxt_slow_path(msg == NULL)) {
591         return;
592     }
593 
594     if (msg == Py_None) {
595         Py_DECREF(msg);
596         return;
597     }
598 
599     future = http->receive_future;
600     http->receive_future = NULL;
601 
602     res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
603     if (nxt_slow_path(res == NULL)) {
604         nxt_unit_req_alert(req, "'set_result' call failed");
605         nxt_python_print_exception();
606     }
607 
608     Py_XDECREF(res);
609     Py_DECREF(future);
610 
611     Py_DECREF(msg);
612 }
613 
614 
615 static PyObject *
616 nxt_py_asgi_http_done(PyObject *self, PyObject *future)
617 {
618     int                 rc;
619     PyObject            *res;
620     nxt_py_asgi_http_t  *http;
621 
622     http = (nxt_py_asgi_http_t *) self;
623 
624     nxt_unit_req_debug(http->req, "asgi_http_done");
625 
626     /*
627      * Get Future.result() and it raises an exception, if coroutine exited
628      * with exception.
629      */
630     res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
631     if (nxt_slow_path(res == NULL)) {
632         nxt_unit_req_error(http->req,
633                            "Python failed to call 'future.result()'");
634         nxt_python_print_exception();
635 
636         rc = NXT_UNIT_ERROR;
637 
638     } else {
639         Py_DECREF(res);
640 
641         rc = NXT_UNIT_OK;
642     }
643 
644     nxt_unit_request_done(http->req, rc);
645 
646     Py_RETURN_NONE;
647 }
648 
649 
650 #endif /* NXT_HAVE_ASGI */
651