nxt_python_asgi.c (1624:e46b1b422545) nxt_python_asgi.c (1681:542b5b8c0647)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6
7#include <python/nxt_python.h>
8
9#if (NXT_HAVE_ASGI)
10
11#include <nxt_main.h>
12#include <nxt_unit.h>
13#include <nxt_unit_request.h>
14#include <nxt_unit_response.h>
15#include <python/nxt_python_asgi.h>
16#include <python/nxt_python_asgi_str.h>
17
18
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6
7#include <python/nxt_python.h>
8
9#if (NXT_HAVE_ASGI)
10
11#include <nxt_main.h>
12#include <nxt_unit.h>
13#include <nxt_unit_request.h>
14#include <nxt_unit_response.h>
15#include <python/nxt_python_asgi.h>
16#include <python/nxt_python_asgi_str.h>
17
18
19static int nxt_python_asgi_ctx_data_alloc(void **pdata);
20static void nxt_python_asgi_ctx_data_free(void *data);
21static int nxt_python_asgi_startup(void *data);
22static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
23
24static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
25 nxt_unit_port_t *port);
19static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
20
21static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
22static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
23 uint16_t port);
24static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
25static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
26
26static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
27
28static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
29static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
30 uint16_t port);
31static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
32static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
33
34static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx);
35
27static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
28static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
29static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
30static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
31
32static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
36static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
37static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
38static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
39static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
40
41static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
42static void nxt_python_asgi_done(void);
33
34
43
44
35PyObject *nxt_py_loop_run_until_complete;
36PyObject *nxt_py_loop_create_future;
37PyObject *nxt_py_loop_create_task;
45static PyObject *nxt_py_port_read;
46static nxt_unit_port_t *nxt_py_shared_port;
38
47
39nxt_queue_t nxt_py_asgi_drain_queue;
40
41static PyObject *nxt_py_loop_call_soon;
42static PyObject *nxt_py_quit_future;
43static PyObject *nxt_py_quit_future_set_result;
44static PyObject *nxt_py_loop_add_reader;
45static PyObject *nxt_py_loop_remove_reader;
46static PyObject *nxt_py_port_read;
47
48static PyMethodDef nxt_py_port_read_method =
49 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
50
48static PyMethodDef nxt_py_port_read_method =
49 {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
50
51static nxt_python_proto_t nxt_py_asgi_proto = {
52 .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc,
53 .ctx_data_free = nxt_python_asgi_ctx_data_free,
54 .startup = nxt_python_asgi_startup,
55 .run = nxt_python_asgi_run,
56 .ready = nxt_python_asgi_ready,
57 .done = nxt_python_asgi_done,
58};
59
51#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A
52
53
54int
55nxt_python_asgi_check(PyObject *obj)
56{
57 int res;
58 PyObject *call;

--- 38 unchanged lines hidden (view full) ---

97 }
98
99 Py_DECREF(call);
100
101 return res;
102}
103
104
60#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A
61
62
63int
64nxt_python_asgi_check(PyObject *obj)
65{
66 int res;
67 PyObject *call;

--- 38 unchanged lines hidden (view full) ---

106 }
107
108 Py_DECREF(call);
109
110 return res;
111}
112
113
105nxt_int_t
106nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
114int
115nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
107{
116{
108 PyObject *asyncio, *loop, *get_event_loop;
109 nxt_int_t rc;
117 nxt_unit_debug(NULL, "asgi_init");
110
118
111 nxt_debug(task, "asgi_init");
112
113 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) {
114 nxt_alert(task, "Python failed to init string objects");
115 return NXT_ERROR;
119 if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) {
120 nxt_unit_alert(NULL, "Python failed to init string objects");
121 return NXT_UNIT_ERROR;
116 }
117
122 }
123
118 asyncio = PyImport_ImportModule("asyncio");
119 if (nxt_slow_path(asyncio == NULL)) {
120 nxt_alert(task, "Python failed to import module 'asyncio'");
121 nxt_python_print_exception();
122 return NXT_ERROR;
124 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
125 if (nxt_slow_path(nxt_py_port_read == NULL)) {
126 nxt_unit_alert(NULL,
127 "Python failed to initialize the 'port_read' function");
128 return NXT_UNIT_ERROR;
123 }
124
129 }
130
125 loop = NULL;
126 get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
127 "get_event_loop");
128 if (nxt_slow_path(get_event_loop == NULL)) {
129 nxt_alert(task,
130 "Python failed to get 'get_event_loop' from module 'asyncio'");
131 goto fail;
131 if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) {
132 return NXT_UNIT_ERROR;
132 }
133
133 }
134
134 if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) {
135 nxt_alert(task, "'asyncio.get_event_loop' is not a callable object");
136 goto fail;
135 if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) {
136 return NXT_UNIT_ERROR;
137 }
138
137 }
138
139 loop = PyObject_CallObject(get_event_loop, NULL);
140 if (nxt_slow_path(loop == NULL)) {
141 nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'");
142 goto fail;
143 }
139 init->callbacks.request_handler = nxt_py_asgi_request_handler;
140 init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
141 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
142 init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler;
143 init->callbacks.quit = nxt_py_asgi_quit;
144 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
145 init->callbacks.add_port = nxt_py_asgi_add_port;
146 init->callbacks.remove_port = nxt_py_asgi_remove_port;
144
147
145 nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task");
146 if (nxt_slow_path(nxt_py_loop_create_task == NULL)) {
147 nxt_alert(task, "Python failed to get 'loop.create_task'");
148 goto fail;
149 }
148 *proto = nxt_py_asgi_proto;
150
149
151 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) {
152 nxt_alert(task, "'loop.create_task' is not a callable object");
153 goto fail;
154 }
150 return NXT_UNIT_OK;
151}
155
152
156 nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader");
157 if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) {
158 nxt_alert(task, "Python failed to get 'loop.add_reader'");
159 goto fail;
160 }
161
153
162 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) {
163 nxt_alert(task, "'loop.add_reader' is not a callable object");
164 goto fail;
165 }
154static int
155nxt_python_asgi_ctx_data_alloc(void **pdata)
156{
157 uint32_t i;
158 PyObject *asyncio, *loop, *new_event_loop, *obj;
159 nxt_py_asgi_ctx_data_t *ctx_data;
166
160
167 nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader");
168 if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) {
169 nxt_alert(task, "Python failed to get 'loop.remove_reader'");
170 goto fail;
161 ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t));
162 if (nxt_slow_path(ctx_data == NULL)) {
163 nxt_unit_alert(NULL, "Failed to allocate context data");
164 return NXT_UNIT_ERROR;
171 }
172
165 }
166
173 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) {
174 nxt_alert(task, "'loop.remove_reader' is not a callable object");
175 goto fail;
176 }
167 memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t));
177
168
178 nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon");
179 if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) {
180 nxt_alert(task, "Python failed to get 'loop.call_soon'");
181 goto fail;
182 }
169 nxt_queue_init(&ctx_data->drain_queue);
183
170
184 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) {
185 nxt_alert(task, "'loop.call_soon' is not a callable object");
186 goto fail;
187 }
171 struct {
172 const char *key;
173 PyObject **handler;
188
174
189 nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop,
190 "run_until_complete");
191 if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) {
192 nxt_alert(task, "Python failed to get 'loop.run_until_complete'");
193 goto fail;
194 }
175 } handlers[] = {
176 { "create_task", &ctx_data->loop_create_task },
177 { "add_reader", &ctx_data->loop_add_reader },
178 { "remove_reader", &ctx_data->loop_remove_reader },
179 { "call_soon", &ctx_data->loop_call_soon },
180 { "run_until_complete", &ctx_data->loop_run_until_complete },
181 { "create_future", &ctx_data->loop_create_future },
182 };
195
183
196 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) {
197 nxt_alert(task, "'loop.run_until_complete' is not a callable object");
198 goto fail;
199 }
184 loop = NULL;
200
185
201 nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future");
202 if (nxt_slow_path(nxt_py_loop_create_future == NULL)) {
203 nxt_alert(task, "Python failed to get 'loop.create_future'");
186 asyncio = PyImport_ImportModule("asyncio");
187 if (nxt_slow_path(asyncio == NULL)) {
188 nxt_unit_alert(NULL, "Python failed to import module 'asyncio'");
189 nxt_python_print_exception();
204 goto fail;
205 }
206
190 goto fail;
191 }
192
207 if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) {
208 nxt_alert(task, "'loop.create_future' is not a callable object");
193 new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
194 "new_event_loop");
195 if (nxt_slow_path(new_event_loop == NULL)) {
196 nxt_unit_alert(NULL,
197 "Python failed to get 'new_event_loop' from module 'asyncio'");
209 goto fail;
210 }
211
198 goto fail;
199 }
200
212 nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
213 if (nxt_slow_path(nxt_py_quit_future == NULL)) {
214 nxt_alert(task, "Python failed to create Future ");
215 nxt_python_print_exception();
201 if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) {
202 nxt_unit_alert(NULL,
203 "'asyncio.new_event_loop' is not a callable object");
216 goto fail;
217 }
218
204 goto fail;
205 }
206
219 nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future,
220 "set_result");
221 if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) {
222 nxt_alert(task, "Python failed to get 'future.set_result'");
207 loop = PyObject_CallObject(new_event_loop, NULL);
208 if (nxt_slow_path(loop == NULL)) {
209 nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'");
223 goto fail;
224 }
225
210 goto fail;
211 }
212
226 if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) {
227 nxt_alert(task, "'future.set_result' is not a callable object");
228 goto fail;
213 for (i = 0; i < nxt_nitems(handlers); i++) {
214 obj = PyObject_GetAttrString(loop, handlers[i].key);
215 if (nxt_slow_path(obj == NULL)) {
216 nxt_unit_alert(NULL, "Python failed to get 'loop.%s'",
217 handlers[i].key);
218 goto fail;
219 }
220
221 *handlers[i].handler = obj;
222
223 if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
224 nxt_unit_alert(NULL, "'loop.%s' is not a callable object",
225 handlers[i].key);
226 goto fail;
227 }
229 }
230
228 }
229
231 nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
232 if (nxt_slow_path(nxt_py_port_read == NULL)) {
233 nxt_alert(task, "Python failed to initialize the 'port_read' function");
230 obj = PyObject_CallObject(ctx_data->loop_create_future, NULL);
231 if (nxt_slow_path(obj == NULL)) {
232 nxt_unit_alert(NULL, "Python failed to create Future ");
233 nxt_python_print_exception();
234 goto fail;
235 }
236
234 goto fail;
235 }
236
237 nxt_queue_init(&nxt_py_asgi_drain_queue);
237 ctx_data->quit_future = obj;
238
238
239 if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) {
239 obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result");
240 if (nxt_slow_path(obj == NULL)) {
241 nxt_unit_alert(NULL, "Python failed to get 'future.set_result'");
240 goto fail;
241 }
242
242 goto fail;
243 }
244
243 if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) {
244 goto fail;
245 }
245 ctx_data->quit_future_set_result = obj;
246
246
247 rc = nxt_py_asgi_lifespan_startup(task);
248 if (nxt_slow_path(rc == NXT_ERROR)) {
247 if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
248 nxt_unit_alert(NULL, "'future.set_result' is not a callable object");
249 goto fail;
250 }
251
249 goto fail;
250 }
251
252 init->callbacks.request_handler = nxt_py_asgi_request_handler;
253 init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
254 init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
255 init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler;
256 init->callbacks.quit = nxt_py_asgi_quit;
257 init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
258 init->callbacks.add_port = nxt_py_asgi_add_port;
259 init->callbacks.remove_port = nxt_py_asgi_remove_port;
260
261 Py_DECREF(loop);
262 Py_DECREF(asyncio);
263
252 Py_DECREF(loop);
253 Py_DECREF(asyncio);
254
264 return NXT_OK;
255 *pdata = ctx_data;
265
256
257 return NXT_UNIT_OK;
258
266fail:
267
259fail:
260
261 nxt_python_asgi_ctx_data_free(ctx_data);
262
268 Py_XDECREF(loop);
263 Py_XDECREF(loop);
269 Py_DECREF(asyncio);
264 Py_XDECREF(asyncio);
270
265
271 return NXT_ERROR;
266 return NXT_UNIT_ERROR;
272}
273
274
267}
268
269
275nxt_int_t
270static void
271nxt_python_asgi_ctx_data_free(void *data)
272{
273 nxt_py_asgi_ctx_data_t *ctx_data;
274
275 ctx_data = data;
276
277 Py_XDECREF(ctx_data->loop_run_until_complete);
278 Py_XDECREF(ctx_data->loop_create_future);
279 Py_XDECREF(ctx_data->loop_create_task);
280 Py_XDECREF(ctx_data->loop_call_soon);
281 Py_XDECREF(ctx_data->loop_add_reader);
282 Py_XDECREF(ctx_data->loop_remove_reader);
283 Py_XDECREF(ctx_data->quit_future);
284 Py_XDECREF(ctx_data->quit_future_set_result);
285
286 nxt_unit_free(NULL, ctx_data);
287}
288
289
290static int
291nxt_python_asgi_startup(void *data)
292{
293 return nxt_py_asgi_lifespan_startup(data);
294}
295
296
297static int
276nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
277{
298nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
299{
278 PyObject *res;
300 PyObject *res;
301 nxt_py_asgi_ctx_data_t *ctx_data;
279
302
280 res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete,
281 nxt_py_quit_future, NULL);
303 ctx_data = ctx->data;
304
305 res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete,
306 ctx_data->quit_future, NULL);
282 if (nxt_slow_path(res == NULL)) {
283 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
284 nxt_python_print_exception();
285
307 if (nxt_slow_path(res == NULL)) {
308 nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
309 nxt_python_print_exception();
310
286 return NXT_ERROR;
311 return NXT_UNIT_ERROR;
287 }
288
289 Py_DECREF(res);
290
312 }
313
314 Py_DECREF(res);
315
291 nxt_py_asgi_lifespan_shutdown();
316 nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port);
317 nxt_py_asgi_remove_reader(ctx, ctx_data->port);
292
318
293 return NXT_OK;
319 if (ctx_data->port != NULL) {
320 ctx_data->port->data = NULL;
321 ctx_data->port = NULL;
322 }
323
324 nxt_py_asgi_lifespan_shutdown(ctx);
325
326 return NXT_UNIT_OK;
294}
295
296
297static void
327}
328
329
330static void
331nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
332{
333 PyObject *res;
334 nxt_py_asgi_ctx_data_t *ctx_data;
335
336 if (port == NULL || port->in_fd == -1) {
337 return;
338 }
339
340 ctx_data = ctx->data;
341
342 nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port);
343
344 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader,
345 PyLong_FromLong(port->in_fd), NULL);
346 if (nxt_slow_path(res == NULL)) {
347 nxt_unit_alert(ctx, "Python failed to remove_reader");
348 nxt_python_print_exception();
349
350 return;
351 }
352
353 Py_DECREF(res);
354}
355
356
357static void
298nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
299{
358nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
359{
300 PyObject *scope, *res, *task, *receive, *send, *done, *asgi;
360 PyObject *scope, *res, *task, *receive, *send, *done, *asgi;
361 nxt_py_asgi_ctx_data_t *ctx_data;
301
302 if (req->request->websocket_handshake) {
303 asgi = nxt_py_asgi_websocket_create(req);
304
305 } else {
306 asgi = nxt_py_asgi_http_create(req);
307 }
308

--- 51 unchanged lines hidden (view full) ---

360 nxt_unit_req_error(req, "Application result type is not a coroutine");
361 nxt_unit_request_done(req, NXT_UNIT_ERROR);
362
363 Py_DECREF(res);
364
365 goto release_scope;
366 }
367
362
363 if (req->request->websocket_handshake) {
364 asgi = nxt_py_asgi_websocket_create(req);
365
366 } else {
367 asgi = nxt_py_asgi_http_create(req);
368 }
369

--- 51 unchanged lines hidden (view full) ---

421 nxt_unit_req_error(req, "Application result type is not a coroutine");
422 nxt_unit_request_done(req, NXT_UNIT_ERROR);
423
424 Py_DECREF(res);
425
426 goto release_scope;
427 }
428
368 task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL);
429 ctx_data = req->ctx->data;
430
431 task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL);
369 if (nxt_slow_path(task == NULL)) {
370 nxt_unit_req_error(req, "Python failed to call the create_task");
371 nxt_python_print_exception();
372 nxt_unit_request_done(req, NXT_UNIT_ERROR);
373
374 Py_DECREF(res);
375
376 goto release_scope;

--- 342 unchanged lines hidden (view full) ---

719
720 Py_DECREF(res);
721
722 return NULL;
723}
724
725
726static int
432 if (nxt_slow_path(task == NULL)) {
433 nxt_unit_req_error(req, "Python failed to call the create_task");
434 nxt_python_print_exception();
435 nxt_unit_request_done(req, NXT_UNIT_ERROR);
436
437 Py_DECREF(res);
438
439 goto release_scope;

--- 342 unchanged lines hidden (view full) ---

782
783 Py_DECREF(res);
784
785 return NULL;
786}
787
788
789static int
790nxt_python_asgi_ready(nxt_unit_ctx_t *ctx)
791{
792 PyObject *res;
793 nxt_unit_port_t *port;
794 nxt_py_asgi_ctx_data_t *ctx_data;
795
796 if (nxt_slow_path(nxt_py_shared_port == NULL)) {
797 return NXT_UNIT_ERROR;
798 }
799
800 port = nxt_py_shared_port;
801
802 nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port);
803
804 ctx_data = ctx->data;
805
806 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
807 PyLong_FromLong(port->in_fd),
808 nxt_py_port_read,
809 PyLong_FromVoidPtr(ctx),
810 PyLong_FromVoidPtr(port), NULL);
811 if (nxt_slow_path(res == NULL)) {
812 nxt_unit_alert(ctx, "Python failed to add_reader");
813 nxt_python_print_exception();
814
815 return NXT_UNIT_ERROR;
816 }
817
818 Py_DECREF(res);
819
820 return NXT_UNIT_OK;
821}
822
823
824static int
727nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
728{
825nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
826{
729 int nb;
730 PyObject *res;
827 int nb;
828 PyObject *res;
829 nxt_py_asgi_ctx_data_t *ctx_data;
731
732 if (port->in_fd == -1) {
733 return NXT_UNIT_OK;
734 }
735
736 nb = 1;
737
738 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
739 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
740 port->in_fd, strerror(errno), errno);
741
742 return NXT_UNIT_ERROR;
743 }
744
745 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
746
830
831 if (port->in_fd == -1) {
832 return NXT_UNIT_OK;
833 }
834
835 nb = 1;
836
837 if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
838 nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
839 port->in_fd, strerror(errno), errno);
840
841 return NXT_UNIT_ERROR;
842 }
843
844 nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
845
747 res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader,
846 if (port->id.id == NXT_UNIT_SHARED_PORT_ID) {
847 nxt_py_shared_port = port;
848
849 return NXT_UNIT_OK;
850 }
851
852 ctx_data = ctx->data;
853
854 ctx_data->port = port;
855 port->data = ctx_data;
856
857 res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
748 PyLong_FromLong(port->in_fd),
749 nxt_py_port_read,
750 PyLong_FromVoidPtr(ctx),
751 PyLong_FromVoidPtr(port), NULL);
752 if (nxt_slow_path(res == NULL)) {
753 nxt_unit_alert(ctx, "Python failed to add_reader");
858 PyLong_FromLong(port->in_fd),
859 nxt_py_port_read,
860 PyLong_FromVoidPtr(ctx),
861 PyLong_FromVoidPtr(port), NULL);
862 if (nxt_slow_path(res == NULL)) {
863 nxt_unit_alert(ctx, "Python failed to add_reader");
864 nxt_python_print_exception();
754
755 return NXT_UNIT_ERROR;
756 }
757
758 Py_DECREF(res);
759
760 return NXT_UNIT_OK;
761}
762
763
764static void
765nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
766{
865
866 return NXT_UNIT_ERROR;
867 }
868
869 Py_DECREF(res);
870
871 return NXT_UNIT_OK;
872}
873
874
875static void
876nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
877{
767 PyObject *res;
768
769 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
770
771 if (port->in_fd == -1) {
772 return;
773 }
774
878 if (port->in_fd == -1) {
879 return;
880 }
881
775 res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader,
776 PyLong_FromLong(port->in_fd), NULL);
777 if (nxt_slow_path(res == NULL)) {
778 nxt_unit_alert(NULL, "Python failed to remove_reader");
779 }
882 nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
780
883
781 Py_DECREF(res);
884 if (nxt_py_shared_port == port) {
885 nxt_py_shared_port = NULL;
886 }
782}
783
784
785static void
786nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
787{
887}
888
889
890static void
891nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
892{
788 PyObject *res;
893 PyObject *res;
894 nxt_py_asgi_ctx_data_t *ctx_data;
789
790 nxt_unit_debug(ctx, "asgi_quit %p", ctx);
791
895
896 nxt_unit_debug(ctx, "asgi_quit %p", ctx);
897
792 res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result,
898 ctx_data = ctx->data;
899
900 if (nxt_py_shared_port != NULL) {
901 res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader,
902 PyLong_FromLong(nxt_py_shared_port->in_fd), NULL);
903 if (nxt_slow_path(res == NULL)) {
904 nxt_unit_alert(NULL, "Python failed to remove_reader");
905 nxt_python_print_exception();
906
907 } else {
908 Py_DECREF(res);
909 }
910 }
911
912 res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result,
793 PyLong_FromLong(0), NULL);
794 if (nxt_slow_path(res == NULL)) {
795 nxt_unit_alert(ctx, "Python failed to set_result");
913 PyLong_FromLong(0), NULL);
914 if (nxt_slow_path(res == NULL)) {
915 nxt_unit_alert(ctx, "Python failed to set_result");
796 }
916 nxt_python_print_exception();
797
917
798 Py_DECREF(res);
918 } else {
919 Py_DECREF(res);
920 }
799}
800
801
802static void
803nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
804{
921}
922
923
924static void
925nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
926{
805 int rc;
806 nxt_queue_link_t *lnk;
927 int rc;
928 nxt_queue_link_t *lnk;
929 nxt_py_asgi_ctx_data_t *ctx_data;
807
930
808 while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) {
809 lnk = nxt_queue_first(&nxt_py_asgi_drain_queue);
931 ctx_data = ctx->data;
810
932
933 while (!nxt_queue_is_empty(&ctx_data->drain_queue)) {
934 lnk = nxt_queue_first(&ctx_data->drain_queue);
935
811 rc = nxt_py_asgi_http_drain(lnk);
812 if (rc == NXT_UNIT_AGAIN) {
936 rc = nxt_py_asgi_http_drain(lnk);
937 if (rc == NXT_UNIT_AGAIN) {
813 break;
938 return;
814 }
815
816 nxt_queue_remove(lnk);
817 }
818}
819
820
821static PyObject *

--- 32 unchanged lines hidden (view full) ---

854 port = PyLong_AsVoidPtr(arg);
855
856 nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port);
857
858 rc = nxt_unit_process_port_msg(ctx, port);
859
860 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
861 return PyErr_Format(PyExc_RuntimeError,
939 }
940
941 nxt_queue_remove(lnk);
942 }
943}
944
945
946static PyObject *

