xref: /unit/src/nxt_application.c (revision 743:e0f0cd7d244a)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) Igor Sysoev
5  * Copyright (C) Valentin V. Bartenev
6  * Copyright (C) NGINX, Inc.
7  */
8 
9 #include <nxt_main.h>
10 #include <nxt_runtime.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_http.h>
14 #include <nxt_application.h>
15 #include <nxt_unit.h>
16 #include <nxt_port_memory_int.h>
17 
18 #include <glob.h>
19 
20 
21 typedef struct {
22     nxt_app_type_t  type;
23     nxt_str_t       version;
24     nxt_str_t       file;
25 } nxt_module_t;
26 
27 
28 static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
29 static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
30     nxt_array_t *modules, const char *name);
31 static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj,
32     void *data);
33 static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
34     void *data);
35 static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
36     const char *name);
37 static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
38 
39 static void nxt_app_http_release(nxt_task_t *task, void *obj, void *data);
40 
41 
42 static uint32_t  compat[] = {
43     NXT_VERNUM, NXT_DEBUG,
44 };
45 
46 
47 nxt_str_t  nxt_server = nxt_string(NXT_SERVER);
48 
49 
50 static nxt_app_module_t  *nxt_app;
51 
52 
53 nxt_int_t
54 nxt_discovery_start(nxt_task_t *task, void *data)
55 {
56     uint32_t       stream;
57     nxt_buf_t      *b;
58     nxt_int_t      ret;
59     nxt_port_t     *main_port, *discovery_port;
60     nxt_runtime_t  *rt;
61 
62     nxt_debug(task, "DISCOVERY");
63 
64     rt = task->thread->runtime;
65 
66     b = nxt_discovery_modules(task, rt->modules);
67     if (nxt_slow_path(b == NULL)) {
68         return NXT_ERROR;
69     }
70 
71     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
72     discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY];
73 
74     stream = nxt_port_rpc_register_handler(task, discovery_port,
75                                            nxt_discovery_quit,
76                                            nxt_discovery_quit,
77                                            main_port->pid, NULL);
78 
79     if (nxt_slow_path(stream == 0)) {
80         return NXT_ERROR;
81     }
82 
83     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
84                                 stream, discovery_port->id, b);
85 
86     if (nxt_slow_path(ret != NXT_OK)) {
87         nxt_port_rpc_cancel(task, discovery_port, stream);
88         return NXT_ERROR;
89     }
90 
91     return NXT_OK;
92 }
93 
94 
95 static nxt_buf_t *
96 nxt_discovery_modules(nxt_task_t *task, const char *path)
97 {
98     char          *name;
99     u_char        *p, *end;
100     size_t        size;
101     glob_t        glb;
102     nxt_mp_t      *mp;
103     nxt_buf_t     *b;
104     nxt_int_t     ret;
105     nxt_uint_t    i, n;
106     nxt_array_t   *modules;
107     nxt_module_t  *module;
108 
109     b = NULL;
110 
111     mp = nxt_mp_create(1024, 128, 256, 32);
112     if (mp == NULL) {
113         return b;
114     }
115 
116     ret = glob(path, 0, NULL, &glb);
117 
118     n = glb.gl_pathc;
119 
120     if (ret != 0) {
121         nxt_log(task, NXT_LOG_NOTICE,
122                 "no modules matching: \"%s\" found", path);
123         n = 0;
124     }
125 
126     modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
127     if (modules == NULL) {
128         goto fail;
129     }
130 
131     for (i = 0; i < n; i++) {
132         name = glb.gl_pathv[i];
133 
134         ret = nxt_discovery_module(task, mp, modules, name);
135         if (ret != NXT_OK) {
136             goto fail;
137         }
138     }
139 
140     size = nxt_length("[]");
141     module = modules->elts;
142     n = modules->nelts;
143 
144     for (i = 0; i < n; i++) {
145         nxt_debug(task, "module: %d %V %V",
146                   module[i].type, &module[i].version, &module[i].file);
147 
148         size += nxt_length("{\"type\": ,");
149         size += nxt_length(" \"version\": \"\",");
150         size += nxt_length(" \"file\": \"\"},");
151 
152         size += NXT_INT_T_LEN
153                 + module[i].version.length
154                 + module[i].file.length;
155     }
156 
157     b = nxt_buf_mem_alloc(mp, size, 0);
158     if (b == NULL) {
159         goto fail;
160     }
161 
162     b->completion_handler = nxt_discovery_completion_handler;
163 
164     p = b->mem.free;
165     end = b->mem.end;
166     *p++ = '[';
167 
168     for (i = 0; i < n; i++) {
169         p = nxt_sprintf(p, end,
170                       "{\"type\": %d, \"version\": \"%V\", \"file\": \"%V\"},",
171                       module[i].type, &module[i].version, &module[i].file);
172     }
173 
174     *p++ = ']';
175     b->mem.free = p;
176 
177 fail:
178 
179     globfree(&glb);
180 
181     return b;
182 }
183 
184 
185 static nxt_int_t
186 nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
187     const char *name)
188 {
189     void              *dl;
190     nxt_str_t         version;
191     nxt_int_t         ret;
192     nxt_uint_t        i, n;
193     nxt_module_t      *module;
194     nxt_app_type_t    type;
195     nxt_app_module_t  *app;
196 
197     /*
198      * Only memory allocation failure should return NXT_ERROR.
199      * Any module processing errors are ignored.
200      */
201     ret = NXT_ERROR;
202 
203     dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
204 
205     if (dl == NULL) {
206         nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
207         return NXT_OK;
208     }
209 
210     app = dlsym(dl, "nxt_app_module");
211 
212     if (app != NULL) {
213         nxt_log(task, NXT_LOG_NOTICE, "module: %V %s \"%s\"",
214                 &app->type, app->version, name);
215 
216         if (app->compat_length != sizeof(compat)
217             || nxt_memcmp(app->compat, compat, sizeof(compat)) != 0)
218         {
219             nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name);
220 
221             goto done;
222         }
223 
224         type = nxt_app_parse_type(app->type.start, app->type.length);
225 
226         if (type == NXT_APP_UNKNOWN) {
227             nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type);
228 
229             goto done;
230         }
231 
232         module = modules->elts;
233         n = modules->nelts;
234 
235         version.start = (u_char *) app->version;
236         version.length = nxt_strlen(app->version);
237 
238         for (i = 0; i < n; i++) {
239             if (type == module[i].type
240                 && nxt_strstr_eq(&module[i].version, &version))
241             {
242                 nxt_log(task, NXT_LOG_NOTICE,
243                         "ignoring %s module with the same "
244                         "application language version %V %V as in %V",
245                         name, &app->type, &version, &module[i].file);
246 
247                 goto done;
248             }
249         }
250 
251         module = nxt_array_add(modules);
252         if (module == NULL) {
253             goto fail;
254         }
255 
256         module->type = type;
257 
258         nxt_str_dup(mp, &module->version, &version);
259         if (module->version.start == NULL) {
260             goto fail;
261         }
262 
263         module->file.length = nxt_strlen(name);
264 
265         module->file.start = nxt_mp_alloc(mp, module->file.length);
266         if (module->file.start == NULL) {
267             goto fail;
268         }
269 
270         nxt_memcpy(module->file.start, name, module->file.length);
271 
272     } else {
273         nxt_alert(task, "dlsym(\"%s\"), failed: \"%s\"", name, dlerror());
274     }
275 
276 done:
277 
278     ret = NXT_OK;
279 
280 fail:
281 
282     if (dlclose(dl) != 0) {
283         nxt_alert(task, "dlclose(\"%s\"), failed: \"%s\"", name, dlerror());
284     }
285 
286     return ret;
287 }
288 
289 
290 static void
291 nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
292 {
293     nxt_mp_t   *mp;
294     nxt_buf_t  *b;
295 
296     b = obj;
297     mp = b->data;
298 
299     nxt_mp_destroy(mp);
300 }
301 
302 
303 static void
304 nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
305 {
306     nxt_worker_process_quit_handler(task, msg);
307 }
308 
309 
310 nxt_int_t
311 nxt_app_start(nxt_task_t *task, void *data)
312 {
313     nxt_int_t              ret;
314     nxt_app_lang_module_t  *lang;
315     nxt_common_app_conf_t  *app_conf;
316 
317     app_conf = data;
318 
319     lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
320     if (nxt_slow_path(lang == NULL)) {
321         nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type);
322         return NXT_ERROR;
323     }
324 
325     nxt_app = lang->module;
326 
327     if (nxt_app == NULL) {
328         nxt_debug(task, "application language module: %s \"%s\"",
329                   lang->version, lang->file);
330 
331         nxt_app = nxt_app_module_load(task, lang->file);
332     }
333 
334     if (app_conf->working_directory != NULL
335         && app_conf->working_directory[0] != 0)
336     {
337         ret = chdir(app_conf->working_directory);
338 
339         if (nxt_slow_path(ret != 0)) {
340             nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E",
341                     app_conf->working_directory, nxt_errno);
342 
343             return NXT_ERROR;
344         }
345     }
346 
347     if (nxt_slow_path(nxt_app_set_environment(app_conf->environment)
348                       != NXT_OK))
349     {
350         nxt_alert(task, "failed to set environment");
351         return NXT_ERROR;
352     }
353 
354     ret = nxt_app->init(task, data);
355 
356     if (nxt_slow_path(ret != NXT_OK)) {
357         nxt_debug(task, "application init failed");
358 
359     } else {
360         nxt_debug(task, "application init done");
361     }
362 
363     return ret;
364 }
365 
366 
367 static nxt_app_module_t *
368 nxt_app_module_load(nxt_task_t *task, const char *name)
369 {
370     void  *dl;
371 
372     dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
373 
374     if (dl != NULL) {
375         return dlsym(dl, "nxt_app_module");
376     }
377 
378     nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
379 
380     return NULL;
381 }
382 
383 
384 static nxt_int_t
385 nxt_app_set_environment(nxt_conf_value_t *environment)
386 {
387     char              *env, *p;
388     uint32_t          next;
389     nxt_str_t         name, value;
390     nxt_conf_value_t  *value_obj;
391 
392     if (environment != NULL) {
393         next = 0;
394 
395         for ( ;; ) {
396             value_obj = nxt_conf_next_object_member(environment, &name, &next);
397             if (value_obj == NULL) {
398                 break;
399             }
400 
401             nxt_conf_get_string(value_obj, &value);
402 
403             env = nxt_malloc(name.length + value.length + 2);
404             if (nxt_slow_path(env == NULL)) {
405                 return NXT_ERROR;
406             }
407 
408             p = nxt_cpymem(env, name.start, name.length);
409             *p++ = '=';
410             p = nxt_cpymem(p, value.start, value.length);
411             *p = '\0';
412 
413             if (nxt_slow_path(putenv(env) != 0)) {
414                 return NXT_ERROR;
415             }
416         }
417     }
418 
419     return NXT_OK;
420 }
421 
422 
423 nxt_int_t
424 nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ar)
425 {
426     ar->timer.handler = nxt_app_http_release;
427     nxt_timer_add(task->thread->engine, &ar->timer, 0);
428 
429     return NXT_OK;
430 }
431 
432 
433 static void
434 nxt_app_http_release(nxt_task_t *task, void *obj, void *data)
435 {
436     nxt_timer_t          *timer;
437     nxt_app_parse_ctx_t  *ar;
438 
439     timer = obj;
440 
441     nxt_debug(task, "http app release");
442 
443     ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
444 
445     nxt_mp_release(ar->request->mem_pool);
446 }
447 
448 
449 nxt_app_lang_module_t *
450 nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
451 {
452     u_char                 *p, *end, *version;
453     size_t                 version_length;
454     nxt_uint_t             i, n;
455     nxt_app_type_t         type;
456     nxt_app_lang_module_t  *lang;
457 
458     end = name->start + name->length;
459     version = end;
460 
461     for (p = name->start; p < end; p++) {
462         if (*p == ' ') {
463             version = p + 1;
464             break;
465         }
466 
467         if (*p >= '0' && *p <= '9') {
468             version = p;
469             break;
470         }
471     }
472 
473     type = nxt_app_parse_type(name->start, p - name->start);
474 
475     if (type == NXT_APP_UNKNOWN) {
476         return NULL;
477     }
478 
479     version_length = end - version;
480 
481     lang = rt->languages->elts;
482     n = rt->languages->nelts;
483 
484     for (i = 0; i < n; i++) {
485 
486         /*
487          * Versions are sorted in descending order
488          * so first match chooses the highest version.
489          */
490 
491         if (lang[i].type == type
492             && nxt_strvers_match(lang[i].version, version, version_length))
493         {
494             return &lang[i];
495         }
496     }
497 
498     return NULL;
499 }
500 
501 
502 nxt_app_type_t
503 nxt_app_parse_type(u_char *p, size_t length)
504 {
505     nxt_str_t str;
506 
507     str.length = length;
508     str.start = p;
509 
510     if (nxt_str_eq(&str, "python", 6)) {
511         return NXT_APP_PYTHON;
512 
513     } else if (nxt_str_eq(&str, "php", 3)) {
514         return NXT_APP_PHP;
515 
516     } else if (nxt_str_eq(&str, "go", 2)) {
517         return NXT_APP_GO;
518 
519     } else if (nxt_str_eq(&str, "perl", 4)) {
520         return NXT_APP_PERL;
521 
522     } else if (nxt_str_eq(&str, "ruby", 4)) {
523         return NXT_APP_RUBY;
524     }
525 
526     return NXT_APP_UNKNOWN;
527 }
528 
529 
530 nxt_int_t
531 nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
532 {
533     nxt_port_t     *my_port, *main_port;
534     nxt_runtime_t  *rt;
535 
536     nxt_memzero(init, sizeof(nxt_unit_init_t));
537 
538     rt = task->thread->runtime;
539 
540     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
541     if (nxt_slow_path(main_port == NULL)) {
542         return NXT_ERROR;
543     }
544 
545     my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
546     if (nxt_slow_path(my_port == NULL)) {
547         return NXT_ERROR;
548     }
549 
550     init->ready_port.id.pid = main_port->pid;
551     init->ready_port.id.id = main_port->id;
552     init->ready_port.out_fd = main_port->pair[1];
553 
554     nxt_fd_blocking(task, main_port->pair[1]);
555 
556     init->ready_stream = my_port->process->init->stream;
557 
558     init->read_port.id.pid = my_port->pid;
559     init->read_port.id.id = my_port->id;
560     init->read_port.in_fd = my_port->pair[0];
561 
562     nxt_fd_blocking(task, my_port->pair[0]);
563 
564     init->log_fd = 2;
565 
566     return NXT_OK;
567 }
568