xref: /unit/src/nxt_application.c (revision 2050:d1298cc3f385)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) Igor Sysoev
5  * Copyright (C) Valentin V. Bartenev
6  * Copyright (C) NGINX, Inc.
7  */
8 
9 #include <nxt_main.h>
10 #include <nxt_runtime.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_http.h>
14 #include <nxt_application.h>
15 #include <nxt_unit.h>
16 #include <nxt_port_memory_int.h>
17 #include <nxt_isolation.h>
18 
19 #include <glob.h>
20 
21 #if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
22 #include <sys/prctl.h>
23 #endif
24 
25 
26 typedef struct {
27     nxt_app_type_t  type;
28     nxt_str_t       version;
29     nxt_str_t       file;
30     nxt_array_t     *mounts;
31 } nxt_module_t;
32 
33 
34 static nxt_int_t nxt_discovery_start(nxt_task_t *task,
35     nxt_process_data_t *data);
36 static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
37 static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
38     nxt_array_t *modules, const char *name);
39 static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj,
40     void *data);
41 static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
42     void *data);
43 static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
44     const char *name);
45 static nxt_int_t nxt_proto_setup(nxt_task_t *task, nxt_process_t *process);
46 static nxt_int_t nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data);
47 static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process);
48 static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
49 static void nxt_proto_start_process_handler(nxt_task_t *task,
50     nxt_port_recv_msg_t *msg);
51 static void nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
52 static void nxt_proto_process_created_handler(nxt_task_t *task,
53     nxt_port_recv_msg_t *msg);
54 static void nxt_proto_quit_children(nxt_task_t *task);
55 static nxt_process_t *nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid);
56 static void nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process);
57 static nxt_process_t *nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid);
58 static u_char *nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src);
59 static void nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data);
60 static void nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data);
61 static void nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data);
62 
63 
64 nxt_str_t  nxt_server = nxt_string(NXT_SERVER);
65 
66 
67 static uint32_t  compat[] = {
68     NXT_VERNUM, NXT_DEBUG,
69 };
70 
71 
72 static nxt_lvlhsh_t           nxt_proto_processes;
73 static nxt_queue_t            nxt_proto_children;
74 static nxt_bool_t             nxt_proto_exiting;
75 
76 static nxt_app_module_t       *nxt_app;
77 static nxt_common_app_conf_t  *nxt_app_conf;
78 
79 
80 static const nxt_port_handlers_t  nxt_discovery_process_port_handlers = {
81     .quit         = nxt_signal_quit_handler,
82     .new_port     = nxt_port_new_port_handler,
83     .change_file  = nxt_port_change_log_file_handler,
84     .mmap         = nxt_port_mmap_handler,
85     .data         = nxt_port_data_handler,
86     .remove_pid   = nxt_port_remove_pid_handler,
87     .rpc_ready    = nxt_port_rpc_handler,
88     .rpc_error    = nxt_port_rpc_handler,
89 };
90 
91 
92 const nxt_sig_event_t  nxt_prototype_signals[] = {
93     nxt_event_signal(SIGHUP,  nxt_proto_signal_handler),
94     nxt_event_signal(SIGINT,  nxt_proto_sigterm_handler),
95     nxt_event_signal(SIGQUIT, nxt_proto_sigterm_handler),
96     nxt_event_signal(SIGTERM, nxt_proto_sigterm_handler),
97     nxt_event_signal(SIGCHLD, nxt_proto_sigchld_handler),
98     nxt_event_signal_end,
99 };
100 
101 
102 static const nxt_port_handlers_t  nxt_proto_process_port_handlers = {
103     .quit            = nxt_proto_quit_handler,
104     .change_file     = nxt_port_change_log_file_handler,
105     .new_port        = nxt_port_new_port_handler,
106     .process_created = nxt_proto_process_created_handler,
107     .process_ready   = nxt_port_process_ready_handler,
108     .remove_pid      = nxt_port_remove_pid_handler,
109     .start_process   = nxt_proto_start_process_handler,
110     .rpc_ready       = nxt_port_rpc_handler,
111     .rpc_error       = nxt_port_rpc_handler,
112 };
113 
114 
115 static const nxt_port_handlers_t  nxt_app_process_port_handlers = {
116     .quit         = nxt_signal_quit_handler,
117     .rpc_ready    = nxt_port_rpc_handler,
118     .rpc_error    = nxt_port_rpc_handler,
119 };
120 
121 
122 const nxt_process_init_t  nxt_discovery_process = {
123     .name           = "discovery",
124     .type           = NXT_PROCESS_DISCOVERY,
125     .prefork        = NULL,
126     .restart        = 0,
127     .setup          = nxt_process_core_setup,
128     .start          = nxt_discovery_start,
129     .port_handlers  = &nxt_discovery_process_port_handlers,
130     .signals        = nxt_process_signals,
131 };
132 
133 
134 const nxt_process_init_t  nxt_proto_process = {
135     .type           = NXT_PROCESS_PROTOTYPE,
136     .prefork        = nxt_isolation_main_prefork,
137     .restart        = 0,
138     .setup          = nxt_proto_setup,
139     .start          = nxt_proto_start,
140     .port_handlers  = &nxt_proto_process_port_handlers,
141     .signals        = nxt_prototype_signals,
142 };
143 
144 
145 const nxt_process_init_t  nxt_app_process = {
146     .type           = NXT_PROCESS_APP,
147     .setup          = nxt_app_setup,
148     .start          = NULL,
149     .prefork        = NULL,
150     .restart        = 0,
151     .port_handlers  = &nxt_app_process_port_handlers,
152     .signals        = nxt_process_signals,
153 };
154 
155 
156 static nxt_int_t
nxt_discovery_start(nxt_task_t * task,nxt_process_data_t * data)157 nxt_discovery_start(nxt_task_t *task, nxt_process_data_t *data)
158 {
159     uint32_t       stream;
160     nxt_buf_t      *b;
161     nxt_int_t      ret;
162     nxt_port_t     *main_port, *discovery_port;
163     nxt_runtime_t  *rt;
164 
165     nxt_log(task, NXT_LOG_INFO, "discovery started");
166 
167     rt = task->thread->runtime;
168 
169     b = nxt_discovery_modules(task, rt->modules);
170     if (nxt_slow_path(b == NULL)) {
171         return NXT_ERROR;
172     }
173 
174     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
175     discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY];
176 
177     stream = nxt_port_rpc_register_handler(task, discovery_port,
178                                            nxt_discovery_quit,
179                                            nxt_discovery_quit,
180                                            main_port->pid, NULL);
181 
182     if (nxt_slow_path(stream == 0)) {
183         return NXT_ERROR;
184     }
185 
186     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
187                                 stream, discovery_port->id, b);
188 
189     if (nxt_slow_path(ret != NXT_OK)) {
190         nxt_port_rpc_cancel(task, discovery_port, stream);
191         return NXT_ERROR;
192     }
193 
194     return NXT_OK;
195 }
196 
197 
198 static nxt_buf_t *
nxt_discovery_modules(nxt_task_t * task,const char * path)199 nxt_discovery_modules(nxt_task_t *task, const char *path)
200 {
201     char            *name;
202     u_char          *p, *end;
203     size_t          size;
204     glob_t          glb;
205     nxt_mp_t        *mp;
206     nxt_buf_t       *b;
207     nxt_int_t       ret;
208     nxt_uint_t      i, n, j;
209     nxt_array_t     *modules, *mounts;
210     nxt_module_t    *module;
211     nxt_fs_mount_t  *mnt;
212 
213     b = NULL;
214 
215     mp = nxt_mp_create(1024, 128, 256, 32);
216     if (mp == NULL) {
217         return b;
218     }
219 
220     ret = glob(path, 0, NULL, &glb);
221 
222     n = glb.gl_pathc;
223 
224     if (ret != 0) {
225         nxt_log(task, NXT_LOG_NOTICE,
226                 "no modules matching: \"%s\" found", path);
227         n = 0;
228     }
229 
230     modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
231     if (modules == NULL) {
232         goto fail;
233     }
234 
235     for (i = 0; i < n; i++) {
236         name = glb.gl_pathv[i];
237 
238         ret = nxt_discovery_module(task, mp, modules, name);
239         if (ret != NXT_OK) {
240             goto fail;
241         }
242     }
243 
244     size = nxt_length("[]");
245     module = modules->elts;
246     n = modules->nelts;
247 
248     for (i = 0; i < n; i++) {
249         nxt_debug(task, "module: %d %V %V",
250                   module[i].type, &module[i].version, &module[i].file);
251 
252         size += nxt_length("{\"type\": ,");
253         size += nxt_length(" \"version\": \"\",");
254         size += nxt_length(" \"file\": \"\",");
255         size += nxt_length(" \"mounts\": []},");
256 
257         size += NXT_INT_T_LEN
258                 + module[i].version.length
259                 + module[i].file.length;
260 
261         mounts = module[i].mounts;
262 
263         size += mounts->nelts * nxt_length("{\"src\": \"\", \"dst\": \"\", "
264                                             "\"type\": , \"name\": \"\", "
265                                             "\"flags\": , \"data\": \"\"},");
266 
267         mnt = mounts->elts;
268 
269         for (j = 0; j < mounts->nelts; j++) {
270             size += nxt_strlen(mnt[j].src) + nxt_strlen(mnt[j].dst)
271                     + nxt_strlen(mnt[j].name) + (2 * NXT_INT_T_LEN)
272                     + (mnt[j].data == NULL ? 0 : nxt_strlen(mnt[j].data));
273         }
274     }
275 
276     b = nxt_buf_mem_alloc(mp, size, 0);
277     if (b == NULL) {
278         goto fail;
279     }
280 
281     b->completion_handler = nxt_discovery_completion_handler;
282 
283     p = b->mem.free;
284     end = b->mem.end;
285     *p++ = '[';
286 
287     for (i = 0; i < n; i++) {
288         mounts = module[i].mounts;
289 
290         p = nxt_sprintf(p, end, "{\"type\": %d, \"version\": \"%V\", "
291                         "\"file\": \"%V\", \"mounts\": [",
292                         module[i].type, &module[i].version, &module[i].file);
293 
294         mnt = mounts->elts;
295         for (j = 0; j < mounts->nelts; j++) {
296             p = nxt_sprintf(p, end,
297                             "{\"src\": \"%s\", \"dst\": \"%s\", "
298                             "\"name\": \"%s\", \"type\": %d, \"flags\": %d, "
299                             "\"data\": \"%s\"},",
300                             mnt[j].src, mnt[j].dst, mnt[j].name, mnt[j].type,
301                             mnt[j].flags,
302                             mnt[j].data == NULL ? (u_char *) "" : mnt[j].data);
303         }
304 
305         *p++ = ']';
306         *p++ = '}';
307         *p++ = ',';
308     }
309 
310     *p++ = ']';
311 
312     if (nxt_slow_path(p > end)) {
313         nxt_alert(task, "discovery write past the buffer");
314         goto fail;
315     }
316 
317     b->mem.free = p;
318 
319 fail:
320 
321     globfree(&glb);
322 
323     return b;
324 }
325 
326 
327 static nxt_int_t
nxt_discovery_module(nxt_task_t * task,nxt_mp_t * mp,nxt_array_t * modules,const char * name)328 nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
329     const char *name)
330 {
331     void                  *dl;
332     nxt_str_t             version;
333     nxt_int_t             ret;
334     nxt_uint_t            i, j, n;
335     nxt_array_t           *mounts;
336     nxt_module_t          *module;
337     nxt_app_type_t        type;
338     nxt_fs_mount_t        *to;
339     nxt_app_module_t      *app;
340     const nxt_fs_mount_t  *from;
341 
342     /*
343      * Only memory allocation failure should return NXT_ERROR.
344      * Any module processing errors are ignored.
345      */
346     ret = NXT_ERROR;
347 
348     dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
349 
350     if (dl == NULL) {
351         nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
352         return NXT_OK;
353     }
354 
355     app = dlsym(dl, "nxt_app_module");
356 
357     if (app != NULL) {
358         nxt_log(task, NXT_LOG_NOTICE, "module: %V %s \"%s\"",
359                 &app->type, app->version, name);
360 
361         if (app->compat_length != sizeof(compat)
362             || nxt_memcmp(app->compat, compat, sizeof(compat)) != 0)
363         {
364             nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name);
365 
366             goto done;
367         }
368 
369         type = nxt_app_parse_type(app->type.start, app->type.length);
370 
371         if (type == NXT_APP_UNKNOWN) {
372             nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type);
373 
374             goto done;
375         }
376 
377         module = modules->elts;
378         n = modules->nelts;
379 
380         version.start = (u_char *) app->version;
381         version.length = nxt_strlen(app->version);
382 
383         for (i = 0; i < n; i++) {
384             if (type == module[i].type
385                 && nxt_strstr_eq(&module[i].version, &version))
386             {
387                 nxt_log(task, NXT_LOG_NOTICE,
388                         "ignoring %s module with the same "
389                         "application language version %V %V as in %V",
390                         name, &app->type, &version, &module[i].file);
391 
392                 goto done;
393             }
394         }
395 
396         module = nxt_array_add(modules);
397         if (module == NULL) {
398             goto fail;
399         }
400 
401         module->type = type;
402 
403         nxt_str_dup(mp, &module->version, &version);
404         if (module->version.start == NULL) {
405             goto fail;
406         }
407 
408         module->file.length = nxt_strlen(name);
409 
410         module->file.start = nxt_mp_alloc(mp, module->file.length);
411         if (module->file.start == NULL) {
412             goto fail;
413         }
414 
415         nxt_memcpy(module->file.start, name, module->file.length);
416 
417         module->mounts = nxt_array_create(mp, app->nmounts,
418                                           sizeof(nxt_fs_mount_t));
419 
420         if (nxt_slow_path(module->mounts == NULL)) {
421             goto fail;
422         }
423 
424         mounts = module->mounts;
425 
426         for (j = 0; j < app->nmounts; j++) {
427             from = &app->mounts[j];
428             to = nxt_array_zero_add(mounts);
429             if (nxt_slow_path(to == NULL)) {
430                 goto fail;
431             }
432 
433             to->src = nxt_cstr_dup(mp, to->src, from->src);
434             if (nxt_slow_path(to->src == NULL)) {
435                 goto fail;
436             }
437 
438             to->dst = nxt_cstr_dup(mp, to->dst, from->dst);
439             if (nxt_slow_path(to->dst == NULL)) {
440                 goto fail;
441             }
442 
443             to->name = nxt_cstr_dup(mp, to->name, from->name);
444             if (nxt_slow_path(to->name == NULL)) {
445                 goto fail;
446             }
447 
448             to->type = from->type;
449 
450             if (from->data != NULL) {
451                 to->data = nxt_cstr_dup(mp, to->data, from->data);
452                 if (nxt_slow_path(to->data == NULL)) {
453                     goto fail;
454                 }
455             }
456 
457             to->flags = from->flags;
458         }
459 
460     } else {
461         nxt_alert(task, "dlsym(\"%s\"), failed: \"%s\"", name, dlerror());
462     }
463 
464 done:
465 
466     ret = NXT_OK;
467 
468 fail:
469 
470     if (dlclose(dl) != 0) {
471         nxt_alert(task, "dlclose(\"%s\"), failed: \"%s\"", name, dlerror());
472     }
473 
474     return ret;
475 }
476 
477 
478 static void
nxt_discovery_completion_handler(nxt_task_t * task,void * obj,void * data)479 nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
480 {
481     nxt_mp_t   *mp;
482     nxt_buf_t  *b;
483 
484     b = obj;
485     mp = b->data;
486 
487     nxt_mp_destroy(mp);
488 }
489 
490 
491 static void
nxt_discovery_quit(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)492 nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
493 {
494     nxt_signal_quit_handler(task, msg);
495 }
496 
497 
498 static nxt_int_t
nxt_proto_setup(nxt_task_t * task,nxt_process_t * process)499 nxt_proto_setup(nxt_task_t *task, nxt_process_t *process)
500 {
501     nxt_int_t              ret;
502     nxt_app_lang_module_t  *lang;
503     nxt_common_app_conf_t  *app_conf;
504 
505     app_conf = process->data.app;
506 
507     nxt_queue_init(&nxt_proto_children);
508 
509     nxt_app_conf = app_conf;
510 
511     lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
512     if (nxt_slow_path(lang == NULL)) {
513         nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type);
514         return NXT_ERROR;
515     }
516 
517     nxt_app = lang->module;
518 
519     if (nxt_app == NULL) {
520         nxt_debug(task, "application language module: %s \"%s\"",
521                   lang->version, lang->file);
522 
523         nxt_app = nxt_app_module_load(task, lang->file);
524         if (nxt_slow_path(nxt_app == NULL)) {
525             return NXT_ERROR;
526         }
527     }
528 
529     if (nxt_slow_path(nxt_app_set_environment(app_conf->environment)
530                       != NXT_OK))
531     {
532         nxt_alert(task, "failed to set environment");
533         return NXT_ERROR;
534     }
535 
536     if (nxt_app->setup != NULL) {
537         ret = nxt_app->setup(task, process, app_conf);
538         if (nxt_slow_path(ret != NXT_OK)) {
539             return ret;
540         }
541     }
542 
543 #if (NXT_HAVE_ISOLATION_ROOTFS)
544     if (process->isolation.rootfs != NULL) {
545         if (process->isolation.mounts != NULL) {
546             ret = nxt_isolation_prepare_rootfs(task, process);
547             if (nxt_slow_path(ret != NXT_OK)) {
548                 return ret;
549             }
550         }
551 
552         ret = nxt_isolation_change_root(task, process);
553         if (nxt_slow_path(ret != NXT_OK)) {
554             return NXT_ERROR;
555         }
556     }
557 #endif
558 
559     if (app_conf->working_directory != NULL
560         && app_conf->working_directory[0] != 0)
561     {
562         ret = chdir(app_conf->working_directory);
563 
564         if (nxt_slow_path(ret != 0)) {
565             nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E",
566                     app_conf->working_directory, nxt_errno);
567 
568             return NXT_ERROR;
569         }
570     }
571 
572     process->state = NXT_PROCESS_STATE_CREATED;
573 
574     return NXT_OK;
575 }
576 
577 
578 static nxt_int_t
nxt_proto_start(nxt_task_t * task,nxt_process_data_t * data)579 nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data)
580 {
581     nxt_debug(task, "prototype waiting for clone messages");
582 
583     return NXT_OK;
584 }
585 
586 
587 static void
nxt_proto_start_process_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)588 nxt_proto_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
589 {
590     u_char              *p;
591     nxt_int_t           ret;
592     nxt_port_t          *port;
593     nxt_runtime_t       *rt;
594     nxt_process_t       *process;
595     nxt_process_init_t  *init;
596 
597     rt = task->thread->runtime;
598 
599     process = nxt_process_new(rt);
600     if (nxt_slow_path(process == NULL)) {
601         goto failed;
602     }
603 
604     process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
605     if (nxt_slow_path(process->mem_pool == NULL)) {
606         nxt_process_use(task, process, -1);
607         goto failed;
608     }
609 
610     process->parent_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE];
611 
612     init = nxt_process_init(process);
613     *init = nxt_app_process;
614 
615     process->name = nxt_mp_alloc(process->mem_pool, nxt_app_conf->name.length
616                                  + sizeof("\"\" application") + 1);
617 
618     if (nxt_slow_path(process->name == NULL)) {
619         nxt_process_use(task, process, -1);
620 
621         goto failed;
622     }
623 
624     init->start = nxt_app->start;
625 
626     init->name = (const char *) nxt_app_conf->name.start;
627 
628     p = (u_char *) process->name;
629     *p++ = '"';
630     p = nxt_cpymem(p, nxt_app_conf->name.start, nxt_app_conf->name.length);
631     p = nxt_cpymem(p, "\" application", 13);
632     *p = '\0';
633 
634     process->user_cred = &rt->user_cred;
635 
636     process->data.app = nxt_app_conf;
637     process->stream = msg->port_msg.stream;
638 
639     ret = nxt_process_start(task, process);
640     if (nxt_slow_path(ret == NXT_ERROR)) {
641         nxt_process_use(task, process, -1);
642 
643         goto failed;
644     }
645 
646     nxt_proto_process_add(task, process);
647 
648     return;
649 
650 failed:
651 
652     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
653                                  msg->port_msg.reply_port);
654 
655     if (nxt_fast_path(port != NULL)) {
656         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
657                               -1, msg->port_msg.stream, 0, NULL);
658     }
659 }
660 
661 
662 static void
nxt_proto_quit_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)663 nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
664 {
665     nxt_debug(task, "prototype quit handler");
666 
667     nxt_proto_quit_children(task);
668 
669     nxt_proto_exiting = 1;
670 
671     if (nxt_queue_is_empty(&nxt_proto_children)) {
672         nxt_process_quit(task, 0);
673     }
674 }
675 
676 
677 static void
nxt_proto_quit_children(nxt_task_t * task)678 nxt_proto_quit_children(nxt_task_t *task)
679 {
680     nxt_port_t     *port;
681     nxt_process_t  *process;
682 
683     nxt_queue_each(process, &nxt_proto_children, nxt_process_t, link) {
684         port = nxt_process_port_first(process);
685 
686         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
687                                      -1, 0, 0, NULL);
688     }
689     nxt_queue_loop;
690 }
691 
692 
693 static void
nxt_proto_process_created_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)694 nxt_proto_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
695 {
696     nxt_pid_t      isolated_pid, pid;
697     nxt_process_t  *process;
698 
699     isolated_pid = nxt_recv_msg_cmsg_pid(msg);
700 
701     process = nxt_proto_process_find(task, isolated_pid);
702     if (nxt_slow_path(process == NULL)) {
703         return;
704     }
705 
706     process->state = NXT_PROCESS_STATE_CREATED;
707 
708     pid = msg->port_msg.pid;
709 
710     if (process->pid != pid) {
711         nxt_debug(task, "app process %PI (aka %PI) is created", isolated_pid,
712                   pid);
713 
714         nxt_runtime_process_remove(task->thread->runtime, process);
715 
716         process->pid = pid;
717 
718         nxt_runtime_process_add(task, process);
719 
720     } else {
721         nxt_debug(task, "app process %PI is created", isolated_pid);
722     }
723 }
724 
725 
726 static void
nxt_proto_signal_handler(nxt_task_t * task,void * obj,void * data)727 nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data)
728 {
729     nxt_trace(task, "signal signo:%d (%s) received, ignored",
730               (int) (uintptr_t) obj, data);
731 }
732 
733 
734 static void
nxt_proto_sigterm_handler(nxt_task_t * task,void * obj,void * data)735 nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data)
736 {
737     nxt_trace(task, "signal signo:%d (%s) received",
738               (int) (uintptr_t) obj, data);
739 
740     nxt_proto_quit_children(task);
741 
742     nxt_proto_exiting = 1;
743 
744     if (nxt_queue_is_empty(&nxt_proto_children)) {
745         nxt_process_quit(task, 0);
746     }
747 }
748 
749 
750 static void
nxt_proto_sigchld_handler(nxt_task_t * task,void * obj,void * data)751 nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data)
752 {
753     int            status;
754     nxt_err_t      err;
755     nxt_pid_t      pid;
756     nxt_process_t  *process;
757 
758     nxt_debug(task, "proto sigchld handler signo:%d (%s)",
759               (int) (uintptr_t) obj, data);
760 
761     for ( ;; ) {
762         pid = waitpid(-1, &status, WNOHANG);
763 
764         if (pid == -1) {
765 
766             switch (err = nxt_errno) {
767 
768             case NXT_ECHILD:
769                 return;
770 
771             case NXT_EINTR:
772                 continue;
773 
774             default:
775                 nxt_alert(task, "waitpid() failed: %E", err);
776                 return;
777             }
778         }
779 
780         nxt_debug(task, "waitpid(): %PI", pid);
781 
782         if (pid == 0) {
783             return;
784         }
785 
786         if (WTERMSIG(status)) {
787 #ifdef WCOREDUMP
788             nxt_alert(task, "app process (isolated %PI) exited on signal %d%s",
789                       pid, WTERMSIG(status),
790                       WCOREDUMP(status) ? " (core dumped)" : "");
791 #else
792             nxt_alert(task, "app process (isolated %PI) exited on signal %d",
793                       pid, WTERMSIG(status));
794 #endif
795 
796         } else {
797             nxt_trace(task, "app process (isolated %PI) exited with code %d",
798                       pid, WEXITSTATUS(status));
799         }
800 
801         process = nxt_proto_process_remove(task, pid);
802         if (process == NULL) {
803             continue;
804         }
805 
806         if (process->state != NXT_PROCESS_STATE_CREATING) {
807             nxt_port_remove_notify_others(task, process);
808         }
809 
810         nxt_process_close_ports(task, process);
811 
812         if (nxt_proto_exiting && nxt_queue_is_empty(&nxt_proto_children)) {
813             nxt_process_quit(task, 0);
814             return;
815         }
816     }
817 }
818 
819 
820 static nxt_app_module_t *
nxt_app_module_load(nxt_task_t * task,const char * name)821 nxt_app_module_load(nxt_task_t *task, const char *name)
822 {
823     char              *err;
824     void              *dl;
825     nxt_app_module_t  *app;
826 
827     dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
828 
829     if (nxt_slow_path(dl == NULL)) {
830         err = dlerror();
831         nxt_alert(task, "dlopen(\"%s\") failed: \"%s\"",
832                   name, err != NULL ? err : "(null)");
833         return NULL;
834     }
835 
836     app = dlsym(dl, "nxt_app_module");
837 
838     if (nxt_slow_path(app == NULL)) {
839         err = dlerror();
840         nxt_alert(task, "dlsym(\"%s\", \"nxt_app_module\") failed: \"%s\"",
841                   name, err != NULL ? err : "(null)");
842 
843         if (dlclose(dl) != 0) {
844             err = dlerror();
845             nxt_alert(task, "dlclose(\"%s\") failed: \"%s\"",
846                       name, err != NULL ? err : "(null)");
847         }
848     }
849 
850     return app;
851 }
852 
853 
854 static nxt_int_t
nxt_app_set_environment(nxt_conf_value_t * environment)855 nxt_app_set_environment(nxt_conf_value_t *environment)
856 {
857     char              *env, *p;
858     uint32_t          next;
859     nxt_str_t         name, value;
860     nxt_conf_value_t  *value_obj;
861 
862     if (environment != NULL) {
863         next = 0;
864 
865         for ( ;; ) {
866             value_obj = nxt_conf_next_object_member(environment, &name, &next);
867             if (value_obj == NULL) {
868                 break;
869             }
870 
871             nxt_conf_get_string(value_obj, &value);
872 
873             env = nxt_malloc(name.length + value.length + 2);
874             if (nxt_slow_path(env == NULL)) {
875                 return NXT_ERROR;
876             }
877 
878             p = nxt_cpymem(env, name.start, name.length);
879             *p++ = '=';
880             p = nxt_cpymem(p, value.start, value.length);
881             *p = '\0';
882 
883             if (nxt_slow_path(putenv(env) != 0)) {
884                 return NXT_ERROR;
885             }
886         }
887     }
888 
889     return NXT_OK;
890 }
891 
892 
893 static u_char *
nxt_cstr_dup(nxt_mp_t * mp,u_char * dst,u_char * src)894 nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src)
895 {
896     u_char  *p;
897     size_t  len;
898 
899     len = nxt_strlen(src);
900 
901     if (dst == NULL) {
902         dst = nxt_mp_alloc(mp, len + 1);
903         if (nxt_slow_path(dst == NULL)) {
904             return NULL;
905         }
906     }
907 
908     p = nxt_cpymem(dst, src, len);
909     *p = '\0';
910 
911     return dst;
912 }
913 
914 
915 static nxt_int_t
nxt_app_setup(nxt_task_t * task,nxt_process_t * process)916 nxt_app_setup(nxt_task_t *task, nxt_process_t *process)
917 {
918     nxt_process_init_t  *init;
919 
920     process->state = NXT_PROCESS_STATE_CREATED;
921 
922     init = nxt_process_init(process);
923 
924     return init->start(task, &process->data);
925 }
926 
927 
928 nxt_app_lang_module_t *
nxt_app_lang_module(nxt_runtime_t * rt,nxt_str_t * name)929 nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
930 {
931     u_char                 *p, *end, *version;
932     size_t                 version_length;
933     nxt_uint_t             i, n;
934     nxt_app_type_t         type;
935     nxt_app_lang_module_t  *lang;
936 
937     end = name->start + name->length;
938     version = end;
939 
940     for (p = name->start; p < end; p++) {
941         if (*p == ' ') {
942             version = p + 1;
943             break;
944         }
945 
946         if (*p >= '0' && *p <= '9') {
947             version = p;
948             break;
949         }
950     }
951 
952     type = nxt_app_parse_type(name->start, p - name->start);
953 
954     if (type == NXT_APP_UNKNOWN) {
955         return NULL;
956     }
957 
958     version_length = end - version;
959 
960     lang = rt->languages->elts;
961     n = rt->languages->nelts;
962 
963     for (i = 0; i < n; i++) {
964 
965         /*
966          * Versions are sorted in descending order
967          * so first match chooses the highest version.
968          */
969 
970         if (lang[i].type == type
971             && nxt_strvers_match(lang[i].version, version, version_length))
972         {
973             return &lang[i];
974         }
975     }
976 
977     return NULL;
978 }
979 
980 
981 nxt_app_type_t
nxt_app_parse_type(u_char * p,size_t length)982 nxt_app_parse_type(u_char *p, size_t length)
983 {
984     nxt_str_t str;
985 
986     str.length = length;
987     str.start = p;
988 
989     if (nxt_str_eq(&str, "external", 8) || nxt_str_eq(&str, "go", 2)) {
990         return NXT_APP_EXTERNAL;
991 
992     } else if (nxt_str_eq(&str, "python", 6)) {
993         return NXT_APP_PYTHON;
994 
995     } else if (nxt_str_eq(&str, "php", 3)) {
996         return NXT_APP_PHP;
997 
998     } else if (nxt_str_eq(&str, "perl", 4)) {
999         return NXT_APP_PERL;
1000 
1001     } else if (nxt_str_eq(&str, "ruby", 4)) {
1002         return NXT_APP_RUBY;
1003 
1004     } else if (nxt_str_eq(&str, "java", 4)) {
1005         return NXT_APP_JAVA;
1006     }
1007 
1008     return NXT_APP_UNKNOWN;
1009 }
1010 
1011 
1012 nxt_int_t
nxt_unit_default_init(nxt_task_t * task,nxt_unit_init_t * init,nxt_common_app_conf_t * conf)1013 nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init,
1014     nxt_common_app_conf_t *conf)
1015 {
1016     nxt_port_t     *my_port, *proto_port, *router_port;
1017     nxt_runtime_t  *rt;
1018 
1019     nxt_memzero(init, sizeof(nxt_unit_init_t));
1020 
1021     rt = task->thread->runtime;
1022 
1023     proto_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE];
1024     if (nxt_slow_path(proto_port == NULL)) {
1025         return NXT_ERROR;
1026     }
1027 
1028     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1029     if (nxt_slow_path(router_port == NULL)) {
1030         return NXT_ERROR;
1031     }
1032 
1033     my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
1034     if (nxt_slow_path(my_port == NULL)) {
1035         return NXT_ERROR;
1036     }
1037 
1038     init->ready_port.id.pid = proto_port->pid;
1039     init->ready_port.id.id = proto_port->id;
1040     init->ready_port.in_fd = -1;
1041     init->ready_port.out_fd = proto_port->pair[1];
1042 
1043     init->ready_stream = my_port->process->stream;
1044 
1045     init->router_port.id.pid = router_port->pid;
1046     init->router_port.id.id = router_port->id;
1047     init->router_port.in_fd = -1;
1048     init->router_port.out_fd = router_port->pair[1];
1049 
1050     init->read_port.id.pid = my_port->pid;
1051     init->read_port.id.id = my_port->id;
1052     init->read_port.in_fd = my_port->pair[0];
1053     init->read_port.out_fd = my_port->pair[1];
1054 
1055     init->shared_port_fd = conf->shared_port_fd;
1056     init->shared_queue_fd = conf->shared_queue_fd;
1057 
1058     init->log_fd = 2;
1059 
1060     init->shm_limit = conf->shm_limit;
1061     init->request_limit = conf->request_limit;
1062 
1063     return NXT_OK;
1064 }
1065 
1066 
1067 static nxt_int_t
nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t * lhq,void * data)1068 nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1069 {
1070     nxt_pid_t      *qpid;
1071     nxt_process_t  *process;
1072 
1073     process = data;
1074     qpid = (nxt_pid_t *) lhq->key.start;
1075 
1076     if (*qpid == process->isolated_pid) {
1077         return NXT_OK;
1078     }
1079 
1080     return NXT_DECLINED;
1081 }
1082 
1083 
1084 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
1085     NXT_LVLHSH_DEFAULT,
1086     nxt_proto_lvlhsh_isolated_pid_test,
1087     nxt_lvlhsh_alloc,
1088     nxt_lvlhsh_free,
1089 };
1090 
1091 
1092 nxt_inline void
nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t * lhq,nxt_pid_t * pid)1093 nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1094 {
1095     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(nxt_pid_t));
1096     lhq->key.length = sizeof(nxt_pid_t);
1097     lhq->key.start = (u_char *) pid;
1098     lhq->proto = &lvlhsh_processes_proto;
1099 }
1100 
1101 
1102 static void
nxt_proto_process_add(nxt_task_t * task,nxt_process_t * process)1103 nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process)
1104 {
1105     nxt_runtime_t       *rt;
1106     nxt_lvlhsh_query_t  lhq;
1107 
1108     rt = task->thread->runtime;
1109 
1110     nxt_proto_process_lhq_pid(&lhq, &process->isolated_pid);
1111 
1112     lhq.replace = 0;
1113     lhq.value = process;
1114     lhq.pool = rt->mem_pool;
1115 
1116     switch (nxt_lvlhsh_insert(&nxt_proto_processes, &lhq)) {
1117 
1118     case NXT_OK:
1119         nxt_debug(task, "process (isolated %PI) added", process->isolated_pid);
1120 
1121         nxt_queue_insert_tail(&nxt_proto_children, &process->link);
1122         break;
1123 
1124     default:
1125         nxt_debug(task, "process (isolated %PI) failed to add",
1126                   process->isolated_pid);
1127         break;
1128     }
1129 }
1130 
1131 
1132 static nxt_process_t *
nxt_proto_process_remove(nxt_task_t * task,nxt_pid_t pid)1133 nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid)
1134 {
1135     nxt_runtime_t       *rt;
1136     nxt_process_t       *process;
1137     nxt_lvlhsh_query_t  lhq;
1138 
1139     nxt_proto_process_lhq_pid(&lhq, &pid);
1140 
1141     rt = task->thread->runtime;
1142 
1143     lhq.pool = rt->mem_pool;
1144 
1145     switch (nxt_lvlhsh_delete(&nxt_proto_processes, &lhq)) {
1146 
1147     case NXT_OK:
1148         nxt_debug(task, "process (isolated %PI) removed", pid);
1149 
1150         process = lhq.value;
1151 
1152         nxt_queue_remove(&process->link);
1153         process->link.next = NULL;
1154 
1155         break;
1156 
1157     default:
1158         nxt_debug(task, "process (isolated %PI) remove failed", pid);
1159         process = NULL;
1160         break;
1161     }
1162 
1163     return process;
1164 }
1165 
1166 
1167 static nxt_process_t *
nxt_proto_process_find(nxt_task_t * task,nxt_pid_t pid)1168 nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid)
1169 {
1170     nxt_process_t       *process;
1171     nxt_lvlhsh_query_t  lhq;
1172 
1173     nxt_proto_process_lhq_pid(&lhq, &pid);
1174 
1175     if (nxt_lvlhsh_find(&nxt_proto_processes, &lhq) == NXT_OK) {
1176         process = lhq.value;
1177 
1178     } else {
1179         nxt_debug(task, "process (isolated %PI) not found", pid);
1180 
1181         process = NULL;
1182     }
1183 
1184     return process;
1185 }
1186