--- 32 unchanged lines hidden (view full) ---

979 port = PyLong_AsVoidPtr(arg);
980
981 nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port);
982
983 rc = nxt_unit_process_port_msg(ctx, port);
984
985 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
986 return PyErr_Format(PyExc_RuntimeError,
862 "error processing port message");
987 "error processing port %d message", port->id.id);
863 }
864
865 Py_RETURN_NONE;
866}
867
868
869PyObject *
870nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,

--- 120 unchanged lines hidden (view full) ---

991 ctx->content_length = content_length;
992 }
993
994 Py_RETURN_NONE;
995}
996
997
998PyObject *
988 }
989
990 Py_RETURN_NONE;
991}
992
993
994PyObject *
995nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,

--- 120 unchanged lines hidden (view full) ---

1116 ctx->content_length = content_length;
1117 }
1118
1119 Py_RETURN_NONE;
1120}
1121
1122
1123PyObject *
999nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
1000 PyObject *result)
1124nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
1125 nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result)
1001{
1002 PyObject *set_result, *res;
1003
1004 if (nxt_slow_path(result == NULL)) {
1005 Py_DECREF(future);
1006
1007 return NULL;
1008 }
1009
1010 set_result = PyObject_GetAttrString(future, "set_result");
1011 if (nxt_slow_path(set_result == NULL)) {
1012 nxt_unit_req_alert(req, "failed to get 'set_result' for future");
1013
1014 Py_CLEAR(future);
1015
1126{
1127 PyObject *set_result, *res;
1128
1129 if (nxt_slow_path(result == NULL)) {
1130 Py_DECREF(future);
1131
1132 return NULL;
1133 }
1134
1135 set_result = PyObject_GetAttrString(future, "set_result");
1136 if (nxt_slow_path(set_result == NULL)) {
1137 nxt_unit_req_alert(req, "failed to get 'set_result' for future");
1138
1139 Py_CLEAR(future);
1140
1016 goto cleanup;
1141 goto cleanup_result;
1017 }
1018
1019 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
1020 nxt_unit_req_alert(req, "'future.set_result' is not a callable");
1021
1022 Py_CLEAR(future);
1023
1024 goto cleanup;
1025 }
1026
1142 }
1143
1144 if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
1145 nxt_unit_req_alert(req, "'future.set_result' is not a callable");
1146
1147 Py_CLEAR(future);
1148
1149 goto cleanup;
1150 }
1151
1027 res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result,
1152 res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result,
1028 result, NULL);
1029 if (nxt_slow_path(res == NULL)) {
1030 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
1031 nxt_python_print_exception();
1032
1033 Py_CLEAR(future);
1034 }
1035
1036 Py_XDECREF(res);
1037
1038cleanup:
1039
1040 Py_DECREF(set_result);
1153 result, NULL);
1154 if (nxt_slow_path(res == NULL)) {
1155 nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
1156 nxt_python_print_exception();
1157
1158 Py_CLEAR(future);
1159 }
1160
1161 Py_XDECREF(res);
1162
1163cleanup:
1164
1165 Py_DECREF(set_result);
1166
1167cleanup_result:
1168
1041 Py_DECREF(result);
1042
1043 return future;
1044}
1045
1046
1047PyObject *
1048nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)

