xref: /unit/src/python/nxt_python.c (revision 2077:624e51cfe97a)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 
7 #include <Python.h>
8 
9 #include <nxt_main.h>
10 #include <nxt_router.h>
11 #include <nxt_unit.h>
12 
13 #include <python/nxt_python.h>
14 
15 #include NXT_PYTHON_MOUNTS_H
16 
17 
18 typedef struct {
19     pthread_t       thread;
20     nxt_unit_ctx_t  *ctx;
21     void            *ctx_data;
22 } nxt_py_thread_info_t;
23 
24 
25 static nxt_int_t nxt_python_start(nxt_task_t *task,
26     nxt_process_data_t *data);
27 static nxt_int_t nxt_python_set_target(nxt_task_t *task,
28     nxt_python_target_t *target, nxt_conf_value_t *conf);
29 static nxt_int_t nxt_python_set_path(nxt_task_t *task, nxt_conf_value_t *value);
30 static int nxt_python_init_threads(nxt_python_app_conf_t *c);
31 static int nxt_python_ready_handler(nxt_unit_ctx_t *ctx);
32 static void *nxt_python_thread_func(void *main_ctx);
33 static void nxt_python_join_threads(nxt_unit_ctx_t *ctx,
34     nxt_python_app_conf_t *c);
35 static void nxt_python_atexit(void);
36 
37 static uint32_t  compat[] = {
38     NXT_VERNUM, NXT_DEBUG,
39 };
40 
41 
42 NXT_EXPORT nxt_app_module_t  nxt_app_module = {
43     sizeof(compat),
44     compat,
45     nxt_string("python"),
46     PY_VERSION,
47     nxt_python_mounts,
48     nxt_nitems(nxt_python_mounts),
49     NULL,
50     nxt_python_start,
51 };
52 
53 static PyObject           *nxt_py_stderr_flush;
54 nxt_python_targets_t      *nxt_py_targets;
55 
56 #if PY_MAJOR_VERSION == 3
57 static wchar_t            *nxt_py_home;
58 #else
59 static char               *nxt_py_home;
60 #endif
61 
62 static pthread_attr_t        *nxt_py_thread_attr;
63 static nxt_py_thread_info_t  *nxt_py_threads;
64 static nxt_python_proto_t    nxt_py_proto;
65 
66 
67 static nxt_int_t
68 nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
69 {
70     int                    rc;
71     size_t                 len, size;
72     uint32_t               next;
73     PyObject               *obj, *module;
74     nxt_str_t              proto, probe_proto, name;
75     nxt_int_t              ret, n, i;
76     nxt_unit_ctx_t         *unit_ctx;
77     nxt_unit_init_t        python_init;
78     nxt_conf_value_t       *cv;
79     nxt_python_targets_t   *targets;
80     nxt_common_app_conf_t  *app_conf;
81     nxt_python_app_conf_t  *c;
82 #if PY_MAJOR_VERSION == 3
83     char                   *path;
84     nxt_int_t              pep405;
85 
86     static const char pyvenv[] = "/pyvenv.cfg";
87     static const char bin_python[] = "/bin/python";
88 #endif
89 
90     static const nxt_str_t  wsgi = nxt_string("wsgi");
91     static const nxt_str_t  asgi = nxt_string("asgi");
92 
93     app_conf = data->app;
94     c = &app_conf->u.python;
95 
96     if (c->home != NULL) {
97         len = nxt_strlen(c->home);
98 
99 #if PY_MAJOR_VERSION == 3
100 
101         path = nxt_malloc(len + sizeof(pyvenv));
102         if (nxt_slow_path(path == NULL)) {
103             nxt_alert(task, "Failed to allocate memory");
104             return NXT_ERROR;
105         }
106 
107         nxt_memcpy(path, c->home, len);
108         nxt_memcpy(path + len, pyvenv, sizeof(pyvenv));
109 
110         pep405 = (access(path, R_OK) == 0);
111 
112         nxt_free(path);
113 
114         if (pep405) {
115             size = (len + sizeof(bin_python)) * sizeof(wchar_t);
116 
117         } else {
118             size = (len + 1) * sizeof(wchar_t);
119         }
120 
121         nxt_py_home = nxt_malloc(size);
122         if (nxt_slow_path(nxt_py_home == NULL)) {
123             nxt_alert(task, "Failed to allocate memory");
124             return NXT_ERROR;
125         }
126 
127         if (pep405) {
128             mbstowcs(nxt_py_home, c->home, len);
129             mbstowcs(nxt_py_home + len, bin_python, sizeof(bin_python));
130             Py_SetProgramName(nxt_py_home);
131 
132         } else {
133             mbstowcs(nxt_py_home, c->home, len + 1);
134             Py_SetPythonHome(nxt_py_home);
135         }
136 
137 #else
138         nxt_py_home = nxt_malloc(len + 1);
139         if (nxt_slow_path(nxt_py_home == NULL)) {
140             nxt_alert(task, "Failed to allocate memory");
141             return NXT_ERROR;
142         }
143 
144         nxt_memcpy(nxt_py_home, c->home, len + 1);
145         Py_SetPythonHome(nxt_py_home);
146 #endif
147     }
148 
149     Py_InitializeEx(0);
150 
151 #if PY_VERSION_HEX < NXT_PYTHON_VER(3, 7)
152     if (c->threads > 1) {
153         PyEval_InitThreads();
154     }
155 #endif
156 
157     module = NULL;
158     obj = NULL;
159 
160     python_init.ctx_data = NULL;
161 
162     obj = PySys_GetObject((char *) "stderr");
163     if (nxt_slow_path(obj == NULL)) {
164         nxt_alert(task, "Python failed to get \"sys.stderr\" object");
165         goto fail;
166     }
167 
168     nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush");
169 
170     /* obj is a Borrowed reference. */
171     obj = NULL;
172 
173     if (nxt_slow_path(nxt_py_stderr_flush == NULL)) {
174         nxt_alert(task, "Python failed to get \"flush\" attribute of "
175                         "\"sys.stderr\" object");
176         goto fail;
177     }
178 
179     if (nxt_slow_path(nxt_python_set_path(task, c->path) != NXT_OK)) {
180         goto fail;
181     }
182 
183     obj = Py_BuildValue("[s]", "unit");
184     if (nxt_slow_path(obj == NULL)) {
185         nxt_alert(task, "Python failed to create the \"sys.argv\" list");
186         goto fail;
187     }
188 
189     if (nxt_slow_path(PySys_SetObject((char *) "argv", obj) != 0)) {
190         nxt_alert(task, "Python failed to set the \"sys.argv\" list");
191         goto fail;
192     }
193 
194     Py_CLEAR(obj);
195 
196     n = (c->targets != NULL ? nxt_conf_object_members_count(c->targets) : 1);
197 
198     size = sizeof(nxt_python_targets_t) + n * sizeof(nxt_python_target_t);
199 
200     targets = nxt_unit_malloc(NULL, size);
201     if (nxt_slow_path(targets == NULL)) {
202         nxt_alert(task, "Could not allocate targets");
203         goto fail;
204     }
205 
206     memset(targets, 0, size);
207 
208     targets->count = n;
209     nxt_py_targets = targets;
210 
211     if (c->targets != NULL) {
212         next = 0;
213 
214         for (i = 0; /* void */; i++) {
215             cv = nxt_conf_next_object_member(c->targets, &name, &next);
216             if (cv == NULL) {
217                 break;
218             }
219 
220             ret = nxt_python_set_target(task, &targets->target[i], cv);
221             if (nxt_slow_path(ret != NXT_OK)) {
222                 goto fail;
223             }
224         }
225 
226     } else {
227         ret = nxt_python_set_target(task, &targets->target[0], app_conf->self);
228         if (nxt_slow_path(ret != NXT_OK)) {
229             goto fail;
230         }
231     }
232 
233     nxt_unit_default_init(task, &python_init, data->app);
234 
235     python_init.data = c;
236     python_init.callbacks.ready_handler = nxt_python_ready_handler;
237 
238     proto = c->protocol;
239 
240     if (proto.length == 0) {
241         proto = nxt_python_asgi_check(targets->target[0].application)
242                 ? asgi : wsgi;
243 
244         for (i = 1; i < targets->count; i++) {
245             probe_proto = nxt_python_asgi_check(targets->target[i].application)
246                           ? asgi : wsgi;
247             if (probe_proto.start != proto.start) {
248                 nxt_alert(task, "A mix of ASGI & WSGI targets is forbidden, "
249                                 "specify protocol in config if incorrect");
250                 goto fail;
251             }
252         }
253     }
254 
255     if (nxt_strstr_eq(&proto, &asgi)) {
256         rc = nxt_python_asgi_init(&python_init, &nxt_py_proto);
257 
258     } else {
259         rc = nxt_python_wsgi_init(&python_init, &nxt_py_proto);
260     }
261 
262     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
263         goto fail;
264     }
265 
266     rc = nxt_py_proto.ctx_data_alloc(&python_init.ctx_data, 1);
267     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
268         goto fail;
269     }
270 
271     rc = nxt_python_init_threads(c);
272     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
273         goto fail;
274     }
275 
276     if (nxt_py_proto.startup != NULL) {
277         if (nxt_py_proto.startup(python_init.ctx_data) != NXT_UNIT_OK) {
278             goto fail;
279         }
280     }
281 
282     unit_ctx = nxt_unit_init(&python_init);
283     if (nxt_slow_path(unit_ctx == NULL)) {
284         goto fail;
285     }
286 
287     rc = nxt_py_proto.run(unit_ctx);
288 
289     nxt_python_join_threads(unit_ctx, c);
290 
291     nxt_unit_done(unit_ctx);
292 
293     nxt_py_proto.ctx_data_free(python_init.ctx_data);
294 
295     nxt_python_atexit();
296 
297     exit(rc);
298 
299     return NXT_OK;
300 
301 fail:
302 
303     nxt_python_join_threads(NULL, c);
304 
305     if (python_init.ctx_data != NULL) {
306         nxt_py_proto.ctx_data_free(python_init.ctx_data);
307     }
308 
309     Py_XDECREF(obj);
310     Py_XDECREF(module);
311 
312     nxt_python_atexit();
313 
314     return NXT_ERROR;
315 }
316 
317 
318 static nxt_int_t
319 nxt_python_set_target(nxt_task_t *task, nxt_python_target_t *target,
320     nxt_conf_value_t *conf)
321 {
322     char              *callable, *module_name;
323     PyObject          *module, *obj;
324     nxt_str_t         str;
325     nxt_conf_value_t  *value;
326 
327     static nxt_str_t  module_str = nxt_string("module");
328     static nxt_str_t  callable_str = nxt_string("callable");
329 
330     module = obj = NULL;
331 
332     value = nxt_conf_get_object_member(conf, &module_str, NULL);
333     if (nxt_slow_path(value == NULL)) {
334         goto fail;
335     }
336 
337     nxt_conf_get_string(value, &str);
338 
339     module_name = nxt_alloca(str.length + 1);
340     nxt_memcpy(module_name, str.start, str.length);
341     module_name[str.length] = '\0';
342 
343     module = PyImport_ImportModule(module_name);
344     if (nxt_slow_path(module == NULL)) {
345         nxt_alert(task, "Python failed to import module \"%s\"", module_name);
346         nxt_python_print_exception();
347         goto fail;
348     }
349 
350     value = nxt_conf_get_object_member(conf, &callable_str, NULL);
351     if (value == NULL) {
352         callable = nxt_alloca(12);
353         nxt_memcpy(callable, "application", 12);
354 
355     } else {
356         nxt_conf_get_string(value, &str);
357 
358         callable = nxt_alloca(str.length + 1);
359         nxt_memcpy(callable, str.start, str.length);
360         callable[str.length] = '\0';
361     }
362 
363     obj = PyDict_GetItemString(PyModule_GetDict(module), callable);
364     if (nxt_slow_path(obj == NULL)) {
365         nxt_alert(task, "Python failed to get \"%s\" from module \"%s\"",
366                   callable, module_name);
367         goto fail;
368     }
369 
370     if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
371         nxt_alert(task, "\"%s\" in module \"%s\" is not a callable object",
372                   callable, module_name);
373         goto fail;
374     }
375 
376     target->application = obj;
377     obj = NULL;
378 
379     Py_INCREF(target->application);
380     Py_CLEAR(module);
381 
382     return NXT_OK;
383 
384 fail:
385 
386     Py_XDECREF(obj);
387     Py_XDECREF(module);
388 
389     return NXT_ERROR;
390 }
391 
392 
393 static nxt_int_t
394 nxt_python_set_path(nxt_task_t *task, nxt_conf_value_t *value)
395 {
396     int               ret;
397     PyObject          *path, *sys;
398     nxt_str_t         str;
399     nxt_uint_t        n;
400     nxt_conf_value_t  *array;
401 
402     if (value == NULL) {
403         return NXT_OK;
404     }
405 
406     sys = PySys_GetObject((char *) "path");
407     if (nxt_slow_path(sys == NULL)) {
408         nxt_alert(task, "Python failed to get \"sys.path\" list");
409         return NXT_ERROR;
410     }
411 
412     /* sys is a Borrowed reference. */
413 
414     array = value;
415     n = nxt_conf_array_elements_count_or_1(array);
416 
417     while (n != 0) {
418         n--;
419 
420         /*
421          * Insertion in front of existing paths starting from the last element
422          * to preserve original order while giving priority to the values
423          * specified in the "path" option.
424          */
425 
426         value = nxt_conf_get_array_element_or_itself(array, n);
427 
428         nxt_conf_get_string(value, &str);
429 
430         path = PyString_FromStringAndSize((char *) str.start, str.length);
431         if (nxt_slow_path(path == NULL)) {
432             nxt_alert(task, "Python failed to create string object \"%V\"",
433                       &str);
434             return NXT_ERROR;
435         }
436 
437         ret = PyList_Insert(sys, 0, path);
438 
439         Py_DECREF(path);
440 
441         if (nxt_slow_path(ret != 0)) {
442             nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"",
443                       &str);
444             return NXT_ERROR;
445         }
446     }
447 
448     return NXT_OK;
449 }
450 
451 
452 static int
453 nxt_python_init_threads(nxt_python_app_conf_t *c)
454 {
455     int                    res;
456     uint32_t               i;
457     nxt_py_thread_info_t   *ti;
458     static pthread_attr_t  attr;
459 
460     if (c->threads <= 1) {
461         return NXT_UNIT_OK;
462     }
463 
464     if (c->thread_stack_size > 0) {
465         res = pthread_attr_init(&attr);
466         if (nxt_slow_path(res != 0)) {
467             nxt_unit_alert(NULL, "thread attr init failed: %s (%d)",
468                            strerror(res), res);
469 
470             return NXT_UNIT_ERROR;
471         }
472 
473         res = pthread_attr_setstacksize(&attr, c->thread_stack_size);
474         if (nxt_slow_path(res != 0)) {
475             nxt_unit_alert(NULL, "thread attr set stack size failed: %s (%d)",
476                            strerror(res), res);
477 
478             return NXT_UNIT_ERROR;
479         }
480 
481         nxt_py_thread_attr = &attr;
482     }
483 
484     nxt_py_threads = nxt_unit_malloc(NULL, sizeof(nxt_py_thread_info_t)
485                                            * (c->threads - 1));
486     if (nxt_slow_path(nxt_py_threads == NULL)) {
487         nxt_unit_alert(NULL, "Failed to allocate thread info array");
488 
489         return NXT_UNIT_ERROR;
490     }
491 
492     memset(nxt_py_threads, 0, sizeof(nxt_py_thread_info_t) * (c->threads - 1));
493 
494     for (i = 0; i < c->threads - 1; i++) {
495         ti = &nxt_py_threads[i];
496 
497         res = nxt_py_proto.ctx_data_alloc(&ti->ctx_data, 0);
498         if (nxt_slow_path(res != NXT_UNIT_OK)) {
499             return NXT_UNIT_ERROR;
500         }
501     }
502 
503     return NXT_UNIT_OK;
504 }
505 
506 
507 static int
508 nxt_python_ready_handler(nxt_unit_ctx_t *ctx)
509 {
510     int                    res;
511     uint32_t               i;
512     nxt_py_thread_info_t   *ti;
513     nxt_python_app_conf_t  *c;
514 
515     c = ctx->unit->data;
516 
517     if (c->threads <= 1) {
518         return NXT_UNIT_OK;
519     }
520 
521     for (i = 0; i < c->threads - 1; i++) {
522         ti = &nxt_py_threads[i];
523 
524         ti->ctx = ctx;
525 
526         res = pthread_create(&ti->thread, nxt_py_thread_attr,
527                              nxt_python_thread_func, ti);
528 
529         if (nxt_fast_path(res == 0)) {
530             nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1));
531 
532         } else {
533             nxt_unit_alert(ctx, "thread #%d create failed: %s (%d)",
534                            (int) (i + 1), strerror(res), res);
535         }
536     }
537 
538     return NXT_UNIT_OK;
539 }
540 
541 
542 static void *
543 nxt_python_thread_func(void *data)
544 {
545     nxt_unit_ctx_t        *ctx;
546     PyGILState_STATE      gstate;
547     nxt_py_thread_info_t  *ti;
548 
549     ti = data;
550 
551     nxt_unit_debug(ti->ctx, "worker thread #%d start",
552                    (int) (ti - nxt_py_threads + 1));
553 
554     gstate = PyGILState_Ensure();
555 
556     if (nxt_py_proto.startup != NULL) {
557         if (nxt_py_proto.startup(ti->ctx_data) != NXT_UNIT_OK) {
558             goto fail;
559         }
560     }
561 
562     ctx = nxt_unit_ctx_alloc(ti->ctx, ti->ctx_data);
563     if (nxt_slow_path(ctx == NULL)) {
564         goto fail;
565     }
566 
567     (void) nxt_py_proto.run(ctx);
568 
569     nxt_unit_done(ctx);
570 
571 fail:
572 
573     PyGILState_Release(gstate);
574 
575     nxt_unit_debug(NULL, "worker thread #%d end",
576                    (int) (ti - nxt_py_threads + 1));
577 
578     return NULL;
579 }
580 
581 
582 static void
583 nxt_python_join_threads(nxt_unit_ctx_t *ctx, nxt_python_app_conf_t *c)
584 {
585     int                   res;
586     uint32_t              i;
587     PyThreadState         *thread_state;
588     nxt_py_thread_info_t  *ti;
589 
590     if (nxt_py_threads == NULL) {
591         return;
592     }
593 
594     thread_state = PyEval_SaveThread();
595 
596     for (i = 0; i < c->threads - 1; i++) {
597         ti = &nxt_py_threads[i];
598 
599         if ((uintptr_t) ti->thread == 0) {
600             continue;
601         }
602 
603         res = pthread_join(ti->thread, NULL);
604 
605         if (nxt_fast_path(res == 0)) {
606             nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1));
607 
608         } else {
609             nxt_unit_alert(ctx, "thread #%d join failed: %s (%d)",
610                            (int) (i + 1), strerror(res), res);
611         }
612     }
613 
614     PyEval_RestoreThread(thread_state);
615 
616     for (i = 0; i < c->threads - 1; i++) {
617         ti = &nxt_py_threads[i];
618 
619         if (ti->ctx_data != NULL) {
620             nxt_py_proto.ctx_data_free(ti->ctx_data);
621         }
622     }
623 
624     nxt_unit_free(NULL, nxt_py_threads);
625 }
626 
627 
628 int
629 nxt_python_init_strings(nxt_python_string_t *pstr)
630 {
631     PyObject  *obj;
632 
633     while (pstr->string.start != NULL) {
634         obj = PyString_FromStringAndSize((char *) pstr->string.start,
635                                          pstr->string.length);
636         if (nxt_slow_path(obj == NULL)) {
637             return NXT_UNIT_ERROR;
638         }
639 
640         PyUnicode_InternInPlace(&obj);
641 
642         *pstr->object_p = obj;
643 
644         pstr++;
645     }
646 
647     return NXT_UNIT_OK;
648 }
649 
650 
651 void
652 nxt_python_done_strings(nxt_python_string_t *pstr)
653 {
654     PyObject  *obj;
655 
656     while (pstr->string.start != NULL) {
657         obj = *pstr->object_p;
658 
659         Py_XDECREF(obj);
660         *pstr->object_p = NULL;
661 
662         pstr++;
663     }
664 }
665 
666 
667 static void
668 nxt_python_atexit(void)
669 {
670     nxt_int_t  i;
671 
672     if (nxt_py_proto.done != NULL) {
673         nxt_py_proto.done();
674     }
675 
676     Py_XDECREF(nxt_py_stderr_flush);
677 
678     if (nxt_py_targets != NULL) {
679         for (i = 0; i < nxt_py_targets->count; i++) {
680             Py_XDECREF(nxt_py_targets->target[i].application);
681         }
682 
683         nxt_unit_free(NULL, nxt_py_targets);
684     }
685 
686     Py_Finalize();
687 
688     if (nxt_py_home != NULL) {
689         nxt_free(nxt_py_home);
690     }
691 }
692 
693 
694 void
695 nxt_python_print_exception(void)
696 {
697     PyErr_Print();
698 
699 #if PY_MAJOR_VERSION == 3
700     /* The backtrace may be buffered in sys.stderr file object. */
701     {
702         PyObject  *result;
703 
704         result = PyObject_CallFunction(nxt_py_stderr_flush, NULL);
705         if (nxt_slow_path(result == NULL)) {
706             PyErr_Clear();
707             return;
708         }
709 
710         Py_DECREF(result);
711     }
712 #endif
713 }
714