xref: /unit/src/python/nxt_python_asgi_http.c (revision 1624:e46b1b422545)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <python/nxt_python.h>
8 
9 #if (NXT_HAVE_ASGI)
10 
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <python/nxt_python_asgi.h>
15 #include <python/nxt_python_asgi_str.h>
16 
17 
18 typedef struct {
19     PyObject_HEAD
20     nxt_unit_request_info_t  *req;
21     nxt_queue_link_t         link;
22     PyObject                 *receive_future;
23     PyObject                 *send_future;
24     uint64_t                 content_length;
25     uint64_t                 bytes_sent;
26     int                      complete;
27     PyObject                 *send_body;
28     Py_ssize_t               send_body_off;
29 } nxt_py_asgi_http_t;
30 
31 
32 static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none);
33 static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http);
34 static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict);
35 static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http,
36     PyObject *dict);
37 static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http,
38     PyObject *dict);
39 static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future);
40 
41 
42 static PyMethodDef nxt_py_asgi_http_methods[] = {
43     { "receive",   nxt_py_asgi_http_receive, METH_NOARGS, 0 },
44     { "send",      nxt_py_asgi_http_send,    METH_O,      0 },
45     { "_done",     nxt_py_asgi_http_done,    METH_O,      0 },
46     { NULL, NULL, 0, 0 }
47 };
48 
49 static PyAsyncMethods nxt_py_asgi_async_methods = {
50     .am_await = nxt_py_asgi_await,
51 };
52 
53 static PyTypeObject nxt_py_asgi_http_type = {
54     PyVarObject_HEAD_INIT(NULL, 0)
55 
56     .tp_name      = "unit._asgi_http",
57     .tp_basicsize = sizeof(nxt_py_asgi_http_t),
58     .tp_dealloc   = nxt_py_asgi_dealloc,
59     .tp_as_async  = &nxt_py_asgi_async_methods,
60     .tp_flags     = Py_TPFLAGS_DEFAULT,
61     .tp_doc       = "unit ASGI HTTP request object",
62     .tp_iter      = nxt_py_asgi_iter,
63     .tp_iternext  = nxt_py_asgi_next,
64     .tp_methods   = nxt_py_asgi_http_methods,
65 };
66 
67 static Py_ssize_t  nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
68 
69 
70 nxt_int_t
71 nxt_py_asgi_http_init(nxt_task_t *task)
72 {
73     if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
74         nxt_alert(task, "Python failed to initialize the 'http' type object");
75         return NXT_ERROR;
76     }
77 
78     return NXT_OK;
79 }
80 
81 
82 PyObject *
83 nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
84 {
85     nxt_py_asgi_http_t  *http;
86 
87     http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type);
88 
89     if (nxt_fast_path(http != NULL)) {
90         http->req = req;
91         http->receive_future = NULL;
92         http->send_future = NULL;
93         http->content_length = -1;
94         http->bytes_sent = 0;
95         http->complete = 0;
96         http->send_body = NULL;
97         http->send_body_off = 0;
98     }
99 
100     return (PyObject *) http;
101 }
102 
103 
104 static PyObject *
105 nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
106 {
107     PyObject                 *msg, *future;
108     nxt_py_asgi_http_t       *http;
109     nxt_unit_request_info_t  *req;
110 
111     http = (nxt_py_asgi_http_t *) self;
112     req = http->req;
113 
114     nxt_unit_req_debug(req, "asgi_http_receive");
115 
116     msg = nxt_py_asgi_http_read_msg(http);
117     if (nxt_slow_path(msg == NULL)) {
118         return NULL;
119     }
120 
121     future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
122     if (nxt_slow_path(future == NULL)) {
123         nxt_unit_req_alert(req, "Python failed to create Future object");
124         nxt_python_print_exception();
125 
126         Py_DECREF(msg);
127 
128         return PyErr_Format(PyExc_RuntimeError,
129                             "failed to create Future object");
130     }
131 
132     if (msg != Py_None) {
133         return nxt_py_asgi_set_result_soon(req, future, msg);
134     }
135 
136     http->receive_future = future;
137     Py_INCREF(http->receive_future);
138 
139     Py_DECREF(msg);
140 
141     return future;
142 }
143 
144 
145 static PyObject *
146 nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http)
147 {
148     char                     *body_buf;
149     ssize_t                  read_res;
150     PyObject                 *msg, *body;
151     Py_ssize_t               size;
152     nxt_unit_request_info_t  *req;
153 
154     req = http->req;
155 
156     size = req->content_length;
157 
158     if (size > nxt_py_asgi_http_body_buf_size) {
159         size = nxt_py_asgi_http_body_buf_size;
160     }
161 
162     if (size > 0) {
163         body = PyBytes_FromStringAndSize(NULL, size);
164         if (nxt_slow_path(body == NULL)) {
165             nxt_unit_req_alert(req, "Python failed to create body byte string");
166             nxt_python_print_exception();
167 
168             return PyErr_Format(PyExc_RuntimeError,
169                                 "failed to create Bytes object");
170         }
171 
172         body_buf = PyBytes_AS_STRING(body);
173 
174         read_res = nxt_unit_request_read(req, body_buf, size);
175 
176     } else {
177         body = NULL;
178         read_res = 0;
179     }
180 
181     if (read_res > 0 || read_res == size) {
182         msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str);
183         if (nxt_slow_path(msg == NULL)) {
184             Py_XDECREF(body);
185 
186             return NULL;
187         }
188 
189 #define SET_ITEM(dict, key, value) \
190     if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value)      \
191                         == -1))                                                \
192     {                                                                          \
193         nxt_unit_req_alert(req,                                                \
194                            "Python failed to set '" #dict "." #key "' item");  \
195         PyErr_SetString(PyExc_RuntimeError,                                    \
196                         "Python failed to set '" #dict "." #key "' item");     \
197         goto fail;                                                             \
198     }
199 
200         if (body != NULL) {
201             SET_ITEM(msg, body, body)
202         }
203 
204         if (req->content_length > 0) {
205             SET_ITEM(msg, more_body, Py_True)
206         }
207 
208 #undef SET_ITEM
209 
210         Py_XDECREF(body);
211 
212         return msg;
213     }
214 
215     Py_XDECREF(body);
216 
217     Py_RETURN_NONE;
218 
219 fail:
220 
221     Py_DECREF(msg);
222     Py_XDECREF(body);
223 
224     return NULL;
225 }
226 
227 
228 static PyObject *
229 nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
230 {
231     PyObject            *type;
232     const char          *type_str;
233     Py_ssize_t          type_len;
234     nxt_py_asgi_http_t  *http;
235 
236     static const nxt_str_t  response_start = nxt_string("http.response.start");
237     static const nxt_str_t  response_body = nxt_string("http.response.body");
238 
239     http = (nxt_py_asgi_http_t *) self;
240 
241     type = PyDict_GetItem(dict, nxt_py_type_str);
242     if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
243         nxt_unit_req_error(http->req, "asgi_http_send: "
244                                       "'type' is not a unicode string");
245         return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string");
246     }
247 
248     type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
249 
250     nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
251                        (int) type_len, type_str);
252 
253     if (type_len == (Py_ssize_t) response_start.length
254         && memcmp(type_str, response_start.start, type_len) == 0)
255     {
256         return nxt_py_asgi_http_response_start(http, dict);
257     }
258 
259     if (type_len == (Py_ssize_t) response_body.length
260         && memcmp(type_str, response_body.start, type_len) == 0)
261     {
262         return nxt_py_asgi_http_response_body(http, dict);
263     }
264 
265     nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'",
266                        (int) type_len, type_str);
267 
268     return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
269 }
270 
271 
272 static PyObject *
273 nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
274 {
275     int                          rc;
276     PyObject                     *status, *headers, *res;
277     nxt_py_asgi_calc_size_ctx_t  calc_size_ctx;
278     nxt_py_asgi_add_field_ctx_t  add_field_ctx;
279 
280     status = PyDict_GetItem(dict, nxt_py_status_str);
281     if (nxt_slow_path(status == NULL || !PyLong_Check(status))) {
282         nxt_unit_req_error(http->req, "asgi_http_response_start: "
283                                       "'status' is not an integer");
284         return PyErr_Format(PyExc_TypeError, "'status' is not an integer");
285     }
286 
287     calc_size_ctx.fields_size = 0;
288     calc_size_ctx.fields_count = 0;
289 
290     headers = PyDict_GetItem(dict, nxt_py_headers_str);
291     if (headers != NULL) {
292         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
293                                        &calc_size_ctx);
294         if (nxt_slow_path(res == NULL)) {
295             return NULL;
296         }
297 
298         Py_DECREF(res);
299     }
300 
301     rc = nxt_unit_response_init(http->req, PyLong_AsLong(status),
302                                 calc_size_ctx.fields_count,
303                                 calc_size_ctx.fields_size);
304     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
305         return PyErr_Format(PyExc_RuntimeError,
306                             "failed to allocate response object");
307     }
308 
309     add_field_ctx.req = http->req;
310     add_field_ctx.content_length = -1;
311 
312     if (headers != NULL) {
313         res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
314                                        &add_field_ctx);
315         if (nxt_slow_path(res == NULL)) {
316             return NULL;
317         }
318 
319         Py_DECREF(res);
320     }
321 
322     http->content_length = add_field_ctx.content_length;
323 
324     Py_INCREF(http);
325     return (PyObject *) http;
326 }
327 
328 
329 static PyObject *
330 nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
331 {
332     int         rc;
333     char        *body_str;
334     ssize_t     sent;
335     PyObject    *body, *more_body, *future;
336     Py_ssize_t  body_len, body_off;
337 
338     body = PyDict_GetItem(dict, nxt_py_body_str);
339     if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) {
340         return PyErr_Format(PyExc_TypeError, "'body' is not a byte string");
341     }
342 
343     more_body = PyDict_GetItem(dict, nxt_py_more_body_str);
344     if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) {
345         return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool");
346     }
347 
348     if (nxt_slow_path(http->complete)) {
349         return PyErr_Format(PyExc_RuntimeError,
350                             "Unexpected ASGI message 'http.response.body' "
351                             "sent, after response already completed");
352     }
353 
354     if (nxt_slow_path(http->send_future != NULL)) {
355         return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
356     }
357 
358     if (body != NULL) {
359         body_str = PyBytes_AS_STRING(body);
360         body_len = PyBytes_GET_SIZE(body);
361 
362         nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d",
363                            (int) body_len, (more_body == Py_True) );
364 
365         if (nxt_slow_path(http->bytes_sent + body_len
366                               > http->content_length))
367         {
368             return PyErr_Format(PyExc_RuntimeError,
369                                 "Response content longer than Content-Length");
370         }
371 
372         body_off = 0;
373 
374         while (body_len > 0) {
375             sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
376             if (nxt_slow_path(sent < 0)) {
377                 return PyErr_Format(PyExc_RuntimeError, "failed to send body");
378             }
379 
380             if (nxt_slow_path(sent == 0)) {
381                 nxt_unit_req_debug(http->req, "asgi_http_response_body: "
382                                    "out of shared memory, %d",
383                                    (int) body_len);
384 
385                 future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
386                 if (nxt_slow_path(future == NULL)) {
387                     nxt_unit_req_alert(http->req,
388                                        "Python failed to create Future object");
389                     nxt_python_print_exception();
390 
391                     return PyErr_Format(PyExc_RuntimeError,
392                                         "failed to create Future object");
393                 }
394 
395                 http->send_body = body;
396                 Py_INCREF(http->send_body);
397                 http->send_body_off = body_off;
398 
399                 nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link);
400 
401                 http->send_future = future;
402                 Py_INCREF(http->send_future);
403 
404                 return future;
405             }
406 
407             body_str += sent;
408             body_len -= sent;
409             body_off += sent;
410             http->bytes_sent += sent;
411         }
412 
413     } else {
414         nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d",
415                            (more_body == Py_True) );
416 
417         if (!nxt_unit_response_is_sent(http->req)) {
418             rc = nxt_unit_response_send(http->req);
419             if (nxt_slow_path(rc != NXT_UNIT_OK)) {
420                 return PyErr_Format(PyExc_RuntimeError,
421                                     "failed to send response");
422             }
423         }
424     }
425 
426     if (more_body == NULL || more_body == Py_False) {
427         http->complete = 1;
428     }
429 
430     Py_INCREF(http);
431     return (PyObject *) http;
432 }
433 
434 
435 void
436 nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req)
437 {
438     PyObject            *msg, *future, *res;
439     nxt_py_asgi_http_t  *http;
440 
441     http = req->data;
442 
443     nxt_unit_req_debug(req, "asgi_http_data_handler");
444 
445     if (http->receive_future == NULL) {
446         return;
447     }
448 
449     msg = nxt_py_asgi_http_read_msg(http);
450     if (nxt_slow_path(msg == NULL)) {
451         return;
452     }
453 
454     if (msg == Py_None) {
455         Py_DECREF(msg);
456         return;
457     }
458 
459     future = http->receive_future;
460     http->receive_future = NULL;
461 
462     res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
463     if (nxt_slow_path(res == NULL)) {
464         nxt_unit_req_alert(req, "'set_result' call failed");
465         nxt_python_print_exception();
466     }
467 
468     Py_XDECREF(res);
469     Py_DECREF(future);
470 
471     Py_DECREF(msg);
472 }
473 
474 
475 int
476 nxt_py_asgi_http_drain(nxt_queue_link_t *lnk)
477 {
478     char                *body_str;
479     ssize_t             sent;
480     PyObject            *future, *exc, *res;
481     Py_ssize_t          body_len;
482     nxt_py_asgi_http_t  *http;
483 
484     http = nxt_container_of(lnk, nxt_py_asgi_http_t, link);
485 
486     body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off;
487     body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off;
488 
489     nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len);
490 
491     while (body_len > 0) {
492         sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
493         if (nxt_slow_path(sent < 0)) {
494             goto fail;
495         }
496 
497         if (nxt_slow_path(sent == 0)) {
498             return NXT_UNIT_AGAIN;
499         }
500 
501         body_str += sent;
502         body_len -= sent;
503 
504         http->send_body_off += sent;
505         http->bytes_sent += sent;
506     }
507 
508     Py_CLEAR(http->send_body);
509 
510     future = http->send_future;
511     http->send_future = NULL;
512 
513     res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None,
514                                      NULL);
515     if (nxt_slow_path(res == NULL)) {
516         nxt_unit_req_alert(http->req, "'set_result' call failed");
517         nxt_python_print_exception();
518     }
519 
520     Py_XDECREF(res);
521     Py_DECREF(future);
522 
523     return NXT_UNIT_OK;
524 
525 fail:
526 
527     exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
528                                        nxt_py_failed_to_send_body_str,
529                                        NULL);
530     if (nxt_slow_path(exc == NULL)) {
531         nxt_unit_req_alert(http->req, "RuntimeError create failed");
532         nxt_python_print_exception();
533 
534         exc = Py_None;
535         Py_INCREF(exc);
536     }
537 
538     future = http->send_future;
539     http->send_future = NULL;
540 
541     res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
542                                      NULL);
543     if (nxt_slow_path(res == NULL)) {
544         nxt_unit_req_alert(http->req, "'set_exception' call failed");
545         nxt_python_print_exception();
546     }
547 
548     Py_XDECREF(res);
549     Py_DECREF(future);
550     Py_DECREF(exc);
551 
552     return NXT_UNIT_ERROR;
553 }
554 
555 
556 static PyObject *
557 nxt_py_asgi_http_done(PyObject *self, PyObject *future)
558 {
559     int                 rc;
560     PyObject            *res;
561     nxt_py_asgi_http_t  *http;
562 
563     http = (nxt_py_asgi_http_t *) self;
564 
565     nxt_unit_req_debug(http->req, "asgi_http_done");
566 
567     /*
568      * Get Future.result() and it raises an exception, if coroutine exited
569      * with exception.
570      */
571     res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
572     if (nxt_slow_path(res == NULL)) {
573         nxt_unit_req_error(http->req,
574                            "Python failed to call 'future.result()'");
575         nxt_python_print_exception();
576 
577         rc = NXT_UNIT_ERROR;
578 
579     } else {
580         Py_DECREF(res);
581 
582         rc = NXT_UNIT_OK;
583     }
584 
585     nxt_unit_request_done(http->req, rc);
586 
587     Py_RETURN_NONE;
588 }
589 
590 
591 #endif /* NXT_HAVE_ASGI */
592