--- 94 unchanged lines hidden (view full) ---

1143
1144 Py_DECREF(asgi);
1145
1146 return scope;
1147}
1148
1149
1150void
1169 Py_DECREF(result);
1170
1171 return future;
1172}
1173
1174
1175PyObject *
1176nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)

--- 94 unchanged lines hidden (view full) ---

1271
1272 Py_DECREF(asgi);
1273
1274 return scope;
1275}
1276
1277
1278void
1279nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link)
1280{
1281 nxt_py_asgi_ctx_data_t *ctx_data;
1282
1283 ctx_data = req->ctx->data;
1284
1285 nxt_queue_insert_tail(&ctx_data->drain_queue, link);
1286}
1287
1288
1289void
1151nxt_py_asgi_dealloc(PyObject *self)
1152{
1153 PyObject_Del(self);
1154}
1155
1156
1157PyObject *
1158nxt_py_asgi_await(PyObject *self)

--- 13 unchanged lines hidden (view full) ---

1172
1173PyObject *
1174nxt_py_asgi_next(PyObject *self)
1175{
1176 return NULL;
1177}
1178
1179
1290nxt_py_asgi_dealloc(PyObject *self)
1291{
1292 PyObject_Del(self);
1293}
1294
1295
1296PyObject *
1297nxt_py_asgi_await(PyObject *self)

