xref: /unit/src/nxt_main_process.c (revision 510:4979fe09d9cd)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_router.h>
13 
14 
15 typedef struct {
16     nxt_socket_t        socket;
17     nxt_socket_error_t  error;
18     u_char              *start;
19     u_char              *end;
20 } nxt_listening_socket_t;
21 
22 
23 typedef struct {
24     nxt_int_t       size;
25     nxt_conf_map_t  *map;
26 } nxt_common_app_member_t;
27 
28 
29 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
30     nxt_runtime_t *rt);
31 static void nxt_main_process_title(nxt_task_t *task);
32 static nxt_int_t nxt_main_start_controller_process(nxt_task_t *task,
33     nxt_runtime_t *rt);
34 static nxt_int_t nxt_main_create_controller_process(nxt_task_t *task,
35     nxt_runtime_t *rt, nxt_process_init_t *init);
36 static nxt_int_t nxt_main_start_router_process(nxt_task_t *task,
37     nxt_runtime_t *rt);
38 static nxt_int_t nxt_main_start_discovery_process(nxt_task_t *task,
39     nxt_runtime_t *rt);
40 static nxt_int_t nxt_main_start_worker_process(nxt_task_t *task,
41     nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream);
42 static nxt_int_t nxt_main_create_worker_process(nxt_task_t *task,
43     nxt_runtime_t *rt, nxt_process_init_t *init);
44 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
45     void *data);
46 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
47     void *data);
48 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
49     void *data);
50 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
51     void *data);
52 static void nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid);
53 static void nxt_main_port_socket_handler(nxt_task_t *task,
54     nxt_port_recv_msg_t *msg);
55 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
56     nxt_listening_socket_t *ls);
57 static void nxt_main_port_modules_handler(nxt_task_t *task,
58     nxt_port_recv_msg_t *msg);
59 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
60 static void nxt_main_port_conf_store_handler(nxt_task_t *task,
61     nxt_port_recv_msg_t *msg);
62 
63 
64 const nxt_sig_event_t  nxt_main_process_signals[] = {
65     nxt_event_signal(SIGINT,  nxt_main_process_sigterm_handler),
66     nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
67     nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
68     nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
69     nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
70     nxt_event_signal_end,
71 };
72 
73 
74 static nxt_bool_t  nxt_exiting;
75 
76 
77 nxt_int_t
78 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
79     nxt_runtime_t *rt)
80 {
81     rt->types |= (1U << NXT_PROCESS_MAIN);
82 
83     if (nxt_main_process_port_create(task, rt) != NXT_OK) {
84         return NXT_ERROR;
85     }
86 
87     nxt_main_process_title(task);
88 
89     /*
90      * The dicsovery process will send a message processed by
91      * nxt_main_port_modules_handler() which starts the controller
92      * and router processes.
93      */
94     return nxt_main_start_discovery_process(task, rt);
95 }
96 
97 
98 static nxt_conf_map_t  nxt_common_app_conf[] = {
99     {
100         nxt_string("type"),
101         NXT_CONF_MAP_STR,
102         offsetof(nxt_common_app_conf_t, type),
103     },
104 
105     {
106         nxt_string("user"),
107         NXT_CONF_MAP_STR,
108         offsetof(nxt_common_app_conf_t, user),
109     },
110 
111     {
112         nxt_string("group"),
113         NXT_CONF_MAP_STR,
114         offsetof(nxt_common_app_conf_t, group),
115     },
116 
117     {
118         nxt_string("working_directory"),
119         NXT_CONF_MAP_CSTRZ,
120         offsetof(nxt_common_app_conf_t, working_directory),
121     },
122 };
123 
124 
125 static nxt_conf_map_t  nxt_common_python_app_conf[] = {
126     {
127         nxt_string("home"),
128         NXT_CONF_MAP_CSTRZ,
129         offsetof(nxt_common_app_conf_t, u.python.home),
130     },
131 
132     {
133         nxt_string("path"),
134         NXT_CONF_MAP_STR,
135         offsetof(nxt_common_app_conf_t, u.python.path),
136     },
137 
138     {
139         nxt_string("module"),
140         NXT_CONF_MAP_STR,
141         offsetof(nxt_common_app_conf_t, u.python.module),
142     },
143 };
144 
145 
146 static nxt_conf_map_t  nxt_common_php_app_conf[] = {
147     {
148         nxt_string("root"),
149         NXT_CONF_MAP_CSTRZ,
150         offsetof(nxt_common_app_conf_t, u.php.root),
151     },
152 
153     {
154         nxt_string("script"),
155         NXT_CONF_MAP_STR,
156         offsetof(nxt_common_app_conf_t, u.php.script),
157     },
158 
159     {
160         nxt_string("index"),
161         NXT_CONF_MAP_STR,
162         offsetof(nxt_common_app_conf_t, u.php.index),
163     },
164 };
165 
166 
167 static nxt_conf_map_t  nxt_common_go_app_conf[] = {
168     {
169         nxt_string("executable"),
170         NXT_CONF_MAP_CSTRZ,
171         offsetof(nxt_common_app_conf_t, u.go.executable),
172     },
173 };
174 
175 
176 static nxt_conf_map_t  nxt_common_perl_app_conf[] = {
177     {
178         nxt_string("script"),
179         NXT_CONF_MAP_CSTRZ,
180         offsetof(nxt_common_app_conf_t, u.perl.script),
181     },
182 };
183 
184 
185 static nxt_common_app_member_t  nxt_common_members[] = {
186     { nxt_nitems(nxt_common_python_app_conf), nxt_common_python_app_conf },
187     { nxt_nitems(nxt_common_php_app_conf),    nxt_common_php_app_conf },
188     { nxt_nitems(nxt_common_go_app_conf),     nxt_common_go_app_conf },
189     { nxt_nitems(nxt_common_perl_app_conf),   nxt_common_perl_app_conf },
190 };
191 
192 
193 static void
194 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
195 {
196     nxt_debug(task, "main data: %*s",
197               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
198 }
199 
200 
201 static void
202 nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
203 {
204     u_char                 *start;
205     nxt_mp_t               *mp;
206     nxt_int_t              ret, idx;
207     nxt_buf_t              *b;
208     nxt_port_t             *port;
209     nxt_conf_value_t       *conf;
210     nxt_common_app_conf_t  app_conf;
211 
212     static nxt_str_t nobody = nxt_string("nobody");
213 
214     ret = NXT_ERROR;
215 
216     mp = nxt_mp_create(1024, 128, 256, 32);
217 
218     if (nxt_slow_path(mp == NULL)) {
219         return;
220     }
221 
222     b = nxt_buf_chk_make_plain(mp, msg->buf, msg->size);
223 
224     if (b == NULL) {
225         return;
226     }
227 
228     nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos,
229               b->mem.pos);
230 
231     nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t));
232 
233     start = b->mem.pos;
234 
235     app_conf.name.start = start;
236     app_conf.name.length = nxt_strlen(start);
237 
238     start += app_conf.name.length + 1;
239 
240     conf = nxt_conf_json_parse(mp, start, b->mem.free, NULL);
241 
242     if (conf == NULL) {
243         nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
244 
245         goto failed;
246     }
247 
248     app_conf.user = nobody;
249 
250     ret = nxt_conf_map_object(mp, conf, nxt_common_app_conf,
251                               nxt_nitems(nxt_common_app_conf), &app_conf);
252     if (ret != NXT_OK) {
253         nxt_log(task, NXT_LOG_CRIT, "root map error");
254         goto failed;
255     }
256 
257     idx = nxt_app_parse_type(app_conf.type.start, app_conf.type.length);
258 
259     nxt_assert(ret != NXT_APP_UNKNOWN);
260 
261     ret = nxt_conf_map_object(mp, conf, nxt_common_members[idx].map,
262                               nxt_common_members[idx].size, &app_conf);
263 
264     nxt_assert(ret == NXT_OK);
265 
266     ret = nxt_main_start_worker_process(task, task->thread->runtime,
267                                         &app_conf, msg->port_msg.stream);
268 
269 failed:
270 
271     if (ret == NXT_ERROR) {
272         port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
273                                      msg->port_msg.reply_port);
274         if (nxt_fast_path(port != NULL)) {
275             nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
276                                     -1, msg->port_msg.stream, 0, NULL);
277         }
278     }
279 
280     nxt_mp_destroy(mp);
281 }
282 
283 
284 static nxt_port_handlers_t  nxt_main_process_port_handlers = {
285     .data           = nxt_port_main_data_handler,
286     .process_ready  = nxt_port_process_ready_handler,
287     .start_worker   = nxt_port_main_start_worker_handler,
288     .socket         = nxt_main_port_socket_handler,
289     .modules        = nxt_main_port_modules_handler,
290     .conf_store     = nxt_main_port_conf_store_handler,
291     .rpc_ready      = nxt_port_rpc_handler,
292     .rpc_error      = nxt_port_rpc_handler,
293 };
294 
295 
296 static nxt_int_t
297 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
298 {
299     nxt_int_t      ret;
300     nxt_port_t     *port;
301     nxt_process_t  *process;
302 
303     process = nxt_runtime_process_get(rt, nxt_pid);
304     if (nxt_slow_path(process == NULL)) {
305         return NXT_ERROR;
306     }
307 
308     port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN);
309     if (nxt_slow_path(port == NULL)) {
310         nxt_process_use(task, process, -1);
311         return NXT_ERROR;
312     }
313 
314     nxt_process_port_add(task, process, port);
315 
316     nxt_process_use(task, process, -1);
317 
318     ret = nxt_port_socket_init(task, port, 0);
319     if (nxt_slow_path(ret != NXT_OK)) {
320         nxt_port_use(task, port, -1);
321         return ret;
322     }
323 
324     nxt_runtime_port_add(task, port);
325 
326     nxt_port_use(task, port, -1);
327 
328     /*
329      * A main process port.  A write port is not closed
330      * since it should be inherited by worker processes.
331      */
332     nxt_port_enable(task, port, &nxt_main_process_port_handlers);
333 
334     process->ready = 1;
335 
336     return NXT_OK;
337 }
338 
339 
340 static void
341 nxt_main_process_title(nxt_task_t *task)
342 {
343     u_char      *p, *end;
344     nxt_uint_t  i;
345     u_char      title[2048];
346 
347     end = title + sizeof(title) - 1;
348 
349     p = nxt_sprintf(title, end, "unit: main [%s", nxt_process_argv[0]);
350 
351     for (i = 1; nxt_process_argv[i] != NULL; i++) {
352         p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
353     }
354 
355     if (p < end) {
356         *p++ = ']';
357     }
358 
359     *p = '\0';
360 
361     nxt_process_title(task, "%s", title);
362 }
363 
364 
365 static nxt_int_t
366 nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
367 {
368     nxt_process_init_t  *init;
369 
370     init = nxt_malloc(sizeof(nxt_process_init_t));
371     if (nxt_slow_path(init == NULL)) {
372         return NXT_ERROR;
373     }
374 
375     init->start = nxt_controller_start;
376     init->name = "controller";
377     init->user_cred = &rt->user_cred;
378     init->port_handlers = &nxt_controller_process_port_handlers;
379     init->signals = nxt_worker_process_signals;
380     init->type = NXT_PROCESS_CONTROLLER;
381     init->stream = 0;
382     init->restart = &nxt_main_create_controller_process;
383 
384     return nxt_main_create_controller_process(task, rt, init);;
385 }
386 
387 
388 static nxt_int_t
389 nxt_main_create_controller_process(nxt_task_t *task, nxt_runtime_t *rt,
390     nxt_process_init_t *init)
391 {
392     ssize_t          n;
393     nxt_int_t        ret;
394     nxt_str_t        conf;
395     nxt_file_t       file;
396     nxt_file_info_t  fi;
397 
398     conf.length = 0;
399 
400     nxt_memzero(&file, sizeof(nxt_file_t));
401 
402     file.name = (nxt_file_name_t *) rt->conf;
403 
404     ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0);
405 
406     if (ret == NXT_OK) {
407         ret = nxt_file_info(&file, &fi);
408 
409         if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) {
410             conf.length = nxt_file_size(&fi);
411             conf.start = nxt_malloc(conf.length);
412 
413             if (nxt_slow_path(conf.start == NULL)) {
414                 nxt_file_close(task, &file);
415                 return NXT_ERROR;
416             }
417 
418             n = nxt_file_read(&file, conf.start, conf.length, 0);
419 
420             if (nxt_slow_path(n != (ssize_t) conf.length)) {
421                 conf.length = 0;
422                 nxt_free(conf.start);
423 
424                 nxt_log(task, NXT_LOG_ALERT,
425                         "failed to restore previous configuration: "
426                         "cannot read the file");
427             }
428         }
429 
430         nxt_file_close(task, &file);
431     }
432 
433     init->data = &conf;
434 
435     ret = nxt_main_create_worker_process(task, rt, init);
436 
437     if (ret == NXT_OK && conf.length != 0) {
438         nxt_free(conf.start);
439     }
440 
441     return ret;
442 }
443 
444 
445 static nxt_int_t
446 nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt)
447 {
448     nxt_process_init_t  *init;
449 
450     init = nxt_malloc(sizeof(nxt_process_init_t));
451     if (nxt_slow_path(init == NULL)) {
452         return NXT_ERROR;
453     }
454 
455     init->start = nxt_discovery_start;
456     init->name = "discovery";
457     init->user_cred = &rt->user_cred;
458     init->port_handlers = &nxt_discovery_process_port_handlers;
459     init->signals = nxt_worker_process_signals;
460     init->type = NXT_PROCESS_DISCOVERY;
461     init->data = rt;
462     init->stream = 0;
463     init->restart = NULL;
464 
465     return nxt_main_create_worker_process(task, rt, init);
466 }
467 
468 
469 static nxt_int_t
470 nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt)
471 {
472     nxt_process_init_t  *init;
473 
474     init = nxt_malloc(sizeof(nxt_process_init_t));
475     if (nxt_slow_path(init == NULL)) {
476         return NXT_ERROR;
477     }
478 
479     init->start = nxt_router_start;
480     init->name = "router";
481     init->user_cred = &rt->user_cred;
482     init->port_handlers = &nxt_router_process_port_handlers;
483     init->signals = nxt_worker_process_signals;
484     init->type = NXT_PROCESS_ROUTER;
485     init->data = rt;
486     init->stream = 0;
487     init->restart = &nxt_main_create_worker_process;
488 
489     return nxt_main_create_worker_process(task, rt, init);
490 }
491 
492 
493 static nxt_int_t
494 nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
495     nxt_common_app_conf_t *app_conf, uint32_t stream)
496 {
497     char                *user, *group;
498     u_char              *title, *last, *end;
499     size_t              size;
500     nxt_process_init_t  *init;
501 
502     size = sizeof(nxt_process_init_t)
503            + sizeof(nxt_user_cred_t)
504            + app_conf->user.length + 1
505            + app_conf->group.length + 1
506            + app_conf->name.length + sizeof("\"\" application");
507 
508     init = nxt_malloc(size);
509     if (nxt_slow_path(init == NULL)) {
510         return NXT_ERROR;
511     }
512 
513     init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t));
514     user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t));
515 
516     nxt_memcpy(user, app_conf->user.start, app_conf->user.length);
517     last = nxt_pointer_to(user, app_conf->user.length);
518     *last++ = '\0';
519 
520     init->user_cred->user = user;
521 
522     if (app_conf->group.start != NULL) {
523         group = (char *) last;
524 
525         nxt_memcpy(group, app_conf->group.start, app_conf->group.length);
526         last = nxt_pointer_to(group, app_conf->group.length);
527         *last++ = '\0';
528 
529     } else {
530         group = NULL;
531     }
532 
533     if (nxt_user_cred_get(task, init->user_cred, group) != NXT_OK) {
534         return NXT_ERROR;
535     }
536 
537     title = last;
538     end = title + app_conf->name.length + sizeof("\"\" application");
539 
540     nxt_sprintf(title, end, "\"%V\" application%Z", &app_conf->name);
541 
542     init->start = nxt_app_start;
543     init->name = (char *) title;
544     init->port_handlers = &nxt_app_process_port_handlers;
545     init->signals = nxt_worker_process_signals;
546     init->type = NXT_PROCESS_WORKER;
547     init->data = app_conf;
548     init->stream = stream;
549     init->restart = NULL;
550 
551     return nxt_main_create_worker_process(task, rt, init);
552 }
553 
554 
555 static nxt_int_t
556 nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
557     nxt_process_init_t *init)
558 {
559     nxt_int_t      ret;
560     nxt_pid_t      pid;
561     nxt_port_t     *port;
562     nxt_process_t  *process;
563 
564     /*
565      * TODO: remove process, init, ports from array on memory and fork failures.
566      */
567 
568     process = nxt_runtime_process_new(rt);
569     if (nxt_slow_path(process == NULL)) {
570         return NXT_ERROR;
571     }
572 
573     process->init = init;
574 
575     port = nxt_port_new(task, 0, 0, init->type);
576     if (nxt_slow_path(port == NULL)) {
577         nxt_process_use(task, process, -1);
578         return NXT_ERROR;
579     }
580 
581     nxt_process_port_add(task, process, port);
582 
583     nxt_process_use(task, process, -1);
584 
585     ret = nxt_port_socket_init(task, port, 0);
586     if (nxt_slow_path(ret != NXT_OK)) {
587         nxt_port_use(task, port, -1);
588         return ret;
589     }
590 
591     pid = nxt_process_create(task, process);
592 
593     nxt_port_use(task, port, -1);
594 
595     switch (pid) {
596 
597     case -1:
598         return NXT_ERROR;
599 
600     case 0:
601         /* A worker process, return to the event engine work queue loop. */
602         return NXT_AGAIN;
603 
604     default:
605         /* The main process created a new process. */
606 
607         nxt_port_read_close(port);
608         nxt_port_write_enable(task, port);
609 
610         return NXT_OK;
611     }
612 }
613 
614 
615 void
616 nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt)
617 {
618     nxt_port_t     *port;
619     nxt_process_t  *process;
620 
621     nxt_runtime_process_each(rt, process) {
622 
623         if (nxt_pid != process->pid) {
624             process->init = NULL;
625 
626             nxt_process_port_each(process, port) {
627 
628                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
629                                              -1, 0, 0, NULL);
630 
631             } nxt_process_port_loop;
632         }
633 
634     } nxt_runtime_process_loop;
635 }
636 
637 
638 
639 static void
640 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
641 {
642     nxt_debug(task, "sigterm handler signo:%d (%s)",
643               (int) (uintptr_t) obj, data);
644 
645     /* TODO: fast exit. */
646 
647     nxt_exiting = 1;
648 
649     nxt_runtime_quit(task);
650 }
651 
652 
653 static void
654 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
655 {
656     nxt_debug(task, "sigquit handler signo:%d (%s)",
657               (int) (uintptr_t) obj, data);
658 
659     /* TODO: graceful exit. */
660 
661     nxt_exiting = 1;
662 
663     nxt_runtime_quit(task);
664 }
665 
666 
667 static void
668 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
669 {
670     nxt_mp_t        *mp;
671     nxt_int_t       ret;
672     nxt_uint_t      n;
673     nxt_file_t      *file, *new_file;
674     nxt_runtime_t   *rt;
675     nxt_array_t     *new_files;
676 
677     nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
678             (int) (uintptr_t) obj, data, "log files rotation");
679 
680     mp = nxt_mp_create(1024, 128, 256, 32);
681     if (mp == NULL) {
682         return;
683     }
684 
685     rt = task->thread->runtime;
686 
687     n = nxt_list_nelts(rt->log_files);
688 
689     new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
690     if (new_files == NULL) {
691         nxt_mp_destroy(mp);
692         return;
693     }
694 
695     nxt_list_each(file, rt->log_files) {
696 
697         /* This allocation cannot fail. */
698         new_file = nxt_array_add(new_files);
699 
700         new_file->name = file->name;
701         new_file->fd = NXT_FILE_INVALID;
702         new_file->log_level = NXT_LOG_CRIT;
703 
704         ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
705                             NXT_FILE_OWNER_ACCESS);
706 
707         if (ret != NXT_OK) {
708             goto fail;
709         }
710 
711     } nxt_list_loop;
712 
713     new_file = new_files->elts;
714 
715     ret = nxt_file_stderr(&new_file[0]);
716 
717     if (ret == NXT_OK) {
718         n = 0;
719 
720         nxt_list_each(file, rt->log_files) {
721 
722             nxt_port_change_log_file(task, rt, n, new_file[n].fd);
723             /*
724              * The old log file descriptor must be closed at the moment
725              * when no other threads use it.  dup2() allows to use the
726              * old file descriptor for new log file.  This change is
727              * performed atomically in the kernel.
728              */
729             (void) nxt_file_redirect(file, new_file[n].fd);
730 
731             n++;
732 
733         } nxt_list_loop;
734 
735         nxt_mp_destroy(mp);
736         return;
737    }
738 
739 fail:
740 
741     new_file = new_files->elts;
742     n = new_files->nelts;
743 
744     while (n != 0) {
745         if (new_file->fd != NXT_FILE_INVALID) {
746             nxt_file_close(task, new_file);
747         }
748 
749         new_file++;
750         n--;
751     }
752 
753     nxt_mp_destroy(mp);
754 }
755 
756 
757 static void
758 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
759 {
760     int                    status;
761     nxt_err_t              err;
762     nxt_pid_t              pid;
763 
764     nxt_debug(task, "sigchld handler signo:%d (%s)",
765               (int) (uintptr_t) obj, data);
766 
767     for ( ;; ) {
768         pid = waitpid(-1, &status, WNOHANG);
769 
770         if (pid == -1) {
771 
772             switch (err = nxt_errno) {
773 
774             case NXT_ECHILD:
775                 return;
776 
777             case NXT_EINTR:
778                 continue;
779 
780             default:
781                 nxt_log(task, NXT_LOG_CRIT, "waitpid() failed: %E", err);
782                 return;
783             }
784         }
785 
786         nxt_debug(task, "waitpid(): %PI", pid);
787 
788         if (pid == 0) {
789             return;
790         }
791 
792         if (WTERMSIG(status)) {
793 #ifdef WCOREDUMP
794             nxt_log(task, NXT_LOG_CRIT, "process %PI exited on signal %d%s",
795                     pid, WTERMSIG(status),
796                     WCOREDUMP(status) ? " (core dumped)" : "");
797 #else
798             nxt_log(task, NXT_LOG_CRIT, "process %PI exited on signal %d",
799                     pid, WTERMSIG(status));
800 #endif
801 
802         } else {
803             nxt_trace(task, "process %PI exited with code %d",
804                       pid, WEXITSTATUS(status));
805         }
806 
807         nxt_main_cleanup_worker_process(task, pid);
808     }
809 }
810 
811 
812 static void
813 nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
814 {
815     nxt_buf_t           *buf;
816     nxt_port_t          *port;
817     nxt_runtime_t       *rt;
818     nxt_process_t       *process;
819     nxt_process_type_t  ptype;
820     nxt_process_init_t  *init;
821 
822     rt = task->thread->runtime;
823 
824     process = nxt_runtime_process_find(rt, pid);
825 
826     if (process) {
827         init = process->init;
828 
829         ptype = nxt_process_type(process);
830 
831         nxt_process_close_ports(task, process);
832 
833         if (!nxt_exiting) {
834             nxt_runtime_process_each(rt, process) {
835 
836                 if (process->pid == nxt_pid
837                     || process->pid == pid
838                     || nxt_queue_is_empty(&process->ports))
839                 {
840                     continue;
841                 }
842 
843                 port = nxt_process_port_first(process);
844 
845                 if (nxt_proc_remove_notify_martix[ptype][port->type] == 0) {
846                     continue;
847                 }
848 
849                 buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
850                                            sizeof(pid));
851 
852                 nxt_assert(buf != NULL);
853 
854                 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
855 
856                 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID,
857                                       -1, init->stream, 0, buf);
858             } nxt_runtime_process_loop;
859         }
860 
861         if (nxt_exiting) {
862 
863             if (rt->nprocesses == 2) {
864                 nxt_runtime_quit(task);
865             }
866 
867         } else if (init != NULL) {
868             if (init->restart != NULL) {
869                 init->restart(task, rt, init);
870 
871             } else {
872                 nxt_free(init);
873             }
874         }
875     }
876 }
877 
878 
879 static void
880 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
881 {
882     size_t                  size;
883     nxt_int_t               ret;
884     nxt_buf_t               *b, *out;
885     nxt_port_t              *port;
886     nxt_sockaddr_t          *sa;
887     nxt_port_msg_type_t     type;
888     nxt_listening_socket_t  ls;
889     u_char                  message[2048];
890 
891     b = msg->buf;
892     sa = (nxt_sockaddr_t *) b->mem.pos;
893 
894     /* TODO check b size and make plain */
895 
896     out = NULL;
897 
898     ls.socket = -1;
899     ls.error = NXT_SOCKET_ERROR_SYSTEM;
900     ls.start = message;
901     ls.end = message + sizeof(message);
902 
903     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
904                                  msg->port_msg.reply_port);
905 
906     nxt_debug(task, "listening socket \"%*s\"",
907               (size_t) sa->length, nxt_sockaddr_start(sa));
908 
909     ret = nxt_main_listening_socket(sa, &ls);
910 
911     if (ret == NXT_OK) {
912         nxt_debug(task, "socket(\"%*s\"): %d",
913                   (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
914 
915         type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
916 
917     } else {
918         size = ls.end - ls.start;
919 
920         nxt_log(task, NXT_LOG_CRIT, "%*s", size, ls.start);
921 
922         out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
923                                    size + 1);
924         if (nxt_slow_path(out == NULL)) {
925             return;
926         }
927 
928         *out->mem.free++ = (uint8_t) ls.error;
929 
930         out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
931 
932         type = NXT_PORT_MSG_RPC_ERROR;
933     }
934 
935     nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
936                           0, out);
937 }
938 
939 
940 static nxt_int_t
941 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
942 {
943     nxt_err_t         err;
944     nxt_socket_t      s;
945 
946     const socklen_t   length = sizeof(int);
947     static const int  enable = 1;
948 
949     s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
950 
951     if (nxt_slow_path(s == -1)) {
952         err = nxt_errno;
953 
954 #if (NXT_INET6)
955 
956         if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
957             ls->error = NXT_SOCKET_ERROR_NOINET6;
958         }
959 
960 #endif
961 
962         ls->end = nxt_sprintf(ls->start, ls->end,
963                               "socket(\\\"%*s\\\") failed %E",
964                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
965 
966         return NXT_ERROR;
967     }
968 
969     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
970         ls->end = nxt_sprintf(ls->start, ls->end,
971                               "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
972                               (size_t) sa->length, nxt_sockaddr_start(sa),
973                               nxt_errno);
974         goto fail;
975     }
976 
977 #if (NXT_INET6)
978 
979     if (sa->u.sockaddr.sa_family == AF_INET6) {
980 
981         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
982             ls->end = nxt_sprintf(ls->start, ls->end,
983                                "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
984                                (size_t) sa->length, nxt_sockaddr_start(sa),
985                                nxt_errno);
986             goto fail;
987         }
988     }
989 
990 #endif
991 
992     if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
993         err = nxt_errno;
994 
995 #if (NXT_HAVE_UNIX_DOMAIN)
996 
997         if (sa->u.sockaddr.sa_family == AF_UNIX) {
998             switch (err) {
999 
1000             case EACCES:
1001                 ls->error = NXT_SOCKET_ERROR_ACCESS;
1002                 break;
1003 
1004             case ENOENT:
1005             case ENOTDIR:
1006                 ls->error = NXT_SOCKET_ERROR_PATH;
1007                 break;
1008             }
1009 
1010             goto next;
1011         }
1012 
1013 #endif
1014 
1015         switch (err) {
1016 
1017         case EACCES:
1018             ls->error = NXT_SOCKET_ERROR_PORT;
1019             break;
1020 
1021         case EADDRINUSE:
1022             ls->error = NXT_SOCKET_ERROR_INUSE;
1023             break;
1024 
1025         case EADDRNOTAVAIL:
1026             ls->error = NXT_SOCKET_ERROR_NOADDR;
1027             break;
1028         }
1029 
1030         ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1031                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1032         goto fail;
1033     }
1034 
1035 #if (NXT_HAVE_UNIX_DOMAIN)
1036 
1037 next:
1038 
1039     if (sa->u.sockaddr.sa_family == AF_UNIX) {
1040         char     *filename;
1041         mode_t   access;
1042 
1043         filename = sa->u.sockaddr_un.sun_path;
1044         access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1045 
1046         if (chmod(filename, access) != 0) {
1047             ls->end = nxt_sprintf(ls->start, ls->end,
1048                                   "chmod(\\\"%s\\\") failed %E",
1049                                   filename, nxt_errno);
1050             goto fail;
1051         }
1052     }
1053 
1054 #endif
1055 
1056     ls->socket = s;
1057 
1058     return NXT_OK;
1059 
1060 fail:
1061 
1062     (void) close(s);
1063 
1064     return NXT_ERROR;
1065 }
1066 
1067 
1068 static nxt_conf_map_t  nxt_app_lang_module_map[] = {
1069     {
1070         nxt_string("type"),
1071         NXT_CONF_MAP_INT,
1072         offsetof(nxt_app_lang_module_t, type),
1073     },
1074 
1075     {
1076         nxt_string("version"),
1077         NXT_CONF_MAP_CSTRZ,
1078         offsetof(nxt_app_lang_module_t, version),
1079     },
1080 
1081     {
1082         nxt_string("file"),
1083         NXT_CONF_MAP_CSTRZ,
1084         offsetof(nxt_app_lang_module_t, file),
1085     },
1086 };
1087 
1088 
1089 static void
1090 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1091 {
1092     uint32_t               index;
1093     nxt_mp_t               *mp;
1094     nxt_int_t              ret;
1095     nxt_buf_t              *b;
1096     nxt_runtime_t          *rt;
1097     nxt_conf_value_t       *conf, *root, *value;
1098     nxt_app_lang_module_t  *lang;
1099 
1100     static nxt_str_t   root_path = nxt_string("/");
1101 
1102     rt = task->thread->runtime;
1103 
1104     if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1105         return;
1106     }
1107 
1108     b = msg->buf;
1109 
1110     if (b == NULL) {
1111         return;
1112     }
1113 
1114     mp = nxt_mp_create(1024, 128, 256, 32);
1115     if (mp == NULL) {
1116         return;
1117     }
1118 
1119     b = nxt_buf_chk_make_plain(mp, b, msg->size);
1120 
1121     if (b == NULL) {
1122         return;
1123     }
1124 
1125     nxt_debug(task, "application languages: \"%*s\"",
1126               b->mem.free - b->mem.pos, b->mem.pos);
1127 
1128     conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1129     if (conf == NULL) {
1130         goto fail;
1131     }
1132 
1133     root = nxt_conf_get_path(conf, &root_path);
1134     if (root == NULL) {
1135         goto fail;
1136     }
1137 
1138     for (index = 0; /* void */ ; index++) {
1139         value = nxt_conf_get_array_element(root, index);
1140         if (value == NULL) {
1141             break;
1142         }
1143 
1144         lang = nxt_array_add(rt->languages);
1145         if (lang == NULL) {
1146             goto fail;
1147         }
1148 
1149         lang->module = NULL;
1150 
1151         ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1152                                   nxt_nitems(nxt_app_lang_module_map), lang);
1153 
1154         if (ret != NXT_OK) {
1155             goto fail;
1156         }
1157 
1158         nxt_debug(task, "lang %d %s \"%s\"",
1159                   lang->type, lang->version, lang->file);
1160     }
1161 
1162     qsort(rt->languages->elts, rt->languages->nelts,
1163           sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1164 
1165 fail:
1166 
1167     nxt_mp_destroy(mp);
1168 
1169     ret = nxt_main_start_controller_process(task, rt);
1170 
1171     if (ret == NXT_OK) {
1172         (void) nxt_main_start_router_process(task, rt);
1173     }
1174 }
1175 
1176 
1177 static int nxt_cdecl
1178 nxt_app_lang_compare(const void *v1, const void *v2)
1179 {
1180     int                          n;
1181     const nxt_app_lang_module_t  *lang1, *lang2;
1182 
1183     lang1 = v1;
1184     lang2 = v2;
1185 
1186     n = lang1->type - lang2->type;
1187 
1188     if (n != 0) {
1189         return n;
1190     }
1191 
1192     n = nxt_strverscmp(lang1->version, lang2->version);
1193 
1194     /* Negate result to move higher versions to the beginning. */
1195 
1196     return -n;
1197 }
1198 
1199 
1200 static void
1201 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1202 {
1203     ssize_t        n, size, offset;
1204     nxt_buf_t      *b;
1205     nxt_int_t      ret;
1206     nxt_file_t     file;
1207     nxt_runtime_t  *rt;
1208 
1209     nxt_memzero(&file, sizeof(nxt_file_t));
1210 
1211     rt = task->thread->runtime;
1212 
1213     file.name = (nxt_file_name_t *) rt->conf_tmp;
1214 
1215     if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY,
1216                                     NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS)
1217                       != NXT_OK))
1218     {
1219         goto error;
1220     }
1221 
1222     offset = 0;
1223 
1224     for (b = msg->buf; b != NULL; b = b->next) {
1225         size = nxt_buf_mem_used_size(&b->mem);
1226 
1227         n = nxt_file_write(&file, b->mem.pos, size, offset);
1228 
1229         if (nxt_slow_path(n != size)) {
1230             nxt_file_close(task, &file);
1231             (void) nxt_file_delete(file.name);
1232             goto error;
1233         }
1234 
1235         offset += n;
1236     }
1237 
1238     nxt_file_close(task, &file);
1239 
1240     ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf);
1241 
1242     if (nxt_fast_path(ret == NXT_OK)) {
1243         return;
1244     }
1245 
1246 error:
1247 
1248     nxt_log(task, NXT_LOG_ALERT, "failed to store current configuration");
1249 }
1250