xref: /unit/src/nxt_application.c (revision 2231:5b3a69fd47a7)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) Igor Sysoev
5  * Copyright (C) Valentin V. Bartenev
6  * Copyright (C) NGINX, Inc.
7  */
8 
9 #include <nxt_main.h>
10 #include <nxt_runtime.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_http.h>
14 #include <nxt_application.h>
15 #include <nxt_unit.h>
16 #include <nxt_port_memory_int.h>
17 #include <nxt_isolation.h>
18 
19 #include <glob.h>
20 
21 #if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
22 #include <sys/prctl.h>
23 #endif
24 
25 
26 #ifdef WCOREDUMP
27 #define NXT_WCOREDUMP(s) WCOREDUMP(s)
28 #else
29 #define NXT_WCOREDUMP(s) 0
30 #endif
31 
32 
33 typedef struct {
34     nxt_app_type_t  type;
35     nxt_str_t       version;
36     nxt_str_t       file;
37     nxt_array_t     *mounts;
38 } nxt_module_t;
39 
40 
41 static nxt_int_t nxt_discovery_start(nxt_task_t *task,
42     nxt_process_data_t *data);
43 static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
44 static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
45     nxt_array_t *modules, const char *name);
46 static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj,
47     void *data);
48 static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
49     void *data);
50 static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
51     const char *name);
52 static nxt_int_t nxt_proto_setup(nxt_task_t *task, nxt_process_t *process);
53 static nxt_int_t nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data);
54 static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process);
55 static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
56 static void nxt_proto_start_process_handler(nxt_task_t *task,
57     nxt_port_recv_msg_t *msg);
58 static void nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
59 static void nxt_proto_process_created_handler(nxt_task_t *task,
60     nxt_port_recv_msg_t *msg);
61 static void nxt_proto_quit_children(nxt_task_t *task);
62 static nxt_process_t *nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid);
63 static void nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process);
64 static nxt_process_t *nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid);
65 static u_char *nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src);
66 static void nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data);
67 static void nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data);
68 static void nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data);
69 
70 
71 nxt_str_t  nxt_server = nxt_string(NXT_SERVER);
72 
73 
74 static uint32_t  compat[] = {
75     NXT_VERNUM, NXT_DEBUG,
76 };
77 
78 
79 static nxt_lvlhsh_t           nxt_proto_processes;
80 static nxt_queue_t            nxt_proto_children;
81 static nxt_bool_t             nxt_proto_exiting;
82 
83 static nxt_app_module_t       *nxt_app;
84 static nxt_common_app_conf_t  *nxt_app_conf;
85 
86 
87 static const nxt_port_handlers_t  nxt_discovery_process_port_handlers = {
88     .quit         = nxt_signal_quit_handler,
89     .new_port     = nxt_port_new_port_handler,
90     .change_file  = nxt_port_change_log_file_handler,
91     .mmap         = nxt_port_mmap_handler,
92     .data         = nxt_port_data_handler,
93     .remove_pid   = nxt_port_remove_pid_handler,
94     .rpc_ready    = nxt_port_rpc_handler,
95     .rpc_error    = nxt_port_rpc_handler,
96 };
97 
98 
99 const nxt_sig_event_t  nxt_prototype_signals[] = {
100     nxt_event_signal(SIGHUP,  nxt_proto_signal_handler),
101     nxt_event_signal(SIGINT,  nxt_proto_sigterm_handler),
102     nxt_event_signal(SIGQUIT, nxt_proto_sigterm_handler),
103     nxt_event_signal(SIGTERM, nxt_proto_sigterm_handler),
104     nxt_event_signal(SIGCHLD, nxt_proto_sigchld_handler),
105     nxt_event_signal_end,
106 };
107 
108 
109 static const nxt_port_handlers_t  nxt_proto_process_port_handlers = {
110     .quit            = nxt_proto_quit_handler,
111     .change_file     = nxt_port_change_log_file_handler,
112     .new_port        = nxt_port_new_port_handler,
113     .process_created = nxt_proto_process_created_handler,
114     .process_ready   = nxt_port_process_ready_handler,
115     .remove_pid      = nxt_port_remove_pid_handler,
116     .start_process   = nxt_proto_start_process_handler,
117     .rpc_ready       = nxt_port_rpc_handler,
118     .rpc_error       = nxt_port_rpc_handler,
119 };
120 
121 
122 static const nxt_port_handlers_t  nxt_app_process_port_handlers = {
123     .quit         = nxt_signal_quit_handler,
124     .rpc_ready    = nxt_port_rpc_handler,
125     .rpc_error    = nxt_port_rpc_handler,
126 };
127 
128 
129 const nxt_process_init_t  nxt_discovery_process = {
130     .name           = "discovery",
131     .type           = NXT_PROCESS_DISCOVERY,
132     .prefork        = NULL,
133     .restart        = 0,
134     .setup          = nxt_process_core_setup,
135     .start          = nxt_discovery_start,
136     .port_handlers  = &nxt_discovery_process_port_handlers,
137     .signals        = nxt_process_signals,
138 };
139 
140 
141 const nxt_process_init_t  nxt_proto_process = {
142     .type           = NXT_PROCESS_PROTOTYPE,
143     .prefork        = nxt_isolation_main_prefork,
144     .restart        = 0,
145     .setup          = nxt_proto_setup,
146     .start          = nxt_proto_start,
147     .port_handlers  = &nxt_proto_process_port_handlers,
148     .signals        = nxt_prototype_signals,
149 };
150 
151 
152 const nxt_process_init_t  nxt_app_process = {
153     .type           = NXT_PROCESS_APP,
154     .setup          = nxt_app_setup,
155     .start          = NULL,
156     .prefork        = NULL,
157     .restart        = 0,
158     .port_handlers  = &nxt_app_process_port_handlers,
159     .signals        = nxt_process_signals,
160 };
161 
162 
163 static nxt_int_t
nxt_discovery_start(nxt_task_t * task,nxt_process_data_t * data)164 nxt_discovery_start(nxt_task_t *task, nxt_process_data_t *data)
165 {
166     uint32_t       stream;
167     nxt_buf_t      *b;
168     nxt_int_t      ret;
169     nxt_port_t     *main_port, *discovery_port;
170     nxt_runtime_t  *rt;
171 
172     nxt_log(task, NXT_LOG_INFO, "discovery started");
173 
174     rt = task->thread->runtime;
175 
176     b = nxt_discovery_modules(task, rt->modules);
177     if (nxt_slow_path(b == NULL)) {
178         return NXT_ERROR;
179     }
180 
181     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
182     discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY];
183 
184     stream = nxt_port_rpc_register_handler(task, discovery_port,
185                                            nxt_discovery_quit,
186                                            nxt_discovery_quit,
187                                            main_port->pid, NULL);
188 
189     if (nxt_slow_path(stream == 0)) {
190         return NXT_ERROR;
191     }
192 
193     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
194                                 stream, discovery_port->id, b);
195 
196     if (nxt_slow_path(ret != NXT_OK)) {
197         nxt_port_rpc_cancel(task, discovery_port, stream);
198         return NXT_ERROR;
199     }
200 
201     return NXT_OK;
202 }
203 
204 
205 static nxt_buf_t *
nxt_discovery_modules(nxt_task_t * task,const char * path)206 nxt_discovery_modules(nxt_task_t *task, const char *path)
207 {
208     char            *name;
209     u_char          *p, *end;
210     size_t          size;
211     glob_t          glb;
212     nxt_mp_t        *mp;
213     nxt_buf_t       *b;
214     nxt_int_t       ret;
215     nxt_uint_t      i, n, j;
216     nxt_array_t     *modules, *mounts;
217     nxt_module_t    *module;
218     nxt_fs_mount_t  *mnt;
219 
220     b = NULL;
221 
222     mp = nxt_mp_create(1024, 128, 256, 32);
223     if (mp == NULL) {
224         return b;
225     }
226 
227     ret = glob(path, 0, NULL, &glb);
228 
229     n = glb.gl_pathc;
230 
231     if (ret != 0) {
232         nxt_log(task, NXT_LOG_NOTICE,
233                 "no modules matching: \"%s\" found", path);
234         n = 0;
235     }
236 
237     modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
238     if (modules == NULL) {
239         goto fail;
240     }
241 
242     for (i = 0; i < n; i++) {
243         name = glb.gl_pathv[i];
244 
245         ret = nxt_discovery_module(task, mp, modules, name);
246         if (ret != NXT_OK) {
247             goto fail;
248         }
249     }
250 
251     size = nxt_length("[]");
252     module = modules->elts;
253     n = modules->nelts;
254 
255     for (i = 0; i < n; i++) {
256         nxt_debug(task, "module: %d %V %V",
257                   module[i].type, &module[i].version, &module[i].file);
258 
259         size += nxt_length("{\"type\": ,");
260         size += nxt_length(" \"version\": \"\",");
261         size += nxt_length(" \"file\": \"\",");
262         size += nxt_length(" \"mounts\": []},");
263 
264         size += NXT_INT_T_LEN
265                 + module[i].version.length
266                 + module[i].file.length;
267 
268         mounts = module[i].mounts;
269 
270         size += mounts->nelts * nxt_length("{\"src\": \"\", \"dst\": \"\", "
271                                             "\"type\": , \"name\": \"\", "
272                                             "\"flags\": , \"data\": \"\"},");
273 
274         mnt = mounts->elts;
275 
276         for (j = 0; j < mounts->nelts; j++) {
277             size += nxt_strlen(mnt[j].src) + nxt_strlen(mnt[j].dst)
278                     + nxt_strlen(mnt[j].name) + (2 * NXT_INT_T_LEN)
279                     + (mnt[j].data == NULL ? 0 : nxt_strlen(mnt[j].data));
280         }
281     }
282 
283     b = nxt_buf_mem_alloc(mp, size, 0);
284     if (b == NULL) {
285         goto fail;
286     }
287 
288     b->completion_handler = nxt_discovery_completion_handler;
289 
290     p = b->mem.free;
291     end = b->mem.end;
292     *p++ = '[';
293 
294     for (i = 0; i < n; i++) {
295         mounts = module[i].mounts;
296 
297         p = nxt_sprintf(p, end, "{\"type\": %d, \"version\": \"%V\", "
298                         "\"file\": \"%V\", \"mounts\": [",
299                         module[i].type, &module[i].version, &module[i].file);
300 
301         mnt = mounts->elts;
302         for (j = 0; j < mounts->nelts; j++) {
303             p = nxt_sprintf(p, end,
304                             "{\"src\": \"%s\", \"dst\": \"%s\", "
305                             "\"name\": \"%s\", \"type\": %d, \"flags\": %d, "
306                             "\"data\": \"%s\"},",
307                             mnt[j].src, mnt[j].dst, mnt[j].name, mnt[j].type,
308                             mnt[j].flags,
309                             mnt[j].data == NULL ? (u_char *) "" : mnt[j].data);
310         }
311 
312         *p++ = ']';
313         *p++ = '}';
314         *p++ = ',';
315     }
316 
317     *p++ = ']';
318 
319     if (nxt_slow_path(p > end)) {
320         nxt_alert(task, "discovery write past the buffer");
321         goto fail;
322     }
323 
324     b->mem.free = p;
325 
326 fail:
327 
328     globfree(&glb);
329 
330     return b;
331 }
332 
333 
334 static nxt_int_t
nxt_discovery_module(nxt_task_t * task,nxt_mp_t * mp,nxt_array_t * modules,const char * name)335 nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
336     const char *name)
337 {
338     void                  *dl;
339     nxt_str_t             version;
340     nxt_int_t             ret;
341     nxt_uint_t            i, j, n;
342     nxt_array_t           *mounts;
343     nxt_module_t          *module;
344     nxt_app_type_t        type;
345     nxt_fs_mount_t        *to;
346     nxt_app_module_t      *app;
347     const nxt_fs_mount_t  *from;
348 
349     /*
350      * Only memory allocation failure should return NXT_ERROR.
351      * Any module processing errors are ignored.
352      */
353     ret = NXT_ERROR;
354 
355     dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
356 
357     if (dl == NULL) {
358         nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
359         return NXT_OK;
360     }
361 
362     app = dlsym(dl, "nxt_app_module");
363 
364     if (app != NULL) {
365         nxt_log(task, NXT_LOG_NOTICE, "module: %V %s \"%s\"",
366                 &app->type, app->version, name);
367 
368         if (app->compat_length != sizeof(compat)
369             || memcmp(app->compat, compat, sizeof(compat)) != 0)
370         {
371             nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name);
372 
373             goto done;
374         }
375 
376         type = nxt_app_parse_type(app->type.start, app->type.length);
377 
378         if (type == NXT_APP_UNKNOWN) {
379             nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type);
380 
381             goto done;
382         }
383 
384         module = modules->elts;
385         n = modules->nelts;
386 
387         version.start = (u_char *) app->version;
388         version.length = nxt_strlen(app->version);
389 
390         for (i = 0; i < n; i++) {
391             if (type == module[i].type
392                 && nxt_strstr_eq(&module[i].version, &version))
393             {
394                 nxt_log(task, NXT_LOG_NOTICE,
395                         "ignoring %s module with the same "
396                         "application language version %V %V as in %V",
397                         name, &app->type, &version, &module[i].file);
398 
399                 goto done;
400             }
401         }
402 
403         module = nxt_array_add(modules);
404         if (module == NULL) {
405             goto fail;
406         }
407 
408         module->type = type;
409 
410         nxt_str_dup(mp, &module->version, &version);
411         if (module->version.start == NULL) {
412             goto fail;
413         }
414 
415         module->file.length = nxt_strlen(name);
416 
417         module->file.start = nxt_mp_alloc(mp, module->file.length);
418         if (module->file.start == NULL) {
419             goto fail;
420         }
421 
422         nxt_memcpy(module->file.start, name, module->file.length);
423 
424         module->mounts = nxt_array_create(mp, app->nmounts,
425                                           sizeof(nxt_fs_mount_t));
426 
427         if (nxt_slow_path(module->mounts == NULL)) {
428             goto fail;
429         }
430 
431         mounts = module->mounts;
432 
433         for (j = 0; j < app->nmounts; j++) {
434             from = &app->mounts[j];
435             to = nxt_array_zero_add(mounts);
436             if (nxt_slow_path(to == NULL)) {
437                 goto fail;
438             }
439 
440             to->src = nxt_cstr_dup(mp, to->src, from->src);
441             if (nxt_slow_path(to->src == NULL)) {
442                 goto fail;
443             }
444 
445             to->dst = nxt_cstr_dup(mp, to->dst, from->dst);
446             if (nxt_slow_path(to->dst == NULL)) {
447                 goto fail;
448             }
449 
450             to->name = nxt_cstr_dup(mp, to->name, from->name);
451             if (nxt_slow_path(to->name == NULL)) {
452                 goto fail;
453             }
454 
455             to->type = from->type;
456 
457             if (from->data != NULL) {
458                 to->data = nxt_cstr_dup(mp, to->data, from->data);
459                 if (nxt_slow_path(to->data == NULL)) {
460                     goto fail;
461                 }
462             }
463 
464             to->flags = from->flags;
465         }
466 
467     } else {
468         nxt_alert(task, "dlsym(\"%s\"), failed: \"%s\"", name, dlerror());
469     }
470 
471 done:
472 
473     ret = NXT_OK;
474 
475 fail:
476 
477     if (dlclose(dl) != 0) {
478         nxt_alert(task, "dlclose(\"%s\"), failed: \"%s\"", name, dlerror());
479     }
480 
481     return ret;
482 }
483 
484 
485 static void
nxt_discovery_completion_handler(nxt_task_t * task,void * obj,void * data)486 nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
487 {
488     nxt_mp_t   *mp;
489     nxt_buf_t  *b;
490 
491     b = obj;
492     mp = b->data;
493 
494     nxt_mp_destroy(mp);
495 }
496 
497 
498 static void
nxt_discovery_quit(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)499 nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
500 {
501     nxt_signal_quit_handler(task, msg);
502 }
503 
504 
505 static nxt_int_t
nxt_proto_setup(nxt_task_t * task,nxt_process_t * process)506 nxt_proto_setup(nxt_task_t *task, nxt_process_t *process)
507 {
508     nxt_int_t              ret;
509     nxt_app_lang_module_t  *lang;
510     nxt_common_app_conf_t  *app_conf;
511 
512     app_conf = process->data.app;
513 
514     nxt_queue_init(&nxt_proto_children);
515 
516     nxt_app_conf = app_conf;
517 
518     lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
519     if (nxt_slow_path(lang == NULL)) {
520         nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type);
521         return NXT_ERROR;
522     }
523 
524     nxt_app = lang->module;
525 
526     if (nxt_app == NULL) {
527         nxt_debug(task, "application language module: %s \"%s\"",
528                   lang->version, lang->file);
529 
530         nxt_app = nxt_app_module_load(task, lang->file);
531         if (nxt_slow_path(nxt_app == NULL)) {
532             return NXT_ERROR;
533         }
534     }
535 
536     if (nxt_slow_path(nxt_app_set_environment(app_conf->environment)
537                       != NXT_OK))
538     {
539         nxt_alert(task, "failed to set environment");
540         return NXT_ERROR;
541     }
542 
543     if (nxt_app->setup != NULL) {
544         ret = nxt_app->setup(task, process, app_conf);
545         if (nxt_slow_path(ret != NXT_OK)) {
546             return ret;
547         }
548     }
549 
550 #if (NXT_HAVE_ISOLATION_ROOTFS)
551     if (process->isolation.rootfs != NULL) {
552         if (process->isolation.mounts != NULL) {
553             ret = nxt_isolation_prepare_rootfs(task, process);
554             if (nxt_slow_path(ret != NXT_OK)) {
555                 return ret;
556             }
557         }
558 
559         ret = nxt_isolation_change_root(task, process);
560         if (nxt_slow_path(ret != NXT_OK)) {
561             return NXT_ERROR;
562         }
563     }
564 #endif
565 
566     if (app_conf->working_directory != NULL
567         && app_conf->working_directory[0] != 0)
568     {
569         ret = chdir(app_conf->working_directory);
570 
571         if (nxt_slow_path(ret != 0)) {
572             nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E",
573                     app_conf->working_directory, nxt_errno);
574 
575             return NXT_ERROR;
576         }
577     }
578 
579     process->state = NXT_PROCESS_STATE_CREATED;
580 
581     return NXT_OK;
582 }
583 
584 
585 static nxt_int_t
nxt_proto_start(nxt_task_t * task,nxt_process_data_t * data)586 nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data)
587 {
588     nxt_debug(task, "prototype waiting for clone messages");
589 
590     return NXT_OK;
591 }
592 
593 
594 static void
nxt_proto_start_process_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)595 nxt_proto_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
596 {
597     u_char              *p;
598     nxt_int_t           ret;
599     nxt_port_t          *port;
600     nxt_runtime_t       *rt;
601     nxt_process_t       *process;
602     nxt_process_init_t  *init;
603 
604     rt = task->thread->runtime;
605 
606     process = nxt_process_new(rt);
607     if (nxt_slow_path(process == NULL)) {
608         goto failed;
609     }
610 
611     process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
612     if (nxt_slow_path(process->mem_pool == NULL)) {
613         nxt_process_use(task, process, -1);
614         goto failed;
615     }
616 
617     process->parent_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE];
618 
619     init = nxt_process_init(process);
620     *init = nxt_app_process;
621 
622     process->name = nxt_mp_alloc(process->mem_pool, nxt_app_conf->name.length
623                                  + sizeof("\"\" application") + 1);
624 
625     if (nxt_slow_path(process->name == NULL)) {
626         nxt_process_use(task, process, -1);
627 
628         goto failed;
629     }
630 
631     init->start = nxt_app->start;
632 
633     init->name = (const char *) nxt_app_conf->name.start;
634 
635     p = (u_char *) process->name;
636     *p++ = '"';
637     p = nxt_cpymem(p, nxt_app_conf->name.start, nxt_app_conf->name.length);
638     p = nxt_cpymem(p, "\" application", 13);
639     *p = '\0';
640 
641     process->user_cred = &rt->user_cred;
642 
643     process->data.app = nxt_app_conf;
644     process->stream = msg->port_msg.stream;
645 
646     init->siblings = &nxt_proto_children;
647 
648     ret = nxt_process_start(task, process);
649     if (nxt_slow_path(ret == NXT_ERROR)) {
650         nxt_process_use(task, process, -1);
651 
652         goto failed;
653     }
654 
655     nxt_proto_process_add(task, process);
656 
657     return;
658 
659 failed:
660 
661     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
662                                  msg->port_msg.reply_port);
663 
664     if (nxt_fast_path(port != NULL)) {
665         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
666                               -1, msg->port_msg.stream, 0, NULL);
667     }
668 }
669 
670 
671 static void
nxt_proto_quit_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)672 nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
673 {
674     nxt_debug(task, "prototype quit handler");
675 
676     nxt_proto_quit_children(task);
677 
678     nxt_proto_exiting = 1;
679 
680     if (nxt_queue_is_empty(&nxt_proto_children)) {
681         nxt_process_quit(task, 0);
682     }
683 }
684 
685 
686 static void
nxt_proto_quit_children(nxt_task_t * task)687 nxt_proto_quit_children(nxt_task_t *task)
688 {
689     nxt_port_t     *port;
690     nxt_process_t  *process;
691 
692     nxt_queue_each(process, &nxt_proto_children, nxt_process_t, link) {
693         port = nxt_process_port_first(process);
694 
695         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
696                                      -1, 0, 0, NULL);
697     }
698     nxt_queue_loop;
699 }
700 
701 
702 static void
nxt_proto_process_created_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)703 nxt_proto_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
704 {
705     nxt_pid_t      isolated_pid, pid;
706     nxt_process_t  *process;
707 
708     isolated_pid = nxt_recv_msg_cmsg_pid(msg);
709 
710     process = nxt_proto_process_find(task, isolated_pid);
711     if (nxt_slow_path(process == NULL)) {
712         return;
713     }
714 
715     process->state = NXT_PROCESS_STATE_CREATED;
716 
717     pid = msg->port_msg.pid;
718 
719     if (process->pid != pid) {
720         nxt_debug(task, "app process %PI (aka %PI) is created", isolated_pid,
721                   pid);
722 
723         process->pid = pid;
724 
725     } else {
726         nxt_debug(task, "app process %PI is created", isolated_pid);
727     }
728 
729     if (!process->registered) {
730         nxt_assert(!nxt_queue_is_empty(&process->ports));
731 
732         nxt_runtime_process_add(task, process);
733 
734         nxt_port_use(task, nxt_process_port_first(process), -1);
735     }
736 }
737 
738 
739 static void
nxt_proto_signal_handler(nxt_task_t * task,void * obj,void * data)740 nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data)
741 {
742     nxt_trace(task, "signal signo:%d (%s) received, ignored",
743               (int) (uintptr_t) obj, data);
744 }
745 
746 
747 static void
nxt_proto_sigterm_handler(nxt_task_t * task,void * obj,void * data)748 nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data)
749 {
750     nxt_trace(task, "signal signo:%d (%s) received",
751               (int) (uintptr_t) obj, data);
752 
753     nxt_proto_quit_children(task);
754 
755     nxt_proto_exiting = 1;
756 
757     if (nxt_queue_is_empty(&nxt_proto_children)) {
758         nxt_process_quit(task, 0);
759     }
760 }
761 
762 
763 static void
nxt_proto_sigchld_handler(nxt_task_t * task,void * obj,void * data)764 nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data)
765 {
766     int            status;
767     nxt_err_t      err;
768     nxt_pid_t      pid;
769     nxt_port_t     *port;
770     nxt_process_t  *process;
771     nxt_runtime_t  *rt;
772 
773     rt = task->thread->runtime;
774 
775     nxt_debug(task, "proto sigchld handler signo:%d (%s)",
776               (int) (uintptr_t) obj, data);
777 
778     for ( ;; ) {
779         pid = waitpid(-1, &status, WNOHANG);
780 
781         if (pid == -1) {
782 
783             switch (err = nxt_errno) {
784 
785             case NXT_ECHILD:
786                 return;
787 
788             case NXT_EINTR:
789                 continue;
790 
791             default:
792                 nxt_alert(task, "waitpid() failed: %E", err);
793                 return;
794             }
795         }
796 
797         nxt_debug(task, "waitpid(): %PI", pid);
798 
799         if (pid == 0) {
800             return;
801         }
802 
803         process = nxt_proto_process_remove(task, pid);
804 
805         if (WTERMSIG(status)) {
806             if (rt->is_pid_isolated) {
807                 nxt_alert(task, "app process %PI (isolated %PI) "
808                                 "exited on signal %d%s",
809                           process != NULL ? process->pid : 0,
810                           pid, WTERMSIG(status),
811                           NXT_WCOREDUMP(status) ? " (core dumped)" : "");
812 
813             } else {
814                 nxt_alert(task, "app process %PI exited on signal %d%s",
815                           pid, WTERMSIG(status),
816                           NXT_WCOREDUMP(status) ? " (core dumped)" : "");
817             }
818 
819         } else {
820             if (rt->is_pid_isolated) {
821                 nxt_trace(task, "app process %PI (isolated %PI) "
822                                 "exited with code %d",
823                           process != NULL ? process->pid : 0,
824                           pid, WEXITSTATUS(status));
825 
826             } else {
827                 nxt_trace(task, "app process %PI exited with code %d",
828                           pid, WEXITSTATUS(status));
829             }
830         }
831 
832         if (process == NULL) {
833             continue;
834         }
835 
836         if (process->registered) {
837             port = NULL;
838 
839         } else {
840             nxt_assert(!nxt_queue_is_empty(&process->ports));
841 
842             port = nxt_process_port_first(process);
843         }
844 
845         if (process->state != NXT_PROCESS_STATE_CREATING) {
846             nxt_port_remove_notify_others(task, process);
847         }
848 
849         nxt_process_close_ports(task, process);
850 
851         if (port != NULL) {
852             nxt_port_use(task, port, -1);
853         }
854 
855         if (nxt_proto_exiting && nxt_queue_is_empty(&nxt_proto_children)) {
856             nxt_process_quit(task, 0);
857             return;
858         }
859     }
860 }
861 
862 
863 static nxt_app_module_t *
nxt_app_module_load(nxt_task_t * task,const char * name)864 nxt_app_module_load(nxt_task_t *task, const char *name)
865 {
866     char              *err;
867     void              *dl;
868     nxt_app_module_t  *app;
869 
870     dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
871 
872     if (nxt_slow_path(dl == NULL)) {
873         err = dlerror();
874         nxt_alert(task, "dlopen(\"%s\") failed: \"%s\"",
875                   name, err != NULL ? err : "(null)");
876         return NULL;
877     }
878 
879     app = dlsym(dl, "nxt_app_module");
880 
881     if (nxt_slow_path(app == NULL)) {
882         err = dlerror();
883         nxt_alert(task, "dlsym(\"%s\", \"nxt_app_module\") failed: \"%s\"",
884                   name, err != NULL ? err : "(null)");
885 
886         if (dlclose(dl) != 0) {
887             err = dlerror();
888             nxt_alert(task, "dlclose(\"%s\") failed: \"%s\"",
889                       name, err != NULL ? err : "(null)");
890         }
891     }
892 
893     return app;
894 }
895 
896 
897 static nxt_int_t
nxt_app_set_environment(nxt_conf_value_t * environment)898 nxt_app_set_environment(nxt_conf_value_t *environment)
899 {
900     char              *env, *p;
901     uint32_t          next;
902     nxt_str_t         name, value;
903     nxt_conf_value_t  *value_obj;
904 
905     if (environment != NULL) {
906         next = 0;
907 
908         for ( ;; ) {
909             value_obj = nxt_conf_next_object_member(environment, &name, &next);
910             if (value_obj == NULL) {
911                 break;
912             }
913 
914             nxt_conf_get_string(value_obj, &value);
915 
916             env = nxt_malloc(name.length + value.length + 2);
917             if (nxt_slow_path(env == NULL)) {
918                 return NXT_ERROR;
919             }
920 
921             p = nxt_cpymem(env, name.start, name.length);
922             *p++ = '=';
923             p = nxt_cpymem(p, value.start, value.length);
924             *p = '\0';
925 
926             if (nxt_slow_path(putenv(env) != 0)) {
927                 return NXT_ERROR;
928             }
929         }
930     }
931 
932     return NXT_OK;
933 }
934 
935 
936 static u_char *
nxt_cstr_dup(nxt_mp_t * mp,u_char * dst,u_char * src)937 nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src)
938 {
939     u_char  *p;
940     size_t  len;
941 
942     len = nxt_strlen(src);
943 
944     if (dst == NULL) {
945         dst = nxt_mp_alloc(mp, len + 1);
946         if (nxt_slow_path(dst == NULL)) {
947             return NULL;
948         }
949     }
950 
951     p = nxt_cpymem(dst, src, len);
952     *p = '\0';
953 
954     return dst;
955 }
956 
957 
958 static nxt_int_t
nxt_app_setup(nxt_task_t * task,nxt_process_t * process)959 nxt_app_setup(nxt_task_t *task, nxt_process_t *process)
960 {
961     nxt_process_init_t  *init;
962 
963     process->state = NXT_PROCESS_STATE_CREATED;
964 
965     init = nxt_process_init(process);
966 
967     return init->start(task, &process->data);
968 }
969 
970 
971 nxt_app_lang_module_t *
nxt_app_lang_module(nxt_runtime_t * rt,nxt_str_t * name)972 nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
973 {
974     u_char                 *p, *end, *version;
975     size_t                 version_length;
976     nxt_uint_t             i, n;
977     nxt_app_type_t         type;
978     nxt_app_lang_module_t  *lang;
979 
980     end = name->start + name->length;
981     version = end;
982 
983     for (p = name->start; p < end; p++) {
984         if (*p == ' ') {
985             version = p + 1;
986             break;
987         }
988 
989         if (*p >= '0' && *p <= '9') {
990             version = p;
991             break;
992         }
993     }
994 
995     type = nxt_app_parse_type(name->start, p - name->start);
996 
997     if (type == NXT_APP_UNKNOWN) {
998         return NULL;
999     }
1000 
1001     version_length = end - version;
1002 
1003     lang = rt->languages->elts;
1004     n = rt->languages->nelts;
1005 
1006     for (i = 0; i < n; i++) {
1007 
1008         /*
1009          * Versions are sorted in descending order
1010          * so first match chooses the highest version.
1011          */
1012 
1013         if (lang[i].type == type
1014             && nxt_strvers_match(lang[i].version, version, version_length))
1015         {
1016             return &lang[i];
1017         }
1018     }
1019 
1020     return NULL;
1021 }
1022 
1023 
1024 nxt_app_type_t
nxt_app_parse_type(u_char * p,size_t length)1025 nxt_app_parse_type(u_char *p, size_t length)
1026 {
1027     nxt_str_t str;
1028 
1029     str.length = length;
1030     str.start = p;
1031 
1032     if (nxt_str_eq(&str, "external", 8) || nxt_str_eq(&str, "go", 2)) {
1033         return NXT_APP_EXTERNAL;
1034 
1035     } else if (nxt_str_eq(&str, "python", 6)) {
1036         return NXT_APP_PYTHON;
1037 
1038     } else if (nxt_str_eq(&str, "php", 3)) {
1039         return NXT_APP_PHP;
1040 
1041     } else if (nxt_str_eq(&str, "perl", 4)) {
1042         return NXT_APP_PERL;
1043 
1044     } else if (nxt_str_eq(&str, "ruby", 4)) {
1045         return NXT_APP_RUBY;
1046 
1047     } else if (nxt_str_eq(&str, "java", 4)) {
1048         return NXT_APP_JAVA;
1049     }
1050 
1051     return NXT_APP_UNKNOWN;
1052 }
1053 
1054 
1055 nxt_int_t
nxt_unit_default_init(nxt_task_t * task,nxt_unit_init_t * init,nxt_common_app_conf_t * conf)1056 nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init,
1057     nxt_common_app_conf_t *conf)
1058 {
1059     nxt_port_t     *my_port, *proto_port, *router_port;
1060     nxt_runtime_t  *rt;
1061 
1062     nxt_memzero(init, sizeof(nxt_unit_init_t));
1063 
1064     rt = task->thread->runtime;
1065 
1066     proto_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE];
1067     if (nxt_slow_path(proto_port == NULL)) {
1068         return NXT_ERROR;
1069     }
1070 
1071     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1072     if (nxt_slow_path(router_port == NULL)) {
1073         return NXT_ERROR;
1074     }
1075 
1076     my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
1077     if (nxt_slow_path(my_port == NULL)) {
1078         return NXT_ERROR;
1079     }
1080 
1081     init->ready_port.id.pid = proto_port->pid;
1082     init->ready_port.id.id = proto_port->id;
1083     init->ready_port.in_fd = -1;
1084     init->ready_port.out_fd = proto_port->pair[1];
1085 
1086     init->ready_stream = my_port->process->stream;
1087 
1088     init->router_port.id.pid = router_port->pid;
1089     init->router_port.id.id = router_port->id;
1090     init->router_port.in_fd = -1;
1091     init->router_port.out_fd = router_port->pair[1];
1092 
1093     init->read_port.id.pid = my_port->pid;
1094     init->read_port.id.id = my_port->id;
1095     init->read_port.in_fd = my_port->pair[0];
1096     init->read_port.out_fd = my_port->pair[1];
1097 
1098     init->shared_port_fd = conf->shared_port_fd;
1099     init->shared_queue_fd = conf->shared_queue_fd;
1100 
1101     init->log_fd = 2;
1102 
1103     init->shm_limit = conf->shm_limit;
1104     init->request_limit = conf->request_limit;
1105 
1106     return NXT_OK;
1107 }
1108 
1109 
1110 static nxt_int_t
nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t * lhq,void * data)1111 nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1112 {
1113     nxt_pid_t      *qpid;
1114     nxt_process_t  *process;
1115 
1116     process = data;
1117     qpid = (nxt_pid_t *) lhq->key.start;
1118 
1119     if (*qpid == process->isolated_pid) {
1120         return NXT_OK;
1121     }
1122 
1123     return NXT_DECLINED;
1124 }
1125 
1126 
1127 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
1128     NXT_LVLHSH_DEFAULT,
1129     nxt_proto_lvlhsh_isolated_pid_test,
1130     nxt_lvlhsh_alloc,
1131     nxt_lvlhsh_free,
1132 };
1133 
1134 
1135 nxt_inline void
nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t * lhq,nxt_pid_t * pid)1136 nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1137 {
1138     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(nxt_pid_t));
1139     lhq->key.length = sizeof(nxt_pid_t);
1140     lhq->key.start = (u_char *) pid;
1141     lhq->proto = &lvlhsh_processes_proto;
1142 }
1143 
1144 
1145 static void
nxt_proto_process_add(nxt_task_t * task,nxt_process_t * process)1146 nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process)
1147 {
1148     nxt_runtime_t       *rt;
1149     nxt_lvlhsh_query_t  lhq;
1150 
1151     rt = task->thread->runtime;
1152 
1153     nxt_proto_process_lhq_pid(&lhq, &process->isolated_pid);
1154 
1155     lhq.replace = 0;
1156     lhq.value = process;
1157     lhq.pool = rt->mem_pool;
1158 
1159     switch (nxt_lvlhsh_insert(&nxt_proto_processes, &lhq)) {
1160 
1161     case NXT_OK:
1162         nxt_debug(task, "process (isolated %PI) added", process->isolated_pid);
1163 
1164         nxt_queue_insert_tail(&nxt_proto_children, &process->link);
1165         break;
1166 
1167     default:
1168         nxt_alert(task, "process (isolated %PI) failed to add",
1169                   process->isolated_pid);
1170         break;
1171     }
1172 }
1173 
1174 
1175 static nxt_process_t *
nxt_proto_process_remove(nxt_task_t * task,nxt_pid_t pid)1176 nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid)
1177 {
1178     nxt_runtime_t       *rt;
1179     nxt_process_t       *process;
1180     nxt_lvlhsh_query_t  lhq;
1181 
1182     nxt_proto_process_lhq_pid(&lhq, &pid);
1183 
1184     rt = task->thread->runtime;
1185 
1186     lhq.pool = rt->mem_pool;
1187 
1188     switch (nxt_lvlhsh_delete(&nxt_proto_processes, &lhq)) {
1189 
1190     case NXT_OK:
1191         nxt_debug(task, "process (isolated %PI) removed", pid);
1192 
1193         process = lhq.value;
1194 
1195         nxt_queue_remove(&process->link);
1196         process->link.next = NULL;
1197 
1198         break;
1199 
1200     default:
1201         nxt_debug(task, "process (isolated %PI) remove failed", pid);
1202         process = NULL;
1203         break;
1204     }
1205 
1206     return process;
1207 }
1208 
1209 
1210 static nxt_process_t *
nxt_proto_process_find(nxt_task_t * task,nxt_pid_t pid)1211 nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid)
1212 {
1213     nxt_process_t       *process;
1214     nxt_lvlhsh_query_t  lhq;
1215 
1216     nxt_proto_process_lhq_pid(&lhq, &pid);
1217 
1218     if (nxt_lvlhsh_find(&nxt_proto_processes, &lhq) == NXT_OK) {
1219         process = lhq.value;
1220 
1221     } else {
1222         nxt_debug(task, "process (isolated %PI) not found", pid);
1223 
1224         process = NULL;
1225     }
1226 
1227     return process;
1228 }
1229