xref: /unit/src/nxt_main_process.c (revision 1673:883f2f79c2f6)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_router.h>
13 #if (NXT_TLS)
14 #include <nxt_cert.h>
15 #endif
16 
17 #include <sys/mount.h>
18 
19 
20 typedef struct {
21     nxt_socket_t        socket;
22     nxt_socket_error_t  error;
23     u_char              *start;
24     u_char              *end;
25 } nxt_listening_socket_t;
26 
27 
28 typedef struct {
29     nxt_uint_t          size;
30     nxt_conf_map_t      *map;
31 } nxt_conf_app_map_t;
32 
33 
34 extern nxt_port_handlers_t  nxt_controller_process_port_handlers;
35 extern nxt_port_handlers_t  nxt_router_process_port_handlers;
36 
37 
38 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
39     nxt_runtime_t *rt);
40 static void nxt_main_process_title(nxt_task_t *task);
41 static nxt_int_t nxt_main_process_create(nxt_task_t *task,
42     const nxt_process_init_t init);
43 static nxt_int_t nxt_main_start_process(nxt_task_t *task,
44     nxt_process_t *process);
45 static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt);
46 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
47     void *data);
48 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
49     void *data);
50 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
51     void *data);
52 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
53     void *data);
54 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
55     void *data);
56 static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid);
57 static void nxt_main_port_socket_handler(nxt_task_t *task,
58     nxt_port_recv_msg_t *msg);
59 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
60     nxt_listening_socket_t *ls);
61 static void nxt_main_port_modules_handler(nxt_task_t *task,
62     nxt_port_recv_msg_t *msg);
63 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
64 static void nxt_main_port_conf_store_handler(nxt_task_t *task,
65     nxt_port_recv_msg_t *msg);
66 static void nxt_main_port_access_log_handler(nxt_task_t *task,
67     nxt_port_recv_msg_t *msg);
68 
69 const nxt_sig_event_t  nxt_main_process_signals[] = {
70     nxt_event_signal(SIGHUP,  nxt_main_process_signal_handler),
71     nxt_event_signal(SIGINT,  nxt_main_process_sigterm_handler),
72     nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
73     nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
74     nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
75     nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
76     nxt_event_signal_end,
77 };
78 
79 
80 static nxt_bool_t  nxt_exiting;
81 
82 
83 nxt_int_t
84 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
85     nxt_runtime_t *rt)
86 {
87     rt->type = NXT_PROCESS_MAIN;
88 
89     if (nxt_main_process_port_create(task, rt) != NXT_OK) {
90         return NXT_ERROR;
91     }
92 
93     nxt_main_process_title(task);
94 
95     /*
96      * The discovery process will send a message processed by
97      * nxt_main_port_modules_handler() which starts the controller
98      * and router processes.
99      */
100     return nxt_main_process_create(task, nxt_discovery_process);
101 }
102 
103 
104 static nxt_conf_map_t  nxt_common_app_conf[] = {
105     {
106         nxt_string("type"),
107         NXT_CONF_MAP_STR,
108         offsetof(nxt_common_app_conf_t, type),
109     },
110 
111     {
112         nxt_string("user"),
113         NXT_CONF_MAP_STR,
114         offsetof(nxt_common_app_conf_t, user),
115     },
116 
117     {
118         nxt_string("group"),
119         NXT_CONF_MAP_STR,
120         offsetof(nxt_common_app_conf_t, group),
121     },
122 
123     {
124         nxt_string("working_directory"),
125         NXT_CONF_MAP_CSTRZ,
126         offsetof(nxt_common_app_conf_t, working_directory),
127     },
128 
129     {
130         nxt_string("environment"),
131         NXT_CONF_MAP_PTR,
132         offsetof(nxt_common_app_conf_t, environment),
133     },
134 
135     {
136         nxt_string("isolation"),
137         NXT_CONF_MAP_PTR,
138         offsetof(nxt_common_app_conf_t, isolation),
139     },
140 
141     {
142         nxt_string("limits"),
143         NXT_CONF_MAP_PTR,
144         offsetof(nxt_common_app_conf_t, limits),
145     },
146 
147 };
148 
149 
150 static nxt_conf_map_t  nxt_common_app_limits_conf[] = {
151     {
152         nxt_string("shm"),
153         NXT_CONF_MAP_SIZE,
154         offsetof(nxt_common_app_conf_t, shm_limit),
155     },
156 
157 };
158 
159 
160 static nxt_conf_map_t  nxt_external_app_conf[] = {
161     {
162         nxt_string("executable"),
163         NXT_CONF_MAP_CSTRZ,
164         offsetof(nxt_common_app_conf_t, u.external.executable),
165     },
166 
167     {
168         nxt_string("arguments"),
169         NXT_CONF_MAP_PTR,
170         offsetof(nxt_common_app_conf_t, u.external.arguments),
171     },
172 
173 };
174 
175 
176 static nxt_conf_map_t  nxt_python_app_conf[] = {
177     {
178         nxt_string("home"),
179         NXT_CONF_MAP_CSTRZ,
180         offsetof(nxt_common_app_conf_t, u.python.home),
181     },
182 
183     {
184         nxt_string("path"),
185         NXT_CONF_MAP_STR,
186         offsetof(nxt_common_app_conf_t, u.python.path),
187     },
188 
189     {
190         nxt_string("module"),
191         NXT_CONF_MAP_STR,
192         offsetof(nxt_common_app_conf_t, u.python.module),
193     },
194 
195     {
196         nxt_string("callable"),
197         NXT_CONF_MAP_CSTRZ,
198         offsetof(nxt_common_app_conf_t, u.python.callable),
199     },
200 };
201 
202 
203 static nxt_conf_map_t  nxt_php_app_conf[] = {
204     {
205         nxt_string("targets"),
206         NXT_CONF_MAP_PTR,
207         offsetof(nxt_common_app_conf_t, u.php.targets),
208     },
209 
210     {
211         nxt_string("options"),
212         NXT_CONF_MAP_PTR,
213         offsetof(nxt_common_app_conf_t, u.php.options),
214     },
215 };
216 
217 
218 static nxt_conf_map_t  nxt_perl_app_conf[] = {
219     {
220         nxt_string("script"),
221         NXT_CONF_MAP_CSTRZ,
222         offsetof(nxt_common_app_conf_t, u.perl.script),
223     },
224 };
225 
226 
227 static nxt_conf_map_t  nxt_ruby_app_conf[] = {
228     {
229         nxt_string("script"),
230         NXT_CONF_MAP_STR,
231         offsetof(nxt_common_app_conf_t, u.ruby.script),
232     },
233 };
234 
235 
236 static nxt_conf_map_t  nxt_java_app_conf[] = {
237     {
238         nxt_string("classpath"),
239         NXT_CONF_MAP_PTR,
240         offsetof(nxt_common_app_conf_t, u.java.classpath),
241     },
242     {
243         nxt_string("webapp"),
244         NXT_CONF_MAP_CSTRZ,
245         offsetof(nxt_common_app_conf_t, u.java.webapp),
246     },
247     {
248         nxt_string("options"),
249         NXT_CONF_MAP_PTR,
250         offsetof(nxt_common_app_conf_t, u.java.options),
251     },
252     {
253         nxt_string("unit_jars"),
254         NXT_CONF_MAP_CSTRZ,
255         offsetof(nxt_common_app_conf_t, u.java.unit_jars),
256     },
257 
258 };
259 
260 
261 static nxt_conf_app_map_t  nxt_app_maps[] = {
262     { nxt_nitems(nxt_external_app_conf),  nxt_external_app_conf },
263     { nxt_nitems(nxt_python_app_conf),    nxt_python_app_conf },
264     { nxt_nitems(nxt_php_app_conf),       nxt_php_app_conf },
265     { nxt_nitems(nxt_perl_app_conf),      nxt_perl_app_conf },
266     { nxt_nitems(nxt_ruby_app_conf),      nxt_ruby_app_conf },
267     { nxt_nitems(nxt_java_app_conf),      nxt_java_app_conf },
268 };
269 
270 
271 static void
272 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
273 {
274     nxt_debug(task, "main data: %*s",
275               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
276 }
277 
278 
279 static void
280 nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
281 {
282     u_char                 *start, *p, ch;
283     size_t                 type_len;
284     nxt_int_t              ret;
285     nxt_buf_t              *b;
286     nxt_port_t             *port;
287     nxt_runtime_t          *rt;
288     nxt_process_t          *process;
289     nxt_app_type_t         idx;
290     nxt_conf_value_t       *conf;
291     nxt_process_init_t     *init;
292     nxt_common_app_conf_t  *app_conf;
293 
294     ret = NXT_ERROR;
295 
296     rt = task->thread->runtime;
297 
298     process = nxt_main_process_new(task, rt);
299     if (nxt_slow_path(process == NULL)) {
300         return;
301     }
302 
303     init = nxt_process_init(process);
304 
305     *init = nxt_app_process;
306 
307     b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
308     if (b == NULL) {
309         goto failed;
310     }
311 
312     nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos,
313               b->mem.pos);
314 
315     app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
316     if (nxt_slow_path(app_conf == NULL)) {
317         goto failed;
318     }
319 
320     start = b->mem.pos;
321 
322     app_conf->name.start = start;
323     app_conf->name.length = nxt_strlen(start);
324 
325     init->name = (const char *) start;
326 
327     process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
328                                  + sizeof("\"\" application") + 1);
329 
330     if (nxt_slow_path(process->name == NULL)) {
331         goto failed;
332     }
333 
334     p = (u_char *) process->name;
335     *p++ = '"';
336     p = nxt_cpymem(p, init->name, app_conf->name.length);
337     p = nxt_cpymem(p, "\" application", 13);
338     *p = '\0';
339 
340     app_conf->shm_limit = 100 * 1024 * 1024;
341 
342     start += app_conf->name.length + 1;
343 
344     conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL);
345     if (conf == NULL) {
346         nxt_alert(task, "router app configuration parsing error");
347 
348         goto failed;
349     }
350 
351     rt = task->thread->runtime;
352 
353     app_conf->user.start  = (u_char*)rt->user_cred.user;
354     app_conf->user.length = nxt_strlen(rt->user_cred.user);
355 
356     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf,
357                               nxt_nitems(nxt_common_app_conf), app_conf);
358 
359     if (ret != NXT_OK) {
360         nxt_alert(task, "failed to map common app conf received from router");
361         goto failed;
362     }
363 
364     for (type_len = 0; type_len != app_conf->type.length; type_len++) {
365         ch = app_conf->type.start[type_len];
366 
367         if (ch == ' ' || nxt_isdigit(ch)) {
368             break;
369         }
370     }
371 
372     idx = nxt_app_parse_type(app_conf->type.start, type_len);
373 
374     if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
375         nxt_alert(task, "invalid app type %d received from router", (int) idx);
376         goto failed;
377     }
378 
379     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map,
380                               nxt_app_maps[idx].size, app_conf);
381 
382     if (nxt_slow_path(ret != NXT_OK)) {
383         nxt_alert(task, "failed to map app conf received from router");
384         goto failed;
385     }
386 
387     if (app_conf->limits != NULL) {
388         ret = nxt_conf_map_object(process->mem_pool, app_conf->limits,
389                                   nxt_common_app_limits_conf,
390                                   nxt_nitems(nxt_common_app_limits_conf),
391                                   app_conf);
392 
393         if (nxt_slow_path(ret != NXT_OK)) {
394             nxt_alert(task, "failed to map app limits received from router");
395             goto failed;
396         }
397     }
398 
399     app_conf->self = conf;
400 
401     process->stream = msg->port_msg.stream;
402     process->data.app = app_conf;
403 
404     ret = nxt_main_start_process(task, process);
405     if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
406         return;
407     }
408 
409 failed:
410 
411     nxt_process_use(task, process, -1);
412 
413     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
414                                  msg->port_msg.reply_port);
415 
416     if (nxt_fast_path(port != NULL)) {
417         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
418                               -1, msg->port_msg.stream, 0, NULL);
419     }
420 }
421 
422 
423 static void
424 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
425 {
426     nxt_port_t     *port;
427     nxt_process_t  *process;
428     nxt_runtime_t  *rt;
429 
430     rt = task->thread->runtime;
431 
432     process = nxt_runtime_process_find(rt, msg->port_msg.pid);
433     if (nxt_slow_path(process == NULL)) {
434         return;
435     }
436 
437     nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
438 
439     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
440                                  msg->port_msg.reply_port);
441 
442 
443     if (nxt_slow_path(port == NULL)) {
444         return;
445     }
446 
447 #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
448     if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
449         if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
450                                                    process->user_cred,
451                                                    &process->isolation.clone)
452                           != NXT_OK))
453         {
454             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
455                                          -1, msg->port_msg.stream, 0, NULL);
456             return;
457         }
458      }
459 
460 #endif
461 
462     process->state = NXT_PROCESS_STATE_CREATED;
463 
464     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST,
465                                  -1, msg->port_msg.stream, 0, NULL);
466 }
467 
468 
469 static nxt_port_handlers_t  nxt_main_process_port_handlers = {
470     .data             = nxt_port_main_data_handler,
471     .process_created  = nxt_main_process_created_handler,
472     .process_ready    = nxt_port_process_ready_handler,
473     .start_process    = nxt_port_main_start_process_handler,
474     .socket           = nxt_main_port_socket_handler,
475     .modules          = nxt_main_port_modules_handler,
476     .conf_store       = nxt_main_port_conf_store_handler,
477 #if (NXT_TLS)
478     .cert_get         = nxt_cert_store_get_handler,
479     .cert_delete      = nxt_cert_store_delete_handler,
480 #endif
481     .access_log       = nxt_main_port_access_log_handler,
482     .rpc_ready        = nxt_port_rpc_handler,
483     .rpc_error        = nxt_port_rpc_handler,
484 };
485 
486 
487 static nxt_int_t
488 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
489 {
490     nxt_int_t      ret;
491     nxt_port_t     *port;
492     nxt_process_t  *process;
493 
494     port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
495                                            NXT_PROCESS_MAIN);
496     if (nxt_slow_path(port == NULL)) {
497         return NXT_ERROR;
498     }
499 
500     process = port->process;
501 
502     ret = nxt_port_socket_init(task, port, 0);
503     if (nxt_slow_path(ret != NXT_OK)) {
504         nxt_port_use(task, port, -1);
505         return ret;
506     }
507 
508     /*
509      * A main process port.  A write port is not closed
510      * since it should be inherited by processes.
511      */
512     nxt_port_enable(task, port, &nxt_main_process_port_handlers);
513 
514     process->state = NXT_PROCESS_STATE_READY;
515 
516     return NXT_OK;
517 }
518 
519 
520 static void
521 nxt_main_process_title(nxt_task_t *task)
522 {
523     u_char      *p, *end;
524     nxt_uint_t  i;
525     u_char      title[2048];
526 
527     end = title + sizeof(title) - 1;
528 
529     p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
530                     nxt_process_argv[0]);
531 
532     for (i = 1; nxt_process_argv[i] != NULL; i++) {
533         p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
534     }
535 
536     if (p < end) {
537         *p++ = ']';
538     }
539 
540     *p = '\0';
541 
542     nxt_process_title(task, "%s", title);
543 }
544 
545 
546 static nxt_int_t
547 nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init)
548 {
549     nxt_int_t           ret;
550     nxt_runtime_t       *rt;
551     nxt_process_t       *process;
552     nxt_process_init_t  *pinit;
553 
554     rt = task->thread->runtime;
555 
556     process = nxt_main_process_new(task, rt);
557     if (nxt_slow_path(process == NULL)) {
558         return NXT_ERROR;
559     }
560 
561     process->name = init.name;
562     process->user_cred = &rt->user_cred;
563 
564     pinit = nxt_process_init(process);
565     *pinit = init;
566 
567     ret = nxt_main_start_process(task, process);
568     if (nxt_slow_path(ret == NXT_ERROR)) {
569         nxt_process_use(task, process, -1);
570     }
571 
572     return ret;
573 }
574 
575 
576 static nxt_process_t *
577 nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt)
578 {
579     nxt_process_t  *process;
580 
581     process = nxt_runtime_process_new(rt);
582     if (nxt_slow_path(process == NULL)) {
583         return NULL;
584     }
585 
586     process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
587     if (process->mem_pool == NULL) {
588         nxt_process_use(task, process, -1);
589         return NULL;
590     }
591 
592     return process;
593 }
594 
595 
596 static nxt_int_t
597 nxt_main_start_process(nxt_task_t *task, nxt_process_t *process)
598 {
599     nxt_mp_t            *tmp_mp;
600     nxt_int_t           ret;
601     nxt_pid_t           pid;
602     nxt_port_t          *port;
603     nxt_process_init_t  *init;
604 
605     init = nxt_process_init(process);
606 
607     port = nxt_port_new(task, 0, 0, init->type);
608     if (nxt_slow_path(port == NULL)) {
609         return NXT_ERROR;
610     }
611 
612     nxt_process_port_add(task, process, port);
613 
614     ret = nxt_port_socket_init(task, port, 0);
615     if (nxt_slow_path(ret != NXT_OK)) {
616         goto free_port;
617     }
618 
619     tmp_mp = nxt_mp_create(1024, 128, 256, 32);
620     if (nxt_slow_path(tmp_mp == NULL)) {
621         ret = NXT_ERROR;
622 
623         goto close_port;
624     }
625 
626     if (init->prefork) {
627         ret = init->prefork(task, process, tmp_mp);
628         if (nxt_slow_path(ret != NXT_OK)) {
629             goto free_mempool;
630         }
631     }
632 
633     pid = nxt_process_create(task, process);
634 
635     switch (pid) {
636 
637     case -1:
638         ret = NXT_ERROR;
639         break;
640 
641     case 0:
642         /* The child process: return to the event engine work queue loop. */
643 
644         nxt_process_use(task, process, -1);
645 
646         ret = NXT_AGAIN;
647         break;
648 
649     default:
650         /* The main process created a new process. */
651 
652         nxt_process_use(task, process, -1);
653 
654         nxt_port_read_close(port);
655         nxt_port_write_enable(task, port);
656 
657         ret = NXT_OK;
658         break;
659     }
660 
661 free_mempool:
662 
663     nxt_mp_destroy(tmp_mp);
664 
665 close_port:
666 
667     if (nxt_slow_path(ret == NXT_ERROR)) {
668         nxt_port_close(task, port);
669     }
670 
671 free_port:
672 
673     nxt_port_use(task, port, -1);
674 
675     return ret;
676 }
677 
678 
679 static void
680 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
681 {
682     nxt_debug(task, "sigterm handler signo:%d (%s)",
683               (int) (uintptr_t) obj, data);
684 
685     /* TODO: fast exit. */
686 
687     nxt_exiting = 1;
688 
689     nxt_runtime_quit(task, 0);
690 }
691 
692 
693 static void
694 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
695 {
696     nxt_debug(task, "sigquit handler signo:%d (%s)",
697               (int) (uintptr_t) obj, data);
698 
699     /* TODO: graceful exit. */
700 
701     nxt_exiting = 1;
702 
703     nxt_runtime_quit(task, 0);
704 }
705 
706 
707 static void
708 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
709 {
710     nxt_mp_t        *mp;
711     nxt_int_t       ret;
712     nxt_uint_t      n;
713     nxt_port_t      *port;
714     nxt_file_t      *file, *new_file;
715     nxt_array_t     *new_files;
716     nxt_runtime_t   *rt;
717 
718     nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
719             (int) (uintptr_t) obj, data, "log files rotation");
720 
721     rt = task->thread->runtime;
722 
723     port = rt->port_by_type[NXT_PROCESS_ROUTER];
724 
725     if (nxt_fast_path(port != NULL)) {
726         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
727                                      -1, 0, 0, NULL);
728     }
729 
730     mp = nxt_mp_create(1024, 128, 256, 32);
731     if (mp == NULL) {
732         return;
733     }
734 
735     n = nxt_list_nelts(rt->log_files);
736 
737     new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
738     if (new_files == NULL) {
739         nxt_mp_destroy(mp);
740         return;
741     }
742 
743     nxt_list_each(file, rt->log_files) {
744 
745         /* This allocation cannot fail. */
746         new_file = nxt_array_add(new_files);
747 
748         new_file->name = file->name;
749         new_file->fd = NXT_FILE_INVALID;
750         new_file->log_level = NXT_LOG_ALERT;
751 
752         ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
753                             NXT_FILE_OWNER_ACCESS);
754 
755         if (ret != NXT_OK) {
756             goto fail;
757         }
758 
759     } nxt_list_loop;
760 
761     new_file = new_files->elts;
762 
763     ret = nxt_file_stderr(&new_file[0]);
764 
765     if (ret == NXT_OK) {
766         n = 0;
767 
768         nxt_list_each(file, rt->log_files) {
769 
770             nxt_port_change_log_file(task, rt, n, new_file[n].fd);
771             /*
772              * The old log file descriptor must be closed at the moment
773              * when no other threads use it.  dup2() allows to use the
774              * old file descriptor for new log file.  This change is
775              * performed atomically in the kernel.
776              */
777             (void) nxt_file_redirect(file, new_file[n].fd);
778 
779             n++;
780 
781         } nxt_list_loop;
782 
783         nxt_mp_destroy(mp);
784         return;
785     }
786 
787 fail:
788 
789     new_file = new_files->elts;
790     n = new_files->nelts;
791 
792     while (n != 0) {
793         if (new_file->fd != NXT_FILE_INVALID) {
794             nxt_file_close(task, new_file);
795         }
796 
797         new_file++;
798         n--;
799     }
800 
801     nxt_mp_destroy(mp);
802 }
803 
804 
805 static void
806 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
807 {
808     int                    status;
809     nxt_err_t              err;
810     nxt_pid_t              pid;
811 
812     nxt_debug(task, "sigchld handler signo:%d (%s)",
813               (int) (uintptr_t) obj, data);
814 
815     for ( ;; ) {
816         pid = waitpid(-1, &status, WNOHANG);
817 
818         if (pid == -1) {
819 
820             switch (err = nxt_errno) {
821 
822             case NXT_ECHILD:
823                 return;
824 
825             case NXT_EINTR:
826                 continue;
827 
828             default:
829                 nxt_alert(task, "waitpid() failed: %E", err);
830                 return;
831             }
832         }
833 
834         nxt_debug(task, "waitpid(): %PI", pid);
835 
836         if (pid == 0) {
837             return;
838         }
839 
840         if (WTERMSIG(status)) {
841 #ifdef WCOREDUMP
842             nxt_alert(task, "process %PI exited on signal %d%s",
843                       pid, WTERMSIG(status),
844                       WCOREDUMP(status) ? " (core dumped)" : "");
845 #else
846             nxt_alert(task, "process %PI exited on signal %d",
847                       pid, WTERMSIG(status));
848 #endif
849 
850         } else {
851             nxt_trace(task, "process %PI exited with code %d",
852                       pid, WEXITSTATUS(status));
853         }
854 
855         nxt_main_cleanup_process(task, pid);
856     }
857 }
858 
859 
860 static void
861 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
862 {
863     nxt_trace(task, "signal signo:%d (%s) recevied, ignored",
864               (int) (uintptr_t) obj, data);
865 }
866 
867 
868 static void
869 nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid)
870 {
871     int                 stream;
872     nxt_int_t           ret;
873     nxt_buf_t           *buf;
874     nxt_port_t          *port;
875     const char          *name;
876     nxt_runtime_t       *rt;
877     nxt_process_t       *process;
878     nxt_process_init_t  init;
879 
880     rt = task->thread->runtime;
881 
882     process = nxt_runtime_process_find(rt, pid);
883     if (!process) {
884         return;
885     }
886 
887     if (process->isolation.cleanup != NULL) {
888         process->isolation.cleanup(task, process);
889     }
890 
891     name = process->name;
892     stream = process->stream;
893     init = *((nxt_process_init_t *) nxt_process_init(process));
894 
895     if (process->state == NXT_PROCESS_STATE_READY) {
896         process->stream = 0;
897     }
898 
899     nxt_process_close_ports(task, process);
900 
901     if (nxt_exiting) {
902         if (rt->nprocesses <= 1) {
903             nxt_runtime_quit(task, 0);
904         }
905 
906         return;
907     }
908 
909     nxt_runtime_process_each(rt, process) {
910 
911         if (process->pid == nxt_pid
912             || process->pid == pid
913             || nxt_queue_is_empty(&process->ports))
914         {
915             continue;
916         }
917 
918         port = nxt_process_port_first(process);
919 
920         if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) {
921             continue;
922         }
923 
924         buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
925                                    sizeof(pid));
926 
927         if (nxt_slow_path(buf == NULL)) {
928             continue;
929         }
930 
931         buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
932 
933         nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
934                               stream, 0, buf);
935 
936     } nxt_runtime_process_loop;
937 
938     if (init.restart) {
939         ret = nxt_main_process_create(task, init);
940         if (nxt_slow_path(ret == NXT_ERROR)) {
941             nxt_alert(task, "failed to restart %s", name);
942         }
943     }
944 }
945 
946 
947 static void
948 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
949 {
950     size_t                  size;
951     nxt_int_t               ret;
952     nxt_buf_t               *b, *out;
953     nxt_port_t              *port;
954     nxt_sockaddr_t          *sa;
955     nxt_port_msg_type_t     type;
956     nxt_listening_socket_t  ls;
957     u_char                  message[2048];
958 
959     b = msg->buf;
960     sa = (nxt_sockaddr_t *) b->mem.pos;
961 
962     /* TODO check b size and make plain */
963 
964     out = NULL;
965 
966     ls.socket = -1;
967     ls.error = NXT_SOCKET_ERROR_SYSTEM;
968     ls.start = message;
969     ls.end = message + sizeof(message);
970 
971     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
972                                  msg->port_msg.reply_port);
973 
974     nxt_debug(task, "listening socket \"%*s\"",
975               (size_t) sa->length, nxt_sockaddr_start(sa));
976 
977     ret = nxt_main_listening_socket(sa, &ls);
978 
979     if (ret == NXT_OK) {
980         nxt_debug(task, "socket(\"%*s\"): %d",
981                   (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
982 
983         type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
984 
985     } else {
986         size = ls.end - ls.start;
987 
988         nxt_alert(task, "%*s", size, ls.start);
989 
990         out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
991                                    size + 1);
992         if (nxt_slow_path(out == NULL)) {
993             return;
994         }
995 
996         *out->mem.free++ = (uint8_t) ls.error;
997 
998         out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
999 
1000         type = NXT_PORT_MSG_RPC_ERROR;
1001     }
1002 
1003     nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
1004                           0, out);
1005 }
1006 
1007 
1008 static nxt_int_t
1009 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
1010 {
1011     nxt_err_t         err;
1012     nxt_socket_t      s;
1013 
1014     const socklen_t   length = sizeof(int);
1015     static const int  enable = 1;
1016 
1017     s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1018 
1019     if (nxt_slow_path(s == -1)) {
1020         err = nxt_errno;
1021 
1022 #if (NXT_INET6)
1023 
1024         if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1025             ls->error = NXT_SOCKET_ERROR_NOINET6;
1026         }
1027 
1028 #endif
1029 
1030         ls->end = nxt_sprintf(ls->start, ls->end,
1031                               "socket(\\\"%*s\\\") failed %E",
1032                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1033 
1034         return NXT_ERROR;
1035     }
1036 
1037     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1038         ls->end = nxt_sprintf(ls->start, ls->end,
1039                               "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1040                               (size_t) sa->length, nxt_sockaddr_start(sa),
1041                               nxt_errno);
1042         goto fail;
1043     }
1044 
1045 #if (NXT_INET6)
1046 
1047     if (sa->u.sockaddr.sa_family == AF_INET6) {
1048 
1049         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1050             ls->end = nxt_sprintf(ls->start, ls->end,
1051                                "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1052                                (size_t) sa->length, nxt_sockaddr_start(sa),
1053                                nxt_errno);
1054             goto fail;
1055         }
1056     }
1057 
1058 #endif
1059 
1060     if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1061         err = nxt_errno;
1062 
1063 #if (NXT_HAVE_UNIX_DOMAIN)
1064 
1065         if (sa->u.sockaddr.sa_family == AF_UNIX) {
1066             switch (err) {
1067 
1068             case EACCES:
1069                 ls->error = NXT_SOCKET_ERROR_ACCESS;
1070                 break;
1071 
1072             case ENOENT:
1073             case ENOTDIR:
1074                 ls->error = NXT_SOCKET_ERROR_PATH;
1075                 break;
1076             }
1077 
1078         } else
1079 #endif
1080         {
1081             switch (err) {
1082 
1083             case EACCES:
1084                 ls->error = NXT_SOCKET_ERROR_PORT;
1085                 break;
1086 
1087             case EADDRINUSE:
1088                 ls->error = NXT_SOCKET_ERROR_INUSE;
1089                 break;
1090 
1091             case EADDRNOTAVAIL:
1092                 ls->error = NXT_SOCKET_ERROR_NOADDR;
1093                 break;
1094             }
1095         }
1096 
1097         ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1098                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1099         goto fail;
1100     }
1101 
1102 #if (NXT_HAVE_UNIX_DOMAIN)
1103 
1104     if (sa->u.sockaddr.sa_family == AF_UNIX) {
1105         char     *filename;
1106         mode_t   access;
1107 
1108         filename = sa->u.sockaddr_un.sun_path;
1109         access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1110 
1111         if (chmod(filename, access) != 0) {
1112             ls->end = nxt_sprintf(ls->start, ls->end,
1113                                   "chmod(\\\"%s\\\") failed %E",
1114                                   filename, nxt_errno);
1115             goto fail;
1116         }
1117     }
1118 
1119 #endif
1120 
1121     ls->socket = s;
1122 
1123     return NXT_OK;
1124 
1125 fail:
1126 
1127     (void) close(s);
1128 
1129     return NXT_ERROR;
1130 }
1131 
1132 
1133 static nxt_conf_map_t  nxt_app_lang_module_map[] = {
1134     {
1135         nxt_string("type"),
1136         NXT_CONF_MAP_INT,
1137         offsetof(nxt_app_lang_module_t, type),
1138     },
1139 
1140     {
1141         nxt_string("version"),
1142         NXT_CONF_MAP_CSTRZ,
1143         offsetof(nxt_app_lang_module_t, version),
1144     },
1145 
1146     {
1147         nxt_string("file"),
1148         NXT_CONF_MAP_CSTRZ,
1149         offsetof(nxt_app_lang_module_t, file),
1150     },
1151 };
1152 
1153 
1154 static nxt_conf_map_t  nxt_app_lang_mounts_map[] = {
1155     {
1156         nxt_string("src"),
1157         NXT_CONF_MAP_CSTRZ,
1158         offsetof(nxt_fs_mount_t, src),
1159     },
1160     {
1161         nxt_string("dst"),
1162         NXT_CONF_MAP_CSTRZ,
1163         offsetof(nxt_fs_mount_t, dst),
1164     },
1165     {
1166         nxt_string("name"),
1167         NXT_CONF_MAP_CSTRZ,
1168         offsetof(nxt_fs_mount_t, name),
1169     },
1170     {
1171         nxt_string("type"),
1172         NXT_CONF_MAP_INT,
1173         offsetof(nxt_fs_mount_t, type),
1174     },
1175     {
1176         nxt_string("flags"),
1177         NXT_CONF_MAP_INT,
1178         offsetof(nxt_fs_mount_t, flags),
1179     },
1180     {
1181         nxt_string("data"),
1182         NXT_CONF_MAP_CSTRZ,
1183         offsetof(nxt_fs_mount_t, data),
1184     },
1185 };
1186 
1187 
1188 static void
1189 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1190 {
1191     uint32_t               index, jindex, nmounts;
1192     nxt_mp_t               *mp;
1193     nxt_int_t              ret;
1194     nxt_buf_t              *b;
1195     nxt_port_t             *port;
1196     nxt_runtime_t          *rt;
1197     nxt_fs_mount_t         *mnt;
1198     nxt_conf_value_t       *conf, *root, *value, *mounts;
1199     nxt_app_lang_module_t  *lang;
1200 
1201     static nxt_str_t root_path = nxt_string("/");
1202     static nxt_str_t mounts_name = nxt_string("mounts");
1203 
1204     rt = task->thread->runtime;
1205 
1206     if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1207         return;
1208     }
1209 
1210     if (nxt_exiting) {
1211         nxt_debug(task, "ignoring discovered modules, exiting");
1212         return;
1213     }
1214 
1215     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1216                                  msg->port_msg.reply_port);
1217 
1218     if (nxt_fast_path(port != NULL)) {
1219         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1220                                      msg->port_msg.stream, 0, NULL);
1221     }
1222 
1223     b = msg->buf;
1224 
1225     if (b == NULL) {
1226         return;
1227     }
1228 
1229     mp = nxt_mp_create(1024, 128, 256, 32);
1230     if (mp == NULL) {
1231         return;
1232     }
1233 
1234     b = nxt_buf_chk_make_plain(mp, b, msg->size);
1235 
1236     if (b == NULL) {
1237         return;
1238     }
1239 
1240     nxt_debug(task, "application languages: \"%*s\"",
1241               b->mem.free - b->mem.pos, b->mem.pos);
1242 
1243     conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1244     if (conf == NULL) {
1245         goto fail;
1246     }
1247 
1248     root = nxt_conf_get_path(conf, &root_path);
1249     if (root == NULL) {
1250         goto fail;
1251     }
1252 
1253     for (index = 0; /* void */ ; index++) {
1254         value = nxt_conf_get_array_element(root, index);
1255         if (value == NULL) {
1256             break;
1257         }
1258 
1259         lang = nxt_array_zero_add(rt->languages);
1260         if (lang == NULL) {
1261             goto fail;
1262         }
1263 
1264         lang->module = NULL;
1265 
1266         ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1267                                   nxt_nitems(nxt_app_lang_module_map), lang);
1268 
1269         if (ret != NXT_OK) {
1270             goto fail;
1271         }
1272 
1273         mounts = nxt_conf_get_object_member(value, &mounts_name, NULL);
1274         if (mounts == NULL) {
1275             nxt_alert(task, "missing mounts from discovery message.");
1276             goto fail;
1277         }
1278 
1279         if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) {
1280             nxt_alert(task, "invalid mounts type from discovery message.");
1281             goto fail;
1282         }
1283 
1284         nmounts = nxt_conf_array_elements_count(mounts);
1285 
1286         lang->mounts = nxt_array_create(rt->mem_pool, nmounts,
1287                                         sizeof(nxt_fs_mount_t));
1288 
1289         if (lang->mounts == NULL) {
1290             goto fail;
1291         }
1292 
1293         for (jindex = 0; /* */; jindex++) {
1294             value = nxt_conf_get_array_element(mounts, jindex);
1295             if (value == NULL) {
1296                 break;
1297             }
1298 
1299             mnt = nxt_array_zero_add(lang->mounts);
1300             if (mnt == NULL) {
1301                 goto fail;
1302             }
1303 
1304             mnt->builtin = 1;
1305             mnt->deps = 1;
1306 
1307             ret = nxt_conf_map_object(rt->mem_pool, value,
1308                                       nxt_app_lang_mounts_map,
1309                                       nxt_nitems(nxt_app_lang_mounts_map), mnt);
1310 
1311             if (ret != NXT_OK) {
1312                 goto fail;
1313             }
1314         }
1315 
1316         nxt_debug(task, "lang %d %s \"%s\" (%d mounts)",
1317                   lang->type, lang->version, lang->file, lang->mounts->nelts);
1318     }
1319 
1320     qsort(rt->languages->elts, rt->languages->nelts,
1321           sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1322 
1323 fail:
1324 
1325     nxt_mp_destroy(mp);
1326 
1327     ret = nxt_main_process_create(task, nxt_controller_process);
1328     if (ret == NXT_OK) {
1329         ret = nxt_main_process_create(task, nxt_router_process);
1330     }
1331 
1332     if (nxt_slow_path(ret == NXT_ERROR)) {
1333         nxt_exiting = 1;
1334 
1335         nxt_runtime_quit(task, 1);
1336     }
1337 }
1338 
1339 
1340 static int nxt_cdecl
1341 nxt_app_lang_compare(const void *v1, const void *v2)
1342 {
1343     int                          n;
1344     const nxt_app_lang_module_t  *lang1, *lang2;
1345 
1346     lang1 = v1;
1347     lang2 = v2;
1348 
1349     n = lang1->type - lang2->type;
1350 
1351     if (n != 0) {
1352         return n;
1353     }
1354 
1355     n = nxt_strverscmp(lang1->version, lang2->version);
1356 
1357     /* Negate result to move higher versions to the beginning. */
1358 
1359     return -n;
1360 }
1361 
1362 
1363 static void
1364 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1365 {
1366     ssize_t        n, size, offset;
1367     nxt_buf_t      *b;
1368     nxt_int_t      ret;
1369     nxt_file_t     file;
1370     nxt_runtime_t  *rt;
1371 
1372     nxt_memzero(&file, sizeof(nxt_file_t));
1373 
1374     rt = task->thread->runtime;
1375 
1376     file.name = (nxt_file_name_t *) rt->conf_tmp;
1377 
1378     if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY,
1379                                     NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS)
1380                       != NXT_OK))
1381     {
1382         goto error;
1383     }
1384 
1385     offset = 0;
1386 
1387     for (b = msg->buf; b != NULL; b = b->next) {
1388         size = nxt_buf_mem_used_size(&b->mem);
1389 
1390         n = nxt_file_write(&file, b->mem.pos, size, offset);
1391 
1392         if (nxt_slow_path(n != size)) {
1393             nxt_file_close(task, &file);
1394             (void) nxt_file_delete(file.name);
1395             goto error;
1396         }
1397 
1398         offset += n;
1399     }
1400 
1401     nxt_file_close(task, &file);
1402 
1403     ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf);
1404 
1405     if (nxt_fast_path(ret == NXT_OK)) {
1406         return;
1407     }
1408 
1409 error:
1410 
1411     nxt_alert(task, "failed to store current configuration");
1412 }
1413 
1414 
1415 static void
1416 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1417 {
1418     u_char               *path;
1419     nxt_int_t            ret;
1420     nxt_file_t           file;
1421     nxt_port_t           *port;
1422     nxt_port_msg_type_t  type;
1423 
1424     nxt_debug(task, "opening access log file");
1425 
1426     path = msg->buf->mem.pos;
1427 
1428     nxt_memzero(&file, sizeof(nxt_file_t));
1429 
1430     file.name = (nxt_file_name_t *) path;
1431     file.log_level = NXT_LOG_ERR;
1432 
1433     ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1434                         NXT_FILE_OWNER_ACCESS);
1435 
1436     type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1437                            : NXT_PORT_MSG_RPC_ERROR;
1438 
1439     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1440                                  msg->port_msg.reply_port);
1441 
1442     if (nxt_fast_path(port != NULL)) {
1443         (void) nxt_port_socket_write(task, port, type, file.fd,
1444                                      msg->port_msg.stream, 0, NULL);
1445     }
1446 }
1447