1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6
7 #include <python/nxt_python.h>
8
9 #if (NXT_HAVE_ASGI)
10
11 #include <nxt_main.h>
12 #include <nxt_unit.h>
13 #include <nxt_unit_request.h>
14 #include <python/nxt_python_asgi.h>
15 #include <python/nxt_python_asgi_str.h>
16
17
18 typedef struct {
19 PyObject_HEAD
20 nxt_unit_request_info_t *req;
21 nxt_queue_link_t link;
22 PyObject *receive_future;
23 PyObject *send_future;
24 uint64_t content_length;
25 uint64_t bytes_sent;
26 PyObject *send_body;
27 Py_ssize_t send_body_off;
28 uint8_t complete;
29 uint8_t closed;
30 uint8_t empty_body_received;
31 } nxt_py_asgi_http_t;
32
33
34 static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none);
35 static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http);
36 static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict);
37 static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http,
38 PyObject *dict);
39 static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http,
40 PyObject *dict);
41 static void nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http);
42 static void nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http,
43 PyObject *future, PyObject *msg);
44 static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future);
45
46
47 static PyMethodDef nxt_py_asgi_http_methods[] = {
48 { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 },
49 { "send", nxt_py_asgi_http_send, METH_O, 0 },
50 { "_done", nxt_py_asgi_http_done, METH_O, 0 },
51 { NULL, NULL, 0, 0 }
52 };
53
54 static PyAsyncMethods nxt_py_asgi_async_methods = {
55 .am_await = nxt_py_asgi_await,
56 };
57
58 static PyTypeObject nxt_py_asgi_http_type = {
59 PyVarObject_HEAD_INIT(NULL, 0)
60
61 .tp_name = "unit._asgi_http",
62 .tp_basicsize = sizeof(nxt_py_asgi_http_t),
63 .tp_dealloc = nxt_py_asgi_dealloc,
64 .tp_as_async = &nxt_py_asgi_async_methods,
65 .tp_flags = Py_TPFLAGS_DEFAULT,
66 .tp_doc = "unit ASGI HTTP request object",
67 .tp_iter = nxt_py_asgi_iter,
68 .tp_iternext = nxt_py_asgi_next,
69 .tp_methods = nxt_py_asgi_http_methods,
70 };
71
72 static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
73
74
75 int
nxt_py_asgi_http_init(void)76 nxt_py_asgi_http_init(void)
77 {
78 if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
79 nxt_unit_alert(NULL,
80 "Python failed to initialize the 'http' type object");
81 return NXT_UNIT_ERROR;
82 }
83
84 return NXT_UNIT_OK;
85 }
86
87
88 PyObject *
nxt_py_asgi_http_create(nxt_unit_request_info_t * req)89 nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
90 {
91 nxt_py_asgi_http_t *http;
92
93 http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type);
94
95 if (nxt_fast_path(http != NULL)) {
96 http->req = req;
97 http->receive_future = NULL;
98 http->send_future = NULL;
99 http->content_length = -1;
100 http->bytes_sent = 0;
101 http->send_body = NULL;
102 http->send_body_off = 0;
103 http->complete = 0;
104 http->closed = 0;
105 http->empty_body_received = 0;
106 }
107
108 return (PyObject *) http;
109 }
110
111
112 static PyObject *
nxt_py_asgi_http_receive(PyObject * self,PyObject * none)113 nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
114 {
115 PyObject *msg, *future;
116 nxt_py_asgi_http_t *http;
117 nxt_py_asgi_ctx_data_t *ctx_data;
118 nxt_unit_request_info_t *req;
119
120 http = (nxt_py_asgi_http_t *) self;
121 req = http->req;
122
123 nxt_unit_req_debug(req, "asgi_http_receive");
124
125 if (nxt_slow_path(http->closed || http->complete )) {
126 msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
127
128 } else {
129 msg = nxt_py_asgi_http_read_msg(http);
130 }
131
132 if (nxt_slow_path(msg == NULL)) {
133 return NULL;
134 }
135
136 ctx_data = req->ctx->data;
137
138 future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
139 if (nxt_slow_path(future == NULL)) {
140 nxt_unit_req_alert(req, "Python failed to create Future object");
141 nxt_python_print_exception();
142
143 Py_DECREF(msg);
144
145 return PyErr_Format(PyExc_RuntimeError,
146 "failed to create Future object");
147 }
148
149 if (msg != Py_None) {
150 return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg);
151 }
152
153 http->receive_future = future;
154 Py_INCREF(http->receive_future);
155
156 Py_DECREF(msg);
157
158 return future;
159 }
160
161
162 static PyObject *
nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t * http)163 nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http)
164 {
165 char *body_buf;
166 ssize_t read_res;
167 PyObject *msg, *body;
168 Py_ssize_t size;
169 nxt_unit_request_info_t *req;
170
171 req = http->req;
172
173 size = req->content_length;
174
175 if (size > nxt_py_asgi_http_body_buf_size) {
176 size = nxt_py_asgi_http_body_buf_size;
177 }
178
179 if (size == 0) {
180 if (http->empty_body_received) {
181 Py_RETURN_NONE;
182 }
183
184 http->empty_body_received = 1;
185 }
186
187 if (size > 0) {
188 body = PyBytes_FromStringAndSize(NULL, size);
189 if (nxt_slow_path(body == NULL)) {
190 nxt_unit_req_alert(req, "Python failed to create body byte string");
191 nxt_python_print_exception();
192
193 return PyErr_Format(PyExc_RuntimeError,
194 "failed to create Bytes object");
195 }
196
197 body_buf = PyBytes_AS_STRING(body);
198
199 read_res = nxt_unit_request_read(req, body_buf, size);
200
201 } else {
202 body = NULL;
203 read_res = 0;
204 }
205
206 if (read_res > 0 || read_res == size) {
207 msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str);
208 if (nxt_slow_path(msg == NULL)) {
209 Py_XDECREF(body);
210
211 return NULL;
212 }
213
214 #define SET_ITEM(dict, key, value) \
215 if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \
216 == -1)) \
217 { \
218 nxt_unit_req_alert(req, \
219 "Python failed to set '" #dict "." #key "' item"); \
220 PyErr_SetString(PyExc_RuntimeError, \
221 "Python failed to set '" #dict "." #key "' item"); \
222 goto fail; \
223 }
224
225 if (body != NULL) {
226 SET_ITEM(msg, body, body)
227 }
228
229 if (req->content_length > 0) {
230 SET_ITEM(msg, more_body, Py_True)
231 }
232
233 #undef SET_ITEM
234
235 Py_XDECREF(body);
236
237 return msg;
238 }
239
240 Py_XDECREF(body);
241
242 Py_RETURN_NONE;
243
244 fail:
245
246 Py_DECREF(msg);
247 Py_XDECREF(body);
248
249 return NULL;
250 }
251
252
253 static PyObject *
nxt_py_asgi_http_send(PyObject * self,PyObject * dict)254 nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
255 {
256 PyObject *type;
257 const char *type_str;
258 Py_ssize_t type_len;
259 nxt_py_asgi_http_t *http;
260
261 static const nxt_str_t response_start = nxt_string("http.response.start");
262 static const nxt_str_t response_body = nxt_string("http.response.body");
263
264 http = (nxt_py_asgi_http_t *) self;
265
266 type = PyDict_GetItem(dict, nxt_py_type_str);
267 if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
268 nxt_unit_req_error(http->req, "asgi_http_send: "
269 "'type' is not a unicode string");
270 return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string");
271 }
272
273 type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
274
275 nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
276 (int) type_len, type_str);
277
278 if (nxt_unit_response_is_init(http->req)) {
279 if (nxt_str_eq(&response_body, type_str, (size_t) type_len)) {
280 return nxt_py_asgi_http_response_body(http, dict);
281 }
282
283 return PyErr_Format(PyExc_RuntimeError,
284 "Expected ASGI message 'http.response.body', "
285 "but got '%U'", type);
286 }
287
288 if (nxt_str_eq(&response_start, type_str, (size_t) type_len)) {
289 return nxt_py_asgi_http_response_start(http, dict);
290 }
291
292 return PyErr_Format(PyExc_RuntimeError,
293 "Expected ASGI message 'http.response.start', "
294 "but got '%U'", type);
295 }
296
297
298 static PyObject *
nxt_py_asgi_http_response_start(nxt_py_asgi_http_t * http,PyObject * dict)299 nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
300 {
301 int rc;
302 PyObject *status, *headers, *res;
303 nxt_py_asgi_calc_size_ctx_t calc_size_ctx;
304 nxt_py_asgi_add_field_ctx_t add_field_ctx;
305
306 status = PyDict_GetItem(dict, nxt_py_status_str);
307 if (nxt_slow_path(status == NULL || !PyLong_Check(status))) {
308 nxt_unit_req_error(http->req, "asgi_http_response_start: "
309 "'status' is not an integer");
310 return PyErr_Format(PyExc_TypeError, "'status' is not an integer");
311 }
312
313 calc_size_ctx.fields_size = 0;
314 calc_size_ctx.fields_count = 0;
315
316 headers = PyDict_GetItem(dict, nxt_py_headers_str);
317 if (headers != NULL) {
318 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
319 &calc_size_ctx);
320 if (nxt_slow_path(res == NULL)) {
321 return NULL;
322 }
323
324 Py_DECREF(res);
325 }
326
327 rc = nxt_unit_response_init(http->req, PyLong_AsLong(status),
328 calc_size_ctx.fields_count,
329 calc_size_ctx.fields_size);
330 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
331 return PyErr_Format(PyExc_RuntimeError,
332 "failed to allocate response object");
333 }
334
335 add_field_ctx.req = http->req;
336 add_field_ctx.content_length = -1;
337
338 if (headers != NULL) {
339 res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
340 &add_field_ctx);
341 if (nxt_slow_path(res == NULL)) {
342 return NULL;
343 }
344
345 Py_DECREF(res);
346 }
347
348 http->content_length = add_field_ctx.content_length;
349
350 Py_INCREF(http);
351 return (PyObject *) http;
352 }
353
354
355 static PyObject *
nxt_py_asgi_http_response_body(nxt_py_asgi_http_t * http,PyObject * dict)356 nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
357 {
358 int rc;
359 char *body_str;
360 ssize_t sent;
361 PyObject *body, *more_body, *future;
362 Py_ssize_t body_len, body_off;
363 nxt_py_asgi_ctx_data_t *ctx_data;
364
365 if (nxt_slow_path(http->complete)) {
366 return PyErr_Format(PyExc_RuntimeError,
367 "Unexpected ASGI message 'http.response.body' "
368 "sent, after response already completed");
369 }
370
371 if (nxt_slow_path(http->send_future != NULL)) {
372 return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
373 }
374
375 more_body = PyDict_GetItem(dict, nxt_py_more_body_str);
376 if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) {
377 return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool");
378 }
379
380 body = PyDict_GetItem(dict, nxt_py_body_str);
381
382 if (body != NULL) {
383 if (PyBytes_Check(body)) {
384 body_str = PyBytes_AS_STRING(body);
385 body_len = PyBytes_GET_SIZE(body);
386
387 } else if (PyByteArray_Check(body)) {
388 body_str = PyByteArray_AS_STRING(body);
389 body_len = PyByteArray_GET_SIZE(body);
390
391 } else {
392 return PyErr_Format(PyExc_TypeError,
393 "'body' is not a byte string or bytearray");
394 }
395
396 nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d",
397 (int) body_len, (more_body == Py_True) );
398
399 if (nxt_slow_path(http->bytes_sent + body_len
400 > http->content_length))
401 {
402 return PyErr_Format(PyExc_RuntimeError,
403 "Response content longer than Content-Length");
404 }
405
406 body_off = 0;
407
408 ctx_data = http->req->ctx->data;
409
410 while (body_len > 0) {
411 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
412 if (nxt_slow_path(sent < 0)) {
413 return PyErr_Format(PyExc_RuntimeError, "failed to send body");
414 }
415
416 if (nxt_slow_path(sent == 0)) {
417 nxt_unit_req_debug(http->req, "asgi_http_response_body: "
418 "out of shared memory, %d",
419 (int) body_len);
420
421 future = PyObject_CallObject(ctx_data->loop_create_future,
422 NULL);
423 if (nxt_slow_path(future == NULL)) {
424 nxt_unit_req_alert(http->req,
425 "Python failed to create Future object");
426 nxt_python_print_exception();
427
428 return PyErr_Format(PyExc_RuntimeError,
429 "failed to create Future object");
430 }
431
432 http->send_body = body;
433 Py_INCREF(http->send_body);
434 http->send_body_off = body_off;
435
436 nxt_py_asgi_drain_wait(http->req, &http->link);
437
438 http->send_future = future;
439 Py_INCREF(http->send_future);
440
441 return future;
442 }
443
444 body_str += sent;
445 body_len -= sent;
446 body_off += sent;
447 http->bytes_sent += sent;
448 }
449
450 } else {
451 nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d",
452 (more_body == Py_True) );
453
454 if (!nxt_unit_response_is_sent(http->req)) {
455 rc = nxt_unit_response_send(http->req);
456 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
457 return PyErr_Format(PyExc_RuntimeError,
458 "failed to send response");
459 }
460 }
461 }
462
463 if (more_body == NULL || more_body == Py_False) {
464 http->complete = 1;
465
466 nxt_py_asgi_http_emit_disconnect(http);
467 }
468
469 Py_INCREF(http);
470 return (PyObject *) http;
471 }
472
473
474 static void
nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t * http)475 nxt_py_asgi_http_emit_disconnect(nxt_py_asgi_http_t *http)
476 {
477 PyObject *msg, *future;
478
479 if (http->receive_future == NULL) {
480 return;
481 }
482
483 msg = nxt_py_asgi_new_msg(http->req, nxt_py_http_disconnect_str);
484 if (nxt_slow_path(msg == NULL)) {
485 return;
486 }
487
488 if (msg == Py_None) {
489 Py_DECREF(msg);
490 return;
491 }
492
493 future = http->receive_future;
494 http->receive_future = NULL;
495
496 nxt_py_asgi_http_set_result(http, future, msg);
497
498 Py_DECREF(msg);
499 }
500
501
502 static void
nxt_py_asgi_http_set_result(nxt_py_asgi_http_t * http,PyObject * future,PyObject * msg)503 nxt_py_asgi_http_set_result(nxt_py_asgi_http_t *http, PyObject *future,
504 PyObject *msg)
505 {
506 PyObject *res;
507
508 res = PyObject_CallMethodObjArgs(future, nxt_py_done_str, NULL);
509 if (nxt_slow_path(res == NULL)) {
510 nxt_unit_req_alert(http->req, "'done' call failed");
511 nxt_python_print_exception();
512 }
513
514 if (nxt_fast_path(res == Py_False)) {
515 res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg,
516 NULL);
517 if (nxt_slow_path(res == NULL)) {
518 nxt_unit_req_alert(http->req, "'set_result' call failed");
519 nxt_python_print_exception();
520 }
521
522 } else {
523 res = NULL;
524 }
525
526 Py_XDECREF(res);
527 Py_DECREF(future);
528 }
529
530
531 void
nxt_py_asgi_http_data_handler(nxt_unit_request_info_t * req)532 nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req)
533 {
534 PyObject *msg, *future;
535 nxt_py_asgi_http_t *http;
536
537 http = req->data;
538
539 nxt_unit_req_debug(req, "asgi_http_data_handler");
540
541 if (http->receive_future == NULL) {
542 return;
543 }
544
545 msg = nxt_py_asgi_http_read_msg(http);
546 if (nxt_slow_path(msg == NULL)) {
547 return;
548 }
549
550 if (msg == Py_None) {
551 Py_DECREF(msg);
552 return;
553 }
554
555 future = http->receive_future;
556 http->receive_future = NULL;
557
558 nxt_py_asgi_http_set_result(http, future, msg);
559
560 Py_DECREF(msg);
561 }
562
563
564 int
nxt_py_asgi_http_drain(nxt_queue_link_t * lnk)565 nxt_py_asgi_http_drain(nxt_queue_link_t *lnk)
566 {
567 char *body_str;
568 ssize_t sent;
569 PyObject *future, *exc, *res;
570 Py_ssize_t body_len;
571 nxt_py_asgi_http_t *http;
572
573 http = nxt_container_of(lnk, nxt_py_asgi_http_t, link);
574
575 body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off;
576 body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off;
577
578 nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len);
579
580 while (body_len > 0) {
581 sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
582 if (nxt_slow_path(sent < 0)) {
583 goto fail;
584 }
585
586 if (nxt_slow_path(sent == 0)) {
587 return NXT_UNIT_AGAIN;
588 }
589
590 body_str += sent;
591 body_len -= sent;
592
593 http->send_body_off += sent;
594 http->bytes_sent += sent;
595 }
596
597 Py_CLEAR(http->send_body);
598
599 future = http->send_future;
600 http->send_future = NULL;
601
602 nxt_py_asgi_http_set_result(http, future, Py_None);
603
604 return NXT_UNIT_OK;
605
606 fail:
607
608 exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
609 nxt_py_failed_to_send_body_str,
610 NULL);
611 if (nxt_slow_path(exc == NULL)) {
612 nxt_unit_req_alert(http->req, "RuntimeError create failed");
613 nxt_python_print_exception();
614
615 exc = Py_None;
616 Py_INCREF(exc);
617 }
618
619 future = http->send_future;
620 http->send_future = NULL;
621
622 res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
623 NULL);
624 if (nxt_slow_path(res == NULL)) {
625 nxt_unit_req_alert(http->req, "'set_exception' call failed");
626 nxt_python_print_exception();
627 }
628
629 Py_XDECREF(res);
630 Py_DECREF(future);
631 Py_DECREF(exc);
632
633 return NXT_UNIT_ERROR;
634 }
635
636
637 void
nxt_py_asgi_http_close_handler(nxt_unit_request_info_t * req)638 nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
639 {
640 nxt_py_asgi_http_t *http;
641
642 http = req->data;
643
644 nxt_unit_req_debug(req, "asgi_http_close_handler");
645
646 if (nxt_fast_path(http != NULL)) {
647 http->closed = 1;
648
649 nxt_py_asgi_http_emit_disconnect(http);
650 }
651 }
652
653
654 static PyObject *
nxt_py_asgi_http_done(PyObject * self,PyObject * future)655 nxt_py_asgi_http_done(PyObject *self, PyObject *future)
656 {
657 int rc;
658 PyObject *res;
659 nxt_py_asgi_http_t *http;
660
661 http = (nxt_py_asgi_http_t *) self;
662
663 nxt_unit_req_debug(http->req, "asgi_http_done");
664
665 /*
666 * Get Future.result() and it raises an exception, if coroutine exited
667 * with exception.
668 */
669 res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
670 if (nxt_slow_path(res == NULL)) {
671 nxt_unit_req_error(http->req,
672 "Python failed to call 'future.result()'");
673 nxt_python_print_exception();
674
675 rc = NXT_UNIT_ERROR;
676
677 } else {
678 Py_DECREF(res);
679
680 rc = NXT_UNIT_OK;
681 }
682
683 nxt_unit_request_done(http->req, rc);
684
685 Py_RETURN_NONE;
686 }
687
688
689 #endif /* NXT_HAVE_ASGI */
690