--- 13 unchanged lines hidden (view full) ---

1311
1312PyObject *
1313nxt_py_asgi_next(PyObject *self)
1314{
1315 return NULL;
1316}
1317
1318
1180void
1319static void
1181nxt_python_asgi_done(void)
1182{
1183 nxt_py_asgi_str_done();
1184
1320nxt_python_asgi_done(void)
1321{
1322 nxt_py_asgi_str_done();
1323
1185 Py_XDECREF(nxt_py_quit_future);
1186 Py_XDECREF(nxt_py_quit_future_set_result);
1187 Py_XDECREF(nxt_py_loop_run_until_complete);
1188 Py_XDECREF(nxt_py_loop_create_future);
1189 Py_XDECREF(nxt_py_loop_create_task);
1190 Py_XDECREF(nxt_py_loop_call_soon);
1191 Py_XDECREF(nxt_py_loop_add_reader);
1192 Py_XDECREF(nxt_py_loop_remove_reader);
1193 Py_XDECREF(nxt_py_port_read);
1194}
1195
1196#else /* !(NXT_HAVE_ASGI) */
1197
1198
1199int
1200nxt_python_asgi_check(PyObject *obj)
1201{
1202 return 0;
1203}
1204
1205
1324 Py_XDECREF(nxt_py_port_read);
1325}
1326
1327#else /* !(NXT_HAVE_ASGI) */
1328
1329
1330int
1331nxt_python_asgi_check(PyObject *obj)
1332{
1333 return 0;
1334}
1335
1336
1206nxt_int_t
1207nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
1337int
1338nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
1208{
1339{
1209 nxt_alert(task, "ASGI not implemented");
1210 return NXT_ERROR;
1340 nxt_unit_alert(NULL, "ASGI not implemented");
1341 return NXT_UNIT_ERROR;
1211}
1212
1213
1342}
1343
1344
1214nxt_int_t
1215nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
1216{
1217 nxt_unit_alert(ctx, "ASGI not implemented");
1218 return NXT_ERROR;
1219}
1220
1221
1222void
1223nxt_python_asgi_done(void)
1224{
1225}
1226
1227#endif /* NXT_HAVE_ASGI */
1345#endif /* NXT_HAVE_ASGI */