xref: /unit/src/nxt_main_process.c (revision 977:4f9268f27b57)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_router.h>
13 #if (NXT_TLS)
14 #include <nxt_cert.h>
15 #endif
16 
17 
18 typedef struct {
19     nxt_socket_t        socket;
20     nxt_socket_error_t  error;
21     u_char              *start;
22     u_char              *end;
23 } nxt_listening_socket_t;
24 
25 
26 typedef struct {
27     nxt_uint_t          size;
28     nxt_conf_map_t      *map;
29 } nxt_conf_app_map_t;
30 
31 
32 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
33     nxt_runtime_t *rt);
34 static void nxt_main_process_title(nxt_task_t *task);
35 static nxt_int_t nxt_main_start_controller_process(nxt_task_t *task,
36     nxt_runtime_t *rt);
37 static nxt_int_t nxt_main_create_controller_process(nxt_task_t *task,
38     nxt_runtime_t *rt, nxt_process_init_t *init);
39 static nxt_int_t nxt_main_start_router_process(nxt_task_t *task,
40     nxt_runtime_t *rt);
41 static nxt_int_t nxt_main_start_discovery_process(nxt_task_t *task,
42     nxt_runtime_t *rt);
43 static nxt_int_t nxt_main_start_worker_process(nxt_task_t *task,
44     nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream);
45 static nxt_int_t nxt_main_create_worker_process(nxt_task_t *task,
46     nxt_runtime_t *rt, nxt_process_init_t *init);
47 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
48     void *data);
49 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
50     void *data);
51 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
52     void *data);
53 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
54     void *data);
55 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
56     void *data);
57 static void nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid);
58 static void nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt);
59 static void nxt_main_port_socket_handler(nxt_task_t *task,
60     nxt_port_recv_msg_t *msg);
61 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
62     nxt_listening_socket_t *ls);
63 static void nxt_main_port_modules_handler(nxt_task_t *task,
64     nxt_port_recv_msg_t *msg);
65 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
66 static void nxt_main_port_conf_store_handler(nxt_task_t *task,
67     nxt_port_recv_msg_t *msg);
68 static void nxt_main_port_access_log_handler(nxt_task_t *task,
69     nxt_port_recv_msg_t *msg);
70 
71 
72 const nxt_sig_event_t  nxt_main_process_signals[] = {
73     nxt_event_signal(SIGHUP,  nxt_main_process_signal_handler),
74     nxt_event_signal(SIGINT,  nxt_main_process_sigterm_handler),
75     nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
76     nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
77     nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
78     nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
79     nxt_event_signal_end,
80 };
81 
82 
83 static nxt_bool_t  nxt_exiting;
84 
85 
86 nxt_int_t
87 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
88     nxt_runtime_t *rt)
89 {
90     rt->type = NXT_PROCESS_MAIN;
91 
92     if (nxt_main_process_port_create(task, rt) != NXT_OK) {
93         return NXT_ERROR;
94     }
95 
96     nxt_main_process_title(task);
97 
98     /*
99      * The dicsovery process will send a message processed by
100      * nxt_main_port_modules_handler() which starts the controller
101      * and router processes.
102      */
103     return nxt_main_start_discovery_process(task, rt);
104 }
105 
106 
107 static nxt_conf_map_t  nxt_common_app_conf[] = {
108     {
109         nxt_string("type"),
110         NXT_CONF_MAP_STR,
111         offsetof(nxt_common_app_conf_t, type),
112     },
113 
114     {
115         nxt_string("user"),
116         NXT_CONF_MAP_STR,
117         offsetof(nxt_common_app_conf_t, user),
118     },
119 
120     {
121         nxt_string("group"),
122         NXT_CONF_MAP_STR,
123         offsetof(nxt_common_app_conf_t, group),
124     },
125 
126     {
127         nxt_string("working_directory"),
128         NXT_CONF_MAP_CSTRZ,
129         offsetof(nxt_common_app_conf_t, working_directory),
130     },
131 
132     {
133         nxt_string("environment"),
134         NXT_CONF_MAP_PTR,
135         offsetof(nxt_common_app_conf_t, environment),
136     },
137 };
138 
139 
140 static nxt_conf_map_t  nxt_external_app_conf[] = {
141     {
142         nxt_string("executable"),
143         NXT_CONF_MAP_CSTRZ,
144         offsetof(nxt_common_app_conf_t, u.external.executable),
145     },
146 
147     {
148         nxt_string("arguments"),
149         NXT_CONF_MAP_PTR,
150         offsetof(nxt_common_app_conf_t, u.external.arguments),
151     },
152 
153 };
154 
155 
156 static nxt_conf_map_t  nxt_python_app_conf[] = {
157     {
158         nxt_string("home"),
159         NXT_CONF_MAP_CSTRZ,
160         offsetof(nxt_common_app_conf_t, u.python.home),
161     },
162 
163     {
164         nxt_string("path"),
165         NXT_CONF_MAP_STR,
166         offsetof(nxt_common_app_conf_t, u.python.path),
167     },
168 
169     {
170         nxt_string("module"),
171         NXT_CONF_MAP_STR,
172         offsetof(nxt_common_app_conf_t, u.python.module),
173     },
174 };
175 
176 
177 static nxt_conf_map_t  nxt_php_app_conf[] = {
178     {
179         nxt_string("root"),
180         NXT_CONF_MAP_CSTRZ,
181         offsetof(nxt_common_app_conf_t, u.php.root),
182     },
183 
184     {
185         nxt_string("script"),
186         NXT_CONF_MAP_STR,
187         offsetof(nxt_common_app_conf_t, u.php.script),
188     },
189 
190     {
191         nxt_string("index"),
192         NXT_CONF_MAP_STR,
193         offsetof(nxt_common_app_conf_t, u.php.index),
194     },
195 
196     {
197         nxt_string("options"),
198         NXT_CONF_MAP_PTR,
199         offsetof(nxt_common_app_conf_t, u.php.options),
200     },
201 };
202 
203 
204 static nxt_conf_map_t  nxt_perl_app_conf[] = {
205     {
206         nxt_string("script"),
207         NXT_CONF_MAP_CSTRZ,
208         offsetof(nxt_common_app_conf_t, u.perl.script),
209     },
210 };
211 
212 
213 static nxt_conf_map_t  nxt_ruby_app_conf[] = {
214     {
215         nxt_string("script"),
216         NXT_CONF_MAP_STR,
217         offsetof(nxt_common_app_conf_t, u.ruby.script),
218     },
219 };
220 
221 
222 static nxt_conf_map_t  nxt_java_app_conf[] = {
223     {
224         nxt_string("classpath"),
225         NXT_CONF_MAP_PTR,
226         offsetof(nxt_common_app_conf_t, u.java.classpath),
227     },
228     {
229         nxt_string("webapp"),
230         NXT_CONF_MAP_CSTRZ,
231         offsetof(nxt_common_app_conf_t, u.java.webapp),
232     },
233     {
234         nxt_string("options"),
235         NXT_CONF_MAP_PTR,
236         offsetof(nxt_common_app_conf_t, u.java.options),
237     },
238     {
239         nxt_string("unit_jars"),
240         NXT_CONF_MAP_CSTRZ,
241         offsetof(nxt_common_app_conf_t, u.java.unit_jars),
242     },
243 
244 };
245 
246 
247 static nxt_conf_app_map_t  nxt_app_maps[] = {
248     { nxt_nitems(nxt_external_app_conf),  nxt_external_app_conf },
249     { nxt_nitems(nxt_python_app_conf),    nxt_python_app_conf },
250     { nxt_nitems(nxt_php_app_conf),       nxt_php_app_conf },
251     { nxt_nitems(nxt_perl_app_conf),      nxt_perl_app_conf },
252     { nxt_nitems(nxt_ruby_app_conf),      nxt_ruby_app_conf },
253     { nxt_nitems(nxt_java_app_conf),      nxt_java_app_conf },
254 };
255 
256 
257 static void
258 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
259 {
260     nxt_debug(task, "main data: %*s",
261               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
262 }
263 
264 
265 static void
266 nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
267 {
268     u_char                 *start, ch;
269     size_t                 type_len;
270     nxt_mp_t               *mp;
271     nxt_int_t              ret;
272     nxt_buf_t              *b;
273     nxt_port_t             *port;
274     nxt_app_type_t         idx;
275     nxt_conf_value_t       *conf;
276     nxt_common_app_conf_t  app_conf;
277 
278     static nxt_str_t nobody = nxt_string("nobody");
279 
280     ret = NXT_ERROR;
281 
282     mp = nxt_mp_create(1024, 128, 256, 32);
283 
284     if (nxt_slow_path(mp == NULL)) {
285         return;
286     }
287 
288     b = nxt_buf_chk_make_plain(mp, msg->buf, msg->size);
289 
290     if (b == NULL) {
291         return;
292     }
293 
294     nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos,
295               b->mem.pos);
296 
297     nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t));
298 
299     start = b->mem.pos;
300 
301     app_conf.name.start = start;
302     app_conf.name.length = nxt_strlen(start);
303 
304     start += app_conf.name.length + 1;
305 
306     conf = nxt_conf_json_parse(mp, start, b->mem.free, NULL);
307 
308     if (conf == NULL) {
309         nxt_alert(task, "router app configuration parsing error");
310 
311         goto failed;
312     }
313 
314     app_conf.user = nobody;
315 
316     ret = nxt_conf_map_object(mp, conf, nxt_common_app_conf,
317                               nxt_nitems(nxt_common_app_conf), &app_conf);
318     if (ret != NXT_OK) {
319         nxt_alert(task, "failed to map common app conf received from router");
320         goto failed;
321     }
322 
323     for (type_len = 0; type_len != app_conf.type.length; type_len++) {
324         ch = app_conf.type.start[type_len];
325 
326         if (ch == ' ' || nxt_isdigit(ch)) {
327             break;
328         }
329     }
330 
331     idx = nxt_app_parse_type(app_conf.type.start, type_len);
332 
333     if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
334         nxt_alert(task, "invalid app type %d received from router", (int) idx);
335         goto failed;
336     }
337 
338     ret = nxt_conf_map_object(mp, conf, nxt_app_maps[idx].map,
339                               nxt_app_maps[idx].size, &app_conf);
340 
341     if (nxt_slow_path(ret != NXT_OK)) {
342         nxt_alert(task, "failed to map app conf received from router");
343         goto failed;
344     }
345 
346     ret = nxt_main_start_worker_process(task, task->thread->runtime,
347                                         &app_conf, msg->port_msg.stream);
348 
349 failed:
350 
351     if (ret == NXT_ERROR) {
352         port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
353                                      msg->port_msg.reply_port);
354         if (nxt_fast_path(port != NULL)) {
355             nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
356                                     -1, msg->port_msg.stream, 0, NULL);
357         }
358     }
359 
360     nxt_mp_destroy(mp);
361 }
362 
363 
364 static nxt_port_handlers_t  nxt_main_process_port_handlers = {
365     .data           = nxt_port_main_data_handler,
366     .process_ready  = nxt_port_process_ready_handler,
367     .start_worker   = nxt_port_main_start_worker_handler,
368     .socket         = nxt_main_port_socket_handler,
369     .modules        = nxt_main_port_modules_handler,
370     .conf_store     = nxt_main_port_conf_store_handler,
371 #if (NXT_TLS)
372     .cert_get       = nxt_cert_store_get_handler,
373     .cert_delete    = nxt_cert_store_delete_handler,
374 #endif
375     .access_log     = nxt_main_port_access_log_handler,
376     .rpc_ready      = nxt_port_rpc_handler,
377     .rpc_error      = nxt_port_rpc_handler,
378 };
379 
380 
381 static nxt_int_t
382 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
383 {
384     nxt_int_t      ret;
385     nxt_port_t     *port;
386     nxt_process_t  *process;
387 
388     process = nxt_runtime_process_get(rt, nxt_pid);
389     if (nxt_slow_path(process == NULL)) {
390         return NXT_ERROR;
391     }
392 
393     port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN);
394     if (nxt_slow_path(port == NULL)) {
395         nxt_process_use(task, process, -1);
396         return NXT_ERROR;
397     }
398 
399     nxt_process_port_add(task, process, port);
400 
401     nxt_process_use(task, process, -1);
402 
403     ret = nxt_port_socket_init(task, port, 0);
404     if (nxt_slow_path(ret != NXT_OK)) {
405         nxt_port_use(task, port, -1);
406         return ret;
407     }
408 
409     nxt_runtime_port_add(task, port);
410 
411     nxt_port_use(task, port, -1);
412 
413     /*
414      * A main process port.  A write port is not closed
415      * since it should be inherited by worker processes.
416      */
417     nxt_port_enable(task, port, &nxt_main_process_port_handlers);
418 
419     process->ready = 1;
420 
421     return NXT_OK;
422 }
423 
424 
425 static void
426 nxt_main_process_title(nxt_task_t *task)
427 {
428     u_char      *p, *end;
429     nxt_uint_t  i;
430     u_char      title[2048];
431 
432     end = title + sizeof(title) - 1;
433 
434     p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
435                     nxt_process_argv[0]);
436 
437     for (i = 1; nxt_process_argv[i] != NULL; i++) {
438         p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
439     }
440 
441     if (p < end) {
442         *p++ = ']';
443     }
444 
445     *p = '\0';
446 
447     nxt_process_title(task, "%s", title);
448 }
449 
450 
451 static nxt_int_t
452 nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
453 {
454     nxt_process_init_t  *init;
455 
456     init = nxt_malloc(sizeof(nxt_process_init_t));
457     if (nxt_slow_path(init == NULL)) {
458         return NXT_ERROR;
459     }
460 
461     init->start = nxt_controller_start;
462     init->name = "controller";
463     init->user_cred = &rt->user_cred;
464     init->port_handlers = &nxt_controller_process_port_handlers;
465     init->signals = nxt_worker_process_signals;
466     init->type = NXT_PROCESS_CONTROLLER;
467     init->stream = 0;
468     init->restart = &nxt_main_create_controller_process;
469 
470     return nxt_main_create_controller_process(task, rt, init);;
471 }
472 
473 
474 static nxt_int_t
475 nxt_main_create_controller_process(nxt_task_t *task, nxt_runtime_t *rt,
476     nxt_process_init_t *init)
477 {
478     ssize_t                n;
479     nxt_int_t              ret;
480     nxt_str_t              *conf;
481     nxt_file_t             file;
482     nxt_file_info_t        fi;
483     nxt_controller_init_t  ctrl_init;
484 
485     nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t));
486 
487     conf = &ctrl_init.conf;
488 
489     nxt_memzero(&file, sizeof(nxt_file_t));
490 
491     file.name = (nxt_file_name_t *) rt->conf;
492 
493     ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0);
494 
495     if (ret == NXT_OK) {
496         ret = nxt_file_info(&file, &fi);
497 
498         if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) {
499             conf->length = nxt_file_size(&fi);
500             conf->start = nxt_malloc(conf->length);
501 
502             if (nxt_slow_path(conf->start == NULL)) {
503                 nxt_file_close(task, &file);
504                 return NXT_ERROR;
505             }
506 
507             n = nxt_file_read(&file, conf->start, conf->length, 0);
508 
509             if (nxt_slow_path(n != (ssize_t) conf->length)) {
510                 nxt_free(conf->start);
511                 conf->start = NULL;
512 
513                 nxt_alert(task, "failed to restore previous configuration: "
514                           "cannot read the file");
515             }
516         }
517 
518         nxt_file_close(task, &file);
519     }
520 
521 #if (NXT_TLS)
522     ctrl_init.certs = nxt_cert_store_load(task);
523 #endif
524 
525     init->data = &ctrl_init;
526 
527     ret = nxt_main_create_worker_process(task, rt, init);
528 
529     if (ret == NXT_OK) {
530         if (conf->start != NULL) {
531             nxt_free(conf->start);
532         }
533 
534 #if (NXT_TLS)
535         if (ctrl_init.certs != NULL) {
536             nxt_cert_store_release(ctrl_init.certs);
537         }
538 #endif
539     }
540 
541     return ret;
542 }
543 
544 
545 static nxt_int_t
546 nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt)
547 {
548     nxt_process_init_t  *init;
549 
550     init = nxt_malloc(sizeof(nxt_process_init_t));
551     if (nxt_slow_path(init == NULL)) {
552         return NXT_ERROR;
553     }
554 
555     init->start = nxt_discovery_start;
556     init->name = "discovery";
557     init->user_cred = &rt->user_cred;
558     init->port_handlers = &nxt_discovery_process_port_handlers;
559     init->signals = nxt_worker_process_signals;
560     init->type = NXT_PROCESS_DISCOVERY;
561     init->data = rt;
562     init->stream = 0;
563     init->restart = NULL;
564 
565     return nxt_main_create_worker_process(task, rt, init);
566 }
567 
568 
569 static nxt_int_t
570 nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt)
571 {
572     nxt_process_init_t  *init;
573 
574     init = nxt_malloc(sizeof(nxt_process_init_t));
575     if (nxt_slow_path(init == NULL)) {
576         return NXT_ERROR;
577     }
578 
579     init->start = nxt_router_start;
580     init->name = "router";
581     init->user_cred = &rt->user_cred;
582     init->port_handlers = &nxt_router_process_port_handlers;
583     init->signals = nxt_worker_process_signals;
584     init->type = NXT_PROCESS_ROUTER;
585     init->data = rt;
586     init->stream = 0;
587     init->restart = &nxt_main_create_worker_process;
588 
589     return nxt_main_create_worker_process(task, rt, init);
590 }
591 
592 
593 static nxt_int_t
594 nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
595     nxt_common_app_conf_t *app_conf, uint32_t stream)
596 {
597     char                *user, *group;
598     u_char              *title, *last, *end;
599     size_t              size;
600     nxt_process_init_t  *init;
601 
602     size = sizeof(nxt_process_init_t)
603            + sizeof(nxt_user_cred_t)
604            + app_conf->user.length + 1
605            + app_conf->group.length + 1
606            + app_conf->name.length + sizeof("\"\" application");
607 
608     init = nxt_malloc(size);
609     if (nxt_slow_path(init == NULL)) {
610         return NXT_ERROR;
611     }
612 
613     init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t));
614     user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t));
615 
616     nxt_memcpy(user, app_conf->user.start, app_conf->user.length);
617     last = nxt_pointer_to(user, app_conf->user.length);
618     *last++ = '\0';
619 
620     init->user_cred->user = user;
621 
622     if (app_conf->group.start != NULL) {
623         group = (char *) last;
624 
625         nxt_memcpy(group, app_conf->group.start, app_conf->group.length);
626         last = nxt_pointer_to(group, app_conf->group.length);
627         *last++ = '\0';
628 
629     } else {
630         group = NULL;
631     }
632 
633     if (nxt_user_cred_get(task, init->user_cred, group) != NXT_OK) {
634         return NXT_ERROR;
635     }
636 
637     title = last;
638     end = title + app_conf->name.length + sizeof("\"\" application");
639 
640     nxt_sprintf(title, end, "\"%V\" application%Z", &app_conf->name);
641 
642     init->start = nxt_app_start;
643     init->name = (char *) title;
644     init->port_handlers = &nxt_app_process_port_handlers;
645     init->signals = nxt_worker_process_signals;
646     init->type = NXT_PROCESS_WORKER;
647     init->data = app_conf;
648     init->stream = stream;
649     init->restart = NULL;
650 
651     return nxt_main_create_worker_process(task, rt, init);
652 }
653 
654 
655 static nxt_int_t
656 nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
657     nxt_process_init_t *init)
658 {
659     nxt_int_t      ret;
660     nxt_pid_t      pid;
661     nxt_port_t     *port;
662     nxt_process_t  *process;
663 
664     /*
665      * TODO: remove process, init, ports from array on memory and fork failures.
666      */
667 
668     process = nxt_runtime_process_new(rt);
669     if (nxt_slow_path(process == NULL)) {
670         return NXT_ERROR;
671     }
672 
673     process->init = init;
674 
675     port = nxt_port_new(task, 0, 0, init->type);
676     if (nxt_slow_path(port == NULL)) {
677         nxt_process_use(task, process, -1);
678         return NXT_ERROR;
679     }
680 
681     nxt_process_port_add(task, process, port);
682 
683     nxt_process_use(task, process, -1);
684 
685     ret = nxt_port_socket_init(task, port, 0);
686     if (nxt_slow_path(ret != NXT_OK)) {
687         nxt_port_use(task, port, -1);
688         return ret;
689     }
690 
691     pid = nxt_process_create(task, process);
692 
693     nxt_port_use(task, port, -1);
694 
695     switch (pid) {
696 
697     case -1:
698         return NXT_ERROR;
699 
700     case 0:
701         /* A worker process, return to the event engine work queue loop. */
702         return NXT_AGAIN;
703 
704     default:
705         /* The main process created a new process. */
706 
707         nxt_port_read_close(port);
708         nxt_port_write_enable(task, port);
709 
710         return NXT_OK;
711     }
712 }
713 
714 
715 void
716 nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
717 {
718     nxt_port_t     *port;
719     nxt_process_t  *process;
720 
721     nxt_runtime_process_each(rt, process) {
722 
723         if (nxt_pid != process->pid) {
724             process->init = NULL;
725 
726             nxt_process_port_each(process, port) {
727 
728                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
729                                              -1, 0, 0, NULL);
730 
731             } nxt_process_port_loop;
732         }
733 
734     } nxt_runtime_process_loop;
735 }
736 
737 
738 
739 static void
740 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
741 {
742     nxt_debug(task, "sigterm handler signo:%d (%s)",
743               (int) (uintptr_t) obj, data);
744 
745     /* TODO: fast exit. */
746 
747     nxt_exiting = 1;
748 
749     nxt_runtime_quit(task, 0);
750 }
751 
752 
753 static void
754 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
755 {
756     nxt_debug(task, "sigquit handler signo:%d (%s)",
757               (int) (uintptr_t) obj, data);
758 
759     /* TODO: graceful exit. */
760 
761     nxt_exiting = 1;
762 
763     nxt_runtime_quit(task, 0);
764 }
765 
766 
767 static void
768 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
769 {
770     nxt_mp_t        *mp;
771     nxt_int_t       ret;
772     nxt_uint_t      n;
773     nxt_port_t      *port;
774     nxt_file_t      *file, *new_file;
775     nxt_array_t     *new_files;
776     nxt_runtime_t   *rt;
777 
778     nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
779             (int) (uintptr_t) obj, data, "log files rotation");
780 
781     rt = task->thread->runtime;
782 
783     port = rt->port_by_type[NXT_PROCESS_ROUTER];
784 
785     if (nxt_fast_path(port != NULL)) {
786         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
787                                      -1, 0, 0, NULL);
788     }
789 
790     mp = nxt_mp_create(1024, 128, 256, 32);
791     if (mp == NULL) {
792         return;
793     }
794 
795     n = nxt_list_nelts(rt->log_files);
796 
797     new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
798     if (new_files == NULL) {
799         nxt_mp_destroy(mp);
800         return;
801     }
802 
803     nxt_list_each(file, rt->log_files) {
804 
805         /* This allocation cannot fail. */
806         new_file = nxt_array_add(new_files);
807 
808         new_file->name = file->name;
809         new_file->fd = NXT_FILE_INVALID;
810         new_file->log_level = NXT_LOG_ALERT;
811 
812         ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
813                             NXT_FILE_OWNER_ACCESS);
814 
815         if (ret != NXT_OK) {
816             goto fail;
817         }
818 
819     } nxt_list_loop;
820 
821     new_file = new_files->elts;
822 
823     ret = nxt_file_stderr(&new_file[0]);
824 
825     if (ret == NXT_OK) {
826         n = 0;
827 
828         nxt_list_each(file, rt->log_files) {
829 
830             nxt_port_change_log_file(task, rt, n, new_file[n].fd);
831             /*
832              * The old log file descriptor must be closed at the moment
833              * when no other threads use it.  dup2() allows to use the
834              * old file descriptor for new log file.  This change is
835              * performed atomically in the kernel.
836              */
837             (void) nxt_file_redirect(file, new_file[n].fd);
838 
839             n++;
840 
841         } nxt_list_loop;
842 
843         nxt_mp_destroy(mp);
844         return;
845    }
846 
847 fail:
848 
849     new_file = new_files->elts;
850     n = new_files->nelts;
851 
852     while (n != 0) {
853         if (new_file->fd != NXT_FILE_INVALID) {
854             nxt_file_close(task, new_file);
855         }
856 
857         new_file++;
858         n--;
859     }
860 
861     nxt_mp_destroy(mp);
862 }
863 
864 
865 static void
866 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
867 {
868     int                    status;
869     nxt_err_t              err;
870     nxt_pid_t              pid;
871 
872     nxt_debug(task, "sigchld handler signo:%d (%s)",
873               (int) (uintptr_t) obj, data);
874 
875     for ( ;; ) {
876         pid = waitpid(-1, &status, WNOHANG);
877 
878         if (pid == -1) {
879 
880             switch (err = nxt_errno) {
881 
882             case NXT_ECHILD:
883                 return;
884 
885             case NXT_EINTR:
886                 continue;
887 
888             default:
889                 nxt_alert(task, "waitpid() failed: %E", err);
890                 return;
891             }
892         }
893 
894         nxt_debug(task, "waitpid(): %PI", pid);
895 
896         if (pid == 0) {
897             return;
898         }
899 
900         if (WTERMSIG(status)) {
901 #ifdef WCOREDUMP
902             nxt_alert(task, "process %PI exited on signal %d%s",
903                       pid, WTERMSIG(status),
904                       WCOREDUMP(status) ? " (core dumped)" : "");
905 #else
906             nxt_alert(task, "process %PI exited on signal %d",
907                       pid, WTERMSIG(status));
908 #endif
909 
910         } else {
911             nxt_trace(task, "process %PI exited with code %d",
912                       pid, WEXITSTATUS(status));
913         }
914 
915         nxt_main_cleanup_worker_process(task, pid);
916     }
917 }
918 
919 
920 static void
921 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
922 {
923     nxt_trace(task, "signal signo:%d (%s) recevied, ignored",
924               (int) (uintptr_t) obj, data);
925 }
926 
927 
928 static void
929 nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
930 {
931     nxt_buf_t           *buf;
932     nxt_port_t          *port;
933     nxt_runtime_t       *rt;
934     nxt_process_t       *process;
935     nxt_process_type_t  ptype;
936     nxt_process_init_t  *init;
937 
938     rt = task->thread->runtime;
939 
940     process = nxt_runtime_process_find(rt, pid);
941 
942     if (process) {
943         init = process->init;
944 
945         ptype = nxt_process_type(process);
946 
947         if (process->ready && init != NULL) {
948             init->stream = 0;
949         }
950 
951         nxt_process_close_ports(task, process);
952 
953         if (!nxt_exiting) {
954             nxt_runtime_process_each(rt, process) {
955 
956                 if (process->pid == nxt_pid
957                     || process->pid == pid
958                     || nxt_queue_is_empty(&process->ports))
959                 {
960                     continue;
961                 }
962 
963                 port = nxt_process_port_first(process);
964 
965                 if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) {
966                     continue;
967                 }
968 
969                 buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
970                                            sizeof(pid));
971                 if (nxt_slow_path(buf == NULL)) {
972                     continue;
973                 }
974 
975                 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
976 
977                 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID,
978                                       -1, init->stream, 0, buf);
979             } nxt_runtime_process_loop;
980         }
981 
982         if (nxt_exiting) {
983 
984             if (rt->nprocesses == 2) {
985                 nxt_runtime_quit(task, 0);
986             }
987 
988         } else if (init != NULL) {
989             if (init->restart != NULL) {
990                 if (init->type == NXT_PROCESS_ROUTER) {
991                     nxt_main_stop_worker_processes(task, rt);
992                 }
993 
994                 init->restart(task, rt, init);
995 
996             } else {
997                 nxt_free(init);
998             }
999         }
1000     }
1001 }
1002 
1003 
1004 static void
1005 nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt)
1006 {
1007     nxt_port_t     *port;
1008     nxt_process_t  *process;
1009 
1010     nxt_runtime_process_each(rt, process) {
1011 
1012         nxt_process_port_each(process, port) {
1013 
1014             if (port->type == NXT_PROCESS_WORKER) {
1015                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
1016                                              -1, 0, 0, NULL);
1017             }
1018 
1019         } nxt_process_port_loop;
1020 
1021     } nxt_runtime_process_loop;
1022 }
1023 
1024 
1025 static void
1026 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1027 {
1028     size_t                  size;
1029     nxt_int_t               ret;
1030     nxt_buf_t               *b, *out;
1031     nxt_port_t              *port;
1032     nxt_sockaddr_t          *sa;
1033     nxt_port_msg_type_t     type;
1034     nxt_listening_socket_t  ls;
1035     u_char                  message[2048];
1036 
1037     b = msg->buf;
1038     sa = (nxt_sockaddr_t *) b->mem.pos;
1039 
1040     /* TODO check b size and make plain */
1041 
1042     out = NULL;
1043 
1044     ls.socket = -1;
1045     ls.error = NXT_SOCKET_ERROR_SYSTEM;
1046     ls.start = message;
1047     ls.end = message + sizeof(message);
1048 
1049     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1050                                  msg->port_msg.reply_port);
1051 
1052     nxt_debug(task, "listening socket \"%*s\"",
1053               (size_t) sa->length, nxt_sockaddr_start(sa));
1054 
1055     ret = nxt_main_listening_socket(sa, &ls);
1056 
1057     if (ret == NXT_OK) {
1058         nxt_debug(task, "socket(\"%*s\"): %d",
1059                   (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
1060 
1061         type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
1062 
1063     } else {
1064         size = ls.end - ls.start;
1065 
1066         nxt_alert(task, "%*s", size, ls.start);
1067 
1068         out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
1069                                    size + 1);
1070         if (nxt_slow_path(out == NULL)) {
1071             return;
1072         }
1073 
1074         *out->mem.free++ = (uint8_t) ls.error;
1075 
1076         out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
1077 
1078         type = NXT_PORT_MSG_RPC_ERROR;
1079     }
1080 
1081     nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
1082                           0, out);
1083 }
1084 
1085 
1086 static nxt_int_t
1087 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
1088 {
1089     nxt_err_t         err;
1090     nxt_socket_t      s;
1091 
1092     const socklen_t   length = sizeof(int);
1093     static const int  enable = 1;
1094 
1095     s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1096 
1097     if (nxt_slow_path(s == -1)) {
1098         err = nxt_errno;
1099 
1100 #if (NXT_INET6)
1101 
1102         if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1103             ls->error = NXT_SOCKET_ERROR_NOINET6;
1104         }
1105 
1106 #endif
1107 
1108         ls->end = nxt_sprintf(ls->start, ls->end,
1109                               "socket(\\\"%*s\\\") failed %E",
1110                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1111 
1112         return NXT_ERROR;
1113     }
1114 
1115     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1116         ls->end = nxt_sprintf(ls->start, ls->end,
1117                               "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1118                               (size_t) sa->length, nxt_sockaddr_start(sa),
1119                               nxt_errno);
1120         goto fail;
1121     }
1122 
1123 #if (NXT_INET6)
1124 
1125     if (sa->u.sockaddr.sa_family == AF_INET6) {
1126 
1127         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1128             ls->end = nxt_sprintf(ls->start, ls->end,
1129                                "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1130                                (size_t) sa->length, nxt_sockaddr_start(sa),
1131                                nxt_errno);
1132             goto fail;
1133         }
1134     }
1135 
1136 #endif
1137 
1138     if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1139         err = nxt_errno;
1140 
1141 #if (NXT_HAVE_UNIX_DOMAIN)
1142 
1143         if (sa->u.sockaddr.sa_family == AF_UNIX) {
1144             switch (err) {
1145 
1146             case EACCES:
1147                 ls->error = NXT_SOCKET_ERROR_ACCESS;
1148                 break;
1149 
1150             case ENOENT:
1151             case ENOTDIR:
1152                 ls->error = NXT_SOCKET_ERROR_PATH;
1153                 break;
1154             }
1155 
1156         } else
1157 #endif
1158         {
1159             switch (err) {
1160 
1161             case EACCES:
1162                 ls->error = NXT_SOCKET_ERROR_PORT;
1163                 break;
1164 
1165             case EADDRINUSE:
1166                 ls->error = NXT_SOCKET_ERROR_INUSE;
1167                 break;
1168 
1169             case EADDRNOTAVAIL:
1170                 ls->error = NXT_SOCKET_ERROR_NOADDR;
1171                 break;
1172             }
1173         }
1174 
1175         ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1176                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1177         goto fail;
1178     }
1179 
1180 #if (NXT_HAVE_UNIX_DOMAIN)
1181 
1182     if (sa->u.sockaddr.sa_family == AF_UNIX) {
1183         char     *filename;
1184         mode_t   access;
1185 
1186         filename = sa->u.sockaddr_un.sun_path;
1187         access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1188 
1189         if (chmod(filename, access) != 0) {
1190             ls->end = nxt_sprintf(ls->start, ls->end,
1191                                   "chmod(\\\"%s\\\") failed %E",
1192                                   filename, nxt_errno);
1193             goto fail;
1194         }
1195     }
1196 
1197 #endif
1198 
1199     ls->socket = s;
1200 
1201     return NXT_OK;
1202 
1203 fail:
1204 
1205     (void) close(s);
1206 
1207     return NXT_ERROR;
1208 }
1209 
1210 
1211 static nxt_conf_map_t  nxt_app_lang_module_map[] = {
1212     {
1213         nxt_string("type"),
1214         NXT_CONF_MAP_INT,
1215         offsetof(nxt_app_lang_module_t, type),
1216     },
1217 
1218     {
1219         nxt_string("version"),
1220         NXT_CONF_MAP_CSTRZ,
1221         offsetof(nxt_app_lang_module_t, version),
1222     },
1223 
1224     {
1225         nxt_string("file"),
1226         NXT_CONF_MAP_CSTRZ,
1227         offsetof(nxt_app_lang_module_t, file),
1228     },
1229 };
1230 
1231 
1232 static void
1233 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1234 {
1235     uint32_t               index;
1236     nxt_mp_t               *mp;
1237     nxt_int_t              ret;
1238     nxt_buf_t              *b;
1239     nxt_port_t             *port;
1240     nxt_runtime_t          *rt;
1241     nxt_conf_value_t       *conf, *root, *value;
1242     nxt_app_lang_module_t  *lang;
1243 
1244     static nxt_str_t   root_path = nxt_string("/");
1245 
1246     rt = task->thread->runtime;
1247 
1248     if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1249         return;
1250     }
1251 
1252     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1253                                  msg->port_msg.reply_port);
1254 
1255     if (nxt_fast_path(port != NULL)) {
1256         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1257                                      msg->port_msg.stream, 0, NULL);
1258     }
1259 
1260     b = msg->buf;
1261 
1262     if (b == NULL) {
1263         return;
1264     }
1265 
1266     mp = nxt_mp_create(1024, 128, 256, 32);
1267     if (mp == NULL) {
1268         return;
1269     }
1270 
1271     b = nxt_buf_chk_make_plain(mp, b, msg->size);
1272 
1273     if (b == NULL) {
1274         return;
1275     }
1276 
1277     nxt_debug(task, "application languages: \"%*s\"",
1278               b->mem.free - b->mem.pos, b->mem.pos);
1279 
1280     conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1281     if (conf == NULL) {
1282         goto fail;
1283     }
1284 
1285     root = nxt_conf_get_path(conf, &root_path);
1286     if (root == NULL) {
1287         goto fail;
1288     }
1289 
1290     for (index = 0; /* void */ ; index++) {
1291         value = nxt_conf_get_array_element(root, index);
1292         if (value == NULL) {
1293             break;
1294         }
1295 
1296         lang = nxt_array_add(rt->languages);
1297         if (lang == NULL) {
1298             goto fail;
1299         }
1300 
1301         lang->module = NULL;
1302 
1303         ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1304                                   nxt_nitems(nxt_app_lang_module_map), lang);
1305 
1306         if (ret != NXT_OK) {
1307             goto fail;
1308         }
1309 
1310         nxt_debug(task, "lang %d %s \"%s\"",
1311                   lang->type, lang->version, lang->file);
1312     }
1313 
1314     qsort(rt->languages->elts, rt->languages->nelts,
1315           sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1316 
1317 fail:
1318 
1319     nxt_mp_destroy(mp);
1320 
1321     ret = nxt_main_start_controller_process(task, rt);
1322 
1323     if (ret == NXT_OK) {
1324         (void) nxt_main_start_router_process(task, rt);
1325     }
1326 }
1327 
1328 
1329 static int nxt_cdecl
1330 nxt_app_lang_compare(const void *v1, const void *v2)
1331 {
1332     int                          n;
1333     const nxt_app_lang_module_t  *lang1, *lang2;
1334 
1335     lang1 = v1;
1336     lang2 = v2;
1337 
1338     n = lang1->type - lang2->type;
1339 
1340     if (n != 0) {
1341         return n;
1342     }
1343 
1344     n = nxt_strverscmp(lang1->version, lang2->version);
1345 
1346     /* Negate result to move higher versions to the beginning. */
1347 
1348     return -n;
1349 }
1350 
1351 
1352 static void
1353 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1354 {
1355     ssize_t        n, size, offset;
1356     nxt_buf_t      *b;
1357     nxt_int_t      ret;
1358     nxt_file_t     file;
1359     nxt_runtime_t  *rt;
1360 
1361     nxt_memzero(&file, sizeof(nxt_file_t));
1362 
1363     rt = task->thread->runtime;
1364 
1365     file.name = (nxt_file_name_t *) rt->conf_tmp;
1366 
1367     if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY,
1368                                     NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS)
1369                       != NXT_OK))
1370     {
1371         goto error;
1372     }
1373 
1374     offset = 0;
1375 
1376     for (b = msg->buf; b != NULL; b = b->next) {
1377         size = nxt_buf_mem_used_size(&b->mem);
1378 
1379         n = nxt_file_write(&file, b->mem.pos, size, offset);
1380 
1381         if (nxt_slow_path(n != size)) {
1382             nxt_file_close(task, &file);
1383             (void) nxt_file_delete(file.name);
1384             goto error;
1385         }
1386 
1387         offset += n;
1388     }
1389 
1390     nxt_file_close(task, &file);
1391 
1392     ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf);
1393 
1394     if (nxt_fast_path(ret == NXT_OK)) {
1395         return;
1396     }
1397 
1398 error:
1399 
1400     nxt_alert(task, "failed to store current configuration");
1401 }
1402 
1403 
1404 static void
1405 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1406 {
1407     u_char               *path;
1408     nxt_int_t            ret;
1409     nxt_file_t           file;
1410     nxt_port_t           *port;
1411     nxt_port_msg_type_t  type;
1412 
1413     nxt_debug(task, "opening access log file");
1414 
1415     path = msg->buf->mem.pos;
1416 
1417     nxt_memzero(&file, sizeof(nxt_file_t));
1418 
1419     file.name = (nxt_file_name_t *) path;
1420     file.log_level = NXT_LOG_ERR;
1421 
1422     ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1423                         NXT_FILE_OWNER_ACCESS);
1424 
1425     type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1426                            : NXT_PORT_MSG_RPC_ERROR;
1427 
1428     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1429                                  msg->port_msg.reply_port);
1430 
1431     if (nxt_fast_path(port != NULL)) {
1432         (void) nxt_port_socket_write(task, port, type, file.fd,
1433                                      msg->port_msg.stream, 0, NULL);
1434     }
1435 }
1436