xref: /unit/src/nxt_application.c (revision 2686:87259ed41698)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) Igor Sysoev
5  * Copyright (C) Valentin V. Bartenev
6  * Copyright (C) NGINX, Inc.
7  */
8 
9 #include <nxt_main.h>
10 #include <nxt_runtime.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_http.h>
14 #include <nxt_application.h>
15 #include <nxt_unit.h>
16 #include <nxt_port_memory_int.h>
17 #include <nxt_isolation.h>
18 
19 #include <glob.h>
20 
21 #if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
22 #include <sys/prctl.h>
23 #endif
24 
25 
26 #ifdef WCOREDUMP
27 #define NXT_WCOREDUMP(s) WCOREDUMP(s)
28 #else
29 #define NXT_WCOREDUMP(s) 0
30 #endif
31 
32 
33 typedef struct {
34     nxt_app_type_t  type;
35     nxt_str_t       version;
36     nxt_str_t       file;
37     nxt_array_t     *mounts;
38 } nxt_module_t;
39 
40 
41 static nxt_int_t nxt_discovery_start(nxt_task_t *task,
42     nxt_process_data_t *data);
43 static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
44 static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
45     nxt_array_t *modules, const char *name);
46 static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj,
47     void *data);
48 static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
49     void *data);
50 static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
51     const char *name);
52 static nxt_int_t nxt_proto_setup(nxt_task_t *task, nxt_process_t *process);
53 static nxt_int_t nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data);
54 static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process);
55 static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
56 static void nxt_proto_start_process_handler(nxt_task_t *task,
57     nxt_port_recv_msg_t *msg);
58 static void nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
59 static void nxt_proto_process_created_handler(nxt_task_t *task,
60     nxt_port_recv_msg_t *msg);
61 static void nxt_proto_quit_children(nxt_task_t *task);
62 static nxt_process_t *nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid);
63 static void nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process);
64 static nxt_process_t *nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid);
65 static u_char *nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src);
66 static void nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data);
67 static void nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data);
68 static void nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data);
69 
70 
71 nxt_str_t  nxt_server = nxt_string(NXT_SERVER);
72 
73 
74 static uint32_t  compat[] = {
75     NXT_VERNUM, NXT_DEBUG,
76 };
77 
78 
79 static nxt_lvlhsh_t           nxt_proto_processes;
80 static nxt_queue_t            nxt_proto_children;
81 static nxt_bool_t             nxt_proto_exiting;
82 
83 static nxt_app_module_t       *nxt_app;
84 static nxt_common_app_conf_t  *nxt_app_conf;
85 
86 
87 static const nxt_port_handlers_t  nxt_discovery_process_port_handlers = {
88     .quit         = nxt_signal_quit_handler,
89     .new_port     = nxt_port_new_port_handler,
90     .change_file  = nxt_port_change_log_file_handler,
91     .mmap         = nxt_port_mmap_handler,
92     .data         = nxt_port_data_handler,
93     .remove_pid   = nxt_port_remove_pid_handler,
94     .rpc_ready    = nxt_port_rpc_handler,
95     .rpc_error    = nxt_port_rpc_handler,
96 };
97 
98 
99 const nxt_sig_event_t  nxt_prototype_signals[] = {
100     nxt_event_signal(SIGHUP,  nxt_proto_signal_handler),
101     nxt_event_signal(SIGINT,  nxt_proto_sigterm_handler),
102     nxt_event_signal(SIGQUIT, nxt_proto_sigterm_handler),
103     nxt_event_signal(SIGTERM, nxt_proto_sigterm_handler),
104     nxt_event_signal(SIGCHLD, nxt_proto_sigchld_handler),
105     nxt_event_signal_end,
106 };
107 
108 
109 static const nxt_port_handlers_t  nxt_proto_process_port_handlers = {
110     .quit            = nxt_proto_quit_handler,
111     .change_file     = nxt_port_change_log_file_handler,
112     .new_port        = nxt_port_new_port_handler,
113     .process_created = nxt_proto_process_created_handler,
114     .process_ready   = nxt_port_process_ready_handler,
115     .remove_pid      = nxt_port_remove_pid_handler,
116     .start_process   = nxt_proto_start_process_handler,
117     .rpc_ready       = nxt_port_rpc_handler,
118     .rpc_error       = nxt_port_rpc_handler,
119 };
120 
121 
122 static const nxt_port_handlers_t  nxt_app_process_port_handlers = {
123     .quit         = nxt_signal_quit_handler,
124     .rpc_ready    = nxt_port_rpc_handler,
125     .rpc_error    = nxt_port_rpc_handler,
126 };
127 
128 
129 const nxt_process_init_t  nxt_discovery_process = {
130     .name           = "discovery",
131     .type           = NXT_PROCESS_DISCOVERY,
132     .prefork        = NULL,
133     .restart        = 0,
134     .setup          = nxt_process_core_setup,
135     .start          = nxt_discovery_start,
136     .port_handlers  = &nxt_discovery_process_port_handlers,
137     .signals        = nxt_process_signals,
138 };
139 
140 
141 const nxt_process_init_t  nxt_proto_process = {
142     .type           = NXT_PROCESS_PROTOTYPE,
143     .prefork        = nxt_isolation_main_prefork,
144     .restart        = 0,
145     .setup          = nxt_proto_setup,
146     .start          = nxt_proto_start,
147     .port_handlers  = &nxt_proto_process_port_handlers,
148     .signals        = nxt_prototype_signals,
149 };
150 
151 
152 const nxt_process_init_t  nxt_app_process = {
153     .type           = NXT_PROCESS_APP,
154     .setup          = nxt_app_setup,
155     .start          = NULL,
156     .prefork        = NULL,
157     .restart        = 0,
158     .port_handlers  = &nxt_app_process_port_handlers,
159     .signals        = nxt_process_signals,
160 };
161 
162 
163 static nxt_int_t
nxt_discovery_start(nxt_task_t * task,nxt_process_data_t * data)164 nxt_discovery_start(nxt_task_t *task, nxt_process_data_t *data)
165 {
166     uint32_t       stream;
167     nxt_buf_t      *b;
168     nxt_int_t      ret;
169     nxt_port_t     *main_port, *discovery_port;
170     nxt_runtime_t  *rt;
171 
172     nxt_log(task, NXT_LOG_INFO, "discovery started");
173 
174     rt = task->thread->runtime;
175 
176     b = nxt_discovery_modules(task, rt->modules);
177     if (nxt_slow_path(b == NULL)) {
178         return NXT_ERROR;
179     }
180 
181     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
182     discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY];
183 
184     stream = nxt_port_rpc_register_handler(task, discovery_port,
185                                            nxt_discovery_quit,
186                                            nxt_discovery_quit,
187                                            main_port->pid, NULL);
188 
189     if (nxt_slow_path(stream == 0)) {
190         return NXT_ERROR;
191     }
192 
193     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
194                                 stream, discovery_port->id, b);
195 
196     if (nxt_slow_path(ret != NXT_OK)) {
197         nxt_port_rpc_cancel(task, discovery_port, stream);
198         return NXT_ERROR;
199     }
200 
201     return NXT_OK;
202 }
203 
204 
205 static nxt_buf_t *
nxt_discovery_modules(nxt_task_t * task,const char * path)206 nxt_discovery_modules(nxt_task_t *task, const char *path)
207 {
208     char            *name;
209     u_char          *p, *end;
210     size_t          size;
211     glob_t          glb;
212     nxt_mp_t        *mp;
213     nxt_buf_t       *b;
214     nxt_int_t       ret;
215     nxt_uint_t      i, n, j;
216     nxt_array_t     *modules, *mounts;
217     nxt_module_t    *module;
218     nxt_fs_mount_t  *mnt;
219 
220     b = NULL;
221 
222     mp = nxt_mp_create(1024, 128, 256, 32);
223     if (mp == NULL) {
224         return b;
225     }
226 
227     ret = glob(path, 0, NULL, &glb);
228 
229     n = glb.gl_pathc;
230 
231     if (ret != 0) {
232         nxt_log(task, NXT_LOG_NOTICE,
233                 "no modules matching: \"%s\" found", path);
234         n = 0;
235     }
236 
237     modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
238     if (modules == NULL) {
239         goto fail;
240     }
241 
242     for (i = 0; i < n; i++) {
243         name = glb.gl_pathv[i];
244 
245         ret = nxt_discovery_module(task, mp, modules, name);
246         if (ret != NXT_OK) {
247             goto fail;
248         }
249     }
250 
251     size = nxt_length("[]");
252     module = modules->elts;
253     n = modules->nelts;
254 
255     for (i = 0; i < n; i++) {
256         nxt_debug(task, "module: %d %V %V",
257                   module[i].type, &module[i].version, &module[i].file);
258 
259         size += nxt_length("{\"type\": ,");
260         size += nxt_length(" \"version\": \"\",");
261         size += nxt_length(" \"file\": \"\",");
262         size += nxt_length(" \"mounts\": []},");
263 
264         size += NXT_INT_T_LEN
265                 + module[i].version.length
266                 + module[i].file.length;
267 
268         mounts = module[i].mounts;
269 
270         size += mounts->nelts * nxt_length("{\"src\": \"\", \"dst\": \"\", "
271                                             "\"type\": , \"name\": \"\", "
272                                             "\"flags\": , \"data\": \"\"},");
273 
274         mnt = mounts->elts;
275 
276         for (j = 0; j < mounts->nelts; j++) {
277             size += nxt_strlen(mnt[j].src) + nxt_strlen(mnt[j].dst)
278                     + nxt_strlen(mnt[j].name) + (2 * NXT_INT_T_LEN)
279                     + (mnt[j].data == NULL ? 0 : nxt_strlen(mnt[j].data));
280         }
281     }
282 
283     b = nxt_buf_mem_alloc(mp, size, 0);
284     if (b == NULL) {
285         goto fail;
286     }
287 
288     b->completion_handler = nxt_discovery_completion_handler;
289 
290     p = b->mem.free;
291     end = b->mem.end;
292     *p++ = '[';
293 
294     for (i = 0; i < n; i++) {
295         mounts = module[i].mounts;
296 
297         p = nxt_sprintf(p, end, "{\"type\": %d, \"version\": \"%V\", "
298                         "\"file\": \"%V\", \"mounts\": [",
299                         module[i].type, &module[i].version, &module[i].file);
300 
301         mnt = mounts->elts;
302         for (j = 0; j < mounts->nelts; j++) {
303             p = nxt_sprintf(p, end,
304                             "{\"src\": \"%s\", \"dst\": \"%s\", "
305                             "\"name\": \"%s\", \"type\": %d, \"flags\": %d, "
306                             "\"data\": \"%s\"},",
307                             mnt[j].src, mnt[j].dst, mnt[j].name, mnt[j].type,
308                             mnt[j].flags,
309                             mnt[j].data == NULL ? (u_char *) "" : mnt[j].data);
310         }
311 
312         *p++ = ']';
313         *p++ = '}';
314         *p++ = ',';
315     }
316 
317     *p++ = ']';
318 
319     if (nxt_slow_path(p > end)) {
320         nxt_alert(task, "discovery write past the buffer");
321         goto fail;
322     }
323 
324     b->mem.free = p;
325 
326 fail:
327 
328     globfree(&glb);
329 
330     return b;
331 }
332 
333 
334 static nxt_int_t
nxt_discovery_module(nxt_task_t * task,nxt_mp_t * mp,nxt_array_t * modules,const char * name)335 nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
336     const char *name)
337 {
338     void                  *dl;
339     nxt_str_t             version;
340     nxt_int_t             ret;
341     nxt_uint_t            i, j, n;
342     nxt_array_t           *mounts;
343     nxt_module_t          *module;
344     nxt_app_type_t        type;
345     nxt_fs_mount_t        *to;
346     nxt_app_module_t      *app;
347     const nxt_fs_mount_t  *from;
348 
349     /*
350      * Only memory allocation failure should return NXT_ERROR.
351      * Any module processing errors are ignored.
352      */
353     ret = NXT_ERROR;
354 
355     dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
356 
357     if (dl == NULL) {
358         nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
359         return NXT_OK;
360     }
361 
362     app = dlsym(dl, "nxt_app_module");
363 
364     if (app != NULL) {
365         nxt_log(task, NXT_LOG_NOTICE, "module: %V %s \"%s\"",
366                 &app->type, app->version, name);
367 
368         if (app->compat_length != sizeof(compat)
369             || memcmp(app->compat, compat, sizeof(compat)) != 0)
370         {
371             nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name);
372 
373             goto done;
374         }
375 
376         type = nxt_app_parse_type(app->type.start, app->type.length);
377 
378         if (type == NXT_APP_UNKNOWN) {
379             nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type);
380 
381             goto done;
382         }
383 
384         module = modules->elts;
385         n = modules->nelts;
386 
387         version.start = (u_char *) app->version;
388         version.length = nxt_strlen(app->version);
389 
390         for (i = 0; i < n; i++) {
391             if (type == module[i].type
392                 && nxt_strstr_eq(&module[i].version, &version))
393             {
394                 nxt_log(task, NXT_LOG_NOTICE,
395                         "ignoring %s module with the same "
396                         "application language version %V %V as in %V",
397                         name, &app->type, &version, &module[i].file);
398 
399                 goto done;
400             }
401         }
402 
403         module = nxt_array_add(modules);
404         if (module == NULL) {
405             goto fail;
406         }
407 
408         module->type = type;
409 
410         nxt_str_dup(mp, &module->version, &version);
411         if (module->version.start == NULL) {
412             goto fail;
413         }
414 
415         module->file.length = nxt_strlen(name);
416 
417         module->file.start = nxt_mp_alloc(mp, module->file.length);
418         if (module->file.start == NULL) {
419             goto fail;
420         }
421 
422         nxt_memcpy(module->file.start, name, module->file.length);
423 
424         module->mounts = nxt_array_create(mp, app->nmounts,
425                                           sizeof(nxt_fs_mount_t));
426 
427         if (nxt_slow_path(module->mounts == NULL)) {
428             goto fail;
429         }
430 
431         mounts = module->mounts;
432 
433         for (j = 0; j < app->nmounts; j++) {
434             from = &app->mounts[j];
435             to = nxt_array_zero_add(mounts);
436             if (nxt_slow_path(to == NULL)) {
437                 goto fail;
438             }
439 
440             to->src = nxt_cstr_dup(mp, to->src, from->src);
441             if (nxt_slow_path(to->src == NULL)) {
442                 goto fail;
443             }
444 
445             to->dst = nxt_cstr_dup(mp, to->dst, from->dst);
446             if (nxt_slow_path(to->dst == NULL)) {
447                 goto fail;
448             }
449 
450             to->name = nxt_cstr_dup(mp, to->name, from->name);
451             if (nxt_slow_path(to->name == NULL)) {
452                 goto fail;
453             }
454 
455             to->type = from->type;
456 
457             if (from->data != NULL) {
458                 to->data = nxt_cstr_dup(mp, to->data, from->data);
459                 if (nxt_slow_path(to->data == NULL)) {
460                     goto fail;
461                 }
462             }
463 
464             to->flags = from->flags;
465         }
466 
467     } else {
468         nxt_alert(task, "dlsym(\"%s\"), failed: \"%s\"", name, dlerror());
469     }
470 
471 done:
472 
473     ret = NXT_OK;
474 
475 fail:
476 
477     if (dlclose(dl) != 0) {
478         nxt_alert(task, "dlclose(\"%s\"), failed: \"%s\"", name, dlerror());
479     }
480 
481     return ret;
482 }
483 
484 
485 static void
nxt_discovery_completion_handler(nxt_task_t * task,void * obj,void * data)486 nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
487 {
488     nxt_mp_t   *mp;
489     nxt_buf_t  *b;
490 
491     b = obj;
492     mp = b->data;
493 
494     nxt_mp_destroy(mp);
495 }
496 
497 
498 static void
nxt_discovery_quit(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)499 nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
500 {
501     nxt_signal_quit_handler(task, msg);
502 }
503 
504 
505 static nxt_int_t
nxt_proto_setup(nxt_task_t * task,nxt_process_t * process)506 nxt_proto_setup(nxt_task_t *task, nxt_process_t *process)
507 {
508     nxt_int_t              ret;
509     nxt_app_lang_module_t  *lang;
510     nxt_common_app_conf_t  *app_conf;
511 
512     app_conf = process->data.app;
513 
514     nxt_queue_init(&nxt_proto_children);
515 
516     nxt_app_conf = app_conf;
517 
518     lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
519     if (nxt_slow_path(lang == NULL)) {
520         nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type);
521         return NXT_ERROR;
522     }
523 
524     nxt_app = lang->module;
525 
526     if (nxt_app == NULL) {
527         nxt_debug(task, "application language module: %s \"%s\"",
528                   lang->version, lang->file);
529 
530         nxt_app = nxt_app_module_load(task, lang->file);
531         if (nxt_slow_path(nxt_app == NULL)) {
532             return NXT_ERROR;
533         }
534     }
535 
536     if (nxt_slow_path(nxt_app_set_environment(app_conf->environment)
537                       != NXT_OK))
538     {
539         nxt_alert(task, "failed to set environment");
540         return NXT_ERROR;
541     }
542 
543     if (nxt_app->setup != NULL) {
544         ret = nxt_app->setup(task, process, app_conf);
545         if (nxt_slow_path(ret != NXT_OK)) {
546             return ret;
547         }
548     }
549 
550 #if (NXT_HAVE_ISOLATION_ROOTFS)
551     if (process->isolation.rootfs != NULL) {
552         if (process->isolation.mounts != NULL) {
553             ret = nxt_isolation_prepare_rootfs(task, process);
554             if (nxt_slow_path(ret != NXT_OK)) {
555                 return ret;
556             }
557         }
558 
559         ret = nxt_isolation_change_root(task, process);
560         if (nxt_slow_path(ret != NXT_OK)) {
561             return NXT_ERROR;
562         }
563     }
564 #endif
565 
566     if (app_conf->working_directory != NULL
567         && app_conf->working_directory[0] != 0)
568     {
569         ret = chdir(app_conf->working_directory);
570 
571         if (nxt_slow_path(ret != 0)) {
572             nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E",
573                     app_conf->working_directory, nxt_errno);
574 
575             return NXT_ERROR;
576         }
577     }
578 
579     process->state = NXT_PROCESS_STATE_CREATED;
580 
581     return NXT_OK;
582 }
583 
584 
585 static nxt_int_t
nxt_proto_start(nxt_task_t * task,nxt_process_data_t * data)586 nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data)
587 {
588     nxt_debug(task, "prototype waiting for clone messages");
589 
590     return NXT_OK;
591 }
592 
593 
594 static void
nxt_proto_start_process_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)595 nxt_proto_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
596 {
597     u_char              *p;
598     nxt_int_t           ret;
599     nxt_port_t          *port;
600     nxt_runtime_t       *rt;
601     nxt_process_t       *process;
602     nxt_process_init_t  *init;
603 
604     rt = task->thread->runtime;
605 
606     process = nxt_process_new(rt);
607     if (nxt_slow_path(process == NULL)) {
608         goto failed;
609     }
610 
611     process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
612     if (nxt_slow_path(process->mem_pool == NULL)) {
613         nxt_process_use(task, process, -1);
614         goto failed;
615     }
616 
617     process->parent_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE];
618 
619     init = nxt_process_init(process);
620     *init = nxt_app_process;
621 
622     process->name = nxt_mp_alloc(process->mem_pool, nxt_app_conf->name.length
623                                  + sizeof("\"\" application") + 1);
624 
625     if (nxt_slow_path(process->name == NULL)) {
626         nxt_process_use(task, process, -1);
627 
628         goto failed;
629     }
630 
631     init->start = nxt_app->start;
632 
633     init->name = (const char *) nxt_app_conf->name.start;
634 
635     p = (u_char *) process->name;
636     *p++ = '"';
637     p = nxt_cpymem(p, nxt_app_conf->name.start, nxt_app_conf->name.length);
638     p = nxt_cpymem(p, "\" application", 13);
639     *p = '\0';
640 
641     process->user_cred = &rt->user_cred;
642 
643     process->data.app = nxt_app_conf;
644     process->stream = msg->port_msg.stream;
645 
646     init->siblings = &nxt_proto_children;
647 
648     ret = nxt_process_start(task, process);
649     if (nxt_slow_path(ret == NXT_ERROR)) {
650         nxt_process_use(task, process, -1);
651 
652         goto failed;
653     }
654 
655     nxt_proto_process_add(task, process);
656 
657     return;
658 
659 failed:
660 
661     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
662                                  msg->port_msg.reply_port);
663 
664     if (nxt_fast_path(port != NULL)) {
665         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
666                               -1, msg->port_msg.stream, 0, NULL);
667     }
668 }
669 
670 
671 static void
nxt_proto_quit_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)672 nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
673 {
674     nxt_debug(task, "prototype quit handler");
675 
676     nxt_proto_quit_children(task);
677 
678     nxt_proto_exiting = 1;
679 
680     if (nxt_queue_is_empty(&nxt_proto_children)) {
681         nxt_process_quit(task, 0);
682     }
683 }
684 
685 
686 static void
nxt_proto_quit_children(nxt_task_t * task)687 nxt_proto_quit_children(nxt_task_t *task)
688 {
689     nxt_port_t     *port;
690     nxt_process_t  *process;
691 
692     nxt_queue_each(process, &nxt_proto_children, nxt_process_t, link) {
693         port = nxt_process_port_first(process);
694 
695         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
696                                      -1, 0, 0, NULL);
697     }
698     nxt_queue_loop;
699 }
700 
701 
702 static void
nxt_proto_process_created_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)703 nxt_proto_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
704 {
705     nxt_pid_t      isolated_pid, pid;
706     nxt_process_t  *process;
707 
708     isolated_pid = nxt_recv_msg_cmsg_pid(msg);
709 
710     process = nxt_proto_process_find(task, isolated_pid);
711     if (nxt_slow_path(process == NULL)) {
712         return;
713     }
714 
715     process->state = NXT_PROCESS_STATE_CREATED;
716 
717     pid = msg->port_msg.pid;
718 
719     if (process->pid != pid) {
720         nxt_debug(task, "app process %PI (aka %PI) is created", isolated_pid,
721                   pid);
722 
723         process->pid = pid;
724 
725     } else {
726         nxt_debug(task, "app process %PI is created", isolated_pid);
727     }
728 
729     if (!process->registered) {
730         nxt_assert(!nxt_queue_is_empty(&process->ports));
731 
732         nxt_runtime_process_add(task, process);
733 
734         nxt_port_use(task, nxt_process_port_first(process), -1);
735     }
736 }
737 
738 
739 static void
nxt_proto_signal_handler(nxt_task_t * task,void * obj,void * data)740 nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data)
741 {
742     nxt_trace(task, "signal signo:%d (%s) received, ignored",
743               (int) (uintptr_t) obj, data);
744 }
745 
746 
747 static void
nxt_proto_sigterm_handler(nxt_task_t * task,void * obj,void * data)748 nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data)
749 {
750     nxt_trace(task, "signal signo:%d (%s) received",
751               (int) (uintptr_t) obj, data);
752 
753     nxt_proto_quit_children(task);
754 
755     nxt_proto_exiting = 1;
756 
757     if (nxt_queue_is_empty(&nxt_proto_children)) {
758         nxt_process_quit(task, 0);
759     }
760 }
761 
762 
763 static void
nxt_proto_sigchld_handler(nxt_task_t * task,void * obj,void * data)764 nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data)
765 {
766     int            status;
767     nxt_err_t      err;
768     nxt_pid_t      pid;
769     nxt_port_t     *port;
770     nxt_process_t  *process;
771     nxt_runtime_t  *rt;
772 
773     rt = task->thread->runtime;
774 
775     nxt_debug(task, "proto sigchld handler signo:%d (%s)",
776               (int) (uintptr_t) obj, data);
777 
778     for ( ;; ) {
779         pid = waitpid(-1, &status, WNOHANG);
780 
781         if (pid == -1) {
782 
783             switch (err = nxt_errno) {
784 
785             case NXT_ECHILD:
786                 return;
787 
788             case NXT_EINTR:
789                 continue;
790 
791             default:
792                 nxt_alert(task, "waitpid() failed: %E", err);
793                 return;
794             }
795         }
796 
797         nxt_debug(task, "waitpid(): %PI", pid);
798 
799         if (pid == 0) {
800             return;
801         }
802 
803         process = nxt_proto_process_remove(task, pid);
804 
805         if (WTERMSIG(status)) {
806             if (rt->is_pid_isolated) {
807                 nxt_alert(task, "app process %PI (isolated %PI) "
808                                 "exited on signal %d%s",
809                           process != NULL ? process->pid : 0,
810                           pid, WTERMSIG(status),
811                           NXT_WCOREDUMP(status) ? " (core dumped)" : "");
812 
813             } else {
814                 nxt_alert(task, "app process %PI exited on signal %d%s",
815                           pid, WTERMSIG(status),
816                           NXT_WCOREDUMP(status) ? " (core dumped)" : "");
817             }
818 
819         } else {
820             if (rt->is_pid_isolated) {
821                 nxt_trace(task, "app process %PI (isolated %PI) "
822                                 "exited with code %d",
823                           process != NULL ? process->pid : 0,
824                           pid, WEXITSTATUS(status));
825 
826             } else {
827                 nxt_trace(task, "app process %PI exited with code %d",
828                           pid, WEXITSTATUS(status));
829             }
830         }
831 
832         if (process == NULL) {
833             continue;
834         }
835 
836         if (process->registered) {
837             port = NULL;
838 
839         } else {
840             nxt_assert(!nxt_queue_is_empty(&process->ports));
841 
842             port = nxt_process_port_first(process);
843         }
844 
845         if (process->state != NXT_PROCESS_STATE_CREATING) {
846             nxt_port_remove_notify_others(task, process);
847         }
848 
849         nxt_process_close_ports(task, process);
850 
851         if (port != NULL) {
852             nxt_port_use(task, port, -1);
853         }
854 
855         if (nxt_proto_exiting && nxt_queue_is_empty(&nxt_proto_children)) {
856             nxt_process_quit(task, 0);
857             return;
858         }
859     }
860 }
861 
862 
863 static nxt_app_module_t *
nxt_app_module_load(nxt_task_t * task,const char * name)864 nxt_app_module_load(nxt_task_t *task, const char *name)
865 {
866     char              *err;
867     void              *dl;
868     nxt_app_module_t  *app;
869 
870     dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
871 
872     if (nxt_slow_path(dl == NULL)) {
873         err = dlerror();
874         nxt_alert(task, "dlopen(\"%s\") failed: \"%s\"",
875                   name, err != NULL ? err : "(null)");
876         return NULL;
877     }
878 
879     app = dlsym(dl, "nxt_app_module");
880 
881     if (nxt_slow_path(app == NULL)) {
882         err = dlerror();
883         nxt_alert(task, "dlsym(\"%s\", \"nxt_app_module\") failed: \"%s\"",
884                   name, err != NULL ? err : "(null)");
885 
886         if (dlclose(dl) != 0) {
887             err = dlerror();
888             nxt_alert(task, "dlclose(\"%s\") failed: \"%s\"",
889                       name, err != NULL ? err : "(null)");
890         }
891     }
892 
893     return app;
894 }
895 
896 
897 static nxt_int_t
nxt_app_set_environment(nxt_conf_value_t * environment)898 nxt_app_set_environment(nxt_conf_value_t *environment)
899 {
900     char              *env, *p;
901     uint32_t          next;
902     nxt_str_t         name, value;
903     nxt_conf_value_t  *value_obj;
904 
905     if (environment != NULL) {
906         next = 0;
907 
908         for ( ;; ) {
909             value_obj = nxt_conf_next_object_member(environment, &name, &next);
910             if (value_obj == NULL) {
911                 break;
912             }
913 
914             nxt_conf_get_string(value_obj, &value);
915 
916             env = nxt_malloc(name.length + value.length + 2);
917             if (nxt_slow_path(env == NULL)) {
918                 return NXT_ERROR;
919             }
920 
921             p = nxt_cpymem(env, name.start, name.length);
922             *p++ = '=';
923             p = nxt_cpymem(p, value.start, value.length);
924             *p = '\0';
925 
926             if (nxt_slow_path(putenv(env) != 0)) {
927                 return NXT_ERROR;
928             }
929         }
930     }
931 
932     return NXT_OK;
933 }
934 
935 
936 nxt_int_t
nxt_app_set_logs(void)937 nxt_app_set_logs(void)
938 {
939     nxt_int_t              ret;
940     nxt_file_t             file;
941     nxt_task_t             *task;
942     nxt_thread_t           *thr;
943     nxt_process_t          *process;
944     nxt_runtime_t          *rt;
945     nxt_common_app_conf_t  *app_conf;
946 
947     thr = nxt_thread();
948 
949     task = thr->task;
950 
951     rt = task->thread->runtime;
952     if (!rt->daemon) {
953         return NXT_OK;
954     }
955 
956     process = rt->port_by_type[NXT_PROCESS_PROTOTYPE]->process;
957     app_conf = process->data.app;
958 
959     if (app_conf->stdout_log != NULL) {
960         nxt_memzero(&file, sizeof(nxt_file_t));
961         file.log_level = 1;
962         file.name = (u_char *) app_conf->stdout_log;
963         ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT, 0666);
964         if (ret == NXT_ERROR) {
965             return NXT_ERROR;
966         }
967 
968         nxt_file_stdout(&file);
969         nxt_file_close(task, &file);
970     }
971 
972     if (app_conf->stderr_log != NULL) {
973         nxt_memzero(&file, sizeof(nxt_file_t));
974         file.log_level = 1;
975         file.name = (u_char *) app_conf->stderr_log;
976         ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT, 0666);
977         if (ret == NXT_ERROR) {
978             return NXT_ERROR;
979         }
980 
981         nxt_file_stderr(&file);
982         nxt_file_close(task, &file);
983     }
984 
985     return NXT_OK;
986 }
987 
988 
989 static u_char *
nxt_cstr_dup(nxt_mp_t * mp,u_char * dst,u_char * src)990 nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src)
991 {
992     u_char  *p;
993     size_t  len;
994 
995     len = nxt_strlen(src);
996 
997     if (dst == NULL) {
998         dst = nxt_mp_alloc(mp, len + 1);
999         if (nxt_slow_path(dst == NULL)) {
1000             return NULL;
1001         }
1002     }
1003 
1004     p = nxt_cpymem(dst, src, len);
1005     *p = '\0';
1006 
1007     return dst;
1008 }
1009 
1010 
1011 static nxt_int_t
nxt_app_setup(nxt_task_t * task,nxt_process_t * process)1012 nxt_app_setup(nxt_task_t *task, nxt_process_t *process)
1013 {
1014     nxt_process_init_t  *init;
1015 
1016     process->state = NXT_PROCESS_STATE_CREATED;
1017 
1018     init = nxt_process_init(process);
1019 
1020     return init->start(task, &process->data);
1021 }
1022 
1023 
1024 nxt_app_lang_module_t *
nxt_app_lang_module(nxt_runtime_t * rt,nxt_str_t * name)1025 nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
1026 {
1027     u_char                 *p, *end, *version;
1028     size_t                 version_length;
1029     nxt_uint_t             i, n;
1030     nxt_app_type_t         type;
1031     nxt_app_lang_module_t  *lang;
1032 
1033     end = name->start + name->length;
1034     version = end;
1035 
1036     for (p = name->start; p < end; p++) {
1037         if (*p == ' ') {
1038             version = p + 1;
1039             break;
1040         }
1041 
1042         if (*p >= '0' && *p <= '9') {
1043             version = p;
1044             break;
1045         }
1046     }
1047 
1048     type = nxt_app_parse_type(name->start, p - name->start);
1049 
1050     if (type == NXT_APP_UNKNOWN) {
1051         return NULL;
1052     }
1053 
1054     version_length = end - version;
1055 
1056     lang = rt->languages->elts;
1057     n = rt->languages->nelts;
1058 
1059     for (i = 0; i < n; i++) {
1060 
1061         /*
1062          * Versions are sorted in descending order
1063          * so first match chooses the highest version.
1064          */
1065 
1066         if (lang[i].type == type
1067             && nxt_strvers_match(lang[i].version, version, version_length))
1068         {
1069             return &lang[i];
1070         }
1071     }
1072 
1073     return NULL;
1074 }
1075 
1076 
1077 nxt_app_type_t
nxt_app_parse_type(u_char * p,size_t length)1078 nxt_app_parse_type(u_char *p, size_t length)
1079 {
1080     nxt_str_t str;
1081 
1082     str.length = length;
1083     str.start = p;
1084 
1085     if (nxt_str_eq(&str, "external", 8) || nxt_str_eq(&str, "go", 2)) {
1086         return NXT_APP_EXTERNAL;
1087 
1088     } else if (nxt_str_eq(&str, "python", 6)) {
1089         return NXT_APP_PYTHON;
1090 
1091     } else if (nxt_str_eq(&str, "php", 3)) {
1092         return NXT_APP_PHP;
1093 
1094     } else if (nxt_str_eq(&str, "perl", 4)) {
1095         return NXT_APP_PERL;
1096 
1097     } else if (nxt_str_eq(&str, "ruby", 4)) {
1098         return NXT_APP_RUBY;
1099 
1100     } else if (nxt_str_eq(&str, "java", 4)) {
1101         return NXT_APP_JAVA;
1102 
1103     } else if (nxt_str_eq(&str, "wasm-wasi-component", 19)) {
1104         return NXT_APP_WASM_WC;
1105 
1106     } else if (nxt_str_eq(&str, "wasm", 4)) {
1107         return NXT_APP_WASM;
1108     }
1109 
1110     return NXT_APP_UNKNOWN;
1111 }
1112 
1113 
1114 nxt_int_t
nxt_unit_default_init(nxt_task_t * task,nxt_unit_init_t * init,nxt_common_app_conf_t * conf)1115 nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init,
1116     nxt_common_app_conf_t *conf)
1117 {
1118     nxt_port_t     *my_port, *proto_port, *router_port;
1119     nxt_runtime_t  *rt;
1120 
1121     nxt_memzero(init, sizeof(nxt_unit_init_t));
1122 
1123     rt = task->thread->runtime;
1124 
1125     proto_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE];
1126     if (nxt_slow_path(proto_port == NULL)) {
1127         return NXT_ERROR;
1128     }
1129 
1130     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1131     if (nxt_slow_path(router_port == NULL)) {
1132         return NXT_ERROR;
1133     }
1134 
1135     my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
1136     if (nxt_slow_path(my_port == NULL)) {
1137         return NXT_ERROR;
1138     }
1139 
1140     init->ready_port.id.pid = proto_port->pid;
1141     init->ready_port.id.id = proto_port->id;
1142     init->ready_port.in_fd = -1;
1143     init->ready_port.out_fd = proto_port->pair[1];
1144 
1145     init->ready_stream = my_port->process->stream;
1146 
1147     init->router_port.id.pid = router_port->pid;
1148     init->router_port.id.id = router_port->id;
1149     init->router_port.in_fd = -1;
1150     init->router_port.out_fd = router_port->pair[1];
1151 
1152     init->read_port.id.pid = my_port->pid;
1153     init->read_port.id.id = my_port->id;
1154     init->read_port.in_fd = my_port->pair[0];
1155     init->read_port.out_fd = my_port->pair[1];
1156 
1157     init->shared_port_fd = conf->shared_port_fd;
1158     init->shared_queue_fd = conf->shared_queue_fd;
1159 
1160     init->log_fd = 2;
1161 
1162     init->shm_limit = conf->shm_limit;
1163     init->request_limit = conf->request_limit;
1164 
1165     return NXT_OK;
1166 }
1167 
1168 
1169 static nxt_int_t
nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t * lhq,void * data)1170 nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1171 {
1172     nxt_pid_t      *qpid;
1173     nxt_process_t  *process;
1174 
1175     process = data;
1176     qpid = (nxt_pid_t *) lhq->key.start;
1177 
1178     if (*qpid == process->isolated_pid) {
1179         return NXT_OK;
1180     }
1181 
1182     return NXT_DECLINED;
1183 }
1184 
1185 
1186 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
1187     NXT_LVLHSH_DEFAULT,
1188     nxt_proto_lvlhsh_isolated_pid_test,
1189     nxt_lvlhsh_alloc,
1190     nxt_lvlhsh_free,
1191 };
1192 
1193 
1194 nxt_inline void
nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t * lhq,nxt_pid_t * pid)1195 nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1196 {
1197     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(nxt_pid_t));
1198     lhq->key.length = sizeof(nxt_pid_t);
1199     lhq->key.start = (u_char *) pid;
1200     lhq->proto = &lvlhsh_processes_proto;
1201 }
1202 
1203 
1204 static void
nxt_proto_process_add(nxt_task_t * task,nxt_process_t * process)1205 nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process)
1206 {
1207     nxt_runtime_t       *rt;
1208     nxt_lvlhsh_query_t  lhq;
1209 
1210     rt = task->thread->runtime;
1211 
1212     nxt_proto_process_lhq_pid(&lhq, &process->isolated_pid);
1213 
1214     lhq.replace = 0;
1215     lhq.value = process;
1216     lhq.pool = rt->mem_pool;
1217 
1218     switch (nxt_lvlhsh_insert(&nxt_proto_processes, &lhq)) {
1219 
1220     case NXT_OK:
1221         nxt_debug(task, "process (isolated %PI) added", process->isolated_pid);
1222 
1223         nxt_queue_insert_tail(&nxt_proto_children, &process->link);
1224         break;
1225 
1226     default:
1227         nxt_alert(task, "process (isolated %PI) failed to add",
1228                   process->isolated_pid);
1229         break;
1230     }
1231 }
1232 
1233 
1234 static nxt_process_t *
nxt_proto_process_remove(nxt_task_t * task,nxt_pid_t pid)1235 nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid)
1236 {
1237     nxt_runtime_t       *rt;
1238     nxt_process_t       *process;
1239     nxt_lvlhsh_query_t  lhq;
1240 
1241     nxt_proto_process_lhq_pid(&lhq, &pid);
1242 
1243     rt = task->thread->runtime;
1244 
1245     lhq.pool = rt->mem_pool;
1246 
1247     switch (nxt_lvlhsh_delete(&nxt_proto_processes, &lhq)) {
1248 
1249     case NXT_OK:
1250         nxt_debug(task, "process (isolated %PI) removed", pid);
1251 
1252         process = lhq.value;
1253 
1254         nxt_queue_remove(&process->link);
1255         process->link.next = NULL;
1256 
1257         break;
1258 
1259     default:
1260         nxt_debug(task, "process (isolated %PI) remove failed", pid);
1261         process = NULL;
1262         break;
1263     }
1264 
1265     return process;
1266 }
1267 
1268 
1269 static nxt_process_t *
nxt_proto_process_find(nxt_task_t * task,nxt_pid_t pid)1270 nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid)
1271 {
1272     nxt_process_t       *process;
1273     nxt_lvlhsh_query_t  lhq;
1274 
1275     nxt_proto_process_lhq_pid(&lhq, &pid);
1276 
1277     if (nxt_lvlhsh_find(&nxt_proto_processes, &lhq) == NXT_OK) {
1278         process = lhq.value;
1279 
1280     } else {
1281         nxt_debug(task, "process (isolated %PI) not found", pid);
1282 
1283         process = NULL;
1284     }
1285 
1286     return process;
1287 }
1288