xref: /unit/src/nxt_application.c (revision 977:4f9268f27b57)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) Igor Sysoev
5  * Copyright (C) Valentin V. Bartenev
6  * Copyright (C) NGINX, Inc.
7  */
8 
9 #include <nxt_main.h>
10 #include <nxt_runtime.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_http.h>
14 #include <nxt_application.h>
15 #include <nxt_unit.h>
16 #include <nxt_port_memory_int.h>
17 
18 #include <glob.h>
19 
20 
21 typedef struct {
22     nxt_app_type_t  type;
23     nxt_str_t       version;
24     nxt_str_t       file;
25 } nxt_module_t;
26 
27 
28 static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
29 static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
30     nxt_array_t *modules, const char *name);
31 static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj,
32     void *data);
33 static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
34     void *data);
35 static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
36     const char *name);
37 static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
38 
39 static void nxt_app_http_release(nxt_task_t *task, void *obj, void *data);
40 
41 
42 static uint32_t  compat[] = {
43     NXT_VERNUM, NXT_DEBUG,
44 };
45 
46 
47 nxt_str_t  nxt_server = nxt_string(NXT_SERVER);
48 
49 
50 static nxt_app_module_t  *nxt_app;
51 
52 
53 nxt_int_t
54 nxt_discovery_start(nxt_task_t *task, void *data)
55 {
56     uint32_t       stream;
57     nxt_buf_t      *b;
58     nxt_int_t      ret;
59     nxt_port_t     *main_port, *discovery_port;
60     nxt_runtime_t  *rt;
61 
62     nxt_debug(task, "DISCOVERY");
63 
64     rt = task->thread->runtime;
65 
66     b = nxt_discovery_modules(task, rt->modules);
67     if (nxt_slow_path(b == NULL)) {
68         return NXT_ERROR;
69     }
70 
71     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
72     discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY];
73 
74     stream = nxt_port_rpc_register_handler(task, discovery_port,
75                                            nxt_discovery_quit,
76                                            nxt_discovery_quit,
77                                            main_port->pid, NULL);
78 
79     if (nxt_slow_path(stream == 0)) {
80         return NXT_ERROR;
81     }
82 
83     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
84                                 stream, discovery_port->id, b);
85 
86     if (nxt_slow_path(ret != NXT_OK)) {
87         nxt_port_rpc_cancel(task, discovery_port, stream);
88         return NXT_ERROR;
89     }
90 
91     return NXT_OK;
92 }
93 
94 
95 static nxt_buf_t *
96 nxt_discovery_modules(nxt_task_t *task, const char *path)
97 {
98     char          *name;
99     u_char        *p, *end;
100     size_t        size;
101     glob_t        glb;
102     nxt_mp_t      *mp;
103     nxt_buf_t     *b;
104     nxt_int_t     ret;
105     nxt_uint_t    i, n;
106     nxt_array_t   *modules;
107     nxt_module_t  *module;
108 
109     b = NULL;
110 
111     mp = nxt_mp_create(1024, 128, 256, 32);
112     if (mp == NULL) {
113         return b;
114     }
115 
116     ret = glob(path, 0, NULL, &glb);
117 
118     n = glb.gl_pathc;
119 
120     if (ret != 0) {
121         nxt_log(task, NXT_LOG_NOTICE,
122                 "no modules matching: \"%s\" found", path);
123         n = 0;
124     }
125 
126     modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
127     if (modules == NULL) {
128         goto fail;
129     }
130 
131     for (i = 0; i < n; i++) {
132         name = glb.gl_pathv[i];
133 
134         ret = nxt_discovery_module(task, mp, modules, name);
135         if (ret != NXT_OK) {
136             goto fail;
137         }
138     }
139 
140     size = nxt_length("[]");
141     module = modules->elts;
142     n = modules->nelts;
143 
144     for (i = 0; i < n; i++) {
145         nxt_debug(task, "module: %d %V %V",
146                   module[i].type, &module[i].version, &module[i].file);
147 
148         size += nxt_length("{\"type\": ,");
149         size += nxt_length(" \"version\": \"\",");
150         size += nxt_length(" \"file\": \"\"},");
151 
152         size += NXT_INT_T_LEN
153                 + module[i].version.length
154                 + module[i].file.length;
155     }
156 
157     b = nxt_buf_mem_alloc(mp, size, 0);
158     if (b == NULL) {
159         goto fail;
160     }
161 
162     b->completion_handler = nxt_discovery_completion_handler;
163 
164     p = b->mem.free;
165     end = b->mem.end;
166     *p++ = '[';
167 
168     for (i = 0; i < n; i++) {
169         p = nxt_sprintf(p, end,
170                       "{\"type\": %d, \"version\": \"%V\", \"file\": \"%V\"},",
171                       module[i].type, &module[i].version, &module[i].file);
172     }
173 
174     *p++ = ']';
175     b->mem.free = p;
176 
177 fail:
178 
179     globfree(&glb);
180 
181     return b;
182 }
183 
184 
185 static nxt_int_t
186 nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
187     const char *name)
188 {
189     void              *dl;
190     nxt_str_t         version;
191     nxt_int_t         ret;
192     nxt_uint_t        i, n;
193     nxt_module_t      *module;
194     nxt_app_type_t    type;
195     nxt_app_module_t  *app;
196 
197     /*
198      * Only memory allocation failure should return NXT_ERROR.
199      * Any module processing errors are ignored.
200      */
201     ret = NXT_ERROR;
202 
203     dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
204 
205     if (dl == NULL) {
206         nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
207         return NXT_OK;
208     }
209 
210     app = dlsym(dl, "nxt_app_module");
211 
212     if (app != NULL) {
213         nxt_log(task, NXT_LOG_NOTICE, "module: %V %s \"%s\"",
214                 &app->type, app->version, name);
215 
216         if (app->compat_length != sizeof(compat)
217             || nxt_memcmp(app->compat, compat, sizeof(compat)) != 0)
218         {
219             nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name);
220 
221             goto done;
222         }
223 
224         type = nxt_app_parse_type(app->type.start, app->type.length);
225 
226         if (type == NXT_APP_UNKNOWN) {
227             nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type);
228 
229             goto done;
230         }
231 
232         module = modules->elts;
233         n = modules->nelts;
234 
235         version.start = (u_char *) app->version;
236         version.length = nxt_strlen(app->version);
237 
238         for (i = 0; i < n; i++) {
239             if (type == module[i].type
240                 && nxt_strstr_eq(&module[i].version, &version))
241             {
242                 nxt_log(task, NXT_LOG_NOTICE,
243                         "ignoring %s module with the same "
244                         "application language version %V %V as in %V",
245                         name, &app->type, &version, &module[i].file);
246 
247                 goto done;
248             }
249         }
250 
251         module = nxt_array_add(modules);
252         if (module == NULL) {
253             goto fail;
254         }
255 
256         module->type = type;
257 
258         nxt_str_dup(mp, &module->version, &version);
259         if (module->version.start == NULL) {
260             goto fail;
261         }
262 
263         module->file.length = nxt_strlen(name);
264 
265         module->file.start = nxt_mp_alloc(mp, module->file.length);
266         if (module->file.start == NULL) {
267             goto fail;
268         }
269 
270         nxt_memcpy(module->file.start, name, module->file.length);
271 
272     } else {
273         nxt_alert(task, "dlsym(\"%s\"), failed: \"%s\"", name, dlerror());
274     }
275 
276 done:
277 
278     ret = NXT_OK;
279 
280 fail:
281 
282     if (dlclose(dl) != 0) {
283         nxt_alert(task, "dlclose(\"%s\"), failed: \"%s\"", name, dlerror());
284     }
285 
286     return ret;
287 }
288 
289 
290 static void
291 nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
292 {
293     nxt_mp_t   *mp;
294     nxt_buf_t  *b;
295 
296     b = obj;
297     mp = b->data;
298 
299     nxt_mp_destroy(mp);
300 }
301 
302 
303 static void
304 nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
305 {
306     nxt_worker_process_quit_handler(task, msg);
307 }
308 
309 
310 nxt_int_t
311 nxt_app_start(nxt_task_t *task, void *data)
312 {
313     nxt_int_t              ret;
314     nxt_app_lang_module_t  *lang;
315     nxt_common_app_conf_t  *app_conf;
316 
317     app_conf = data;
318 
319     lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
320     if (nxt_slow_path(lang == NULL)) {
321         nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type);
322         return NXT_ERROR;
323     }
324 
325     nxt_app = lang->module;
326 
327     if (nxt_app == NULL) {
328         nxt_debug(task, "application language module: %s \"%s\"",
329                   lang->version, lang->file);
330 
331         nxt_app = nxt_app_module_load(task, lang->file);
332     }
333 
334     if (nxt_app->pre_init != NULL) {
335         ret = nxt_app->pre_init(task, data);
336 
337         if (nxt_slow_path(ret != NXT_OK)) {
338             nxt_debug(task, "application pre_init failed");
339 
340         } else {
341             nxt_debug(task, "application pre_init done");
342         }
343     }
344 
345     if (app_conf->working_directory != NULL
346         && app_conf->working_directory[0] != 0)
347     {
348         ret = chdir(app_conf->working_directory);
349 
350         if (nxt_slow_path(ret != 0)) {
351             nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E",
352                     app_conf->working_directory, nxt_errno);
353 
354             return NXT_ERROR;
355         }
356     }
357 
358     if (nxt_slow_path(nxt_app_set_environment(app_conf->environment)
359                       != NXT_OK))
360     {
361         nxt_alert(task, "failed to set environment");
362         return NXT_ERROR;
363     }
364 
365     ret = nxt_app->init(task, data);
366 
367     if (nxt_slow_path(ret != NXT_OK)) {
368         nxt_debug(task, "application init failed");
369 
370     } else {
371         nxt_debug(task, "application init done");
372     }
373 
374     return ret;
375 }
376 
377 
378 static nxt_app_module_t *
379 nxt_app_module_load(nxt_task_t *task, const char *name)
380 {
381     void  *dl;
382 
383     dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
384 
385     if (dl != NULL) {
386         return dlsym(dl, "nxt_app_module");
387     }
388 
389     nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
390 
391     return NULL;
392 }
393 
394 
395 static nxt_int_t
396 nxt_app_set_environment(nxt_conf_value_t *environment)
397 {
398     char              *env, *p;
399     uint32_t          next;
400     nxt_str_t         name, value;
401     nxt_conf_value_t  *value_obj;
402 
403     if (environment != NULL) {
404         next = 0;
405 
406         for ( ;; ) {
407             value_obj = nxt_conf_next_object_member(environment, &name, &next);
408             if (value_obj == NULL) {
409                 break;
410             }
411 
412             nxt_conf_get_string(value_obj, &value);
413 
414             env = nxt_malloc(name.length + value.length + 2);
415             if (nxt_slow_path(env == NULL)) {
416                 return NXT_ERROR;
417             }
418 
419             p = nxt_cpymem(env, name.start, name.length);
420             *p++ = '=';
421             p = nxt_cpymem(p, value.start, value.length);
422             *p = '\0';
423 
424             if (nxt_slow_path(putenv(env) != 0)) {
425                 return NXT_ERROR;
426             }
427         }
428     }
429 
430     return NXT_OK;
431 }
432 
433 
434 nxt_int_t
435 nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ar)
436 {
437     ar->timer.handler = nxt_app_http_release;
438     nxt_timer_add(task->thread->engine, &ar->timer, 0);
439 
440     return NXT_OK;
441 }
442 
443 
444 static void
445 nxt_app_http_release(nxt_task_t *task, void *obj, void *data)
446 {
447     nxt_timer_t          *timer;
448     nxt_app_parse_ctx_t  *ar;
449 
450     timer = obj;
451 
452     nxt_debug(task, "http app release");
453 
454     ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
455 
456     nxt_mp_release(ar->request->mem_pool);
457 }
458 
459 
460 nxt_app_lang_module_t *
461 nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
462 {
463     u_char                 *p, *end, *version;
464     size_t                 version_length;
465     nxt_uint_t             i, n;
466     nxt_app_type_t         type;
467     nxt_app_lang_module_t  *lang;
468 
469     end = name->start + name->length;
470     version = end;
471 
472     for (p = name->start; p < end; p++) {
473         if (*p == ' ') {
474             version = p + 1;
475             break;
476         }
477 
478         if (*p >= '0' && *p <= '9') {
479             version = p;
480             break;
481         }
482     }
483 
484     type = nxt_app_parse_type(name->start, p - name->start);
485 
486     if (type == NXT_APP_UNKNOWN) {
487         return NULL;
488     }
489 
490     version_length = end - version;
491 
492     lang = rt->languages->elts;
493     n = rt->languages->nelts;
494 
495     for (i = 0; i < n; i++) {
496 
497         /*
498          * Versions are sorted in descending order
499          * so first match chooses the highest version.
500          */
501 
502         if (lang[i].type == type
503             && nxt_strvers_match(lang[i].version, version, version_length))
504         {
505             return &lang[i];
506         }
507     }
508 
509     return NULL;
510 }
511 
512 
513 nxt_app_type_t
514 nxt_app_parse_type(u_char *p, size_t length)
515 {
516     nxt_str_t str;
517 
518     str.length = length;
519     str.start = p;
520 
521     if (nxt_str_eq(&str, "external", 8) || nxt_str_eq(&str, "go", 2)) {
522         return NXT_APP_EXTERNAL;
523 
524     } else if (nxt_str_eq(&str, "python", 6)) {
525         return NXT_APP_PYTHON;
526 
527     } else if (nxt_str_eq(&str, "php", 3)) {
528         return NXT_APP_PHP;
529 
530     } else if (nxt_str_eq(&str, "perl", 4)) {
531         return NXT_APP_PERL;
532 
533     } else if (nxt_str_eq(&str, "ruby", 4)) {
534         return NXT_APP_RUBY;
535 
536     } else if (nxt_str_eq(&str, "java", 4)) {
537         return NXT_APP_JAVA;
538     }
539 
540     return NXT_APP_UNKNOWN;
541 }
542 
543 
544 nxt_int_t
545 nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
546 {
547     nxt_port_t     *my_port, *main_port;
548     nxt_runtime_t  *rt;
549 
550     nxt_memzero(init, sizeof(nxt_unit_init_t));
551 
552     rt = task->thread->runtime;
553 
554     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
555     if (nxt_slow_path(main_port == NULL)) {
556         return NXT_ERROR;
557     }
558 
559     my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
560     if (nxt_slow_path(my_port == NULL)) {
561         return NXT_ERROR;
562     }
563 
564     init->ready_port.id.pid = main_port->pid;
565     init->ready_port.id.id = main_port->id;
566     init->ready_port.out_fd = main_port->pair[1];
567 
568     nxt_fd_blocking(task, main_port->pair[1]);
569 
570     init->ready_stream = my_port->process->init->stream;
571 
572     init->read_port.id.pid = my_port->pid;
573     init->read_port.id.id = my_port->id;
574     init->read_port.in_fd = my_port->pair[0];
575 
576     nxt_fd_blocking(task, my_port->pair[0]);
577 
578     init->log_fd = 2;
579 
580     return NXT_OK;
581 }
582