xref: /unit/src/nxt_main_process.c (revision 1489:4a3ec07f4b19)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_router.h>
13 #if (NXT_TLS)
14 #include <nxt_cert.h>
15 #endif
16 
17 #include <sys/mount.h>
18 
19 
20 typedef struct {
21     nxt_socket_t        socket;
22     nxt_socket_error_t  error;
23     u_char              *start;
24     u_char              *end;
25 } nxt_listening_socket_t;
26 
27 
28 typedef struct {
29     nxt_uint_t          size;
30     nxt_conf_map_t      *map;
31 } nxt_conf_app_map_t;
32 
33 
34 extern nxt_port_handlers_t  nxt_controller_process_port_handlers;
35 extern nxt_port_handlers_t  nxt_router_process_port_handlers;
36 
37 
38 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
39     nxt_runtime_t *rt);
40 static void nxt_main_process_title(nxt_task_t *task);
41 static nxt_int_t nxt_main_process_create(nxt_task_t *task,
42     const nxt_process_init_t init);
43 static nxt_int_t nxt_main_start_process(nxt_task_t *task,
44     nxt_process_t *process);
45 static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt);
46 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
47     void *data);
48 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
49     void *data);
50 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
51     void *data);
52 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
53     void *data);
54 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
55     void *data);
56 static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid);
57 static void nxt_main_port_socket_handler(nxt_task_t *task,
58     nxt_port_recv_msg_t *msg);
59 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
60     nxt_listening_socket_t *ls);
61 static void nxt_main_port_modules_handler(nxt_task_t *task,
62     nxt_port_recv_msg_t *msg);
63 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
64 static void nxt_main_port_conf_store_handler(nxt_task_t *task,
65     nxt_port_recv_msg_t *msg);
66 static void nxt_main_port_access_log_handler(nxt_task_t *task,
67     nxt_port_recv_msg_t *msg);
68 
69 const nxt_sig_event_t  nxt_main_process_signals[] = {
70     nxt_event_signal(SIGHUP,  nxt_main_process_signal_handler),
71     nxt_event_signal(SIGINT,  nxt_main_process_sigterm_handler),
72     nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
73     nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
74     nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
75     nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
76     nxt_event_signal_end,
77 };
78 
79 
80 static nxt_bool_t  nxt_exiting;
81 
82 
83 nxt_int_t
84 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
85     nxt_runtime_t *rt)
86 {
87     rt->type = NXT_PROCESS_MAIN;
88 
89     if (nxt_main_process_port_create(task, rt) != NXT_OK) {
90         return NXT_ERROR;
91     }
92 
93     nxt_main_process_title(task);
94 
95     /*
96      * The discovery process will send a message processed by
97      * nxt_main_port_modules_handler() which starts the controller
98      * and router processes.
99      */
100     return nxt_main_process_create(task, nxt_discovery_process);
101 }
102 
103 
104 static nxt_conf_map_t  nxt_common_app_conf[] = {
105     {
106         nxt_string("type"),
107         NXT_CONF_MAP_STR,
108         offsetof(nxt_common_app_conf_t, type),
109     },
110 
111     {
112         nxt_string("user"),
113         NXT_CONF_MAP_STR,
114         offsetof(nxt_common_app_conf_t, user),
115     },
116 
117     {
118         nxt_string("group"),
119         NXT_CONF_MAP_STR,
120         offsetof(nxt_common_app_conf_t, group),
121     },
122 
123     {
124         nxt_string("working_directory"),
125         NXT_CONF_MAP_CSTRZ,
126         offsetof(nxt_common_app_conf_t, working_directory),
127     },
128 
129     {
130         nxt_string("environment"),
131         NXT_CONF_MAP_PTR,
132         offsetof(nxt_common_app_conf_t, environment),
133     },
134 
135     {
136         nxt_string("isolation"),
137         NXT_CONF_MAP_PTR,
138         offsetof(nxt_common_app_conf_t, isolation),
139     },
140 
141     {
142         nxt_string("limits"),
143         NXT_CONF_MAP_PTR,
144         offsetof(nxt_common_app_conf_t, limits),
145     },
146 
147 };
148 
149 
150 static nxt_conf_map_t  nxt_common_app_limits_conf[] = {
151     {
152         nxt_string("shm"),
153         NXT_CONF_MAP_SIZE,
154         offsetof(nxt_common_app_conf_t, shm_limit),
155     },
156 
157 };
158 
159 
160 static nxt_conf_map_t  nxt_external_app_conf[] = {
161     {
162         nxt_string("executable"),
163         NXT_CONF_MAP_CSTRZ,
164         offsetof(nxt_common_app_conf_t, u.external.executable),
165     },
166 
167     {
168         nxt_string("arguments"),
169         NXT_CONF_MAP_PTR,
170         offsetof(nxt_common_app_conf_t, u.external.arguments),
171     },
172 
173 };
174 
175 
176 static nxt_conf_map_t  nxt_python_app_conf[] = {
177     {
178         nxt_string("home"),
179         NXT_CONF_MAP_CSTRZ,
180         offsetof(nxt_common_app_conf_t, u.python.home),
181     },
182 
183     {
184         nxt_string("path"),
185         NXT_CONF_MAP_STR,
186         offsetof(nxt_common_app_conf_t, u.python.path),
187     },
188 
189     {
190         nxt_string("module"),
191         NXT_CONF_MAP_STR,
192         offsetof(nxt_common_app_conf_t, u.python.module),
193     },
194 };
195 
196 
197 static nxt_conf_map_t  nxt_php_app_conf[] = {
198     {
199         nxt_string("targets"),
200         NXT_CONF_MAP_PTR,
201         offsetof(nxt_common_app_conf_t, u.php.targets),
202     },
203 
204     {
205         nxt_string("options"),
206         NXT_CONF_MAP_PTR,
207         offsetof(nxt_common_app_conf_t, u.php.options),
208     },
209 };
210 
211 
212 static nxt_conf_map_t  nxt_perl_app_conf[] = {
213     {
214         nxt_string("script"),
215         NXT_CONF_MAP_CSTRZ,
216         offsetof(nxt_common_app_conf_t, u.perl.script),
217     },
218 };
219 
220 
221 static nxt_conf_map_t  nxt_ruby_app_conf[] = {
222     {
223         nxt_string("script"),
224         NXT_CONF_MAP_STR,
225         offsetof(nxt_common_app_conf_t, u.ruby.script),
226     },
227 };
228 
229 
230 static nxt_conf_map_t  nxt_java_app_conf[] = {
231     {
232         nxt_string("classpath"),
233         NXT_CONF_MAP_PTR,
234         offsetof(nxt_common_app_conf_t, u.java.classpath),
235     },
236     {
237         nxt_string("webapp"),
238         NXT_CONF_MAP_CSTRZ,
239         offsetof(nxt_common_app_conf_t, u.java.webapp),
240     },
241     {
242         nxt_string("options"),
243         NXT_CONF_MAP_PTR,
244         offsetof(nxt_common_app_conf_t, u.java.options),
245     },
246     {
247         nxt_string("unit_jars"),
248         NXT_CONF_MAP_CSTRZ,
249         offsetof(nxt_common_app_conf_t, u.java.unit_jars),
250     },
251 
252 };
253 
254 
255 static nxt_conf_app_map_t  nxt_app_maps[] = {
256     { nxt_nitems(nxt_external_app_conf),  nxt_external_app_conf },
257     { nxt_nitems(nxt_python_app_conf),    nxt_python_app_conf },
258     { nxt_nitems(nxt_php_app_conf),       nxt_php_app_conf },
259     { nxt_nitems(nxt_perl_app_conf),      nxt_perl_app_conf },
260     { nxt_nitems(nxt_ruby_app_conf),      nxt_ruby_app_conf },
261     { nxt_nitems(nxt_java_app_conf),      nxt_java_app_conf },
262 };
263 
264 
265 static void
266 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
267 {
268     nxt_debug(task, "main data: %*s",
269               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
270 }
271 
272 
273 static void
274 nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
275 {
276     u_char                 *start, *p, ch;
277     size_t                 type_len;
278     nxt_int_t              ret;
279     nxt_buf_t              *b;
280     nxt_port_t             *port;
281     nxt_runtime_t          *rt;
282     nxt_process_t          *process;
283     nxt_app_type_t         idx;
284     nxt_conf_value_t       *conf;
285     nxt_process_init_t     *init;
286     nxt_common_app_conf_t  *app_conf;
287 
288     ret = NXT_ERROR;
289 
290     rt = task->thread->runtime;
291 
292     process = nxt_main_process_new(task, rt);
293     if (nxt_slow_path(process == NULL)) {
294         return;
295     }
296 
297     init = nxt_process_init(process);
298 
299     *init = nxt_app_process;
300 
301     b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
302     if (b == NULL) {
303         goto failed;
304     }
305 
306     nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos,
307               b->mem.pos);
308 
309     app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
310     if (nxt_slow_path(app_conf == NULL)) {
311         goto failed;
312     }
313 
314     start = b->mem.pos;
315 
316     app_conf->name.start = start;
317     app_conf->name.length = nxt_strlen(start);
318 
319     init->name = (const char *) start;
320 
321     process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
322                                  + sizeof("\"\" application") + 1);
323 
324     if (nxt_slow_path(process->name == NULL)) {
325         goto failed;
326     }
327 
328     p = (u_char *) process->name;
329     *p++ = '"';
330     p = nxt_cpymem(p, init->name, app_conf->name.length);
331     p = nxt_cpymem(p, "\" application", 13);
332     *p = '\0';
333 
334     app_conf->shm_limit = 100 * 1024 * 1024;
335 
336     start += app_conf->name.length + 1;
337 
338     conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL);
339     if (conf == NULL) {
340         nxt_alert(task, "router app configuration parsing error");
341 
342         goto failed;
343     }
344 
345     rt = task->thread->runtime;
346 
347     app_conf->user.start  = (u_char*)rt->user_cred.user;
348     app_conf->user.length = nxt_strlen(rt->user_cred.user);
349 
350     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf,
351                               nxt_nitems(nxt_common_app_conf), app_conf);
352 
353     if (ret != NXT_OK) {
354         nxt_alert(task, "failed to map common app conf received from router");
355         goto failed;
356     }
357 
358     for (type_len = 0; type_len != app_conf->type.length; type_len++) {
359         ch = app_conf->type.start[type_len];
360 
361         if (ch == ' ' || nxt_isdigit(ch)) {
362             break;
363         }
364     }
365 
366     idx = nxt_app_parse_type(app_conf->type.start, type_len);
367 
368     if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
369         nxt_alert(task, "invalid app type %d received from router", (int) idx);
370         goto failed;
371     }
372 
373     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map,
374                               nxt_app_maps[idx].size, app_conf);
375 
376     if (nxt_slow_path(ret != NXT_OK)) {
377         nxt_alert(task, "failed to map app conf received from router");
378         goto failed;
379     }
380 
381     if (app_conf->limits != NULL) {
382         ret = nxt_conf_map_object(process->mem_pool, app_conf->limits,
383                                   nxt_common_app_limits_conf,
384                                   nxt_nitems(nxt_common_app_limits_conf),
385                                   app_conf);
386 
387         if (nxt_slow_path(ret != NXT_OK)) {
388             nxt_alert(task, "failed to map app limits received from router");
389             goto failed;
390         }
391     }
392 
393     app_conf->self = conf;
394 
395     process->stream = msg->port_msg.stream;
396     process->data.app = app_conf;
397 
398     ret = nxt_main_start_process(task, process);
399     if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
400         return;
401     }
402 
403 failed:
404 
405     nxt_process_use(task, process, -1);
406 
407     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
408                                  msg->port_msg.reply_port);
409 
410     if (nxt_fast_path(port != NULL)) {
411         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
412                               -1, msg->port_msg.stream, 0, NULL);
413     }
414 }
415 
416 
417 static void
418 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
419 {
420     nxt_port_t     *port;
421     nxt_process_t  *process;
422     nxt_runtime_t  *rt;
423 
424     rt = task->thread->runtime;
425 
426     process = nxt_runtime_process_find(rt, msg->port_msg.pid);
427     if (nxt_slow_path(process == NULL)) {
428         return;
429     }
430 
431     nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
432 
433     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
434                                  msg->port_msg.reply_port);
435 
436 
437     if (nxt_slow_path(port == NULL)) {
438         return;
439     }
440 
441 #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
442     if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
443         if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
444                                                    process->user_cred,
445                                                    &process->isolation.clone)
446                           != NXT_OK))
447         {
448             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
449                                          -1, msg->port_msg.stream, 0, NULL);
450             return;
451         }
452      }
453 
454 #endif
455 
456     process->state = NXT_PROCESS_STATE_CREATED;
457 
458     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST,
459                                  -1, msg->port_msg.stream, 0, NULL);
460 }
461 
462 
463 static nxt_port_handlers_t  nxt_main_process_port_handlers = {
464     .data             = nxt_port_main_data_handler,
465     .process_created  = nxt_main_process_created_handler,
466     .process_ready    = nxt_port_process_ready_handler,
467     .start_process    = nxt_port_main_start_process_handler,
468     .socket           = nxt_main_port_socket_handler,
469     .modules          = nxt_main_port_modules_handler,
470     .conf_store       = nxt_main_port_conf_store_handler,
471 #if (NXT_TLS)
472     .cert_get         = nxt_cert_store_get_handler,
473     .cert_delete      = nxt_cert_store_delete_handler,
474 #endif
475     .access_log       = nxt_main_port_access_log_handler,
476     .rpc_ready        = nxt_port_rpc_handler,
477     .rpc_error        = nxt_port_rpc_handler,
478 };
479 
480 
481 static nxt_int_t
482 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
483 {
484     nxt_int_t      ret;
485     nxt_port_t     *port;
486     nxt_process_t  *process;
487 
488     port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
489                                            NXT_PROCESS_MAIN);
490     if (nxt_slow_path(port == NULL)) {
491         return NXT_ERROR;
492     }
493 
494     process = port->process;
495 
496     ret = nxt_port_socket_init(task, port, 0);
497     if (nxt_slow_path(ret != NXT_OK)) {
498         nxt_port_use(task, port, -1);
499         return ret;
500     }
501 
502     /*
503      * A main process port.  A write port is not closed
504      * since it should be inherited by processes.
505      */
506     nxt_port_enable(task, port, &nxt_main_process_port_handlers);
507 
508     process->state = NXT_PROCESS_STATE_READY;
509 
510     return NXT_OK;
511 }
512 
513 
514 static void
515 nxt_main_process_title(nxt_task_t *task)
516 {
517     u_char      *p, *end;
518     nxt_uint_t  i;
519     u_char      title[2048];
520 
521     end = title + sizeof(title) - 1;
522 
523     p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
524                     nxt_process_argv[0]);
525 
526     for (i = 1; nxt_process_argv[i] != NULL; i++) {
527         p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
528     }
529 
530     if (p < end) {
531         *p++ = ']';
532     }
533 
534     *p = '\0';
535 
536     nxt_process_title(task, "%s", title);
537 }
538 
539 
540 static nxt_int_t
541 nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init)
542 {
543     nxt_int_t           ret;
544     nxt_runtime_t       *rt;
545     nxt_process_t       *process;
546     nxt_process_init_t  *pinit;
547 
548     rt = task->thread->runtime;
549 
550     process = nxt_main_process_new(task, rt);
551     if (nxt_slow_path(process == NULL)) {
552         return NXT_ERROR;
553     }
554 
555     process->name = init.name;
556     process->user_cred = &rt->user_cred;
557 
558     pinit = nxt_process_init(process);
559     *pinit = init;
560 
561     ret = nxt_main_start_process(task, process);
562     if (nxt_slow_path(ret == NXT_ERROR)) {
563         nxt_process_use(task, process, -1);
564     }
565 
566     return ret;
567 }
568 
569 
570 static nxt_process_t *
571 nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt)
572 {
573     nxt_process_t  *process;
574 
575     process = nxt_runtime_process_new(rt);
576     if (nxt_slow_path(process == NULL)) {
577         return NULL;
578     }
579 
580     process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
581     if (process->mem_pool == NULL) {
582         nxt_process_use(task, process, -1);
583         return NULL;
584     }
585 
586     return process;
587 }
588 
589 
590 static nxt_int_t
591 nxt_main_start_process(nxt_task_t *task, nxt_process_t *process)
592 {
593     nxt_mp_t            *tmp_mp;
594     nxt_int_t           ret;
595     nxt_pid_t           pid;
596     nxt_port_t          *port;
597     nxt_process_init_t  *init;
598 
599     init = nxt_process_init(process);
600 
601     port = nxt_port_new(task, 0, 0, init->type);
602     if (nxt_slow_path(port == NULL)) {
603         return NXT_ERROR;
604     }
605 
606     nxt_process_port_add(task, process, port);
607 
608     nxt_process_use(task, process, -1);
609 
610     ret = NXT_ERROR;
611     tmp_mp = NULL;
612 
613     ret = nxt_port_socket_init(task, port, 0);
614     if (nxt_slow_path(ret != NXT_OK)) {
615         goto fail;
616     }
617 
618     tmp_mp = nxt_mp_create(1024, 128, 256, 32);
619     if (tmp_mp == NULL) {
620         goto fail;
621     }
622 
623     if (init->prefork) {
624         ret = init->prefork(task, process, tmp_mp);
625         if (nxt_slow_path(ret != NXT_OK)) {
626             goto fail;
627         }
628     }
629 
630     pid = nxt_process_create(task, process);
631 
632     switch (pid) {
633 
634     case -1:
635         nxt_port_close(task, port);
636         break;
637 
638     case 0:
639         /* The child process: return to the event engine work queue loop. */
640 
641         ret = NXT_AGAIN;
642         break;
643 
644     default:
645         /* The main process created a new process. */
646 
647         nxt_port_read_close(port);
648         nxt_port_write_enable(task, port);
649 
650         ret = NXT_OK;
651         break;
652     }
653 
654 fail:
655 
656     nxt_port_use(task, port, -1);
657 
658     if (nxt_fast_path(tmp_mp != NULL)) {
659         nxt_mp_destroy(tmp_mp);
660     }
661 
662     return ret;
663 }
664 
665 
666 static void
667 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
668 {
669     nxt_debug(task, "sigterm handler signo:%d (%s)",
670               (int) (uintptr_t) obj, data);
671 
672     /* TODO: fast exit. */
673 
674     nxt_exiting = 1;
675 
676     nxt_runtime_quit(task, 0);
677 }
678 
679 
680 static void
681 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
682 {
683     nxt_debug(task, "sigquit handler signo:%d (%s)",
684               (int) (uintptr_t) obj, data);
685 
686     /* TODO: graceful exit. */
687 
688     nxt_exiting = 1;
689 
690     nxt_runtime_quit(task, 0);
691 }
692 
693 
694 static void
695 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
696 {
697     nxt_mp_t        *mp;
698     nxt_int_t       ret;
699     nxt_uint_t      n;
700     nxt_port_t      *port;
701     nxt_file_t      *file, *new_file;
702     nxt_array_t     *new_files;
703     nxt_runtime_t   *rt;
704 
705     nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
706             (int) (uintptr_t) obj, data, "log files rotation");
707 
708     rt = task->thread->runtime;
709 
710     port = rt->port_by_type[NXT_PROCESS_ROUTER];
711 
712     if (nxt_fast_path(port != NULL)) {
713         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
714                                      -1, 0, 0, NULL);
715     }
716 
717     mp = nxt_mp_create(1024, 128, 256, 32);
718     if (mp == NULL) {
719         return;
720     }
721 
722     n = nxt_list_nelts(rt->log_files);
723 
724     new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
725     if (new_files == NULL) {
726         nxt_mp_destroy(mp);
727         return;
728     }
729 
730     nxt_list_each(file, rt->log_files) {
731 
732         /* This allocation cannot fail. */
733         new_file = nxt_array_add(new_files);
734 
735         new_file->name = file->name;
736         new_file->fd = NXT_FILE_INVALID;
737         new_file->log_level = NXT_LOG_ALERT;
738 
739         ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
740                             NXT_FILE_OWNER_ACCESS);
741 
742         if (ret != NXT_OK) {
743             goto fail;
744         }
745 
746     } nxt_list_loop;
747 
748     new_file = new_files->elts;
749 
750     ret = nxt_file_stderr(&new_file[0]);
751 
752     if (ret == NXT_OK) {
753         n = 0;
754 
755         nxt_list_each(file, rt->log_files) {
756 
757             nxt_port_change_log_file(task, rt, n, new_file[n].fd);
758             /*
759              * The old log file descriptor must be closed at the moment
760              * when no other threads use it.  dup2() allows to use the
761              * old file descriptor for new log file.  This change is
762              * performed atomically in the kernel.
763              */
764             (void) nxt_file_redirect(file, new_file[n].fd);
765 
766             n++;
767 
768         } nxt_list_loop;
769 
770         nxt_mp_destroy(mp);
771         return;
772     }
773 
774 fail:
775 
776     new_file = new_files->elts;
777     n = new_files->nelts;
778 
779     while (n != 0) {
780         if (new_file->fd != NXT_FILE_INVALID) {
781             nxt_file_close(task, new_file);
782         }
783 
784         new_file++;
785         n--;
786     }
787 
788     nxt_mp_destroy(mp);
789 }
790 
791 
792 static void
793 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
794 {
795     int                    status;
796     nxt_err_t              err;
797     nxt_pid_t              pid;
798 
799     nxt_debug(task, "sigchld handler signo:%d (%s)",
800               (int) (uintptr_t) obj, data);
801 
802     for ( ;; ) {
803         pid = waitpid(-1, &status, WNOHANG);
804 
805         if (pid == -1) {
806 
807             switch (err = nxt_errno) {
808 
809             case NXT_ECHILD:
810                 return;
811 
812             case NXT_EINTR:
813                 continue;
814 
815             default:
816                 nxt_alert(task, "waitpid() failed: %E", err);
817                 return;
818             }
819         }
820 
821         nxt_debug(task, "waitpid(): %PI", pid);
822 
823         if (pid == 0) {
824             return;
825         }
826 
827         if (WTERMSIG(status)) {
828 #ifdef WCOREDUMP
829             nxt_alert(task, "process %PI exited on signal %d%s",
830                       pid, WTERMSIG(status),
831                       WCOREDUMP(status) ? " (core dumped)" : "");
832 #else
833             nxt_alert(task, "process %PI exited on signal %d",
834                       pid, WTERMSIG(status));
835 #endif
836 
837         } else {
838             nxt_trace(task, "process %PI exited with code %d",
839                       pid, WEXITSTATUS(status));
840         }
841 
842         nxt_main_cleanup_process(task, pid);
843     }
844 }
845 
846 
847 static void
848 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
849 {
850     nxt_trace(task, "signal signo:%d (%s) recevied, ignored",
851               (int) (uintptr_t) obj, data);
852 }
853 
854 
855 static void
856 nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid)
857 {
858     int                 stream;
859     nxt_int_t           ret;
860     nxt_buf_t           *buf;
861     nxt_port_t          *port;
862     const char          *name;
863     nxt_runtime_t       *rt;
864     nxt_process_t       *process;
865     nxt_process_init_t  init;
866 
867     rt = task->thread->runtime;
868 
869     process = nxt_runtime_process_find(rt, pid);
870     if (!process) {
871         return;
872     }
873 
874 #if (NXT_HAVE_ISOLATION_ROOTFS)
875     if (process->isolation.rootfs != NULL && process->isolation.mounts) {
876         (void) nxt_process_unmount_all(task, process);
877     }
878 #endif
879 
880     name = process->name;
881     stream = process->stream;
882     init = *((nxt_process_init_t *) nxt_process_init(process));
883 
884     if (process->state == NXT_PROCESS_STATE_READY) {
885         process->stream = 0;
886     }
887 
888     nxt_process_close_ports(task, process);
889 
890     if (nxt_exiting) {
891         if (rt->nprocesses <= 1) {
892             nxt_runtime_quit(task, 0);
893         }
894 
895         return;
896     }
897 
898     nxt_runtime_process_each(rt, process) {
899 
900         if (process->pid == nxt_pid
901             || process->pid == pid
902             || nxt_queue_is_empty(&process->ports))
903         {
904             continue;
905         }
906 
907         port = nxt_process_port_first(process);
908 
909         if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) {
910             continue;
911         }
912 
913         buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
914                                    sizeof(pid));
915 
916         if (nxt_slow_path(buf == NULL)) {
917             continue;
918         }
919 
920         buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
921 
922         nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
923                               stream, 0, buf);
924 
925     } nxt_runtime_process_loop;
926 
927     if (init.restart) {
928         ret = nxt_main_process_create(task, init);
929         if (nxt_slow_path(ret == NXT_ERROR)) {
930             nxt_alert(task, "failed to restart %s", name);
931         }
932     }
933 }
934 
935 
936 static void
937 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
938 {
939     size_t                  size;
940     nxt_int_t               ret;
941     nxt_buf_t               *b, *out;
942     nxt_port_t              *port;
943     nxt_sockaddr_t          *sa;
944     nxt_port_msg_type_t     type;
945     nxt_listening_socket_t  ls;
946     u_char                  message[2048];
947 
948     b = msg->buf;
949     sa = (nxt_sockaddr_t *) b->mem.pos;
950 
951     /* TODO check b size and make plain */
952 
953     out = NULL;
954 
955     ls.socket = -1;
956     ls.error = NXT_SOCKET_ERROR_SYSTEM;
957     ls.start = message;
958     ls.end = message + sizeof(message);
959 
960     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
961                                  msg->port_msg.reply_port);
962 
963     nxt_debug(task, "listening socket \"%*s\"",
964               (size_t) sa->length, nxt_sockaddr_start(sa));
965 
966     ret = nxt_main_listening_socket(sa, &ls);
967 
968     if (ret == NXT_OK) {
969         nxt_debug(task, "socket(\"%*s\"): %d",
970                   (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
971 
972         type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
973 
974     } else {
975         size = ls.end - ls.start;
976 
977         nxt_alert(task, "%*s", size, ls.start);
978 
979         out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
980                                    size + 1);
981         if (nxt_slow_path(out == NULL)) {
982             return;
983         }
984 
985         *out->mem.free++ = (uint8_t) ls.error;
986 
987         out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
988 
989         type = NXT_PORT_MSG_RPC_ERROR;
990     }
991 
992     nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
993                           0, out);
994 }
995 
996 
997 static nxt_int_t
998 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
999 {
1000     nxt_err_t         err;
1001     nxt_socket_t      s;
1002 
1003     const socklen_t   length = sizeof(int);
1004     static const int  enable = 1;
1005 
1006     s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1007 
1008     if (nxt_slow_path(s == -1)) {
1009         err = nxt_errno;
1010 
1011 #if (NXT_INET6)
1012 
1013         if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1014             ls->error = NXT_SOCKET_ERROR_NOINET6;
1015         }
1016 
1017 #endif
1018 
1019         ls->end = nxt_sprintf(ls->start, ls->end,
1020                               "socket(\\\"%*s\\\") failed %E",
1021                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1022 
1023         return NXT_ERROR;
1024     }
1025 
1026     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1027         ls->end = nxt_sprintf(ls->start, ls->end,
1028                               "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1029                               (size_t) sa->length, nxt_sockaddr_start(sa),
1030                               nxt_errno);
1031         goto fail;
1032     }
1033 
1034 #if (NXT_INET6)
1035 
1036     if (sa->u.sockaddr.sa_family == AF_INET6) {
1037 
1038         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1039             ls->end = nxt_sprintf(ls->start, ls->end,
1040                                "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1041                                (size_t) sa->length, nxt_sockaddr_start(sa),
1042                                nxt_errno);
1043             goto fail;
1044         }
1045     }
1046 
1047 #endif
1048 
1049     if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1050         err = nxt_errno;
1051 
1052 #if (NXT_HAVE_UNIX_DOMAIN)
1053 
1054         if (sa->u.sockaddr.sa_family == AF_UNIX) {
1055             switch (err) {
1056 
1057             case EACCES:
1058                 ls->error = NXT_SOCKET_ERROR_ACCESS;
1059                 break;
1060 
1061             case ENOENT:
1062             case ENOTDIR:
1063                 ls->error = NXT_SOCKET_ERROR_PATH;
1064                 break;
1065             }
1066 
1067         } else
1068 #endif
1069         {
1070             switch (err) {
1071 
1072             case EACCES:
1073                 ls->error = NXT_SOCKET_ERROR_PORT;
1074                 break;
1075 
1076             case EADDRINUSE:
1077                 ls->error = NXT_SOCKET_ERROR_INUSE;
1078                 break;
1079 
1080             case EADDRNOTAVAIL:
1081                 ls->error = NXT_SOCKET_ERROR_NOADDR;
1082                 break;
1083             }
1084         }
1085 
1086         ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1087                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1088         goto fail;
1089     }
1090 
1091 #if (NXT_HAVE_UNIX_DOMAIN)
1092 
1093     if (sa->u.sockaddr.sa_family == AF_UNIX) {
1094         char     *filename;
1095         mode_t   access;
1096 
1097         filename = sa->u.sockaddr_un.sun_path;
1098         access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1099 
1100         if (chmod(filename, access) != 0) {
1101             ls->end = nxt_sprintf(ls->start, ls->end,
1102                                   "chmod(\\\"%s\\\") failed %E",
1103                                   filename, nxt_errno);
1104             goto fail;
1105         }
1106     }
1107 
1108 #endif
1109 
1110     ls->socket = s;
1111 
1112     return NXT_OK;
1113 
1114 fail:
1115 
1116     (void) close(s);
1117 
1118     return NXT_ERROR;
1119 }
1120 
1121 
1122 static nxt_conf_map_t  nxt_app_lang_module_map[] = {
1123     {
1124         nxt_string("type"),
1125         NXT_CONF_MAP_INT,
1126         offsetof(nxt_app_lang_module_t, type),
1127     },
1128 
1129     {
1130         nxt_string("version"),
1131         NXT_CONF_MAP_CSTRZ,
1132         offsetof(nxt_app_lang_module_t, version),
1133     },
1134 
1135     {
1136         nxt_string("file"),
1137         NXT_CONF_MAP_CSTRZ,
1138         offsetof(nxt_app_lang_module_t, file),
1139     },
1140 };
1141 
1142 
1143 static nxt_conf_map_t  nxt_app_lang_mounts_map[] = {
1144     {
1145         nxt_string("src"),
1146         NXT_CONF_MAP_CSTRZ,
1147         offsetof(nxt_fs_mount_t, src),
1148     },
1149     {
1150         nxt_string("dst"),
1151         NXT_CONF_MAP_CSTRZ,
1152         offsetof(nxt_fs_mount_t, dst),
1153     },
1154     {
1155         nxt_string("fstype"),
1156         NXT_CONF_MAP_CSTRZ,
1157         offsetof(nxt_fs_mount_t, fstype),
1158     },
1159     {
1160         nxt_string("flags"),
1161         NXT_CONF_MAP_INT,
1162         offsetof(nxt_fs_mount_t, flags),
1163     },
1164     {
1165         nxt_string("data"),
1166         NXT_CONF_MAP_CSTRZ,
1167         offsetof(nxt_fs_mount_t, data),
1168     },
1169 };
1170 
1171 
1172 static void
1173 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1174 {
1175     uint32_t               index, jindex, nmounts;
1176     nxt_mp_t               *mp;
1177     nxt_int_t              ret;
1178     nxt_buf_t              *b;
1179     nxt_port_t             *port;
1180     nxt_runtime_t          *rt;
1181     nxt_fs_mount_t         *mnt;
1182     nxt_conf_value_t       *conf, *root, *value, *mounts;
1183     nxt_app_lang_module_t  *lang;
1184 
1185     static nxt_str_t root_path = nxt_string("/");
1186     static nxt_str_t mounts_name = nxt_string("mounts");
1187 
1188     rt = task->thread->runtime;
1189 
1190     if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1191         return;
1192     }
1193 
1194     if (nxt_exiting) {
1195         nxt_debug(task, "ignoring discovered modules, exiting");
1196         return;
1197     }
1198 
1199     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1200                                  msg->port_msg.reply_port);
1201 
1202     if (nxt_fast_path(port != NULL)) {
1203         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1204                                      msg->port_msg.stream, 0, NULL);
1205     }
1206 
1207     b = msg->buf;
1208 
1209     if (b == NULL) {
1210         return;
1211     }
1212 
1213     mp = nxt_mp_create(1024, 128, 256, 32);
1214     if (mp == NULL) {
1215         return;
1216     }
1217 
1218     b = nxt_buf_chk_make_plain(mp, b, msg->size);
1219 
1220     if (b == NULL) {
1221         return;
1222     }
1223 
1224     nxt_debug(task, "application languages: \"%*s\"",
1225               b->mem.free - b->mem.pos, b->mem.pos);
1226 
1227     conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1228     if (conf == NULL) {
1229         goto fail;
1230     }
1231 
1232     root = nxt_conf_get_path(conf, &root_path);
1233     if (root == NULL) {
1234         goto fail;
1235     }
1236 
1237     for (index = 0; /* void */ ; index++) {
1238         value = nxt_conf_get_array_element(root, index);
1239         if (value == NULL) {
1240             break;
1241         }
1242 
1243         lang = nxt_array_zero_add(rt->languages);
1244         if (lang == NULL) {
1245             goto fail;
1246         }
1247 
1248         lang->module = NULL;
1249 
1250         ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1251                                   nxt_nitems(nxt_app_lang_module_map), lang);
1252 
1253         if (ret != NXT_OK) {
1254             goto fail;
1255         }
1256 
1257         mounts = nxt_conf_get_object_member(value, &mounts_name, NULL);
1258         if (mounts == NULL) {
1259             nxt_alert(task, "missing mounts from discovery message.");
1260             goto fail;
1261         }
1262 
1263         if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) {
1264             nxt_alert(task, "invalid mounts type from discovery message.");
1265             goto fail;
1266         }
1267 
1268         nmounts = nxt_conf_array_elements_count(mounts);
1269 
1270         lang->mounts = nxt_array_create(rt->mem_pool, nmounts,
1271                                         sizeof(nxt_fs_mount_t));
1272 
1273         if (lang->mounts == NULL) {
1274             goto fail;
1275         }
1276 
1277         for (jindex = 0; /* */; jindex++) {
1278             value = nxt_conf_get_array_element(mounts, jindex);
1279             if (value == NULL) {
1280                 break;
1281             }
1282 
1283             mnt = nxt_array_zero_add(lang->mounts);
1284             if (mnt == NULL) {
1285                 goto fail;
1286             }
1287 
1288             ret = nxt_conf_map_object(rt->mem_pool, value,
1289                                       nxt_app_lang_mounts_map,
1290                                       nxt_nitems(nxt_app_lang_mounts_map), mnt);
1291 
1292             if (ret != NXT_OK) {
1293                 goto fail;
1294             }
1295         }
1296 
1297         nxt_debug(task, "lang %d %s \"%s\" (%d mounts)",
1298                   lang->type, lang->version, lang->file, lang->mounts->nelts);
1299     }
1300 
1301     qsort(rt->languages->elts, rt->languages->nelts,
1302           sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1303 
1304 fail:
1305 
1306     nxt_mp_destroy(mp);
1307 
1308     ret = nxt_main_process_create(task, nxt_controller_process);
1309     if (ret == NXT_OK) {
1310         ret = nxt_main_process_create(task, nxt_router_process);
1311     }
1312 
1313     if (nxt_slow_path(ret == NXT_ERROR)) {
1314         nxt_exiting = 1;
1315 
1316         nxt_runtime_quit(task, 1);
1317     }
1318 }
1319 
1320 
1321 static int nxt_cdecl
1322 nxt_app_lang_compare(const void *v1, const void *v2)
1323 {
1324     int                          n;
1325     const nxt_app_lang_module_t  *lang1, *lang2;
1326 
1327     lang1 = v1;
1328     lang2 = v2;
1329 
1330     n = lang1->type - lang2->type;
1331 
1332     if (n != 0) {
1333         return n;
1334     }
1335 
1336     n = nxt_strverscmp(lang1->version, lang2->version);
1337 
1338     /* Negate result to move higher versions to the beginning. */
1339 
1340     return -n;
1341 }
1342 
1343 
1344 static void
1345 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1346 {
1347     ssize_t        n, size, offset;
1348     nxt_buf_t      *b;
1349     nxt_int_t      ret;
1350     nxt_file_t     file;
1351     nxt_runtime_t  *rt;
1352 
1353     nxt_memzero(&file, sizeof(nxt_file_t));
1354 
1355     rt = task->thread->runtime;
1356 
1357     file.name = (nxt_file_name_t *) rt->conf_tmp;
1358 
1359     if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY,
1360                                     NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS)
1361                       != NXT_OK))
1362     {
1363         goto error;
1364     }
1365 
1366     offset = 0;
1367 
1368     for (b = msg->buf; b != NULL; b = b->next) {
1369         size = nxt_buf_mem_used_size(&b->mem);
1370 
1371         n = nxt_file_write(&file, b->mem.pos, size, offset);
1372 
1373         if (nxt_slow_path(n != size)) {
1374             nxt_file_close(task, &file);
1375             (void) nxt_file_delete(file.name);
1376             goto error;
1377         }
1378 
1379         offset += n;
1380     }
1381 
1382     nxt_file_close(task, &file);
1383 
1384     ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf);
1385 
1386     if (nxt_fast_path(ret == NXT_OK)) {
1387         return;
1388     }
1389 
1390 error:
1391 
1392     nxt_alert(task, "failed to store current configuration");
1393 }
1394 
1395 
1396 static void
1397 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1398 {
1399     u_char               *path;
1400     nxt_int_t            ret;
1401     nxt_file_t           file;
1402     nxt_port_t           *port;
1403     nxt_port_msg_type_t  type;
1404 
1405     nxt_debug(task, "opening access log file");
1406 
1407     path = msg->buf->mem.pos;
1408 
1409     nxt_memzero(&file, sizeof(nxt_file_t));
1410 
1411     file.name = (nxt_file_name_t *) path;
1412     file.log_level = NXT_LOG_ERR;
1413 
1414     ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1415                         NXT_FILE_OWNER_ACCESS);
1416 
1417     type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1418                            : NXT_PORT_MSG_RPC_ERROR;
1419 
1420     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1421                                  msg->port_msg.reply_port);
1422 
1423     if (nxt_fast_path(port != NULL)) {
1424         (void) nxt_port_socket_write(task, port, type, file.fd,
1425                                      msg->port_msg.stream, 0, NULL);
1426     }
1427 }
1428