xref: /unit/src/nxt_main_process.c (revision 1579:c80e692dc644)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_router.h>
13 #if (NXT_TLS)
14 #include <nxt_cert.h>
15 #endif
16 
17 #include <sys/mount.h>
18 
19 
20 typedef struct {
21     nxt_socket_t        socket;
22     nxt_socket_error_t  error;
23     u_char              *start;
24     u_char              *end;
25 } nxt_listening_socket_t;
26 
27 
28 typedef struct {
29     nxt_uint_t          size;
30     nxt_conf_map_t      *map;
31 } nxt_conf_app_map_t;
32 
33 
34 extern nxt_port_handlers_t  nxt_controller_process_port_handlers;
35 extern nxt_port_handlers_t  nxt_router_process_port_handlers;
36 
37 
38 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
39     nxt_runtime_t *rt);
40 static void nxt_main_process_title(nxt_task_t *task);
41 static nxt_int_t nxt_main_process_create(nxt_task_t *task,
42     const nxt_process_init_t init);
43 static nxt_int_t nxt_main_start_process(nxt_task_t *task,
44     nxt_process_t *process);
45 static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt);
46 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
47     void *data);
48 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
49     void *data);
50 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
51     void *data);
52 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
53     void *data);
54 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
55     void *data);
56 static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid);
57 static void nxt_main_port_socket_handler(nxt_task_t *task,
58     nxt_port_recv_msg_t *msg);
59 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
60     nxt_listening_socket_t *ls);
61 static void nxt_main_port_modules_handler(nxt_task_t *task,
62     nxt_port_recv_msg_t *msg);
63 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
64 static void nxt_main_port_conf_store_handler(nxt_task_t *task,
65     nxt_port_recv_msg_t *msg);
66 static void nxt_main_port_access_log_handler(nxt_task_t *task,
67     nxt_port_recv_msg_t *msg);
68 
69 const nxt_sig_event_t  nxt_main_process_signals[] = {
70     nxt_event_signal(SIGHUP,  nxt_main_process_signal_handler),
71     nxt_event_signal(SIGINT,  nxt_main_process_sigterm_handler),
72     nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
73     nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
74     nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
75     nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
76     nxt_event_signal_end,
77 };
78 
79 
80 static nxt_bool_t  nxt_exiting;
81 
82 
83 nxt_int_t
84 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
85     nxt_runtime_t *rt)
86 {
87     rt->type = NXT_PROCESS_MAIN;
88 
89     if (nxt_main_process_port_create(task, rt) != NXT_OK) {
90         return NXT_ERROR;
91     }
92 
93     nxt_main_process_title(task);
94 
95     /*
96      * The discovery process will send a message processed by
97      * nxt_main_port_modules_handler() which starts the controller
98      * and router processes.
99      */
100     return nxt_main_process_create(task, nxt_discovery_process);
101 }
102 
103 
104 static nxt_conf_map_t  nxt_common_app_conf[] = {
105     {
106         nxt_string("type"),
107         NXT_CONF_MAP_STR,
108         offsetof(nxt_common_app_conf_t, type),
109     },
110 
111     {
112         nxt_string("user"),
113         NXT_CONF_MAP_STR,
114         offsetof(nxt_common_app_conf_t, user),
115     },
116 
117     {
118         nxt_string("group"),
119         NXT_CONF_MAP_STR,
120         offsetof(nxt_common_app_conf_t, group),
121     },
122 
123     {
124         nxt_string("working_directory"),
125         NXT_CONF_MAP_CSTRZ,
126         offsetof(nxt_common_app_conf_t, working_directory),
127     },
128 
129     {
130         nxt_string("environment"),
131         NXT_CONF_MAP_PTR,
132         offsetof(nxt_common_app_conf_t, environment),
133     },
134 
135     {
136         nxt_string("isolation"),
137         NXT_CONF_MAP_PTR,
138         offsetof(nxt_common_app_conf_t, isolation),
139     },
140 
141     {
142         nxt_string("limits"),
143         NXT_CONF_MAP_PTR,
144         offsetof(nxt_common_app_conf_t, limits),
145     },
146 
147 };
148 
149 
150 static nxt_conf_map_t  nxt_common_app_limits_conf[] = {
151     {
152         nxt_string("shm"),
153         NXT_CONF_MAP_SIZE,
154         offsetof(nxt_common_app_conf_t, shm_limit),
155     },
156 
157 };
158 
159 
160 static nxt_conf_map_t  nxt_external_app_conf[] = {
161     {
162         nxt_string("executable"),
163         NXT_CONF_MAP_CSTRZ,
164         offsetof(nxt_common_app_conf_t, u.external.executable),
165     },
166 
167     {
168         nxt_string("arguments"),
169         NXT_CONF_MAP_PTR,
170         offsetof(nxt_common_app_conf_t, u.external.arguments),
171     },
172 
173 };
174 
175 
176 static nxt_conf_map_t  nxt_python_app_conf[] = {
177     {
178         nxt_string("home"),
179         NXT_CONF_MAP_CSTRZ,
180         offsetof(nxt_common_app_conf_t, u.python.home),
181     },
182 
183     {
184         nxt_string("path"),
185         NXT_CONF_MAP_STR,
186         offsetof(nxt_common_app_conf_t, u.python.path),
187     },
188 
189     {
190         nxt_string("module"),
191         NXT_CONF_MAP_STR,
192         offsetof(nxt_common_app_conf_t, u.python.module),
193     },
194 };
195 
196 
197 static nxt_conf_map_t  nxt_php_app_conf[] = {
198     {
199         nxt_string("targets"),
200         NXT_CONF_MAP_PTR,
201         offsetof(nxt_common_app_conf_t, u.php.targets),
202     },
203 
204     {
205         nxt_string("options"),
206         NXT_CONF_MAP_PTR,
207         offsetof(nxt_common_app_conf_t, u.php.options),
208     },
209 };
210 
211 
212 static nxt_conf_map_t  nxt_perl_app_conf[] = {
213     {
214         nxt_string("script"),
215         NXT_CONF_MAP_CSTRZ,
216         offsetof(nxt_common_app_conf_t, u.perl.script),
217     },
218 };
219 
220 
221 static nxt_conf_map_t  nxt_ruby_app_conf[] = {
222     {
223         nxt_string("script"),
224         NXT_CONF_MAP_STR,
225         offsetof(nxt_common_app_conf_t, u.ruby.script),
226     },
227 };
228 
229 
230 static nxt_conf_map_t  nxt_java_app_conf[] = {
231     {
232         nxt_string("classpath"),
233         NXT_CONF_MAP_PTR,
234         offsetof(nxt_common_app_conf_t, u.java.classpath),
235     },
236     {
237         nxt_string("webapp"),
238         NXT_CONF_MAP_CSTRZ,
239         offsetof(nxt_common_app_conf_t, u.java.webapp),
240     },
241     {
242         nxt_string("options"),
243         NXT_CONF_MAP_PTR,
244         offsetof(nxt_common_app_conf_t, u.java.options),
245     },
246     {
247         nxt_string("unit_jars"),
248         NXT_CONF_MAP_CSTRZ,
249         offsetof(nxt_common_app_conf_t, u.java.unit_jars),
250     },
251 
252 };
253 
254 
255 static nxt_conf_app_map_t  nxt_app_maps[] = {
256     { nxt_nitems(nxt_external_app_conf),  nxt_external_app_conf },
257     { nxt_nitems(nxt_python_app_conf),    nxt_python_app_conf },
258     { nxt_nitems(nxt_php_app_conf),       nxt_php_app_conf },
259     { nxt_nitems(nxt_perl_app_conf),      nxt_perl_app_conf },
260     { nxt_nitems(nxt_ruby_app_conf),      nxt_ruby_app_conf },
261     { nxt_nitems(nxt_java_app_conf),      nxt_java_app_conf },
262 };
263 
264 
265 static void
266 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
267 {
268     nxt_debug(task, "main data: %*s",
269               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
270 }
271 
272 
273 static void
274 nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
275 {
276     u_char                 *start, *p, ch;
277     size_t                 type_len;
278     nxt_int_t              ret;
279     nxt_buf_t              *b;
280     nxt_port_t             *port;
281     nxt_runtime_t          *rt;
282     nxt_process_t          *process;
283     nxt_app_type_t         idx;
284     nxt_conf_value_t       *conf;
285     nxt_process_init_t     *init;
286     nxt_common_app_conf_t  *app_conf;
287 
288     ret = NXT_ERROR;
289 
290     rt = task->thread->runtime;
291 
292     process = nxt_main_process_new(task, rt);
293     if (nxt_slow_path(process == NULL)) {
294         return;
295     }
296 
297     init = nxt_process_init(process);
298 
299     *init = nxt_app_process;
300 
301     b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
302     if (b == NULL) {
303         goto failed;
304     }
305 
306     nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos,
307               b->mem.pos);
308 
309     app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
310     if (nxt_slow_path(app_conf == NULL)) {
311         goto failed;
312     }
313 
314     start = b->mem.pos;
315 
316     app_conf->name.start = start;
317     app_conf->name.length = nxt_strlen(start);
318 
319     init->name = (const char *) start;
320 
321     process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
322                                  + sizeof("\"\" application") + 1);
323 
324     if (nxt_slow_path(process->name == NULL)) {
325         goto failed;
326     }
327 
328     p = (u_char *) process->name;
329     *p++ = '"';
330     p = nxt_cpymem(p, init->name, app_conf->name.length);
331     p = nxt_cpymem(p, "\" application", 13);
332     *p = '\0';
333 
334     app_conf->shm_limit = 100 * 1024 * 1024;
335 
336     start += app_conf->name.length + 1;
337 
338     conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL);
339     if (conf == NULL) {
340         nxt_alert(task, "router app configuration parsing error");
341 
342         goto failed;
343     }
344 
345     rt = task->thread->runtime;
346 
347     app_conf->user.start  = (u_char*)rt->user_cred.user;
348     app_conf->user.length = nxt_strlen(rt->user_cred.user);
349 
350     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf,
351                               nxt_nitems(nxt_common_app_conf), app_conf);
352 
353     if (ret != NXT_OK) {
354         nxt_alert(task, "failed to map common app conf received from router");
355         goto failed;
356     }
357 
358     for (type_len = 0; type_len != app_conf->type.length; type_len++) {
359         ch = app_conf->type.start[type_len];
360 
361         if (ch == ' ' || nxt_isdigit(ch)) {
362             break;
363         }
364     }
365 
366     idx = nxt_app_parse_type(app_conf->type.start, type_len);
367 
368     if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
369         nxt_alert(task, "invalid app type %d received from router", (int) idx);
370         goto failed;
371     }
372 
373     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map,
374                               nxt_app_maps[idx].size, app_conf);
375 
376     if (nxt_slow_path(ret != NXT_OK)) {
377         nxt_alert(task, "failed to map app conf received from router");
378         goto failed;
379     }
380 
381     if (app_conf->limits != NULL) {
382         ret = nxt_conf_map_object(process->mem_pool, app_conf->limits,
383                                   nxt_common_app_limits_conf,
384                                   nxt_nitems(nxt_common_app_limits_conf),
385                                   app_conf);
386 
387         if (nxt_slow_path(ret != NXT_OK)) {
388             nxt_alert(task, "failed to map app limits received from router");
389             goto failed;
390         }
391     }
392 
393     app_conf->self = conf;
394 
395     process->stream = msg->port_msg.stream;
396     process->data.app = app_conf;
397 
398     ret = nxt_main_start_process(task, process);
399     if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
400         return;
401     }
402 
403 failed:
404 
405     nxt_process_use(task, process, -1);
406 
407     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
408                                  msg->port_msg.reply_port);
409 
410     if (nxt_fast_path(port != NULL)) {
411         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
412                               -1, msg->port_msg.stream, 0, NULL);
413     }
414 }
415 
416 
417 static void
418 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
419 {
420     nxt_port_t     *port;
421     nxt_process_t  *process;
422     nxt_runtime_t  *rt;
423 
424     rt = task->thread->runtime;
425 
426     process = nxt_runtime_process_find(rt, msg->port_msg.pid);
427     if (nxt_slow_path(process == NULL)) {
428         return;
429     }
430 
431     nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
432 
433     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
434                                  msg->port_msg.reply_port);
435 
436 
437     if (nxt_slow_path(port == NULL)) {
438         return;
439     }
440 
441 #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
442     if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
443         if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
444                                                    process->user_cred,
445                                                    &process->isolation.clone)
446                           != NXT_OK))
447         {
448             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
449                                          -1, msg->port_msg.stream, 0, NULL);
450             return;
451         }
452      }
453 
454 #endif
455 
456     process->state = NXT_PROCESS_STATE_CREATED;
457 
458     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST,
459                                  -1, msg->port_msg.stream, 0, NULL);
460 }
461 
462 
463 static nxt_port_handlers_t  nxt_main_process_port_handlers = {
464     .data             = nxt_port_main_data_handler,
465     .process_created  = nxt_main_process_created_handler,
466     .process_ready    = nxt_port_process_ready_handler,
467     .start_process    = nxt_port_main_start_process_handler,
468     .socket           = nxt_main_port_socket_handler,
469     .modules          = nxt_main_port_modules_handler,
470     .conf_store       = nxt_main_port_conf_store_handler,
471 #if (NXT_TLS)
472     .cert_get         = nxt_cert_store_get_handler,
473     .cert_delete      = nxt_cert_store_delete_handler,
474 #endif
475     .access_log       = nxt_main_port_access_log_handler,
476     .rpc_ready        = nxt_port_rpc_handler,
477     .rpc_error        = nxt_port_rpc_handler,
478 };
479 
480 
481 static nxt_int_t
482 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
483 {
484     nxt_int_t      ret;
485     nxt_port_t     *port;
486     nxt_process_t  *process;
487 
488     port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
489                                            NXT_PROCESS_MAIN);
490     if (nxt_slow_path(port == NULL)) {
491         return NXT_ERROR;
492     }
493 
494     process = port->process;
495 
496     ret = nxt_port_socket_init(task, port, 0);
497     if (nxt_slow_path(ret != NXT_OK)) {
498         nxt_port_use(task, port, -1);
499         return ret;
500     }
501 
502     /*
503      * A main process port.  A write port is not closed
504      * since it should be inherited by processes.
505      */
506     nxt_port_enable(task, port, &nxt_main_process_port_handlers);
507 
508     process->state = NXT_PROCESS_STATE_READY;
509 
510     return NXT_OK;
511 }
512 
513 
514 static void
515 nxt_main_process_title(nxt_task_t *task)
516 {
517     u_char      *p, *end;
518     nxt_uint_t  i;
519     u_char      title[2048];
520 
521     end = title + sizeof(title) - 1;
522 
523     p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
524                     nxt_process_argv[0]);
525 
526     for (i = 1; nxt_process_argv[i] != NULL; i++) {
527         p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
528     }
529 
530     if (p < end) {
531         *p++ = ']';
532     }
533 
534     *p = '\0';
535 
536     nxt_process_title(task, "%s", title);
537 }
538 
539 
540 static nxt_int_t
541 nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init)
542 {
543     nxt_int_t           ret;
544     nxt_runtime_t       *rt;
545     nxt_process_t       *process;
546     nxt_process_init_t  *pinit;
547 
548     rt = task->thread->runtime;
549 
550     process = nxt_main_process_new(task, rt);
551     if (nxt_slow_path(process == NULL)) {
552         return NXT_ERROR;
553     }
554 
555     process->name = init.name;
556     process->user_cred = &rt->user_cred;
557 
558     pinit = nxt_process_init(process);
559     *pinit = init;
560 
561     ret = nxt_main_start_process(task, process);
562     if (nxt_slow_path(ret == NXT_ERROR)) {
563         nxt_process_use(task, process, -1);
564     }
565 
566     return ret;
567 }
568 
569 
570 static nxt_process_t *
571 nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt)
572 {
573     nxt_process_t  *process;
574 
575     process = nxt_runtime_process_new(rt);
576     if (nxt_slow_path(process == NULL)) {
577         return NULL;
578     }
579 
580     process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
581     if (process->mem_pool == NULL) {
582         nxt_process_use(task, process, -1);
583         return NULL;
584     }
585 
586     return process;
587 }
588 
589 
590 static nxt_int_t
591 nxt_main_start_process(nxt_task_t *task, nxt_process_t *process)
592 {
593     nxt_mp_t            *tmp_mp;
594     nxt_int_t           ret;
595     nxt_pid_t           pid;
596     nxt_port_t          *port;
597     nxt_process_init_t  *init;
598 
599     init = nxt_process_init(process);
600 
601     port = nxt_port_new(task, 0, 0, init->type);
602     if (nxt_slow_path(port == NULL)) {
603         return NXT_ERROR;
604     }
605 
606     nxt_process_port_add(task, process, port);
607 
608     ret = nxt_port_socket_init(task, port, 0);
609     if (nxt_slow_path(ret != NXT_OK)) {
610         goto free_port;
611     }
612 
613     tmp_mp = nxt_mp_create(1024, 128, 256, 32);
614     if (nxt_slow_path(tmp_mp == NULL)) {
615         ret = NXT_ERROR;
616 
617         goto close_port;
618     }
619 
620     if (init->prefork) {
621         ret = init->prefork(task, process, tmp_mp);
622         if (nxt_slow_path(ret != NXT_OK)) {
623             goto free_mempool;
624         }
625     }
626 
627     pid = nxt_process_create(task, process);
628 
629     switch (pid) {
630 
631     case -1:
632         ret = NXT_ERROR;
633         break;
634 
635     case 0:
636         /* The child process: return to the event engine work queue loop. */
637 
638         nxt_process_use(task, process, -1);
639 
640         ret = NXT_AGAIN;
641         break;
642 
643     default:
644         /* The main process created a new process. */
645 
646         nxt_process_use(task, process, -1);
647 
648         nxt_port_read_close(port);
649         nxt_port_write_enable(task, port);
650 
651         ret = NXT_OK;
652         break;
653     }
654 
655 free_mempool:
656 
657     nxt_mp_destroy(tmp_mp);
658 
659 close_port:
660 
661     if (nxt_slow_path(ret == NXT_ERROR)) {
662         nxt_port_close(task, port);
663     }
664 
665 free_port:
666 
667     nxt_port_use(task, port, -1);
668 
669     return ret;
670 }
671 
672 
673 static void
674 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
675 {
676     nxt_debug(task, "sigterm handler signo:%d (%s)",
677               (int) (uintptr_t) obj, data);
678 
679     /* TODO: fast exit. */
680 
681     nxt_exiting = 1;
682 
683     nxt_runtime_quit(task, 0);
684 }
685 
686 
687 static void
688 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
689 {
690     nxt_debug(task, "sigquit handler signo:%d (%s)",
691               (int) (uintptr_t) obj, data);
692 
693     /* TODO: graceful exit. */
694 
695     nxt_exiting = 1;
696 
697     nxt_runtime_quit(task, 0);
698 }
699 
700 
701 static void
702 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
703 {
704     nxt_mp_t        *mp;
705     nxt_int_t       ret;
706     nxt_uint_t      n;
707     nxt_port_t      *port;
708     nxt_file_t      *file, *new_file;
709     nxt_array_t     *new_files;
710     nxt_runtime_t   *rt;
711 
712     nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
713             (int) (uintptr_t) obj, data, "log files rotation");
714 
715     rt = task->thread->runtime;
716 
717     port = rt->port_by_type[NXT_PROCESS_ROUTER];
718 
719     if (nxt_fast_path(port != NULL)) {
720         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
721                                      -1, 0, 0, NULL);
722     }
723 
724     mp = nxt_mp_create(1024, 128, 256, 32);
725     if (mp == NULL) {
726         return;
727     }
728 
729     n = nxt_list_nelts(rt->log_files);
730 
731     new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
732     if (new_files == NULL) {
733         nxt_mp_destroy(mp);
734         return;
735     }
736 
737     nxt_list_each(file, rt->log_files) {
738 
739         /* This allocation cannot fail. */
740         new_file = nxt_array_add(new_files);
741 
742         new_file->name = file->name;
743         new_file->fd = NXT_FILE_INVALID;
744         new_file->log_level = NXT_LOG_ALERT;
745 
746         ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
747                             NXT_FILE_OWNER_ACCESS);
748 
749         if (ret != NXT_OK) {
750             goto fail;
751         }
752 
753     } nxt_list_loop;
754 
755     new_file = new_files->elts;
756 
757     ret = nxt_file_stderr(&new_file[0]);
758 
759     if (ret == NXT_OK) {
760         n = 0;
761 
762         nxt_list_each(file, rt->log_files) {
763 
764             nxt_port_change_log_file(task, rt, n, new_file[n].fd);
765             /*
766              * The old log file descriptor must be closed at the moment
767              * when no other threads use it.  dup2() allows to use the
768              * old file descriptor for new log file.  This change is
769              * performed atomically in the kernel.
770              */
771             (void) nxt_file_redirect(file, new_file[n].fd);
772 
773             n++;
774 
775         } nxt_list_loop;
776 
777         nxt_mp_destroy(mp);
778         return;
779     }
780 
781 fail:
782 
783     new_file = new_files->elts;
784     n = new_files->nelts;
785 
786     while (n != 0) {
787         if (new_file->fd != NXT_FILE_INVALID) {
788             nxt_file_close(task, new_file);
789         }
790 
791         new_file++;
792         n--;
793     }
794 
795     nxt_mp_destroy(mp);
796 }
797 
798 
799 static void
800 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
801 {
802     int                    status;
803     nxt_err_t              err;
804     nxt_pid_t              pid;
805 
806     nxt_debug(task, "sigchld handler signo:%d (%s)",
807               (int) (uintptr_t) obj, data);
808 
809     for ( ;; ) {
810         pid = waitpid(-1, &status, WNOHANG);
811 
812         if (pid == -1) {
813 
814             switch (err = nxt_errno) {
815 
816             case NXT_ECHILD:
817                 return;
818 
819             case NXT_EINTR:
820                 continue;
821 
822             default:
823                 nxt_alert(task, "waitpid() failed: %E", err);
824                 return;
825             }
826         }
827 
828         nxt_debug(task, "waitpid(): %PI", pid);
829 
830         if (pid == 0) {
831             return;
832         }
833 
834         if (WTERMSIG(status)) {
835 #ifdef WCOREDUMP
836             nxt_alert(task, "process %PI exited on signal %d%s",
837                       pid, WTERMSIG(status),
838                       WCOREDUMP(status) ? " (core dumped)" : "");
839 #else
840             nxt_alert(task, "process %PI exited on signal %d",
841                       pid, WTERMSIG(status));
842 #endif
843 
844         } else {
845             nxt_trace(task, "process %PI exited with code %d",
846                       pid, WEXITSTATUS(status));
847         }
848 
849         nxt_main_cleanup_process(task, pid);
850     }
851 }
852 
853 
854 static void
855 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
856 {
857     nxt_trace(task, "signal signo:%d (%s) recevied, ignored",
858               (int) (uintptr_t) obj, data);
859 }
860 
861 
862 static void
863 nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid)
864 {
865     int                 stream;
866     nxt_int_t           ret;
867     nxt_buf_t           *buf;
868     nxt_port_t          *port;
869     const char          *name;
870     nxt_runtime_t       *rt;
871     nxt_process_t       *process;
872     nxt_process_init_t  init;
873 
874     rt = task->thread->runtime;
875 
876     process = nxt_runtime_process_find(rt, pid);
877     if (!process) {
878         return;
879     }
880 
881     if (process->isolation.cleanup != NULL) {
882         process->isolation.cleanup(task, process);
883     }
884 
885     name = process->name;
886     stream = process->stream;
887     init = *((nxt_process_init_t *) nxt_process_init(process));
888 
889     if (process->state == NXT_PROCESS_STATE_READY) {
890         process->stream = 0;
891     }
892 
893     nxt_process_close_ports(task, process);
894 
895     if (nxt_exiting) {
896         if (rt->nprocesses <= 1) {
897             nxt_runtime_quit(task, 0);
898         }
899 
900         return;
901     }
902 
903     nxt_runtime_process_each(rt, process) {
904 
905         if (process->pid == nxt_pid
906             || process->pid == pid
907             || nxt_queue_is_empty(&process->ports))
908         {
909             continue;
910         }
911 
912         port = nxt_process_port_first(process);
913 
914         if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) {
915             continue;
916         }
917 
918         buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
919                                    sizeof(pid));
920 
921         if (nxt_slow_path(buf == NULL)) {
922             continue;
923         }
924 
925         buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
926 
927         nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
928                               stream, 0, buf);
929 
930     } nxt_runtime_process_loop;
931 
932     if (init.restart) {
933         ret = nxt_main_process_create(task, init);
934         if (nxt_slow_path(ret == NXT_ERROR)) {
935             nxt_alert(task, "failed to restart %s", name);
936         }
937     }
938 }
939 
940 
941 static void
942 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
943 {
944     size_t                  size;
945     nxt_int_t               ret;
946     nxt_buf_t               *b, *out;
947     nxt_port_t              *port;
948     nxt_sockaddr_t          *sa;
949     nxt_port_msg_type_t     type;
950     nxt_listening_socket_t  ls;
951     u_char                  message[2048];
952 
953     b = msg->buf;
954     sa = (nxt_sockaddr_t *) b->mem.pos;
955 
956     /* TODO check b size and make plain */
957 
958     out = NULL;
959 
960     ls.socket = -1;
961     ls.error = NXT_SOCKET_ERROR_SYSTEM;
962     ls.start = message;
963     ls.end = message + sizeof(message);
964 
965     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
966                                  msg->port_msg.reply_port);
967 
968     nxt_debug(task, "listening socket \"%*s\"",
969               (size_t) sa->length, nxt_sockaddr_start(sa));
970 
971     ret = nxt_main_listening_socket(sa, &ls);
972 
973     if (ret == NXT_OK) {
974         nxt_debug(task, "socket(\"%*s\"): %d",
975                   (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
976 
977         type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
978 
979     } else {
980         size = ls.end - ls.start;
981 
982         nxt_alert(task, "%*s", size, ls.start);
983 
984         out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
985                                    size + 1);
986         if (nxt_slow_path(out == NULL)) {
987             return;
988         }
989 
990         *out->mem.free++ = (uint8_t) ls.error;
991 
992         out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
993 
994         type = NXT_PORT_MSG_RPC_ERROR;
995     }
996 
997     nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
998                           0, out);
999 }
1000 
1001 
1002 static nxt_int_t
1003 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
1004 {
1005     nxt_err_t         err;
1006     nxt_socket_t      s;
1007 
1008     const socklen_t   length = sizeof(int);
1009     static const int  enable = 1;
1010 
1011     s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1012 
1013     if (nxt_slow_path(s == -1)) {
1014         err = nxt_errno;
1015 
1016 #if (NXT_INET6)
1017 
1018         if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1019             ls->error = NXT_SOCKET_ERROR_NOINET6;
1020         }
1021 
1022 #endif
1023 
1024         ls->end = nxt_sprintf(ls->start, ls->end,
1025                               "socket(\\\"%*s\\\") failed %E",
1026                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1027 
1028         return NXT_ERROR;
1029     }
1030 
1031     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1032         ls->end = nxt_sprintf(ls->start, ls->end,
1033                               "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1034                               (size_t) sa->length, nxt_sockaddr_start(sa),
1035                               nxt_errno);
1036         goto fail;
1037     }
1038 
1039 #if (NXT_INET6)
1040 
1041     if (sa->u.sockaddr.sa_family == AF_INET6) {
1042 
1043         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1044             ls->end = nxt_sprintf(ls->start, ls->end,
1045                                "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1046                                (size_t) sa->length, nxt_sockaddr_start(sa),
1047                                nxt_errno);
1048             goto fail;
1049         }
1050     }
1051 
1052 #endif
1053 
1054     if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1055         err = nxt_errno;
1056 
1057 #if (NXT_HAVE_UNIX_DOMAIN)
1058 
1059         if (sa->u.sockaddr.sa_family == AF_UNIX) {
1060             switch (err) {
1061 
1062             case EACCES:
1063                 ls->error = NXT_SOCKET_ERROR_ACCESS;
1064                 break;
1065 
1066             case ENOENT:
1067             case ENOTDIR:
1068                 ls->error = NXT_SOCKET_ERROR_PATH;
1069                 break;
1070             }
1071 
1072         } else
1073 #endif
1074         {
1075             switch (err) {
1076 
1077             case EACCES:
1078                 ls->error = NXT_SOCKET_ERROR_PORT;
1079                 break;
1080 
1081             case EADDRINUSE:
1082                 ls->error = NXT_SOCKET_ERROR_INUSE;
1083                 break;
1084 
1085             case EADDRNOTAVAIL:
1086                 ls->error = NXT_SOCKET_ERROR_NOADDR;
1087                 break;
1088             }
1089         }
1090 
1091         ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1092                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1093         goto fail;
1094     }
1095 
1096 #if (NXT_HAVE_UNIX_DOMAIN)
1097 
1098     if (sa->u.sockaddr.sa_family == AF_UNIX) {
1099         char     *filename;
1100         mode_t   access;
1101 
1102         filename = sa->u.sockaddr_un.sun_path;
1103         access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1104 
1105         if (chmod(filename, access) != 0) {
1106             ls->end = nxt_sprintf(ls->start, ls->end,
1107                                   "chmod(\\\"%s\\\") failed %E",
1108                                   filename, nxt_errno);
1109             goto fail;
1110         }
1111     }
1112 
1113 #endif
1114 
1115     ls->socket = s;
1116 
1117     return NXT_OK;
1118 
1119 fail:
1120 
1121     (void) close(s);
1122 
1123     return NXT_ERROR;
1124 }
1125 
1126 
1127 static nxt_conf_map_t  nxt_app_lang_module_map[] = {
1128     {
1129         nxt_string("type"),
1130         NXT_CONF_MAP_INT,
1131         offsetof(nxt_app_lang_module_t, type),
1132     },
1133 
1134     {
1135         nxt_string("version"),
1136         NXT_CONF_MAP_CSTRZ,
1137         offsetof(nxt_app_lang_module_t, version),
1138     },
1139 
1140     {
1141         nxt_string("file"),
1142         NXT_CONF_MAP_CSTRZ,
1143         offsetof(nxt_app_lang_module_t, file),
1144     },
1145 };
1146 
1147 
1148 static nxt_conf_map_t  nxt_app_lang_mounts_map[] = {
1149     {
1150         nxt_string("src"),
1151         NXT_CONF_MAP_CSTRZ,
1152         offsetof(nxt_fs_mount_t, src),
1153     },
1154     {
1155         nxt_string("dst"),
1156         NXT_CONF_MAP_CSTRZ,
1157         offsetof(nxt_fs_mount_t, dst),
1158     },
1159     {
1160         nxt_string("fstype"),
1161         NXT_CONF_MAP_CSTRZ,
1162         offsetof(nxt_fs_mount_t, fstype),
1163     },
1164     {
1165         nxt_string("flags"),
1166         NXT_CONF_MAP_INT,
1167         offsetof(nxt_fs_mount_t, flags),
1168     },
1169     {
1170         nxt_string("data"),
1171         NXT_CONF_MAP_CSTRZ,
1172         offsetof(nxt_fs_mount_t, data),
1173     },
1174 };
1175 
1176 
1177 static void
1178 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1179 {
1180     uint32_t               index, jindex, nmounts;
1181     nxt_mp_t               *mp;
1182     nxt_int_t              ret;
1183     nxt_buf_t              *b;
1184     nxt_port_t             *port;
1185     nxt_runtime_t          *rt;
1186     nxt_fs_mount_t         *mnt;
1187     nxt_conf_value_t       *conf, *root, *value, *mounts;
1188     nxt_app_lang_module_t  *lang;
1189 
1190     static nxt_str_t root_path = nxt_string("/");
1191     static nxt_str_t mounts_name = nxt_string("mounts");
1192 
1193     rt = task->thread->runtime;
1194 
1195     if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1196         return;
1197     }
1198 
1199     if (nxt_exiting) {
1200         nxt_debug(task, "ignoring discovered modules, exiting");
1201         return;
1202     }
1203 
1204     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1205                                  msg->port_msg.reply_port);
1206 
1207     if (nxt_fast_path(port != NULL)) {
1208         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1209                                      msg->port_msg.stream, 0, NULL);
1210     }
1211 
1212     b = msg->buf;
1213 
1214     if (b == NULL) {
1215         return;
1216     }
1217 
1218     mp = nxt_mp_create(1024, 128, 256, 32);
1219     if (mp == NULL) {
1220         return;
1221     }
1222 
1223     b = nxt_buf_chk_make_plain(mp, b, msg->size);
1224 
1225     if (b == NULL) {
1226         return;
1227     }
1228 
1229     nxt_debug(task, "application languages: \"%*s\"",
1230               b->mem.free - b->mem.pos, b->mem.pos);
1231 
1232     conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1233     if (conf == NULL) {
1234         goto fail;
1235     }
1236 
1237     root = nxt_conf_get_path(conf, &root_path);
1238     if (root == NULL) {
1239         goto fail;
1240     }
1241 
1242     for (index = 0; /* void */ ; index++) {
1243         value = nxt_conf_get_array_element(root, index);
1244         if (value == NULL) {
1245             break;
1246         }
1247 
1248         lang = nxt_array_zero_add(rt->languages);
1249         if (lang == NULL) {
1250             goto fail;
1251         }
1252 
1253         lang->module = NULL;
1254 
1255         ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1256                                   nxt_nitems(nxt_app_lang_module_map), lang);
1257 
1258         if (ret != NXT_OK) {
1259             goto fail;
1260         }
1261 
1262         mounts = nxt_conf_get_object_member(value, &mounts_name, NULL);
1263         if (mounts == NULL) {
1264             nxt_alert(task, "missing mounts from discovery message.");
1265             goto fail;
1266         }
1267 
1268         if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) {
1269             nxt_alert(task, "invalid mounts type from discovery message.");
1270             goto fail;
1271         }
1272 
1273         nmounts = nxt_conf_array_elements_count(mounts);
1274 
1275         lang->mounts = nxt_array_create(rt->mem_pool, nmounts,
1276                                         sizeof(nxt_fs_mount_t));
1277 
1278         if (lang->mounts == NULL) {
1279             goto fail;
1280         }
1281 
1282         for (jindex = 0; /* */; jindex++) {
1283             value = nxt_conf_get_array_element(mounts, jindex);
1284             if (value == NULL) {
1285                 break;
1286             }
1287 
1288             mnt = nxt_array_zero_add(lang->mounts);
1289             if (mnt == NULL) {
1290                 goto fail;
1291             }
1292 
1293             ret = nxt_conf_map_object(rt->mem_pool, value,
1294                                       nxt_app_lang_mounts_map,
1295                                       nxt_nitems(nxt_app_lang_mounts_map), mnt);
1296 
1297             if (ret != NXT_OK) {
1298                 goto fail;
1299             }
1300         }
1301 
1302         nxt_debug(task, "lang %d %s \"%s\" (%d mounts)",
1303                   lang->type, lang->version, lang->file, lang->mounts->nelts);
1304     }
1305 
1306     qsort(rt->languages->elts, rt->languages->nelts,
1307           sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1308 
1309 fail:
1310 
1311     nxt_mp_destroy(mp);
1312 
1313     ret = nxt_main_process_create(task, nxt_controller_process);
1314     if (ret == NXT_OK) {
1315         ret = nxt_main_process_create(task, nxt_router_process);
1316     }
1317 
1318     if (nxt_slow_path(ret == NXT_ERROR)) {
1319         nxt_exiting = 1;
1320 
1321         nxt_runtime_quit(task, 1);
1322     }
1323 }
1324 
1325 
1326 static int nxt_cdecl
1327 nxt_app_lang_compare(const void *v1, const void *v2)
1328 {
1329     int                          n;
1330     const nxt_app_lang_module_t  *lang1, *lang2;
1331 
1332     lang1 = v1;
1333     lang2 = v2;
1334 
1335     n = lang1->type - lang2->type;
1336 
1337     if (n != 0) {
1338         return n;
1339     }
1340 
1341     n = nxt_strverscmp(lang1->version, lang2->version);
1342 
1343     /* Negate result to move higher versions to the beginning. */
1344 
1345     return -n;
1346 }
1347 
1348 
1349 static void
1350 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1351 {
1352     ssize_t        n, size, offset;
1353     nxt_buf_t      *b;
1354     nxt_int_t      ret;
1355     nxt_file_t     file;
1356     nxt_runtime_t  *rt;
1357 
1358     nxt_memzero(&file, sizeof(nxt_file_t));
1359 
1360     rt = task->thread->runtime;
1361 
1362     file.name = (nxt_file_name_t *) rt->conf_tmp;
1363 
1364     if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY,
1365                                     NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS)
1366                       != NXT_OK))
1367     {
1368         goto error;
1369     }
1370 
1371     offset = 0;
1372 
1373     for (b = msg->buf; b != NULL; b = b->next) {
1374         size = nxt_buf_mem_used_size(&b->mem);
1375 
1376         n = nxt_file_write(&file, b->mem.pos, size, offset);
1377 
1378         if (nxt_slow_path(n != size)) {
1379             nxt_file_close(task, &file);
1380             (void) nxt_file_delete(file.name);
1381             goto error;
1382         }
1383 
1384         offset += n;
1385     }
1386 
1387     nxt_file_close(task, &file);
1388 
1389     ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf);
1390 
1391     if (nxt_fast_path(ret == NXT_OK)) {
1392         return;
1393     }
1394 
1395 error:
1396 
1397     nxt_alert(task, "failed to store current configuration");
1398 }
1399 
1400 
1401 static void
1402 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1403 {
1404     u_char               *path;
1405     nxt_int_t            ret;
1406     nxt_file_t           file;
1407     nxt_port_t           *port;
1408     nxt_port_msg_type_t  type;
1409 
1410     nxt_debug(task, "opening access log file");
1411 
1412     path = msg->buf->mem.pos;
1413 
1414     nxt_memzero(&file, sizeof(nxt_file_t));
1415 
1416     file.name = (nxt_file_name_t *) path;
1417     file.log_level = NXT_LOG_ERR;
1418 
1419     ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1420                         NXT_FILE_OWNER_ACCESS);
1421 
1422     type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1423                            : NXT_PORT_MSG_RPC_ERROR;
1424 
1425     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1426                                  msg->port_msg.reply_port);
1427 
1428     if (nxt_fast_path(port != NULL)) {
1429         (void) nxt_port_socket_write(task, port, type, file.fd,
1430                                      msg->port_msg.stream, 0, NULL);
1431     }
1432 }
1433