xref: /unit/src/nxt_main_process.c (revision 804:fe8d2dea28dd)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_router.h>
13 #if (NXT_TLS)
14 #include <nxt_cert.h>
15 #endif
16 
17 
18 typedef struct {
19     nxt_socket_t        socket;
20     nxt_socket_error_t  error;
21     u_char              *start;
22     u_char              *end;
23 } nxt_listening_socket_t;
24 
25 
26 typedef struct {
27     nxt_uint_t          size;
28     nxt_conf_map_t      *map;
29 } nxt_conf_app_map_t;
30 
31 
32 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
33     nxt_runtime_t *rt);
34 static void nxt_main_process_title(nxt_task_t *task);
35 static nxt_int_t nxt_main_start_controller_process(nxt_task_t *task,
36     nxt_runtime_t *rt);
37 static nxt_int_t nxt_main_create_controller_process(nxt_task_t *task,
38     nxt_runtime_t *rt, nxt_process_init_t *init);
39 static nxt_int_t nxt_main_start_router_process(nxt_task_t *task,
40     nxt_runtime_t *rt);
41 static nxt_int_t nxt_main_start_discovery_process(nxt_task_t *task,
42     nxt_runtime_t *rt);
43 static nxt_int_t nxt_main_start_worker_process(nxt_task_t *task,
44     nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream);
45 static nxt_int_t nxt_main_create_worker_process(nxt_task_t *task,
46     nxt_runtime_t *rt, nxt_process_init_t *init);
47 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
48     void *data);
49 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
50     void *data);
51 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
52     void *data);
53 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
54     void *data);
55 static void nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid);
56 static void nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt);
57 static void nxt_main_port_socket_handler(nxt_task_t *task,
58     nxt_port_recv_msg_t *msg);
59 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
60     nxt_listening_socket_t *ls);
61 static void nxt_main_port_modules_handler(nxt_task_t *task,
62     nxt_port_recv_msg_t *msg);
63 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
64 static void nxt_main_port_conf_store_handler(nxt_task_t *task,
65     nxt_port_recv_msg_t *msg);
66 static void nxt_main_port_access_log_handler(nxt_task_t *task,
67     nxt_port_recv_msg_t *msg);
68 
69 
70 const nxt_sig_event_t  nxt_main_process_signals[] = {
71     nxt_event_signal(SIGINT,  nxt_main_process_sigterm_handler),
72     nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
73     nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
74     nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
75     nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
76     nxt_event_signal_end,
77 };
78 
79 
80 static nxt_bool_t  nxt_exiting;
81 
82 
83 nxt_int_t
84 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
85     nxt_runtime_t *rt)
86 {
87     rt->type = NXT_PROCESS_MAIN;
88 
89     if (nxt_main_process_port_create(task, rt) != NXT_OK) {
90         return NXT_ERROR;
91     }
92 
93     nxt_main_process_title(task);
94 
95     /*
96      * The dicsovery process will send a message processed by
97      * nxt_main_port_modules_handler() which starts the controller
98      * and router processes.
99      */
100     return nxt_main_start_discovery_process(task, rt);
101 }
102 
103 
104 static nxt_conf_map_t  nxt_common_app_conf[] = {
105     {
106         nxt_string("type"),
107         NXT_CONF_MAP_STR,
108         offsetof(nxt_common_app_conf_t, type),
109     },
110 
111     {
112         nxt_string("user"),
113         NXT_CONF_MAP_STR,
114         offsetof(nxt_common_app_conf_t, user),
115     },
116 
117     {
118         nxt_string("group"),
119         NXT_CONF_MAP_STR,
120         offsetof(nxt_common_app_conf_t, group),
121     },
122 
123     {
124         nxt_string("working_directory"),
125         NXT_CONF_MAP_CSTRZ,
126         offsetof(nxt_common_app_conf_t, working_directory),
127     },
128 
129     {
130         nxt_string("environment"),
131         NXT_CONF_MAP_PTR,
132         offsetof(nxt_common_app_conf_t, environment),
133     },
134 };
135 
136 
137 static nxt_conf_map_t  nxt_external_app_conf[] = {
138     {
139         nxt_string("executable"),
140         NXT_CONF_MAP_CSTRZ,
141         offsetof(nxt_common_app_conf_t, u.external.executable),
142     },
143 
144     {
145         nxt_string("arguments"),
146         NXT_CONF_MAP_PTR,
147         offsetof(nxt_common_app_conf_t, u.external.arguments),
148     },
149 
150 };
151 
152 
153 static nxt_conf_map_t  nxt_python_app_conf[] = {
154     {
155         nxt_string("home"),
156         NXT_CONF_MAP_CSTRZ,
157         offsetof(nxt_common_app_conf_t, u.python.home),
158     },
159 
160     {
161         nxt_string("path"),
162         NXT_CONF_MAP_STR,
163         offsetof(nxt_common_app_conf_t, u.python.path),
164     },
165 
166     {
167         nxt_string("module"),
168         NXT_CONF_MAP_STR,
169         offsetof(nxt_common_app_conf_t, u.python.module),
170     },
171 };
172 
173 
174 static nxt_conf_map_t  nxt_php_app_conf[] = {
175     {
176         nxt_string("root"),
177         NXT_CONF_MAP_CSTRZ,
178         offsetof(nxt_common_app_conf_t, u.php.root),
179     },
180 
181     {
182         nxt_string("script"),
183         NXT_CONF_MAP_STR,
184         offsetof(nxt_common_app_conf_t, u.php.script),
185     },
186 
187     {
188         nxt_string("index"),
189         NXT_CONF_MAP_STR,
190         offsetof(nxt_common_app_conf_t, u.php.index),
191     },
192 
193     {
194         nxt_string("options"),
195         NXT_CONF_MAP_PTR,
196         offsetof(nxt_common_app_conf_t, u.php.options),
197     },
198 };
199 
200 
201 static nxt_conf_map_t  nxt_perl_app_conf[] = {
202     {
203         nxt_string("script"),
204         NXT_CONF_MAP_CSTRZ,
205         offsetof(nxt_common_app_conf_t, u.perl.script),
206     },
207 };
208 
209 
210 static nxt_conf_map_t  nxt_ruby_app_conf[] = {
211     {
212         nxt_string("script"),
213         NXT_CONF_MAP_STR,
214         offsetof(nxt_common_app_conf_t, u.ruby.script),
215     },
216 };
217 
218 
219 static nxt_conf_app_map_t  nxt_app_maps[] = {
220     { nxt_nitems(nxt_external_app_conf),  nxt_external_app_conf },
221     { nxt_nitems(nxt_python_app_conf),    nxt_python_app_conf },
222     { nxt_nitems(nxt_php_app_conf),       nxt_php_app_conf },
223     { nxt_nitems(nxt_perl_app_conf),      nxt_perl_app_conf },
224     { nxt_nitems(nxt_ruby_app_conf),      nxt_ruby_app_conf },
225 };
226 
227 
228 static void
229 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
230 {
231     nxt_debug(task, "main data: %*s",
232               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
233 }
234 
235 
236 static void
237 nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
238 {
239     u_char                 *start, ch;
240     size_t                 type_len;
241     nxt_mp_t               *mp;
242     nxt_int_t              ret;
243     nxt_buf_t              *b;
244     nxt_port_t             *port;
245     nxt_app_type_t         idx;
246     nxt_conf_value_t       *conf;
247     nxt_common_app_conf_t  app_conf;
248 
249     static nxt_str_t nobody = nxt_string("nobody");
250 
251     ret = NXT_ERROR;
252 
253     mp = nxt_mp_create(1024, 128, 256, 32);
254 
255     if (nxt_slow_path(mp == NULL)) {
256         return;
257     }
258 
259     b = nxt_buf_chk_make_plain(mp, msg->buf, msg->size);
260 
261     if (b == NULL) {
262         return;
263     }
264 
265     nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos,
266               b->mem.pos);
267 
268     nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t));
269 
270     start = b->mem.pos;
271 
272     app_conf.name.start = start;
273     app_conf.name.length = nxt_strlen(start);
274 
275     start += app_conf.name.length + 1;
276 
277     conf = nxt_conf_json_parse(mp, start, b->mem.free, NULL);
278 
279     if (conf == NULL) {
280         nxt_alert(task, "router app configuration parsing error");
281 
282         goto failed;
283     }
284 
285     app_conf.user = nobody;
286 
287     ret = nxt_conf_map_object(mp, conf, nxt_common_app_conf,
288                               nxt_nitems(nxt_common_app_conf), &app_conf);
289     if (ret != NXT_OK) {
290         nxt_alert(task, "failed to map common app conf received from router");
291         goto failed;
292     }
293 
294     for (type_len = 0; type_len != app_conf.type.length; type_len++) {
295         ch = app_conf.type.start[type_len];
296 
297         if (ch == ' ' || nxt_isdigit(ch)) {
298             break;
299         }
300     }
301 
302     idx = nxt_app_parse_type(app_conf.type.start, type_len);
303 
304     if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
305         nxt_alert(task, "invalid app type %d received from router", (int) idx);
306         goto failed;
307     }
308 
309     ret = nxt_conf_map_object(mp, conf, nxt_app_maps[idx].map,
310                               nxt_app_maps[idx].size, &app_conf);
311 
312     if (nxt_slow_path(ret != NXT_OK)) {
313         nxt_alert(task, "failed to map app conf received from router");
314         goto failed;
315     }
316 
317     ret = nxt_main_start_worker_process(task, task->thread->runtime,
318                                         &app_conf, msg->port_msg.stream);
319 
320 failed:
321 
322     if (ret == NXT_ERROR) {
323         port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
324                                      msg->port_msg.reply_port);
325         if (nxt_fast_path(port != NULL)) {
326             nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
327                                     -1, msg->port_msg.stream, 0, NULL);
328         }
329     }
330 
331     nxt_mp_destroy(mp);
332 }
333 
334 
335 static nxt_port_handlers_t  nxt_main_process_port_handlers = {
336     .data           = nxt_port_main_data_handler,
337     .process_ready  = nxt_port_process_ready_handler,
338     .start_worker   = nxt_port_main_start_worker_handler,
339     .socket         = nxt_main_port_socket_handler,
340     .modules        = nxt_main_port_modules_handler,
341     .conf_store     = nxt_main_port_conf_store_handler,
342 #if (NXT_TLS)
343     .cert_get       = nxt_cert_store_get_handler,
344     .cert_delete    = nxt_cert_store_delete_handler,
345 #endif
346     .access_log     = nxt_main_port_access_log_handler,
347     .rpc_ready      = nxt_port_rpc_handler,
348     .rpc_error      = nxt_port_rpc_handler,
349 };
350 
351 
352 static nxt_int_t
353 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
354 {
355     nxt_int_t      ret;
356     nxt_port_t     *port;
357     nxt_process_t  *process;
358 
359     process = nxt_runtime_process_get(rt, nxt_pid);
360     if (nxt_slow_path(process == NULL)) {
361         return NXT_ERROR;
362     }
363 
364     port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN);
365     if (nxt_slow_path(port == NULL)) {
366         nxt_process_use(task, process, -1);
367         return NXT_ERROR;
368     }
369 
370     nxt_process_port_add(task, process, port);
371 
372     nxt_process_use(task, process, -1);
373 
374     ret = nxt_port_socket_init(task, port, 0);
375     if (nxt_slow_path(ret != NXT_OK)) {
376         nxt_port_use(task, port, -1);
377         return ret;
378     }
379 
380     nxt_runtime_port_add(task, port);
381 
382     nxt_port_use(task, port, -1);
383 
384     /*
385      * A main process port.  A write port is not closed
386      * since it should be inherited by worker processes.
387      */
388     nxt_port_enable(task, port, &nxt_main_process_port_handlers);
389 
390     process->ready = 1;
391 
392     return NXT_OK;
393 }
394 
395 
396 static void
397 nxt_main_process_title(nxt_task_t *task)
398 {
399     u_char      *p, *end;
400     nxt_uint_t  i;
401     u_char      title[2048];
402 
403     end = title + sizeof(title) - 1;
404 
405     p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
406                     nxt_process_argv[0]);
407 
408     for (i = 1; nxt_process_argv[i] != NULL; i++) {
409         p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
410     }
411 
412     if (p < end) {
413         *p++ = ']';
414     }
415 
416     *p = '\0';
417 
418     nxt_process_title(task, "%s", title);
419 }
420 
421 
422 static nxt_int_t
423 nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
424 {
425     nxt_process_init_t  *init;
426 
427     init = nxt_malloc(sizeof(nxt_process_init_t));
428     if (nxt_slow_path(init == NULL)) {
429         return NXT_ERROR;
430     }
431 
432     init->start = nxt_controller_start;
433     init->name = "controller";
434     init->user_cred = &rt->user_cred;
435     init->port_handlers = &nxt_controller_process_port_handlers;
436     init->signals = nxt_worker_process_signals;
437     init->type = NXT_PROCESS_CONTROLLER;
438     init->stream = 0;
439     init->restart = &nxt_main_create_controller_process;
440 
441     return nxt_main_create_controller_process(task, rt, init);;
442 }
443 
444 
445 static nxt_int_t
446 nxt_main_create_controller_process(nxt_task_t *task, nxt_runtime_t *rt,
447     nxt_process_init_t *init)
448 {
449     ssize_t                n;
450     nxt_int_t              ret;
451     nxt_str_t              *conf;
452     nxt_file_t             file;
453     nxt_file_info_t        fi;
454     nxt_controller_init_t  ctrl_init;
455 
456     nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t));
457 
458     conf = &ctrl_init.conf;
459 
460     nxt_memzero(&file, sizeof(nxt_file_t));
461 
462     file.name = (nxt_file_name_t *) rt->conf;
463 
464     ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0);
465 
466     if (ret == NXT_OK) {
467         ret = nxt_file_info(&file, &fi);
468 
469         if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) {
470             conf->length = nxt_file_size(&fi);
471             conf->start = nxt_malloc(conf->length);
472 
473             if (nxt_slow_path(conf->start == NULL)) {
474                 nxt_file_close(task, &file);
475                 return NXT_ERROR;
476             }
477 
478             n = nxt_file_read(&file, conf->start, conf->length, 0);
479 
480             if (nxt_slow_path(n != (ssize_t) conf->length)) {
481                 nxt_free(conf->start);
482                 conf->start = NULL;
483 
484                 nxt_alert(task, "failed to restore previous configuration: "
485                           "cannot read the file");
486             }
487         }
488 
489         nxt_file_close(task, &file);
490     }
491 
492 #if (NXT_TLS)
493     ctrl_init.certs = nxt_cert_store_load(task);
494 #endif
495 
496     init->data = &ctrl_init;
497 
498     ret = nxt_main_create_worker_process(task, rt, init);
499 
500     if (ret == NXT_OK) {
501         if (conf->start != NULL) {
502             nxt_free(conf->start);
503         }
504 
505 #if (NXT_TLS)
506         if (ctrl_init.certs != NULL) {
507             nxt_cert_store_release(ctrl_init.certs);
508         }
509 #endif
510     }
511 
512     return ret;
513 }
514 
515 
516 static nxt_int_t
517 nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt)
518 {
519     nxt_process_init_t  *init;
520 
521     init = nxt_malloc(sizeof(nxt_process_init_t));
522     if (nxt_slow_path(init == NULL)) {
523         return NXT_ERROR;
524     }
525 
526     init->start = nxt_discovery_start;
527     init->name = "discovery";
528     init->user_cred = &rt->user_cred;
529     init->port_handlers = &nxt_discovery_process_port_handlers;
530     init->signals = nxt_worker_process_signals;
531     init->type = NXT_PROCESS_DISCOVERY;
532     init->data = rt;
533     init->stream = 0;
534     init->restart = NULL;
535 
536     return nxt_main_create_worker_process(task, rt, init);
537 }
538 
539 
540 static nxt_int_t
541 nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt)
542 {
543     nxt_process_init_t  *init;
544 
545     init = nxt_malloc(sizeof(nxt_process_init_t));
546     if (nxt_slow_path(init == NULL)) {
547         return NXT_ERROR;
548     }
549 
550     init->start = nxt_router_start;
551     init->name = "router";
552     init->user_cred = &rt->user_cred;
553     init->port_handlers = &nxt_router_process_port_handlers;
554     init->signals = nxt_worker_process_signals;
555     init->type = NXT_PROCESS_ROUTER;
556     init->data = rt;
557     init->stream = 0;
558     init->restart = &nxt_main_create_worker_process;
559 
560     return nxt_main_create_worker_process(task, rt, init);
561 }
562 
563 
564 static nxt_int_t
565 nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
566     nxt_common_app_conf_t *app_conf, uint32_t stream)
567 {
568     char                *user, *group;
569     u_char              *title, *last, *end;
570     size_t              size;
571     nxt_process_init_t  *init;
572 
573     size = sizeof(nxt_process_init_t)
574            + sizeof(nxt_user_cred_t)
575            + app_conf->user.length + 1
576            + app_conf->group.length + 1
577            + app_conf->name.length + sizeof("\"\" application");
578 
579     init = nxt_malloc(size);
580     if (nxt_slow_path(init == NULL)) {
581         return NXT_ERROR;
582     }
583 
584     init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t));
585     user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t));
586 
587     nxt_memcpy(user, app_conf->user.start, app_conf->user.length);
588     last = nxt_pointer_to(user, app_conf->user.length);
589     *last++ = '\0';
590 
591     init->user_cred->user = user;
592 
593     if (app_conf->group.start != NULL) {
594         group = (char *) last;
595 
596         nxt_memcpy(group, app_conf->group.start, app_conf->group.length);
597         last = nxt_pointer_to(group, app_conf->group.length);
598         *last++ = '\0';
599 
600     } else {
601         group = NULL;
602     }
603 
604     if (nxt_user_cred_get(task, init->user_cred, group) != NXT_OK) {
605         return NXT_ERROR;
606     }
607 
608     title = last;
609     end = title + app_conf->name.length + sizeof("\"\" application");
610 
611     nxt_sprintf(title, end, "\"%V\" application%Z", &app_conf->name);
612 
613     init->start = nxt_app_start;
614     init->name = (char *) title;
615     init->port_handlers = &nxt_app_process_port_handlers;
616     init->signals = nxt_worker_process_signals;
617     init->type = NXT_PROCESS_WORKER;
618     init->data = app_conf;
619     init->stream = stream;
620     init->restart = NULL;
621 
622     return nxt_main_create_worker_process(task, rt, init);
623 }
624 
625 
626 static nxt_int_t
627 nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
628     nxt_process_init_t *init)
629 {
630     nxt_int_t      ret;
631     nxt_pid_t      pid;
632     nxt_port_t     *port;
633     nxt_process_t  *process;
634 
635     /*
636      * TODO: remove process, init, ports from array on memory and fork failures.
637      */
638 
639     process = nxt_runtime_process_new(rt);
640     if (nxt_slow_path(process == NULL)) {
641         return NXT_ERROR;
642     }
643 
644     process->init = init;
645 
646     port = nxt_port_new(task, 0, 0, init->type);
647     if (nxt_slow_path(port == NULL)) {
648         nxt_process_use(task, process, -1);
649         return NXT_ERROR;
650     }
651 
652     nxt_process_port_add(task, process, port);
653 
654     nxt_process_use(task, process, -1);
655 
656     ret = nxt_port_socket_init(task, port, 0);
657     if (nxt_slow_path(ret != NXT_OK)) {
658         nxt_port_use(task, port, -1);
659         return ret;
660     }
661 
662     pid = nxt_process_create(task, process);
663 
664     nxt_port_use(task, port, -1);
665 
666     switch (pid) {
667 
668     case -1:
669         return NXT_ERROR;
670 
671     case 0:
672         /* A worker process, return to the event engine work queue loop. */
673         return NXT_AGAIN;
674 
675     default:
676         /* The main process created a new process. */
677 
678         nxt_port_read_close(port);
679         nxt_port_write_enable(task, port);
680 
681         return NXT_OK;
682     }
683 }
684 
685 
686 void
687 nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
688 {
689     nxt_port_t     *port;
690     nxt_process_t  *process;
691 
692     nxt_runtime_process_each(rt, process) {
693 
694         if (nxt_pid != process->pid) {
695             process->init = NULL;
696 
697             nxt_process_port_each(process, port) {
698 
699                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
700                                              -1, 0, 0, NULL);
701 
702             } nxt_process_port_loop;
703         }
704 
705     } nxt_runtime_process_loop;
706 }
707 
708 
709 
710 static void
711 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
712 {
713     nxt_debug(task, "sigterm handler signo:%d (%s)",
714               (int) (uintptr_t) obj, data);
715 
716     /* TODO: fast exit. */
717 
718     nxt_exiting = 1;
719 
720     nxt_runtime_quit(task, 0);
721 }
722 
723 
724 static void
725 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
726 {
727     nxt_debug(task, "sigquit handler signo:%d (%s)",
728               (int) (uintptr_t) obj, data);
729 
730     /* TODO: graceful exit. */
731 
732     nxt_exiting = 1;
733 
734     nxt_runtime_quit(task, 0);
735 }
736 
737 
738 static void
739 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
740 {
741     nxt_mp_t        *mp;
742     nxt_int_t       ret;
743     nxt_uint_t      n;
744     nxt_port_t      *port;
745     nxt_file_t      *file, *new_file;
746     nxt_array_t     *new_files;
747     nxt_runtime_t   *rt;
748 
749     nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
750             (int) (uintptr_t) obj, data, "log files rotation");
751 
752     rt = task->thread->runtime;
753 
754     port = rt->port_by_type[NXT_PROCESS_ROUTER];
755 
756     if (nxt_fast_path(port != NULL)) {
757         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
758                                      -1, 0, 0, NULL);
759     }
760 
761     mp = nxt_mp_create(1024, 128, 256, 32);
762     if (mp == NULL) {
763         return;
764     }
765 
766     n = nxt_list_nelts(rt->log_files);
767 
768     new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
769     if (new_files == NULL) {
770         nxt_mp_destroy(mp);
771         return;
772     }
773 
774     nxt_list_each(file, rt->log_files) {
775 
776         /* This allocation cannot fail. */
777         new_file = nxt_array_add(new_files);
778 
779         new_file->name = file->name;
780         new_file->fd = NXT_FILE_INVALID;
781         new_file->log_level = NXT_LOG_ALERT;
782 
783         ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
784                             NXT_FILE_OWNER_ACCESS);
785 
786         if (ret != NXT_OK) {
787             goto fail;
788         }
789 
790     } nxt_list_loop;
791 
792     new_file = new_files->elts;
793 
794     ret = nxt_file_stderr(&new_file[0]);
795 
796     if (ret == NXT_OK) {
797         n = 0;
798 
799         nxt_list_each(file, rt->log_files) {
800 
801             nxt_port_change_log_file(task, rt, n, new_file[n].fd);
802             /*
803              * The old log file descriptor must be closed at the moment
804              * when no other threads use it.  dup2() allows to use the
805              * old file descriptor for new log file.  This change is
806              * performed atomically in the kernel.
807              */
808             (void) nxt_file_redirect(file, new_file[n].fd);
809 
810             n++;
811 
812         } nxt_list_loop;
813 
814         nxt_mp_destroy(mp);
815         return;
816    }
817 
818 fail:
819 
820     new_file = new_files->elts;
821     n = new_files->nelts;
822 
823     while (n != 0) {
824         if (new_file->fd != NXT_FILE_INVALID) {
825             nxt_file_close(task, new_file);
826         }
827 
828         new_file++;
829         n--;
830     }
831 
832     nxt_mp_destroy(mp);
833 }
834 
835 
836 static void
837 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
838 {
839     int                    status;
840     nxt_err_t              err;
841     nxt_pid_t              pid;
842 
843     nxt_debug(task, "sigchld handler signo:%d (%s)",
844               (int) (uintptr_t) obj, data);
845 
846     for ( ;; ) {
847         pid = waitpid(-1, &status, WNOHANG);
848 
849         if (pid == -1) {
850 
851             switch (err = nxt_errno) {
852 
853             case NXT_ECHILD:
854                 return;
855 
856             case NXT_EINTR:
857                 continue;
858 
859             default:
860                 nxt_alert(task, "waitpid() failed: %E", err);
861                 return;
862             }
863         }
864 
865         nxt_debug(task, "waitpid(): %PI", pid);
866 
867         if (pid == 0) {
868             return;
869         }
870 
871         if (WTERMSIG(status)) {
872 #ifdef WCOREDUMP
873             nxt_alert(task, "process %PI exited on signal %d%s",
874                       pid, WTERMSIG(status),
875                       WCOREDUMP(status) ? " (core dumped)" : "");
876 #else
877             nxt_alert(task, "process %PI exited on signal %d",
878                       pid, WTERMSIG(status));
879 #endif
880 
881         } else {
882             nxt_trace(task, "process %PI exited with code %d",
883                       pid, WEXITSTATUS(status));
884         }
885 
886         nxt_main_cleanup_worker_process(task, pid);
887     }
888 }
889 
890 
891 static void
892 nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
893 {
894     nxt_buf_t           *buf;
895     nxt_port_t          *port;
896     nxt_runtime_t       *rt;
897     nxt_process_t       *process;
898     nxt_process_type_t  ptype;
899     nxt_process_init_t  *init;
900 
901     rt = task->thread->runtime;
902 
903     process = nxt_runtime_process_find(rt, pid);
904 
905     if (process) {
906         init = process->init;
907 
908         ptype = nxt_process_type(process);
909 
910         if (process->ready && init != NULL) {
911             init->stream = 0;
912         }
913 
914         nxt_process_close_ports(task, process);
915 
916         if (!nxt_exiting) {
917             nxt_runtime_process_each(rt, process) {
918 
919                 if (process->pid == nxt_pid
920                     || process->pid == pid
921                     || nxt_queue_is_empty(&process->ports))
922                 {
923                     continue;
924                 }
925 
926                 port = nxt_process_port_first(process);
927 
928                 if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) {
929                     continue;
930                 }
931 
932                 buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
933                                            sizeof(pid));
934                 if (nxt_slow_path(buf == NULL)) {
935                     continue;
936                 }
937 
938                 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
939 
940                 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID,
941                                       -1, init->stream, 0, buf);
942             } nxt_runtime_process_loop;
943         }
944 
945         if (nxt_exiting) {
946 
947             if (rt->nprocesses == 2) {
948                 nxt_runtime_quit(task, 0);
949             }
950 
951         } else if (init != NULL) {
952             if (init->restart != NULL) {
953                 if (init->type == NXT_PROCESS_ROUTER) {
954                     nxt_main_stop_worker_processes(task, rt);
955                 }
956 
957                 init->restart(task, rt, init);
958 
959             } else {
960                 nxt_free(init);
961             }
962         }
963     }
964 }
965 
966 
967 static void
968 nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt)
969 {
970     nxt_port_t     *port;
971     nxt_process_t  *process;
972 
973     nxt_runtime_process_each(rt, process) {
974 
975         nxt_process_port_each(process, port) {
976 
977             if (port->type == NXT_PROCESS_WORKER) {
978                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
979                                              -1, 0, 0, NULL);
980             }
981 
982         } nxt_process_port_loop;
983 
984     } nxt_runtime_process_loop;
985 }
986 
987 
988 static void
989 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
990 {
991     size_t                  size;
992     nxt_int_t               ret;
993     nxt_buf_t               *b, *out;
994     nxt_port_t              *port;
995     nxt_sockaddr_t          *sa;
996     nxt_port_msg_type_t     type;
997     nxt_listening_socket_t  ls;
998     u_char                  message[2048];
999 
1000     b = msg->buf;
1001     sa = (nxt_sockaddr_t *) b->mem.pos;
1002 
1003     /* TODO check b size and make plain */
1004 
1005     out = NULL;
1006 
1007     ls.socket = -1;
1008     ls.error = NXT_SOCKET_ERROR_SYSTEM;
1009     ls.start = message;
1010     ls.end = message + sizeof(message);
1011 
1012     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1013                                  msg->port_msg.reply_port);
1014 
1015     nxt_debug(task, "listening socket \"%*s\"",
1016               (size_t) sa->length, nxt_sockaddr_start(sa));
1017 
1018     ret = nxt_main_listening_socket(sa, &ls);
1019 
1020     if (ret == NXT_OK) {
1021         nxt_debug(task, "socket(\"%*s\"): %d",
1022                   (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
1023 
1024         type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
1025 
1026     } else {
1027         size = ls.end - ls.start;
1028 
1029         nxt_alert(task, "%*s", size, ls.start);
1030 
1031         out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
1032                                    size + 1);
1033         if (nxt_slow_path(out == NULL)) {
1034             return;
1035         }
1036 
1037         *out->mem.free++ = (uint8_t) ls.error;
1038 
1039         out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
1040 
1041         type = NXT_PORT_MSG_RPC_ERROR;
1042     }
1043 
1044     nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
1045                           0, out);
1046 }
1047 
1048 
1049 static nxt_int_t
1050 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
1051 {
1052     nxt_err_t         err;
1053     nxt_socket_t      s;
1054 
1055     const socklen_t   length = sizeof(int);
1056     static const int  enable = 1;
1057 
1058     s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1059 
1060     if (nxt_slow_path(s == -1)) {
1061         err = nxt_errno;
1062 
1063 #if (NXT_INET6)
1064 
1065         if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1066             ls->error = NXT_SOCKET_ERROR_NOINET6;
1067         }
1068 
1069 #endif
1070 
1071         ls->end = nxt_sprintf(ls->start, ls->end,
1072                               "socket(\\\"%*s\\\") failed %E",
1073                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1074 
1075         return NXT_ERROR;
1076     }
1077 
1078     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1079         ls->end = nxt_sprintf(ls->start, ls->end,
1080                               "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1081                               (size_t) sa->length, nxt_sockaddr_start(sa),
1082                               nxt_errno);
1083         goto fail;
1084     }
1085 
1086 #if (NXT_INET6)
1087 
1088     if (sa->u.sockaddr.sa_family == AF_INET6) {
1089 
1090         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1091             ls->end = nxt_sprintf(ls->start, ls->end,
1092                                "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1093                                (size_t) sa->length, nxt_sockaddr_start(sa),
1094                                nxt_errno);
1095             goto fail;
1096         }
1097     }
1098 
1099 #endif
1100 
1101     if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1102         err = nxt_errno;
1103 
1104 #if (NXT_HAVE_UNIX_DOMAIN)
1105 
1106         if (sa->u.sockaddr.sa_family == AF_UNIX) {
1107             switch (err) {
1108 
1109             case EACCES:
1110                 ls->error = NXT_SOCKET_ERROR_ACCESS;
1111                 break;
1112 
1113             case ENOENT:
1114             case ENOTDIR:
1115                 ls->error = NXT_SOCKET_ERROR_PATH;
1116                 break;
1117             }
1118 
1119             goto next;
1120         }
1121 
1122 #endif
1123 
1124         switch (err) {
1125 
1126         case EACCES:
1127             ls->error = NXT_SOCKET_ERROR_PORT;
1128             break;
1129 
1130         case EADDRINUSE:
1131             ls->error = NXT_SOCKET_ERROR_INUSE;
1132             break;
1133 
1134         case EADDRNOTAVAIL:
1135             ls->error = NXT_SOCKET_ERROR_NOADDR;
1136             break;
1137         }
1138 
1139         ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1140                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1141         goto fail;
1142     }
1143 
1144 #if (NXT_HAVE_UNIX_DOMAIN)
1145 
1146 next:
1147 
1148     if (sa->u.sockaddr.sa_family == AF_UNIX) {
1149         char     *filename;
1150         mode_t   access;
1151 
1152         filename = sa->u.sockaddr_un.sun_path;
1153         access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1154 
1155         if (chmod(filename, access) != 0) {
1156             ls->end = nxt_sprintf(ls->start, ls->end,
1157                                   "chmod(\\\"%s\\\") failed %E",
1158                                   filename, nxt_errno);
1159             goto fail;
1160         }
1161     }
1162 
1163 #endif
1164 
1165     ls->socket = s;
1166 
1167     return NXT_OK;
1168 
1169 fail:
1170 
1171     (void) close(s);
1172 
1173     return NXT_ERROR;
1174 }
1175 
1176 
1177 static nxt_conf_map_t  nxt_app_lang_module_map[] = {
1178     {
1179         nxt_string("type"),
1180         NXT_CONF_MAP_INT,
1181         offsetof(nxt_app_lang_module_t, type),
1182     },
1183 
1184     {
1185         nxt_string("version"),
1186         NXT_CONF_MAP_CSTRZ,
1187         offsetof(nxt_app_lang_module_t, version),
1188     },
1189 
1190     {
1191         nxt_string("file"),
1192         NXT_CONF_MAP_CSTRZ,
1193         offsetof(nxt_app_lang_module_t, file),
1194     },
1195 };
1196 
1197 
1198 static void
1199 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1200 {
1201     uint32_t               index;
1202     nxt_mp_t               *mp;
1203     nxt_int_t              ret;
1204     nxt_buf_t              *b;
1205     nxt_port_t             *port;
1206     nxt_runtime_t          *rt;
1207     nxt_conf_value_t       *conf, *root, *value;
1208     nxt_app_lang_module_t  *lang;
1209 
1210     static nxt_str_t   root_path = nxt_string("/");
1211 
1212     rt = task->thread->runtime;
1213 
1214     if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1215         return;
1216     }
1217 
1218     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1219                                  msg->port_msg.reply_port);
1220 
1221     if (nxt_fast_path(port != NULL)) {
1222         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1223                                      msg->port_msg.stream, 0, NULL);
1224     }
1225 
1226     b = msg->buf;
1227 
1228     if (b == NULL) {
1229         return;
1230     }
1231 
1232     mp = nxt_mp_create(1024, 128, 256, 32);
1233     if (mp == NULL) {
1234         return;
1235     }
1236 
1237     b = nxt_buf_chk_make_plain(mp, b, msg->size);
1238 
1239     if (b == NULL) {
1240         return;
1241     }
1242 
1243     nxt_debug(task, "application languages: \"%*s\"",
1244               b->mem.free - b->mem.pos, b->mem.pos);
1245 
1246     conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1247     if (conf == NULL) {
1248         goto fail;
1249     }
1250 
1251     root = nxt_conf_get_path(conf, &root_path);
1252     if (root == NULL) {
1253         goto fail;
1254     }
1255 
1256     for (index = 0; /* void */ ; index++) {
1257         value = nxt_conf_get_array_element(root, index);
1258         if (value == NULL) {
1259             break;
1260         }
1261 
1262         lang = nxt_array_add(rt->languages);
1263         if (lang == NULL) {
1264             goto fail;
1265         }
1266 
1267         lang->module = NULL;
1268 
1269         ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1270                                   nxt_nitems(nxt_app_lang_module_map), lang);
1271 
1272         if (ret != NXT_OK) {
1273             goto fail;
1274         }
1275 
1276         nxt_debug(task, "lang %d %s \"%s\"",
1277                   lang->type, lang->version, lang->file);
1278     }
1279 
1280     qsort(rt->languages->elts, rt->languages->nelts,
1281           sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1282 
1283 fail:
1284 
1285     nxt_mp_destroy(mp);
1286 
1287     ret = nxt_main_start_controller_process(task, rt);
1288 
1289     if (ret == NXT_OK) {
1290         (void) nxt_main_start_router_process(task, rt);
1291     }
1292 }
1293 
1294 
1295 static int nxt_cdecl
1296 nxt_app_lang_compare(const void *v1, const void *v2)
1297 {
1298     int                          n;
1299     const nxt_app_lang_module_t  *lang1, *lang2;
1300 
1301     lang1 = v1;
1302     lang2 = v2;
1303 
1304     n = lang1->type - lang2->type;
1305 
1306     if (n != 0) {
1307         return n;
1308     }
1309 
1310     n = nxt_strverscmp(lang1->version, lang2->version);
1311 
1312     /* Negate result to move higher versions to the beginning. */
1313 
1314     return -n;
1315 }
1316 
1317 
1318 static void
1319 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1320 {
1321     ssize_t        n, size, offset;
1322     nxt_buf_t      *b;
1323     nxt_int_t      ret;
1324     nxt_file_t     file;
1325     nxt_runtime_t  *rt;
1326 
1327     nxt_memzero(&file, sizeof(nxt_file_t));
1328 
1329     rt = task->thread->runtime;
1330 
1331     file.name = (nxt_file_name_t *) rt->conf_tmp;
1332 
1333     if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY,
1334                                     NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS)
1335                       != NXT_OK))
1336     {
1337         goto error;
1338     }
1339 
1340     offset = 0;
1341 
1342     for (b = msg->buf; b != NULL; b = b->next) {
1343         size = nxt_buf_mem_used_size(&b->mem);
1344 
1345         n = nxt_file_write(&file, b->mem.pos, size, offset);
1346 
1347         if (nxt_slow_path(n != size)) {
1348             nxt_file_close(task, &file);
1349             (void) nxt_file_delete(file.name);
1350             goto error;
1351         }
1352 
1353         offset += n;
1354     }
1355 
1356     nxt_file_close(task, &file);
1357 
1358     ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf);
1359 
1360     if (nxt_fast_path(ret == NXT_OK)) {
1361         return;
1362     }
1363 
1364 error:
1365 
1366     nxt_alert(task, "failed to store current configuration");
1367 }
1368 
1369 
1370 static void
1371 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1372 {
1373     u_char               *path;
1374     nxt_int_t            ret;
1375     nxt_file_t           file;
1376     nxt_port_t           *port;
1377     nxt_port_msg_type_t  type;
1378 
1379     nxt_debug(task, "opening access log file");
1380 
1381     path = msg->buf->mem.pos;
1382 
1383     nxt_memzero(&file, sizeof(nxt_file_t));
1384 
1385     file.name = (nxt_file_name_t *) path;
1386     file.log_level = NXT_LOG_ERR;
1387 
1388     ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1389                         NXT_FILE_OWNER_ACCESS);
1390 
1391     type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1392                            : NXT_PORT_MSG_RPC_ERROR;
1393 
1394     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1395                                  msg->port_msg.reply_port);
1396 
1397     if (nxt_fast_path(port != NULL)) {
1398         (void) nxt_port_socket_write(task, port, type, file.fd,
1399                                      msg->port_msg.stream, 0, NULL);
1400     }
1401 }
1402