nxt_python_asgi_http.c (1917:a7e80c97def4) nxt_python_asgi_http.c (1980:43553aa72111)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6
7#include <python/nxt_python.h>
8
9#if (NXT_HAVE_ASGI)
10
11#include <nxt_main.h>
12#include <nxt_unit.h>
13#include <nxt_unit_request.h>
14#include <python/nxt_python_asgi.h>
15#include <python/nxt_python_asgi_str.h>
16
17
18typedef struct {
19 PyObject_HEAD
20 nxt_unit_request_info_t *req;
21 nxt_queue_link_t link;
22 PyObject *receive_future;
23 PyObject *send_future;
24 uint64_t content_length;
25 uint64_t bytes_sent;
26 PyObject *send_body;
27 Py_ssize_t send_body_off;
28 uint8_t complete;
29 uint8_t closed;
30 uint8_t empty_body_received;
31} nxt_py_asgi_http_t;
32
33
34static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none);
35static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http);
36static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict);
37static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http,
38 PyObject *dict);
39static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http,
40 PyObject *dict);
41static void nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http);
42static void nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http,
43 PyObject *future, PyObject *msg);
44static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future);
45
46
47static PyMethodDef nxt_py_asgi_http_methods[] = {
48 { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 },
49 { "send", nxt_py_asgi_http_send, METH_O, 0 },
50 { "_done", nxt_py_asgi_http_done, METH_O, 0 },
51 { NULL, NULL, 0, 0 }
52};
53
54static PyAsyncMethods nxt_py_asgi_async_methods = {
55 .am_await = nxt_py_asgi_await,
56};
57
58static PyTypeObject nxt_py_asgi_http_type = {
59 PyVarObject_HEAD_INIT(NULL, 0)
60
61 .tp_name = "unit._asgi_http",
62 .tp_basicsize = sizeof(nxt_py_asgi_http_t),
63 .tp_dealloc = nxt_py_asgi_dealloc,
64 .tp_as_async = &nxt_py_asgi_async_methods,
65 .tp_flags = Py_TPFLAGS_DEFAULT,
66 .tp_doc = "unit ASGI HTTP request object",
67 .tp_iter = nxt_py_asgi_iter,
68 .tp_iternext = nxt_py_asgi_next,
69 .tp_methods = nxt_py_asgi_http_methods,
70};
71
72static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
73
74
75int
76nxt_py_asgi_http_init(void)
77{
78 if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
79 nxt_unit_alert(NULL,
80 "Python failed to initialize the 'http' type object");
81 return NXT_UNIT_ERROR;
82 }
83
84 return NXT_UNIT_OK;
85}
86
87
88PyObject *
89nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
90{
91 nxt_py_asgi_http_t *http;
92
93 http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type);
94
95 if (nxt_fast_path(http != NULL)) {
96 http->req = req;
97 http->receive_future = NULL;
98 http->send_future = NULL;
99 http->content_length = -1;
100 http->bytes_sent = 0;
101 http->send_body = NULL;
102 http->send_body_off = 0;
103 http->complete = 0;
104 http->closed = 0;
105 http->empty_body_received = 0;
106 }
107
108 return (PyObject *) http;
109}
110
111
112static PyObject *
113nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
114{
115 PyObject *msg, *future;
116 nxt_py_asgi_http_t *http;
117 nxt_py_asgi_ctx_data_t *ctx_data;
118 nxt_unit_request_info_t *req;
119
120 http = (nxt_py_asgi_http_t *) self;
121 req = http->req;
122
123 nxt_unit_req_debug(req, "asgi_http_receive");
124
125 if (nxt_slow_path(http->closed || http->complete )) {
126 msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
127
128 } else {
129 msg = nxt_py_asgi_http_read_msg(http);
130 }
131
132 if (nxt_slow_path(msg == NULL)) {
133 return NULL;
134 }
135
136 ctx_data = req->ctx->data;
137
138 future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
139 if (nxt_slow_path(future == NULL)) {
140 nxt_unit_req_alert(req, "Python failed to create Future object");
141 nxt_python_print_exception();
142
143 Py_DECREF(msg);
144
145 return PyErr_Format(PyExc_RuntimeError,
146 "failed to create Future object");
147 }
148
149 if (msg != Py_None) {
150 return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg);
151 }
152
153 http->receive_future = future;
154 Py_INCREF(http->receive_future);
155
156 Py_DECREF(msg);
157
158 return future;
159}
160
161
162static PyObject *
163nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http)
164{
165 char *body_buf;
166 ssize_t read_res;
167 PyObject *msg, *body;
168 Py_ssize_t size;
169 nxt_unit_request_info_t *req;
170
171 req = http->req;
172
173 size = req->content_length;
174
175 if (size > nxt_py_asgi_http_body_buf_size) {
176 size = nxt_py_asgi_http_body_buf_size;
177 }
178
179 if (size == 0) {
180 if (http->empty_body_received) {
181 Py_RETURN_NONE;
182 }
183
184 http->empty_body_received = 1;
185 }
186
187 if (size > 0) {
188 body = PyBytes_FromStringAndSize(NULL, size);
189 if (nxt_slow_path(body == NULL)) {
190 nxt_unit_req_alert(req, "Python failed to create body byte string");
191 nxt_python_print_exception();
192
193 return PyErr_Format(PyExc_RuntimeError,
194 "failed to create Bytes object");
195 }
196
197 body_buf = PyBytes_AS_STRING(body);
198
199 read_res = nxt_unit_request_read(req, body_buf, size);
200
201 } else {
202 body = NULL;
203 read_res = 0;
204 }
205
206 if (read_res > 0 || read_res == size) {
207 msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str);
208 if (nxt_slow_path(msg == NULL)) {
209 Py_XDECREF(body);
210
211 return NULL;
212 }
213
214#define SET_ITEM(dict, key, value) \
215 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \
216 == -1)) \
217 { \
218 nxt_unit_req_alert(req, \
219 "Python failed to set '" #dict "." #key "' item"); \
220 PyErr_SetString(PyExc_RuntimeError, \
221 "Python failed to set '" #dict "." #key "' item"); \
222 goto fail; \
223 }
224
225 if (body != NULL) {
226 SET_ITEM(msg, body, body)
227 }
228
229 if (req->content_length > 0) {
230 SET_ITEM(msg, more_body, Py_True)
231 }
232
233#undef SET_ITEM
234
235 Py_XDECREF(body);
236
237 return msg;
238 }
239
240 Py_XDECREF(body);
241
242 Py_RETURN_NONE;
243
244fail:
245
246 Py_DECREF(msg);
247 Py_XDECREF(body);
248
249 return NULL;
250}
251
252
253static PyObject *
254nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
255{
256 PyObject *type;
257 const char *type_str;
258 Py_ssize_t type_len;
259 nxt_py_asgi_http_t *http;
260
261 static const nxt_str_t response_start = nxt_string("http.response.start");
262 static const nxt_str_t response_body = nxt_string("http.response.body");
263
264 http = (nxt_py_asgi_http_t *) self;
265
266 type = PyDict_GetItem(dict, nxt_py_type_str);
267 if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
268 nxt_unit_req_error(http->req, "asgi_http_send: "
269 "'type' is not a unicode string");
270 return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string");
271 }
272
273 type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
274
275 nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
276 (int) type_len, type_str);
277
278 if (nxt_unit_response_is_init(http->req)) {
279 if (nxt_str_eq(&response_body, type_str, (size_t) type_len)) {
280 return nxt_py_asgi_http_response_body(http, dict);
281 }
282
283 return PyErr_Format(PyExc_RuntimeError,
284 "Expected ASGI message 'http.response.body', "
285 "but got '%U'", type);
286 }
287
288 if (nxt_str_eq(&response_start, type_str, (size_t) type_len)) {
289 return nxt_py_asgi_http_response_start(http, dict);
290 }
291
292 return PyErr_Format(PyExc_RuntimeError,
293 "Expected ASGI message 'http.response.start', "
294 "but got '%U'", type);
295}
296
297
298static PyObject *
299nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
300{
301 int rc;
302 PyObject *status, *headers, *res;
303 nxt_py_asgi_calc_size_ctx_t calc_size_ctx;
304 nxt_py_asgi_add_field_ctx_t add_field_ctx;
305
306 status = PyDict_GetItem(dict, nxt_py_status_str);
307 if (nxt_slow_path(status == NULL || !PyLong_Check(status))) {
308 nxt_unit_req_error(http->req, "asgi_http_response_start: "
309 "'status' is not an integer");
310 return PyErr_Format(PyExc_TypeError, "'status' is not an integer");
311 }
312
313 calc_size_ctx.fields_size = 0;
314 calc_size_ctx.fields_count = 0;
315
316 headers = PyDict_GetItem(dict, nxt_py_headers_str);
317 if (headers != NULL) {
318 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
319 &calc_size_ctx);
320 if (nxt_slow_path(res == NULL)) {
321 return NULL;
322 }
323
324 Py_DECREF(res);
325 }
326
327 rc = nxt_unit_response_init(http->req, PyLong_AsLong(status),
328 calc_size_ctx.fields_count,
329 calc_size_ctx.fields_size);
330 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
331 return PyErr_Format(PyExc_RuntimeError,
332 "failed to allocate response object");
333 }
334
335 add_field_ctx.req = http->req;
336 add_field_ctx.content_length = -1;
337
338 if (headers != NULL) {
339 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
340 &add_field_ctx);
341 if (nxt_slow_path(res == NULL)) {
342 return NULL;
343 }
344
345 Py_DECREF(res);
346 }
347
348 http->content_length = add_field_ctx.content_length;
349
350 Py_INCREF(http);
351 return (PyObject *) http;
352}
353
354
355static PyObject *
356nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
357{
358 int rc;
359 char *body_str;
360 ssize_t sent;
361 PyObject *body, *more_body, *future;
362 Py_ssize_t body_len, body_off;
363 nxt_py_asgi_ctx_data_t *ctx_data;
364
365 body = PyDict_GetItem(dict, nxt_py_body_str);
366 if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) {
367 return PyErr_Format(PyExc_TypeError, "'body' is not a byte string");
368 }
369
370 more_body = PyDict_GetItem(dict, nxt_py_more_body_str);
371 if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) {
372 return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool");
373 }
374
375 if (nxt_slow_path(http->complete)) {
376 return PyErr_Format(PyExc_RuntimeError,
377 "Unexpected ASGI message 'http.response.body' "
378 "sent, after response already completed");
379 }
380
381 if (nxt_slow_path(http->send_future != NULL)) {
382 return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
383 }
384
385 if (body != NULL) {
386 body_str = PyBytes_AS_STRING(body);
387 body_len = PyBytes_GET_SIZE(body);
388
389 nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d",
390 (int) body_len, (more_body == Py_True) );
391
392 if (nxt_slow_path(http->bytes_sent + body_len
393 > http->content_length))
394 {
395 return PyErr_Format(PyExc_RuntimeError,
396 "Response content longer than Content-Length");
397 }
398
399 body_off = 0;
400
401 ctx_data = http->req->ctx->data;
402
403 while (body_len > 0) {
404 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
405 if (nxt_slow_path(sent < 0)) {
406 return PyErr_Format(PyExc_RuntimeError, "failed to send body");
407 }
408
409 if (nxt_slow_path(sent == 0)) {
410 nxt_unit_req_debug(http->req, "asgi_http_response_body: "
411 "out of shared memory, %d",
412 (int) body_len);
413
414 future = PyObject_CallObject(ctx_data->loop_create_future,
415 NULL);
416 if (nxt_slow_path(future == NULL)) {
417 nxt_unit_req_alert(http->req,
418 "Python failed to create Future object");
419 nxt_python_print_exception();
420
421 return PyErr_Format(PyExc_RuntimeError,
422 "failed to create Future object");
423 }
424
425 http->send_body = body;
426 Py_INCREF(http->send_body);
427 http->send_body_off = body_off;
428
429 nxt_py_asgi_drain_wait(http->req, &http->link);
430
431 http->send_future = future;
432 Py_INCREF(http->send_future);
433
434 return future;
435 }
436
437 body_str += sent;
438 body_len -= sent;
439 body_off += sent;
440 http->bytes_sent += sent;
441 }
442
443 } else {
444 nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d",
445 (more_body == Py_True) );
446
447 if (!nxt_unit_response_is_sent(http->req)) {
448 rc = nxt_unit_response_send(http->req);
449 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
450 return PyErr_Format(PyExc_RuntimeError,
451 "failed to send response");
452 }
453 }
454 }
455
456 if (more_body == NULL || more_body == Py_False) {
457 http->complete = 1;
458
459 nxt_py_asgi_http_emit_disconnect(http);
460 }
461
462 Py_INCREF(http);
463 return (PyObject *) http;
464}
465
466
467static void
468nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http)
469{
470 PyObject *msg, *future;
471
472 if (http->receive_future == NULL) {
473 return;
474 }
475
476 msg = nxt_py_asgi_new_msg(http->req, nxt_py_http_disconnect_str);
477 if (nxt_slow_path(msg == NULL)) {
478 return;
479 }
480
481 if (msg == Py_None) {
482 Py_DECREF(msg);
483 return;
484 }
485
486 future = http->receive_future;
487 http->receive_future = NULL;
488
489 nxt_py_asgi_http_set_result(http, future, msg);
490
491 Py_DECREF(msg);
492}
493
494
495static void
496nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http, PyObject *future,
497 PyObject *msg)
498{
499 PyObject *res;
500
501 res = PyObject_CallMethodObjArgs(future, nxt_py_done_str, NULL);
502 if (nxt_slow_path(res == NULL)) {
503 nxt_unit_req_alert(http->req, "'done' call failed");
504 nxt_python_print_exception();
505 }
506
507 if (nxt_fast_path(res == Py_False)) {
508 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg,
509 NULL);
510 if (nxt_slow_path(res == NULL)) {
511 nxt_unit_req_alert(http->req, "'set_result' call failed");
512 nxt_python_print_exception();
513 }
514
515 } else {
516 res = NULL;
517 }
518
519 Py_XDECREF(res);
520 Py_DECREF(future);
521}
522
523
524void
525nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req)
526{
527 PyObject *msg, *future;
528 nxt_py_asgi_http_t *http;
529
530 http = req->data;
531
532 nxt_unit_req_debug(req, "asgi_http_data_handler");
533
534 if (http->receive_future == NULL) {
535 return;
536 }
537
538 msg = nxt_py_asgi_http_read_msg(http);
539 if (nxt_slow_path(msg == NULL)) {
540 return;
541 }
542
543 if (msg == Py_None) {
544 Py_DECREF(msg);
545 return;
546 }
547
548 future = http->receive_future;
549 http->receive_future = NULL;
550
551 nxt_py_asgi_http_set_result(http, future, msg);
552
553 Py_DECREF(msg);
554}
555
556
557int
558nxt_py_asgi_http_drain(nxt_queue_link_t *lnk)
559{
560 char *body_str;
561 ssize_t sent;
562 PyObject *future, *exc, *res;
563 Py_ssize_t body_len;
564 nxt_py_asgi_http_t *http;
565
566 http = nxt_container_of(lnk, nxt_py_asgi_http_t, link);
567
568 body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off;
569 body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off;
570
571 nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len);
572
573 while (body_len > 0) {
574 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
575 if (nxt_slow_path(sent < 0)) {
576 goto fail;
577 }
578
579 if (nxt_slow_path(sent == 0)) {
580 return NXT_UNIT_AGAIN;
581 }
582
583 body_str += sent;
584 body_len -= sent;
585
586 http->send_body_off += sent;
587 http->bytes_sent += sent;
588 }
589
590 Py_CLEAR(http->send_body);
591
592 future = http->send_future;
593 http->send_future = NULL;
594
595 nxt_py_asgi_http_set_result(http, future, Py_None);
596
597 return NXT_UNIT_OK;
598
599fail:
600
601 exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
602 nxt_py_failed_to_send_body_str,
603 NULL);
604 if (nxt_slow_path(exc == NULL)) {
605 nxt_unit_req_alert(http->req, "RuntimeError create failed");
606 nxt_python_print_exception();
607
608 exc = Py_None;
609 Py_INCREF(exc);
610 }
611
612 future = http->send_future;
613 http->send_future = NULL;
614
615 res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
616 NULL);
617 if (nxt_slow_path(res == NULL)) {
618 nxt_unit_req_alert(http->req, "'set_exception' call failed");
619 nxt_python_print_exception();
620 }
621
622 Py_XDECREF(res);
623 Py_DECREF(future);
624 Py_DECREF(exc);
625
626 return NXT_UNIT_ERROR;
627}
628
629
630void
631nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
632{
633 nxt_py_asgi_http_t *http;
634
635 http = req->data;
636
637 nxt_unit_req_debug(req, "asgi_http_close_handler");
638
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6
7#include <python/nxt_python.h>
8
9#if (NXT_HAVE_ASGI)
10
11#include <nxt_main.h>
12#include <nxt_unit.h>
13#include <nxt_unit_request.h>
14#include <python/nxt_python_asgi.h>
15#include <python/nxt_python_asgi_str.h>
16
17
18typedef struct {
19 PyObject_HEAD
20 nxt_unit_request_info_t *req;
21 nxt_queue_link_t link;
22 PyObject *receive_future;
23 PyObject *send_future;
24 uint64_t content_length;
25 uint64_t bytes_sent;
26 PyObject *send_body;
27 Py_ssize_t send_body_off;
28 uint8_t complete;
29 uint8_t closed;
30 uint8_t empty_body_received;
31} nxt_py_asgi_http_t;
32
33
34static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none);
35static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http);
36static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict);
37static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http,
38 PyObject *dict);
39static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http,
40 PyObject *dict);
41static void nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http);
42static void nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http,
43 PyObject *future, PyObject *msg);
44static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future);
45
46
47static PyMethodDef nxt_py_asgi_http_methods[] = {
48 { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 },
49 { "send", nxt_py_asgi_http_send, METH_O, 0 },
50 { "_done", nxt_py_asgi_http_done, METH_O, 0 },
51 { NULL, NULL, 0, 0 }
52};
53
54static PyAsyncMethods nxt_py_asgi_async_methods = {
55 .am_await = nxt_py_asgi_await,
56};
57
58static PyTypeObject nxt_py_asgi_http_type = {
59 PyVarObject_HEAD_INIT(NULL, 0)
60
61 .tp_name = "unit._asgi_http",
62 .tp_basicsize = sizeof(nxt_py_asgi_http_t),
63 .tp_dealloc = nxt_py_asgi_dealloc,
64 .tp_as_async = &nxt_py_asgi_async_methods,
65 .tp_flags = Py_TPFLAGS_DEFAULT,
66 .tp_doc = "unit ASGI HTTP request object",
67 .tp_iter = nxt_py_asgi_iter,
68 .tp_iternext = nxt_py_asgi_next,
69 .tp_methods = nxt_py_asgi_http_methods,
70};
71
72static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
73
74
75int
76nxt_py_asgi_http_init(void)
77{
78 if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
79 nxt_unit_alert(NULL,
80 "Python failed to initialize the 'http' type object");
81 return NXT_UNIT_ERROR;
82 }
83
84 return NXT_UNIT_OK;
85}
86
87
88PyObject *
89nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
90{
91 nxt_py_asgi_http_t *http;
92
93 http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type);
94
95 if (nxt_fast_path(http != NULL)) {
96 http->req = req;
97 http->receive_future = NULL;
98 http->send_future = NULL;
99 http->content_length = -1;
100 http->bytes_sent = 0;
101 http->send_body = NULL;
102 http->send_body_off = 0;
103 http->complete = 0;
104 http->closed = 0;
105 http->empty_body_received = 0;
106 }
107
108 return (PyObject *) http;
109}
110
111
112static PyObject *
113nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
114{
115 PyObject *msg, *future;
116 nxt_py_asgi_http_t *http;
117 nxt_py_asgi_ctx_data_t *ctx_data;
118 nxt_unit_request_info_t *req;
119
120 http = (nxt_py_asgi_http_t *) self;
121 req = http->req;
122
123 nxt_unit_req_debug(req, "asgi_http_receive");
124
125 if (nxt_slow_path(http->closed || http->complete )) {
126 msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
127
128 } else {
129 msg = nxt_py_asgi_http_read_msg(http);
130 }
131
132 if (nxt_slow_path(msg == NULL)) {
133 return NULL;
134 }
135
136 ctx_data = req->ctx->data;
137
138 future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
139 if (nxt_slow_path(future == NULL)) {
140 nxt_unit_req_alert(req, "Python failed to create Future object");
141 nxt_python_print_exception();
142
143 Py_DECREF(msg);
144
145 return PyErr_Format(PyExc_RuntimeError,
146 "failed to create Future object");
147 }
148
149 if (msg != Py_None) {
150 return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg);
151 }
152
153 http->receive_future = future;
154 Py_INCREF(http->receive_future);
155
156 Py_DECREF(msg);
157
158 return future;
159}
160
161
162static PyObject *
163nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http)
164{
165 char *body_buf;
166 ssize_t read_res;
167 PyObject *msg, *body;
168 Py_ssize_t size;
169 nxt_unit_request_info_t *req;
170
171 req = http->req;
172
173 size = req->content_length;
174
175 if (size > nxt_py_asgi_http_body_buf_size) {
176 size = nxt_py_asgi_http_body_buf_size;
177 }
178
179 if (size == 0) {
180 if (http->empty_body_received) {
181 Py_RETURN_NONE;
182 }
183
184 http->empty_body_received = 1;
185 }
186
187 if (size > 0) {
188 body = PyBytes_FromStringAndSize(NULL, size);
189 if (nxt_slow_path(body == NULL)) {
190 nxt_unit_req_alert(req, "Python failed to create body byte string");
191 nxt_python_print_exception();
192
193 return PyErr_Format(PyExc_RuntimeError,
194 "failed to create Bytes object");
195 }
196
197 body_buf = PyBytes_AS_STRING(body);
198
199 read_res = nxt_unit_request_read(req, body_buf, size);
200
201 } else {
202 body = NULL;
203 read_res = 0;
204 }
205
206 if (read_res > 0 || read_res == size) {
207 msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str);
208 if (nxt_slow_path(msg == NULL)) {
209 Py_XDECREF(body);
210
211 return NULL;
212 }
213
214#define SET_ITEM(dict, key, value) \
215 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \
216 == -1)) \
217 { \
218 nxt_unit_req_alert(req, \
219 "Python failed to set '" #dict "." #key "' item"); \
220 PyErr_SetString(PyExc_RuntimeError, \
221 "Python failed to set '" #dict "." #key "' item"); \
222 goto fail; \
223 }
224
225 if (body != NULL) {
226 SET_ITEM(msg, body, body)
227 }
228
229 if (req->content_length > 0) {
230 SET_ITEM(msg, more_body, Py_True)
231 }
232
233#undef SET_ITEM
234
235 Py_XDECREF(body);
236
237 return msg;
238 }
239
240 Py_XDECREF(body);
241
242 Py_RETURN_NONE;
243
244fail:
245
246 Py_DECREF(msg);
247 Py_XDECREF(body);
248
249 return NULL;
250}
251
252
253static PyObject *
254nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
255{
256 PyObject *type;
257 const char *type_str;
258 Py_ssize_t type_len;
259 nxt_py_asgi_http_t *http;
260
261 static const nxt_str_t response_start = nxt_string("http.response.start");
262 static const nxt_str_t response_body = nxt_string("http.response.body");
263
264 http = (nxt_py_asgi_http_t *) self;
265
266 type = PyDict_GetItem(dict, nxt_py_type_str);
267 if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
268 nxt_unit_req_error(http->req, "asgi_http_send: "
269 "'type' is not a unicode string");
270 return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string");
271 }
272
273 type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
274
275 nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
276 (int) type_len, type_str);
277
278 if (nxt_unit_response_is_init(http->req)) {
279 if (nxt_str_eq(&response_body, type_str, (size_t) type_len)) {
280 return nxt_py_asgi_http_response_body(http, dict);
281 }
282
283 return PyErr_Format(PyExc_RuntimeError,
284 "Expected ASGI message 'http.response.body', "
285 "but got '%U'", type);
286 }
287
288 if (nxt_str_eq(&response_start, type_str, (size_t) type_len)) {
289 return nxt_py_asgi_http_response_start(http, dict);
290 }
291
292 return PyErr_Format(PyExc_RuntimeError,
293 "Expected ASGI message 'http.response.start', "
294 "but got '%U'", type);
295}
296
297
298static PyObject *
299nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
300{
301 int rc;
302 PyObject *status, *headers, *res;
303 nxt_py_asgi_calc_size_ctx_t calc_size_ctx;
304 nxt_py_asgi_add_field_ctx_t add_field_ctx;
305
306 status = PyDict_GetItem(dict, nxt_py_status_str);
307 if (nxt_slow_path(status == NULL || !PyLong_Check(status))) {
308 nxt_unit_req_error(http->req, "asgi_http_response_start: "
309 "'status' is not an integer");
310 return PyErr_Format(PyExc_TypeError, "'status' is not an integer");
311 }
312
313 calc_size_ctx.fields_size = 0;
314 calc_size_ctx.fields_count = 0;
315
316 headers = PyDict_GetItem(dict, nxt_py_headers_str);
317 if (headers != NULL) {
318 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
319 &calc_size_ctx);
320 if (nxt_slow_path(res == NULL)) {
321 return NULL;
322 }
323
324 Py_DECREF(res);
325 }
326
327 rc = nxt_unit_response_init(http->req, PyLong_AsLong(status),
328 calc_size_ctx.fields_count,
329 calc_size_ctx.fields_size);
330 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
331 return PyErr_Format(PyExc_RuntimeError,
332 "failed to allocate response object");
333 }
334
335 add_field_ctx.req = http->req;
336 add_field_ctx.content_length = -1;
337
338 if (headers != NULL) {
339 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
340 &add_field_ctx);
341 if (nxt_slow_path(res == NULL)) {
342 return NULL;
343 }
344
345 Py_DECREF(res);
346 }
347
348 http->content_length = add_field_ctx.content_length;
349
350 Py_INCREF(http);
351 return (PyObject *) http;
352}
353
354
355static PyObject *
356nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
357{
358 int rc;
359 char *body_str;
360 ssize_t sent;
361 PyObject *body, *more_body, *future;
362 Py_ssize_t body_len, body_off;
363 nxt_py_asgi_ctx_data_t *ctx_data;
364
365 body = PyDict_GetItem(dict, nxt_py_body_str);
366 if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) {
367 return PyErr_Format(PyExc_TypeError, "'body' is not a byte string");
368 }
369
370 more_body = PyDict_GetItem(dict, nxt_py_more_body_str);
371 if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) {
372 return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool");
373 }
374
375 if (nxt_slow_path(http->complete)) {
376 return PyErr_Format(PyExc_RuntimeError,
377 "Unexpected ASGI message 'http.response.body' "
378 "sent, after response already completed");
379 }
380
381 if (nxt_slow_path(http->send_future != NULL)) {
382 return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
383 }
384
385 if (body != NULL) {
386 body_str = PyBytes_AS_STRING(body);
387 body_len = PyBytes_GET_SIZE(body);
388
389 nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d",
390 (int) body_len, (more_body == Py_True) );
391
392 if (nxt_slow_path(http->bytes_sent + body_len
393 > http->content_length))
394 {
395 return PyErr_Format(PyExc_RuntimeError,
396 "Response content longer than Content-Length");
397 }
398
399 body_off = 0;
400
401 ctx_data = http->req->ctx->data;
402
403 while (body_len > 0) {
404 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
405 if (nxt_slow_path(sent < 0)) {
406 return PyErr_Format(PyExc_RuntimeError, "failed to send body");
407 }
408
409 if (nxt_slow_path(sent == 0)) {
410 nxt_unit_req_debug(http->req, "asgi_http_response_body: "
411 "out of shared memory, %d",
412 (int) body_len);
413
414 future = PyObject_CallObject(ctx_data->loop_create_future,
415 NULL);
416 if (nxt_slow_path(future == NULL)) {
417 nxt_unit_req_alert(http->req,
418 "Python failed to create Future object");
419 nxt_python_print_exception();
420
421 return PyErr_Format(PyExc_RuntimeError,
422 "failed to create Future object");
423 }
424
425 http->send_body = body;
426 Py_INCREF(http->send_body);
427 http->send_body_off = body_off;
428
429 nxt_py_asgi_drain_wait(http->req, &http->link);
430
431 http->send_future = future;
432 Py_INCREF(http->send_future);
433
434 return future;
435 }
436
437 body_str += sent;
438 body_len -= sent;
439 body_off += sent;
440 http->bytes_sent += sent;
441 }
442
443 } else {
444 nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d",
445 (more_body == Py_True) );
446
447 if (!nxt_unit_response_is_sent(http->req)) {
448 rc = nxt_unit_response_send(http->req);
449 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
450 return PyErr_Format(PyExc_RuntimeError,
451 "failed to send response");
452 }
453 }
454 }
455
456 if (more_body == NULL || more_body == Py_False) {
457 http->complete = 1;
458
459 nxt_py_asgi_http_emit_disconnect(http);
460 }
461
462 Py_INCREF(http);
463 return (PyObject *) http;
464}
465
466
467static void
468nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http)
469{
470 PyObject *msg, *future;
471
472 if (http->receive_future == NULL) {
473 return;
474 }
475
476 msg = nxt_py_asgi_new_msg(http->req, nxt_py_http_disconnect_str);
477 if (nxt_slow_path(msg == NULL)) {
478 return;
479 }
480
481 if (msg == Py_None) {
482 Py_DECREF(msg);
483 return;
484 }
485
486 future = http->receive_future;
487 http->receive_future = NULL;
488
489 nxt_py_asgi_http_set_result(http, future, msg);
490
491 Py_DECREF(msg);
492}
493
494
495static void
496nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http, PyObject *future,
497 PyObject *msg)
498{
499 PyObject *res;
500
501 res = PyObject_CallMethodObjArgs(future, nxt_py_done_str, NULL);
502 if (nxt_slow_path(res == NULL)) {
503 nxt_unit_req_alert(http->req, "'done' call failed");
504 nxt_python_print_exception();
505 }
506
507 if (nxt_fast_path(res == Py_False)) {
508 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg,
509 NULL);
510 if (nxt_slow_path(res == NULL)) {
511 nxt_unit_req_alert(http->req, "'set_result' call failed");
512 nxt_python_print_exception();
513 }
514
515 } else {
516 res = NULL;
517 }
518
519 Py_XDECREF(res);
520 Py_DECREF(future);
521}
522
523
524void
525nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req)
526{
527 PyObject *msg, *future;
528 nxt_py_asgi_http_t *http;
529
530 http = req->data;
531
532 nxt_unit_req_debug(req, "asgi_http_data_handler");
533
534 if (http->receive_future == NULL) {
535 return;
536 }
537
538 msg = nxt_py_asgi_http_read_msg(http);
539 if (nxt_slow_path(msg == NULL)) {
540 return;
541 }
542
543 if (msg == Py_None) {
544 Py_DECREF(msg);
545 return;
546 }
547
548 future = http->receive_future;
549 http->receive_future = NULL;
550
551 nxt_py_asgi_http_set_result(http, future, msg);
552
553 Py_DECREF(msg);
554}
555
556
557int
558nxt_py_asgi_http_drain(nxt_queue_link_t *lnk)
559{
560 char *body_str;
561 ssize_t sent;
562 PyObject *future, *exc, *res;
563 Py_ssize_t body_len;
564 nxt_py_asgi_http_t *http;
565
566 http = nxt_container_of(lnk, nxt_py_asgi_http_t, link);
567
568 body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off;
569 body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off;
570
571 nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len);
572
573 while (body_len > 0) {
574 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
575 if (nxt_slow_path(sent < 0)) {
576 goto fail;
577 }
578
579 if (nxt_slow_path(sent == 0)) {
580 return NXT_UNIT_AGAIN;
581 }
582
583 body_str += sent;
584 body_len -= sent;
585
586 http->send_body_off += sent;
587 http->bytes_sent += sent;
588 }
589
590 Py_CLEAR(http->send_body);
591
592 future = http->send_future;
593 http->send_future = NULL;
594
595 nxt_py_asgi_http_set_result(http, future, Py_None);
596
597 return NXT_UNIT_OK;
598
599fail:
600
601 exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
602 nxt_py_failed_to_send_body_str,
603 NULL);
604 if (nxt_slow_path(exc == NULL)) {
605 nxt_unit_req_alert(http->req, "RuntimeError create failed");
606 nxt_python_print_exception();
607
608 exc = Py_None;
609 Py_INCREF(exc);
610 }
611
612 future = http->send_future;
613 http->send_future = NULL;
614
615 res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
616 NULL);
617 if (nxt_slow_path(res == NULL)) {
618 nxt_unit_req_alert(http->req, "'set_exception' call failed");
619 nxt_python_print_exception();
620 }
621
622 Py_XDECREF(res);
623 Py_DECREF(future);
624 Py_DECREF(exc);
625
626 return NXT_UNIT_ERROR;
627}
628
629
630void
631nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
632{
633 nxt_py_asgi_http_t *http;
634
635 http = req->data;
636
637 nxt_unit_req_debug(req, "asgi_http_close_handler");
638
639 http->closed = 1;
639 if (nxt_fast_path(http != NULL)) {
640 http->closed = 1;
640
641
641 nxt_py_asgi_http_emit_disconnect(http);
642 nxt_py_asgi_http_emit_disconnect(http);
643 }
642}
643
644
645static PyObject *
646nxt_py_asgi_http_done(PyObject *self, PyObject *future)
647{
648 int rc;
649 PyObject *res;
650 nxt_py_asgi_http_t *http;
651
652 http = (nxt_py_asgi_http_t *) self;
653
654 nxt_unit_req_debug(http->req, "asgi_http_done");
655
656 /*
657 * Get Future.result() and it raises an exception, if coroutine exited
658 * with exception.
659 */
660 res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
661 if (nxt_slow_path(res == NULL)) {
662 nxt_unit_req_error(http->req,
663 "Python failed to call 'future.result()'");
664 nxt_python_print_exception();
665
666 rc = NXT_UNIT_ERROR;
667
668 } else {
669 Py_DECREF(res);
670
671 rc = NXT_UNIT_OK;
672 }
673
674 nxt_unit_request_done(http->req, rc);
675
676 Py_RETURN_NONE;
677}
678
679
680#endif /* NXT_HAVE_ASGI */
644}
645
646
647static PyObject *
648nxt_py_asgi_http_done(PyObject *self, PyObject *future)
649{
650 int rc;
651 PyObject *res;
652 nxt_py_asgi_http_t *http;
653
654 http = (nxt_py_asgi_http_t *) self;
655
656 nxt_unit_req_debug(http->req, "asgi_http_done");
657
658 /*
659 * Get Future.result() and it raises an exception, if coroutine exited
660 * with exception.
661 */
662 res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
663 if (nxt_slow_path(res == NULL)) {
664 nxt_unit_req_error(http->req,
665 "Python failed to call 'future.result()'");
666 nxt_python_print_exception();
667
668 rc = NXT_UNIT_ERROR;
669
670 } else {
671 Py_DECREF(res);
672
673 rc = NXT_UNIT_OK;
674 }
675
676 nxt_unit_request_done(http->req, rc);
677
678 Py_RETURN_NONE;
679}
680
681
682#endif /* NXT_HAVE_ASGI */