xref: /unit/src/nxt_main_process.c (revision 2686:87259ed41698)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_router.h>
13 #include <nxt_port_queue.h>
14 #if (NXT_TLS)
15 #include <nxt_cert.h>
16 #endif
17 #if (NXT_HAVE_NJS)
18 #include <nxt_script.h>
19 #endif
20 
21 #include <sys/mount.h>
22 
23 
24 typedef struct {
25     nxt_socket_t        socket;
26     nxt_socket_error_t  error;
27     u_char              *start;
28     u_char              *end;
29 } nxt_listening_socket_t;
30 
31 
32 typedef struct {
33     nxt_uint_t          size;
34     nxt_conf_map_t      *map;
35 } nxt_conf_app_map_t;
36 
37 
38 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
39     nxt_runtime_t *rt);
40 static void nxt_main_process_title(nxt_task_t *task);
41 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
42     void *data);
43 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
44     void *data);
45 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
46     void *data);
47 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
48     void *data);
49 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
50     void *data);
51 static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process);
52 static void nxt_main_port_socket_handler(nxt_task_t *task,
53     nxt_port_recv_msg_t *msg);
54 static void nxt_main_port_socket_unlink_handler(nxt_task_t *task,
55     nxt_port_recv_msg_t *msg);
56 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
57     nxt_listening_socket_t *ls);
58 static void nxt_main_port_modules_handler(nxt_task_t *task,
59     nxt_port_recv_msg_t *msg);
60 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
61 static void nxt_main_process_whoami_handler(nxt_task_t *task,
62     nxt_port_recv_msg_t *msg);
63 static void nxt_main_port_conf_store_handler(nxt_task_t *task,
64     nxt_port_recv_msg_t *msg);
65 static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name,
66     const char *name, u_char *buf, size_t size);
67 static void nxt_main_port_access_log_handler(nxt_task_t *task,
68     nxt_port_recv_msg_t *msg);
69 
70 const nxt_sig_event_t  nxt_main_process_signals[] = {
71     nxt_event_signal(SIGHUP,  nxt_main_process_signal_handler),
72     nxt_event_signal(SIGINT,  nxt_main_process_sigterm_handler),
73     nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
74     nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
75     nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
76     nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
77     nxt_event_signal_end,
78 };
79 
80 
81 nxt_uint_t  nxt_conf_ver;
82 
83 static nxt_bool_t  nxt_exiting;
84 
85 
86 nxt_int_t
nxt_main_process_start(nxt_thread_t * thr,nxt_task_t * task,nxt_runtime_t * rt)87 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
88     nxt_runtime_t *rt)
89 {
90     rt->type = NXT_PROCESS_MAIN;
91 
92     if (nxt_main_process_port_create(task, rt) != NXT_OK) {
93         return NXT_ERROR;
94     }
95 
96     nxt_main_process_title(task);
97 
98     /*
99      * The discovery process will send a message processed by
100      * nxt_main_port_modules_handler() which starts the controller
101      * and router processes.
102      */
103     return nxt_process_init_start(task, nxt_discovery_process);
104 }
105 
106 
107 static nxt_conf_map_t  nxt_common_app_conf[] = {
108     {
109         nxt_string("type"),
110         NXT_CONF_MAP_STR,
111         offsetof(nxt_common_app_conf_t, type),
112     },
113 
114     {
115         nxt_string("user"),
116         NXT_CONF_MAP_STR,
117         offsetof(nxt_common_app_conf_t, user),
118     },
119 
120     {
121         nxt_string("group"),
122         NXT_CONF_MAP_STR,
123         offsetof(nxt_common_app_conf_t, group),
124     },
125 
126     {
127         nxt_string("stdout"),
128         NXT_CONF_MAP_CSTRZ,
129         offsetof(nxt_common_app_conf_t, stdout_log),
130     },
131 
132     {
133         nxt_string("stderr"),
134         NXT_CONF_MAP_CSTRZ,
135         offsetof(nxt_common_app_conf_t, stderr_log),
136     },
137 
138     {
139         nxt_string("working_directory"),
140         NXT_CONF_MAP_CSTRZ,
141         offsetof(nxt_common_app_conf_t, working_directory),
142     },
143 
144     {
145         nxt_string("environment"),
146         NXT_CONF_MAP_PTR,
147         offsetof(nxt_common_app_conf_t, environment),
148     },
149 
150     {
151         nxt_string("isolation"),
152         NXT_CONF_MAP_PTR,
153         offsetof(nxt_common_app_conf_t, isolation),
154     },
155 
156     {
157         nxt_string("limits"),
158         NXT_CONF_MAP_PTR,
159         offsetof(nxt_common_app_conf_t, limits),
160     },
161 
162 };
163 
164 
165 static nxt_conf_map_t  nxt_common_app_limits_conf[] = {
166     {
167         nxt_string("shm"),
168         NXT_CONF_MAP_SIZE,
169         offsetof(nxt_common_app_conf_t, shm_limit),
170     },
171 
172     {
173         nxt_string("requests"),
174         NXT_CONF_MAP_INT32,
175         offsetof(nxt_common_app_conf_t, request_limit),
176     },
177 
178 };
179 
180 
181 static nxt_conf_map_t  nxt_external_app_conf[] = {
182     {
183         nxt_string("executable"),
184         NXT_CONF_MAP_CSTRZ,
185         offsetof(nxt_common_app_conf_t, u.external.executable),
186     },
187 
188     {
189         nxt_string("arguments"),
190         NXT_CONF_MAP_PTR,
191         offsetof(nxt_common_app_conf_t, u.external.arguments),
192     },
193 
194 };
195 
196 
197 static nxt_conf_map_t  nxt_python_app_conf[] = {
198     {
199         nxt_string("home"),
200         NXT_CONF_MAP_CSTRZ,
201         offsetof(nxt_common_app_conf_t, u.python.home),
202     },
203 
204     {
205         nxt_string("path"),
206         NXT_CONF_MAP_PTR,
207         offsetof(nxt_common_app_conf_t, u.python.path),
208     },
209 
210     {
211         nxt_string("protocol"),
212         NXT_CONF_MAP_STR,
213         offsetof(nxt_common_app_conf_t, u.python.protocol),
214     },
215 
216     {
217         nxt_string("threads"),
218         NXT_CONF_MAP_INT32,
219         offsetof(nxt_common_app_conf_t, u.python.threads),
220     },
221 
222     {
223         nxt_string("targets"),
224         NXT_CONF_MAP_PTR,
225         offsetof(nxt_common_app_conf_t, u.python.targets),
226     },
227 
228     {
229         nxt_string("thread_stack_size"),
230         NXT_CONF_MAP_INT32,
231         offsetof(nxt_common_app_conf_t, u.python.thread_stack_size),
232     },
233 };
234 
235 
236 static nxt_conf_map_t  nxt_php_app_conf[] = {
237     {
238         nxt_string("targets"),
239         NXT_CONF_MAP_PTR,
240         offsetof(nxt_common_app_conf_t, u.php.targets),
241     },
242 
243     {
244         nxt_string("options"),
245         NXT_CONF_MAP_PTR,
246         offsetof(nxt_common_app_conf_t, u.php.options),
247     },
248 };
249 
250 
251 static nxt_conf_map_t  nxt_perl_app_conf[] = {
252     {
253         nxt_string("script"),
254         NXT_CONF_MAP_CSTRZ,
255         offsetof(nxt_common_app_conf_t, u.perl.script),
256     },
257 
258     {
259         nxt_string("threads"),
260         NXT_CONF_MAP_INT32,
261         offsetof(nxt_common_app_conf_t, u.perl.threads),
262     },
263 
264     {
265         nxt_string("thread_stack_size"),
266         NXT_CONF_MAP_INT32,
267         offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size),
268     },
269 };
270 
271 
272 static nxt_conf_map_t  nxt_ruby_app_conf[] = {
273     {
274         nxt_string("script"),
275         NXT_CONF_MAP_STR,
276         offsetof(nxt_common_app_conf_t, u.ruby.script),
277     },
278     {
279         nxt_string("threads"),
280         NXT_CONF_MAP_INT32,
281         offsetof(nxt_common_app_conf_t, u.ruby.threads),
282     },
283     {
284         nxt_string("hooks"),
285         NXT_CONF_MAP_STR,
286         offsetof(nxt_common_app_conf_t, u.ruby.hooks),
287     }
288 };
289 
290 
291 static nxt_conf_map_t  nxt_java_app_conf[] = {
292     {
293         nxt_string("classpath"),
294         NXT_CONF_MAP_PTR,
295         offsetof(nxt_common_app_conf_t, u.java.classpath),
296     },
297     {
298         nxt_string("webapp"),
299         NXT_CONF_MAP_CSTRZ,
300         offsetof(nxt_common_app_conf_t, u.java.webapp),
301     },
302     {
303         nxt_string("options"),
304         NXT_CONF_MAP_PTR,
305         offsetof(nxt_common_app_conf_t, u.java.options),
306     },
307     {
308         nxt_string("unit_jars"),
309         NXT_CONF_MAP_CSTRZ,
310         offsetof(nxt_common_app_conf_t, u.java.unit_jars),
311     },
312     {
313         nxt_string("threads"),
314         NXT_CONF_MAP_INT32,
315         offsetof(nxt_common_app_conf_t, u.java.threads),
316     },
317     {
318         nxt_string("thread_stack_size"),
319         NXT_CONF_MAP_INT32,
320         offsetof(nxt_common_app_conf_t, u.java.thread_stack_size),
321     },
322 
323 };
324 
325 
326 static nxt_conf_map_t  nxt_wasm_app_conf[] = {
327     {
328         nxt_string("module"),
329         NXT_CONF_MAP_CSTRZ,
330         offsetof(nxt_common_app_conf_t, u.wasm.module),
331     },
332     {
333         nxt_string("request_handler"),
334         NXT_CONF_MAP_CSTRZ,
335         offsetof(nxt_common_app_conf_t, u.wasm.request_handler),
336     },
337     {
338         nxt_string("malloc_handler"),
339         NXT_CONF_MAP_CSTRZ,
340         offsetof(nxt_common_app_conf_t, u.wasm.malloc_handler),
341     },
342     {
343         nxt_string("free_handler"),
344         NXT_CONF_MAP_CSTRZ,
345         offsetof(nxt_common_app_conf_t, u.wasm.free_handler),
346     },
347     {
348         nxt_string("module_init_handler"),
349         NXT_CONF_MAP_CSTRZ,
350         offsetof(nxt_common_app_conf_t, u.wasm.module_init_handler),
351     },
352     {
353         nxt_string("module_end_handler"),
354         NXT_CONF_MAP_CSTRZ,
355         offsetof(nxt_common_app_conf_t, u.wasm.module_end_handler),
356     },
357     {
358         nxt_string("request_init_handler"),
359         NXT_CONF_MAP_CSTRZ,
360         offsetof(nxt_common_app_conf_t, u.wasm.request_init_handler),
361     },
362     {
363         nxt_string("request_end_handler"),
364         NXT_CONF_MAP_CSTRZ,
365         offsetof(nxt_common_app_conf_t, u.wasm.request_end_handler),
366     },
367     {
368         nxt_string("response_end_handler"),
369         NXT_CONF_MAP_CSTRZ,
370         offsetof(nxt_common_app_conf_t, u.wasm.response_end_handler),
371     },
372     {
373         nxt_string("access"),
374         NXT_CONF_MAP_PTR,
375         offsetof(nxt_common_app_conf_t, u.wasm.access),
376     },
377 };
378 
379 
380 static nxt_conf_map_t  nxt_wasm_wc_app_conf[] = {
381     {
382         nxt_string("component"),
383         NXT_CONF_MAP_CSTRZ,
384         offsetof(nxt_common_app_conf_t, u.wasm_wc.component),
385     },
386     {
387         nxt_string("access"),
388         NXT_CONF_MAP_PTR,
389         offsetof(nxt_common_app_conf_t, u.wasm_wc.access),
390     },
391 };
392 
393 
394 static nxt_conf_app_map_t  nxt_app_maps[] = {
395     { nxt_nitems(nxt_external_app_conf),  nxt_external_app_conf },
396     { nxt_nitems(nxt_python_app_conf),    nxt_python_app_conf },
397     { nxt_nitems(nxt_php_app_conf),       nxt_php_app_conf },
398     { nxt_nitems(nxt_perl_app_conf),      nxt_perl_app_conf },
399     { nxt_nitems(nxt_ruby_app_conf),      nxt_ruby_app_conf },
400     { nxt_nitems(nxt_java_app_conf),      nxt_java_app_conf },
401     { nxt_nitems(nxt_wasm_app_conf),      nxt_wasm_app_conf },
402     { nxt_nitems(nxt_wasm_wc_app_conf),   nxt_wasm_wc_app_conf },
403 };
404 
405 
406 static void
nxt_main_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)407 nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
408 {
409     nxt_debug(task, "main data: %*s",
410               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
411 }
412 
413 
414 static void
nxt_main_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)415 nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
416 {
417     void        *mem;
418     nxt_port_t  *port;
419 
420     nxt_port_new_port_handler(task, msg);
421 
422     port = msg->u.new_port;
423 
424     if (port != NULL
425         && port->type == NXT_PROCESS_APP
426         && msg->fd[1] != -1)
427     {
428         mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
429                            PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0);
430         if (nxt_fast_path(mem != MAP_FAILED)) {
431             port->queue = mem;
432         }
433 
434         nxt_fd_close(msg->fd[1]);
435         msg->fd[1] = -1;
436     }
437 }
438 
439 
440 static void
nxt_main_start_process_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)441 nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
442 {
443     u_char                 *start, *p, ch;
444     size_t                 type_len;
445     nxt_int_t              ret;
446     nxt_buf_t              *b;
447     nxt_port_t             *port;
448     nxt_runtime_t          *rt;
449     nxt_process_t          *process;
450     nxt_app_type_t         idx;
451     nxt_conf_value_t       *conf;
452     nxt_process_init_t     *init;
453     nxt_common_app_conf_t  *app_conf;
454 
455     rt = task->thread->runtime;
456 
457     port = rt->port_by_type[NXT_PROCESS_ROUTER];
458     if (nxt_slow_path(port == NULL)) {
459         nxt_alert(task, "router port not found");
460         goto close_fds;
461     }
462 
463     if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) {
464         nxt_alert(task, "process %PI cannot start processes",
465                   nxt_recv_msg_cmsg_pid(msg));
466 
467         goto close_fds;
468     }
469 
470     process = nxt_process_new(rt);
471     if (nxt_slow_path(process == NULL)) {
472         goto close_fds;
473     }
474 
475     process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
476     if (process->mem_pool == NULL) {
477         nxt_process_use(task, process, -1);
478         goto close_fds;
479     }
480 
481     process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN];
482 
483     init = nxt_process_init(process);
484 
485     *init = nxt_proto_process;
486 
487     b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
488     if (b == NULL) {
489         goto failed;
490     }
491 
492     nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos,
493               b->mem.pos);
494 
495     app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
496     if (nxt_slow_path(app_conf == NULL)) {
497         goto failed;
498     }
499 
500     app_conf->shared_port_fd = msg->fd[0];
501     app_conf->shared_queue_fd = msg->fd[1];
502 
503     start = b->mem.pos;
504 
505     app_conf->name.start = start;
506     app_conf->name.length = nxt_strlen(start);
507 
508     init->name = (const char *) start;
509 
510     process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
511                                  + sizeof("\"\" prototype") + 1);
512 
513     if (nxt_slow_path(process->name == NULL)) {
514         goto failed;
515     }
516 
517     p = (u_char *) process->name;
518     *p++ = '"';
519     p = nxt_cpymem(p, init->name, app_conf->name.length);
520     p = nxt_cpymem(p, "\" prototype", 11);
521     *p = '\0';
522 
523     app_conf->shm_limit = 100 * 1024 * 1024;
524     app_conf->request_limit = 0;
525 
526     start += app_conf->name.length + 1;
527 
528     conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL);
529     if (conf == NULL) {
530         nxt_alert(task, "router app configuration parsing error");
531 
532         goto failed;
533     }
534 
535     rt = task->thread->runtime;
536 
537     app_conf->user.start  = (u_char*)rt->user_cred.user;
538     app_conf->user.length = nxt_strlen(rt->user_cred.user);
539 
540     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf,
541                               nxt_nitems(nxt_common_app_conf), app_conf);
542 
543     if (ret != NXT_OK) {
544         nxt_alert(task, "failed to map common app conf received from router");
545         goto failed;
546     }
547 
548     for (type_len = 0; type_len != app_conf->type.length; type_len++) {
549         ch = app_conf->type.start[type_len];
550 
551         if (ch == ' ' || nxt_isdigit(ch)) {
552             break;
553         }
554     }
555 
556     idx = nxt_app_parse_type(app_conf->type.start, type_len);
557 
558     if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
559         nxt_alert(task, "invalid app type %d received from router", (int) idx);
560         goto failed;
561     }
562 
563     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map,
564                               nxt_app_maps[idx].size, app_conf);
565 
566     if (nxt_slow_path(ret != NXT_OK)) {
567         nxt_alert(task, "failed to map app conf received from router");
568         goto failed;
569     }
570 
571     if (app_conf->limits != NULL) {
572         ret = nxt_conf_map_object(process->mem_pool, app_conf->limits,
573                                   nxt_common_app_limits_conf,
574                                   nxt_nitems(nxt_common_app_limits_conf),
575                                   app_conf);
576 
577         if (nxt_slow_path(ret != NXT_OK)) {
578             nxt_alert(task, "failed to map app limits received from router");
579             goto failed;
580         }
581     }
582 
583     app_conf->self = conf;
584 
585     process->stream = msg->port_msg.stream;
586     process->data.app = app_conf;
587 
588     ret = nxt_process_start(task, process);
589     if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
590 
591         /* Close shared port fds only in main process. */
592         if (ret == NXT_OK) {
593             nxt_fd_close(app_conf->shared_port_fd);
594             nxt_fd_close(app_conf->shared_queue_fd);
595         }
596 
597         /* Avoid fds close in caller. */
598         msg->fd[0] = -1;
599         msg->fd[1] = -1;
600 
601         return;
602     }
603 
604 failed:
605 
606     nxt_process_use(task, process, -1);
607 
608     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
609                                  msg->port_msg.reply_port);
610 
611     if (nxt_fast_path(port != NULL)) {
612         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
613                               -1, msg->port_msg.stream, 0, NULL);
614     }
615 
616 close_fds:
617 
618     nxt_fd_close(msg->fd[0]);
619     msg->fd[0] = -1;
620 
621     nxt_fd_close(msg->fd[1]);
622     msg->fd[1] = -1;
623 }
624 
625 
626 static void
nxt_main_process_created_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)627 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
628 {
629     nxt_port_t     *port;
630     nxt_process_t  *process;
631     nxt_runtime_t  *rt;
632 
633     rt = task->thread->runtime;
634 
635     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
636                                  msg->port_msg.reply_port);
637     if (nxt_slow_path(port == NULL)) {
638         return;
639     }
640 
641     process = port->process;
642 
643     nxt_assert(process != NULL);
644     nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
645 
646 #if (NXT_HAVE_LINUX_NS && NXT_HAVE_CLONE_NEWUSER)
647     if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
648         if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
649                                                    process->user_cred,
650                                                    &process->isolation.clone)
651                           != NXT_OK))
652         {
653             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
654                                          -1, msg->port_msg.stream, 0, NULL);
655             return;
656         }
657     }
658 
659 #endif
660 
661     process->state = NXT_PROCESS_STATE_CREATED;
662 
663     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST,
664                                  -1, msg->port_msg.stream, 0, NULL);
665 }
666 
667 
668 static nxt_port_handlers_t  nxt_main_process_port_handlers = {
669     .data             = nxt_main_data_handler,
670     .new_port         = nxt_main_new_port_handler,
671     .process_created  = nxt_main_process_created_handler,
672     .process_ready    = nxt_port_process_ready_handler,
673     .whoami           = nxt_main_process_whoami_handler,
674     .remove_pid       = nxt_port_remove_pid_handler,
675     .start_process    = nxt_main_start_process_handler,
676     .socket           = nxt_main_port_socket_handler,
677     .socket_unlink    = nxt_main_port_socket_unlink_handler,
678     .modules          = nxt_main_port_modules_handler,
679     .conf_store       = nxt_main_port_conf_store_handler,
680 #if (NXT_TLS)
681     .cert_get         = nxt_cert_store_get_handler,
682     .cert_delete      = nxt_cert_store_delete_handler,
683 #endif
684 #if (NXT_HAVE_NJS)
685     .script_get       = nxt_script_store_get_handler,
686     .script_delete    = nxt_script_store_delete_handler,
687 #endif
688     .access_log       = nxt_main_port_access_log_handler,
689     .rpc_ready        = nxt_port_rpc_handler,
690     .rpc_error        = nxt_port_rpc_handler,
691 };
692 
693 
694 static void
nxt_main_process_whoami_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)695 nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
696 {
697     nxt_buf_t      *buf;
698     nxt_pid_t      pid, ppid;
699     nxt_port_t     *port;
700     nxt_runtime_t  *rt;
701     nxt_process_t  *pprocess;
702 
703     nxt_assert(msg->port_msg.reply_port == 0);
704 
705     if (nxt_slow_path(msg->buf == NULL
706         || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t)))
707     {
708         nxt_alert(task, "whoami: buffer is NULL or unexpected size");
709         goto fail;
710     }
711 
712     nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t));
713 
714     rt = task->thread->runtime;
715 
716     pprocess = nxt_runtime_process_find(rt, ppid);
717     if (nxt_slow_path(pprocess == NULL)) {
718         nxt_alert(task, "whoami: parent process %PI not found", ppid);
719         goto fail;
720     }
721 
722     pid = nxt_recv_msg_cmsg_pid(msg);
723 
724     nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid,
725               msg->fd[0]);
726 
727     if (msg->fd[0] != -1) {
728         port = nxt_runtime_process_port_create(task, rt, pid, 0,
729                                                NXT_PROCESS_APP);
730         if (nxt_slow_path(port == NULL)) {
731             goto fail;
732         }
733 
734         nxt_fd_nonblocking(task, msg->fd[0]);
735 
736         port->pair[0] = -1;
737         port->pair[1] = msg->fd[0];
738         msg->fd[0] = -1;
739 
740         port->max_size = 16 * 1024;
741         port->max_share = 64 * 1024;
742         port->socket.task = task;
743 
744         nxt_port_write_enable(task, port);
745 
746     } else {
747         port = nxt_runtime_port_find(rt, pid, 0);
748         if (nxt_slow_path(port == NULL)) {
749             goto fail;
750         }
751     }
752 
753     if (ppid != nxt_pid) {
754         nxt_queue_insert_tail(&pprocess->children, &port->process->link);
755     }
756 
757     buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
758                             sizeof(nxt_pid_t), 0);
759     if (nxt_slow_path(buf == NULL)) {
760         goto fail;
761     }
762 
763     buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t));
764 
765     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1,
766                                  msg->port_msg.stream, 0, buf);
767 
768 fail:
769 
770     if (msg->fd[0] != -1) {
771         nxt_fd_close(msg->fd[0]);
772     }
773 }
774 
775 
776 static nxt_int_t
nxt_main_process_port_create(nxt_task_t * task,nxt_runtime_t * rt)777 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
778 {
779     nxt_int_t      ret;
780     nxt_port_t     *port;
781     nxt_process_t  *process;
782 
783     port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
784                                            NXT_PROCESS_MAIN);
785     if (nxt_slow_path(port == NULL)) {
786         return NXT_ERROR;
787     }
788 
789     process = port->process;
790 
791     ret = nxt_port_socket_init(task, port, 0);
792     if (nxt_slow_path(ret != NXT_OK)) {
793         nxt_port_use(task, port, -1);
794         return ret;
795     }
796 
797     /*
798      * A main process port.  A write port is not closed
799      * since it should be inherited by processes.
800      */
801     nxt_port_enable(task, port, &nxt_main_process_port_handlers);
802 
803     process->state = NXT_PROCESS_STATE_READY;
804 
805     return NXT_OK;
806 }
807 
808 
809 static void
nxt_main_process_title(nxt_task_t * task)810 nxt_main_process_title(nxt_task_t *task)
811 {
812     u_char      *p, *end;
813     nxt_uint_t  i;
814     u_char      title[2048];
815 
816     end = title + sizeof(title) - 1;
817 
818     p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
819                     nxt_process_argv[0]);
820 
821     for (i = 1; nxt_process_argv[i] != NULL; i++) {
822         p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
823     }
824 
825     if (p < end) {
826         *p++ = ']';
827     }
828 
829     *p = '\0';
830 
831     nxt_process_title(task, "%s", title);
832 }
833 
834 
835 static void
nxt_main_process_sigterm_handler(nxt_task_t * task,void * obj,void * data)836 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
837 {
838     nxt_debug(task, "sigterm handler signo:%d (%s)",
839               (int) (uintptr_t) obj, data);
840 
841     /* TODO: fast exit. */
842 
843     nxt_exiting = 1;
844 
845     nxt_runtime_quit(task, 0);
846 }
847 
848 
849 static void
nxt_main_process_sigquit_handler(nxt_task_t * task,void * obj,void * data)850 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
851 {
852     nxt_debug(task, "sigquit handler signo:%d (%s)",
853               (int) (uintptr_t) obj, data);
854 
855     /* TODO: graceful exit. */
856 
857     nxt_exiting = 1;
858 
859     nxt_runtime_quit(task, 0);
860 }
861 
862 
863 static void
nxt_main_process_sigusr1_handler(nxt_task_t * task,void * obj,void * data)864 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
865 {
866     nxt_mp_t        *mp;
867     nxt_int_t       ret;
868     nxt_uint_t      n;
869     nxt_port_t      *port;
870     nxt_file_t      *file, *new_file;
871     nxt_array_t     *new_files;
872     nxt_runtime_t   *rt;
873 
874     nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) received, %s",
875             (int) (uintptr_t) obj, data, "log files rotation");
876 
877     rt = task->thread->runtime;
878 
879     port = rt->port_by_type[NXT_PROCESS_ROUTER];
880 
881     if (nxt_fast_path(port != NULL)) {
882         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
883                                      -1, 0, 0, NULL);
884     }
885 
886     mp = nxt_mp_create(1024, 128, 256, 32);
887     if (mp == NULL) {
888         return;
889     }
890 
891     n = nxt_list_nelts(rt->log_files);
892 
893     new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
894     if (new_files == NULL) {
895         nxt_mp_destroy(mp);
896         return;
897     }
898 
899     nxt_list_each(file, rt->log_files) {
900 
901         /* This allocation cannot fail. */
902         new_file = nxt_array_add(new_files);
903 
904         new_file->name = file->name;
905         new_file->fd = NXT_FILE_INVALID;
906         new_file->log_level = NXT_LOG_ALERT;
907 
908         ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
909                             NXT_FILE_OWNER_ACCESS);
910 
911         if (ret != NXT_OK) {
912             goto fail;
913         }
914 
915     } nxt_list_loop;
916 
917     new_file = new_files->elts;
918 
919     ret = nxt_file_stderr(&new_file[0]);
920 
921     if (ret == NXT_OK) {
922         n = 0;
923 
924         nxt_list_each(file, rt->log_files) {
925 
926             nxt_port_change_log_file(task, rt, n, new_file[n].fd);
927             /*
928              * The old log file descriptor must be closed at the moment
929              * when no other threads use it.  dup2() allows to use the
930              * old file descriptor for new log file.  This change is
931              * performed atomically in the kernel.
932              */
933             (void) nxt_file_redirect(file, new_file[n].fd);
934 
935             n++;
936 
937         } nxt_list_loop;
938 
939         nxt_mp_destroy(mp);
940         return;
941     }
942 
943 fail:
944 
945     new_file = new_files->elts;
946     n = new_files->nelts;
947 
948     while (n != 0) {
949         if (new_file->fd != NXT_FILE_INVALID) {
950             nxt_file_close(task, new_file);
951         }
952 
953         new_file++;
954         n--;
955     }
956 
957     nxt_mp_destroy(mp);
958 }
959 
960 
961 static void
nxt_main_process_sigchld_handler(nxt_task_t * task,void * obj,void * data)962 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
963 {
964     int                 status;
965     nxt_int_t           ret;
966     nxt_err_t           err;
967     nxt_pid_t           pid;
968     nxt_port_t          *port;
969     nxt_queue_t         children;
970     nxt_runtime_t       *rt;
971     nxt_process_t       *process, *child;
972     nxt_process_init_t  init;
973 
974     nxt_debug(task, "sigchld handler signo:%d (%s)",
975               (int) (uintptr_t) obj, data);
976 
977     rt = task->thread->runtime;
978 
979     for ( ;; ) {
980         pid = waitpid(-1, &status, WNOHANG);
981 
982         if (pid == -1) {
983 
984             switch (err = nxt_errno) {
985 
986             case NXT_ECHILD:
987                 return;
988 
989             case NXT_EINTR:
990                 continue;
991 
992             default:
993                 nxt_alert(task, "waitpid() failed: %E", err);
994                 return;
995             }
996         }
997 
998         nxt_debug(task, "waitpid(): %PI", pid);
999 
1000         if (pid == 0) {
1001             return;
1002         }
1003 
1004         if (WTERMSIG(status)) {
1005 #ifdef WCOREDUMP
1006             nxt_alert(task, "process %PI exited on signal %d%s",
1007                       pid, WTERMSIG(status),
1008                       WCOREDUMP(status) ? " (core dumped)" : "");
1009 #else
1010             nxt_alert(task, "process %PI exited on signal %d",
1011                       pid, WTERMSIG(status));
1012 #endif
1013 
1014         } else {
1015             nxt_trace(task, "process %PI exited with code %d",
1016                       pid, WEXITSTATUS(status));
1017         }
1018 
1019         process = nxt_runtime_process_find(rt, pid);
1020 
1021         if (process != NULL) {
1022             nxt_main_process_cleanup(task, process);
1023 
1024             if (process->state == NXT_PROCESS_STATE_READY) {
1025                 process->stream = 0;
1026             }
1027 
1028             nxt_queue_init(&children);
1029 
1030             if (!nxt_queue_is_empty(&process->children)) {
1031                 nxt_queue_add(&children, &process->children);
1032 
1033                 nxt_queue_init(&process->children);
1034 
1035                 nxt_queue_each(child, &children, nxt_process_t, link) {
1036                     port = nxt_process_port_first(child);
1037 
1038                     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
1039                                                  -1, 0, 0, NULL);
1040                 } nxt_queue_loop;
1041             }
1042 
1043             if (nxt_exiting) {
1044                 nxt_process_close_ports(task, process);
1045 
1046                 nxt_queue_each(child, &children, nxt_process_t, link) {
1047                     nxt_queue_remove(&child->link);
1048                     child->link.next = NULL;
1049 
1050                     nxt_process_close_ports(task, child);
1051                 } nxt_queue_loop;
1052 
1053                 if (rt->nprocesses <= 1) {
1054                     nxt_runtime_quit(task, 0);
1055 
1056                     return;
1057                 }
1058 
1059                 continue;
1060             }
1061 
1062             nxt_port_remove_notify_others(task, process);
1063 
1064             nxt_queue_each(child, &children, nxt_process_t, link) {
1065                 nxt_port_remove_notify_others(task, child);
1066 
1067                 nxt_queue_remove(&child->link);
1068                 child->link.next = NULL;
1069 
1070                 nxt_process_close_ports(task, child);
1071             } nxt_queue_loop;
1072 
1073             init = *(nxt_process_init_t *) nxt_process_init(process);
1074 
1075             nxt_process_close_ports(task, process);
1076 
1077             if (init.restart) {
1078                 ret = nxt_process_init_start(task, init);
1079                 if (nxt_slow_path(ret == NXT_ERROR)) {
1080                     nxt_alert(task, "failed to restart %s", init.name);
1081                 }
1082             }
1083         }
1084     }
1085 }
1086 
1087 
1088 static void
nxt_main_process_signal_handler(nxt_task_t * task,void * obj,void * data)1089 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
1090 {
1091     nxt_trace(task, "signal signo:%d (%s) received, ignored",
1092               (int) (uintptr_t) obj, data);
1093 }
1094 
1095 
1096 static void
nxt_main_process_cleanup(nxt_task_t * task,nxt_process_t * process)1097 nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process)
1098 {
1099     if (process->isolation.cleanup != NULL) {
1100         process->isolation.cleanup(task, process);
1101     }
1102 
1103     if (process->isolation.cgroup_cleanup != NULL) {
1104         process->isolation.cgroup_cleanup(task, process);
1105     }
1106 }
1107 
1108 
1109 static void
nxt_main_port_socket_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1110 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1111 {
1112     size_t                  size;
1113     nxt_int_t               ret;
1114     nxt_buf_t               *b, *out;
1115     nxt_port_t              *port;
1116     nxt_sockaddr_t          *sa;
1117     nxt_port_msg_type_t     type;
1118     nxt_listening_socket_t  ls;
1119     u_char                  message[2048];
1120 
1121     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1122                                  msg->port_msg.reply_port);
1123     if (nxt_slow_path(port == NULL)) {
1124         return;
1125     }
1126 
1127     if (nxt_slow_path(port->type != NXT_PROCESS_ROUTER)) {
1128         nxt_alert(task, "process %PI cannot create listener sockets",
1129                   msg->port_msg.pid);
1130 
1131         return;
1132     }
1133 
1134     b = msg->buf;
1135     sa = (nxt_sockaddr_t *) b->mem.pos;
1136 
1137     /* TODO check b size and make plain */
1138 
1139     ls.socket = -1;
1140     ls.error = NXT_SOCKET_ERROR_SYSTEM;
1141     ls.start = message;
1142     ls.end = message + sizeof(message);
1143 
1144     nxt_debug(task, "listening socket \"%*s\"",
1145               (size_t) sa->length, nxt_sockaddr_start(sa));
1146 
1147     ret = nxt_main_listening_socket(sa, &ls);
1148 
1149     if (ret == NXT_OK) {
1150         nxt_debug(task, "socket(\"%*s\"): %d",
1151                   (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
1152 
1153         out = NULL;
1154 
1155         type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
1156 
1157     } else {
1158         size = ls.end - ls.start;
1159 
1160         nxt_alert(task, "%*s", size, ls.start);
1161 
1162         out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
1163                                    size + 1);
1164         if (nxt_fast_path(out != NULL)) {
1165             *out->mem.free++ = (uint8_t) ls.error;
1166 
1167             out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
1168         }
1169 
1170         type = NXT_PORT_MSG_RPC_ERROR;
1171     }
1172 
1173     nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
1174                           0, out);
1175 }
1176 
1177 
1178 static nxt_int_t
nxt_main_listening_socket(nxt_sockaddr_t * sa,nxt_listening_socket_t * ls)1179 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
1180 {
1181     nxt_err_t         err;
1182     nxt_socket_t      s;
1183 
1184     const socklen_t   length = sizeof(int);
1185     static const int  enable = 1;
1186 
1187     s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1188 
1189     if (nxt_slow_path(s == -1)) {
1190         err = nxt_errno;
1191 
1192 #if (NXT_INET6)
1193 
1194         if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1195             ls->error = NXT_SOCKET_ERROR_NOINET6;
1196         }
1197 
1198 #endif
1199 
1200         ls->end = nxt_sprintf(ls->start, ls->end,
1201                               "socket(\\\"%*s\\\") failed %E",
1202                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1203 
1204         return NXT_ERROR;
1205     }
1206 
1207     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1208         ls->end = nxt_sprintf(ls->start, ls->end,
1209                               "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1210                               (size_t) sa->length, nxt_sockaddr_start(sa),
1211                               nxt_errno);
1212         goto fail;
1213     }
1214 
1215 #if (NXT_INET6)
1216 
1217     if (sa->u.sockaddr.sa_family == AF_INET6) {
1218 
1219         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1220             ls->end = nxt_sprintf(ls->start, ls->end,
1221                                "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1222                                (size_t) sa->length, nxt_sockaddr_start(sa),
1223                                nxt_errno);
1224             goto fail;
1225         }
1226     }
1227 
1228 #endif
1229 
1230     if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1231         err = nxt_errno;
1232 
1233 #if (NXT_HAVE_UNIX_DOMAIN)
1234 
1235         if (sa->u.sockaddr.sa_family == AF_UNIX) {
1236             switch (err) {
1237 
1238             case EACCES:
1239                 ls->error = NXT_SOCKET_ERROR_ACCESS;
1240                 break;
1241 
1242             case ENOENT:
1243             case ENOTDIR:
1244                 ls->error = NXT_SOCKET_ERROR_PATH;
1245                 break;
1246             }
1247 
1248         } else
1249 #endif
1250         {
1251             switch (err) {
1252 
1253             case EACCES:
1254                 ls->error = NXT_SOCKET_ERROR_PORT;
1255                 break;
1256 
1257             case EADDRINUSE:
1258                 ls->error = NXT_SOCKET_ERROR_INUSE;
1259                 break;
1260 
1261             case EADDRNOTAVAIL:
1262                 ls->error = NXT_SOCKET_ERROR_NOADDR;
1263                 break;
1264             }
1265         }
1266 
1267         ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1268                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1269         goto fail;
1270     }
1271 
1272 #if (NXT_HAVE_UNIX_DOMAIN)
1273 
1274     if (sa->u.sockaddr.sa_family == AF_UNIX
1275         && sa->u.sockaddr_un.sun_path[0] != '\0')
1276     {
1277         char          *filename;
1278         mode_t        access;
1279         nxt_thread_t  *thr;
1280 
1281         filename = sa->u.sockaddr_un.sun_path;
1282         access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1283 
1284         if (chmod(filename, access) != 0) {
1285             ls->end = nxt_sprintf(ls->start, ls->end,
1286                                   "chmod(\\\"%s\\\") failed %E",
1287                                   filename, nxt_errno);
1288             goto fail;
1289         }
1290 
1291         thr = nxt_thread();
1292         nxt_runtime_listen_socket_add(thr->runtime, sa);
1293     }
1294 
1295 #endif
1296 
1297     ls->socket = s;
1298 
1299     return NXT_OK;
1300 
1301 fail:
1302 
1303     (void) close(s);
1304 
1305     return NXT_ERROR;
1306 }
1307 
1308 
1309 static void
nxt_main_port_socket_unlink_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1310 nxt_main_port_socket_unlink_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1311 {
1312 #if (NXT_HAVE_UNIX_DOMAIN)
1313     size_t               i;
1314     nxt_buf_t            *b;
1315     const char           *filename;
1316     nxt_runtime_t        *rt;
1317     nxt_sockaddr_t       *sa;
1318     nxt_listen_socket_t  *ls;
1319 
1320     b = msg->buf;
1321     sa = (nxt_sockaddr_t *) b->mem.pos;
1322 
1323     filename = sa->u.sockaddr_un.sun_path;
1324     unlink(filename);
1325 
1326     rt = task->thread->runtime;
1327 
1328     for (i = 0; i < rt->listen_sockets->nelts; i++) {
1329         const char  *name;
1330 
1331         ls = (nxt_listen_socket_t *) rt->listen_sockets->elts + i;
1332         sa = ls->sockaddr;
1333 
1334         if (sa->u.sockaddr.sa_family != AF_UNIX
1335             || sa->u.sockaddr_un.sun_path[0] == '\0')
1336         {
1337             continue;
1338         }
1339 
1340         name = sa->u.sockaddr_un.sun_path;
1341         if (strcmp(name, filename) != 0) {
1342             continue;
1343         }
1344 
1345         nxt_array_remove(rt->listen_sockets, ls);
1346         break;
1347     }
1348 #endif
1349 }
1350 
1351 
1352 static nxt_conf_map_t  nxt_app_lang_module_map[] = {
1353     {
1354         nxt_string("type"),
1355         NXT_CONF_MAP_INT,
1356         offsetof(nxt_app_lang_module_t, type),
1357     },
1358 
1359     {
1360         nxt_string("version"),
1361         NXT_CONF_MAP_CSTRZ,
1362         offsetof(nxt_app_lang_module_t, version),
1363     },
1364 
1365     {
1366         nxt_string("file"),
1367         NXT_CONF_MAP_CSTRZ,
1368         offsetof(nxt_app_lang_module_t, file),
1369     },
1370 };
1371 
1372 
1373 static nxt_conf_map_t  nxt_app_lang_mounts_map[] = {
1374     {
1375         nxt_string("src"),
1376         NXT_CONF_MAP_CSTRZ,
1377         offsetof(nxt_fs_mount_t, src),
1378     },
1379     {
1380         nxt_string("dst"),
1381         NXT_CONF_MAP_CSTRZ,
1382         offsetof(nxt_fs_mount_t, dst),
1383     },
1384     {
1385         nxt_string("name"),
1386         NXT_CONF_MAP_CSTRZ,
1387         offsetof(nxt_fs_mount_t, name),
1388     },
1389     {
1390         nxt_string("type"),
1391         NXT_CONF_MAP_INT,
1392         offsetof(nxt_fs_mount_t, type),
1393     },
1394     {
1395         nxt_string("flags"),
1396         NXT_CONF_MAP_INT,
1397         offsetof(nxt_fs_mount_t, flags),
1398     },
1399     {
1400         nxt_string("data"),
1401         NXT_CONF_MAP_CSTRZ,
1402         offsetof(nxt_fs_mount_t, data),
1403     },
1404 };
1405 
1406 
1407 static void
nxt_main_port_modules_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1408 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1409 {
1410     uint32_t               index, jindex, nmounts;
1411     nxt_mp_t               *mp;
1412     nxt_int_t              ret;
1413     nxt_buf_t              *b;
1414     nxt_port_t             *port;
1415     nxt_runtime_t          *rt;
1416     nxt_fs_mount_t         *mnt;
1417     nxt_conf_value_t       *conf, *root, *value, *mounts;
1418     nxt_app_lang_module_t  *lang;
1419 
1420     static nxt_str_t root_path = nxt_string("/");
1421     static nxt_str_t mounts_name = nxt_string("mounts");
1422 
1423     rt = task->thread->runtime;
1424 
1425     if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1426         nxt_alert(task, "process %PI cannot send modules", msg->port_msg.pid);
1427         return;
1428     }
1429 
1430     if (nxt_exiting) {
1431         nxt_debug(task, "ignoring discovered modules, exiting");
1432         return;
1433     }
1434 
1435     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1436                                  msg->port_msg.reply_port);
1437 
1438     if (nxt_fast_path(port != NULL)) {
1439         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1440                                      msg->port_msg.stream, 0, NULL);
1441     }
1442 
1443     b = msg->buf;
1444 
1445     if (b == NULL) {
1446         return;
1447     }
1448 
1449     mp = nxt_mp_create(1024, 128, 256, 32);
1450     if (mp == NULL) {
1451         return;
1452     }
1453 
1454     b = nxt_buf_chk_make_plain(mp, b, msg->size);
1455 
1456     if (b == NULL) {
1457         return;
1458     }
1459 
1460     nxt_debug(task, "application languages: \"%*s\"",
1461               b->mem.free - b->mem.pos, b->mem.pos);
1462 
1463     conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1464     if (conf == NULL) {
1465         goto fail;
1466     }
1467 
1468     root = nxt_conf_get_path(conf, &root_path);
1469     if (root == NULL) {
1470         goto fail;
1471     }
1472 
1473     for (index = 0; /* void */ ; index++) {
1474         value = nxt_conf_get_array_element(root, index);
1475         if (value == NULL) {
1476             break;
1477         }
1478 
1479         lang = nxt_array_zero_add(rt->languages);
1480         if (lang == NULL) {
1481             goto fail;
1482         }
1483 
1484         lang->module = NULL;
1485 
1486         ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1487                                   nxt_nitems(nxt_app_lang_module_map), lang);
1488 
1489         if (ret != NXT_OK) {
1490             goto fail;
1491         }
1492 
1493         mounts = nxt_conf_get_object_member(value, &mounts_name, NULL);
1494         if (mounts == NULL) {
1495             nxt_alert(task, "missing mounts from discovery message.");
1496             goto fail;
1497         }
1498 
1499         if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) {
1500             nxt_alert(task, "invalid mounts type from discovery message.");
1501             goto fail;
1502         }
1503 
1504         nmounts = nxt_conf_array_elements_count(mounts);
1505 
1506         lang->mounts = nxt_array_create(rt->mem_pool, nmounts,
1507                                         sizeof(nxt_fs_mount_t));
1508 
1509         if (lang->mounts == NULL) {
1510             goto fail;
1511         }
1512 
1513         for (jindex = 0; /* */; jindex++) {
1514             value = nxt_conf_get_array_element(mounts, jindex);
1515             if (value == NULL) {
1516                 break;
1517             }
1518 
1519             mnt = nxt_array_zero_add(lang->mounts);
1520             if (mnt == NULL) {
1521                 goto fail;
1522             }
1523 
1524             mnt->builtin = 1;
1525             mnt->deps = 1;
1526 
1527             ret = nxt_conf_map_object(rt->mem_pool, value,
1528                                       nxt_app_lang_mounts_map,
1529                                       nxt_nitems(nxt_app_lang_mounts_map), mnt);
1530 
1531             if (ret != NXT_OK) {
1532                 goto fail;
1533             }
1534         }
1535 
1536         nxt_debug(task, "lang %d %s \"%s\" (%d mounts)",
1537                   lang->type, lang->version, lang->file, lang->mounts->nelts);
1538     }
1539 
1540     qsort(rt->languages->elts, rt->languages->nelts,
1541           sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1542 
1543 fail:
1544 
1545     nxt_mp_destroy(mp);
1546 
1547     ret = nxt_process_init_start(task, nxt_controller_process);
1548     if (ret == NXT_OK) {
1549         ret = nxt_process_init_start(task, nxt_router_process);
1550     }
1551 
1552     if (nxt_slow_path(ret == NXT_ERROR)) {
1553         nxt_exiting = 1;
1554 
1555         nxt_runtime_quit(task, 1);
1556     }
1557 }
1558 
1559 
1560 static int nxt_cdecl
nxt_app_lang_compare(const void * v1,const void * v2)1561 nxt_app_lang_compare(const void *v1, const void *v2)
1562 {
1563     int                          n;
1564     const nxt_app_lang_module_t  *lang1, *lang2;
1565 
1566     lang1 = v1;
1567     lang2 = v2;
1568 
1569     n = lang1->type - lang2->type;
1570 
1571     if (n != 0) {
1572         return n;
1573     }
1574 
1575     n = nxt_strverscmp(lang1->version, lang2->version);
1576 
1577     /* Negate result to move higher versions to the beginning. */
1578 
1579     return -n;
1580 }
1581 
1582 
1583 static void
nxt_main_port_conf_store_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1584 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1585 {
1586     void           *p;
1587     size_t         n, size;
1588     nxt_int_t      ret;
1589     nxt_port_t     *ctl_port;
1590     nxt_runtime_t  *rt;
1591     u_char         ver[NXT_INT_T_LEN];
1592 
1593     rt = task->thread->runtime;
1594 
1595     ctl_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
1596 
1597     if (nxt_slow_path(msg->port_msg.pid != ctl_port->pid)) {
1598         nxt_alert(task, "process %PI cannot store conf", msg->port_msg.pid);
1599         return;
1600     }
1601 
1602     p = MAP_FAILED;
1603 
1604     /*
1605      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
1606      * initialized in 'cleanup' section.
1607      */
1608     size = 0;
1609 
1610     if (nxt_slow_path(msg->fd[0] == -1)) {
1611         nxt_alert(task, "conf_store_handler: invalid shm fd");
1612         goto error;
1613     }
1614 
1615     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
1616         nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)",
1617                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
1618         goto error;
1619     }
1620 
1621     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
1622 
1623     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
1624 
1625     nxt_fd_close(msg->fd[0]);
1626     msg->fd[0] = -1;
1627 
1628     if (nxt_slow_path(p == MAP_FAILED)) {
1629         goto error;
1630     }
1631 
1632     nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p);
1633 
1634     if (nxt_conf_ver != NXT_VERNUM) {
1635         n = nxt_sprintf(ver, ver + NXT_INT_T_LEN, "%d", NXT_VERNUM) - ver;
1636 
1637         ret = nxt_main_file_store(task, rt->ver_tmp, rt->ver, ver, n);
1638         if (nxt_slow_path(ret != NXT_OK)) {
1639             goto error;
1640         }
1641 
1642         nxt_conf_ver = NXT_VERNUM;
1643     }
1644 
1645     ret = nxt_main_file_store(task, rt->conf_tmp, rt->conf, p, size);
1646 
1647     if (nxt_fast_path(ret == NXT_OK)) {
1648         goto cleanup;
1649     }
1650 
1651 error:
1652 
1653     nxt_alert(task, "failed to store current configuration");
1654 
1655 cleanup:
1656 
1657     if (p != MAP_FAILED) {
1658         nxt_mem_munmap(p, size);
1659     }
1660 
1661     if (msg->fd[0] != -1) {
1662         nxt_fd_close(msg->fd[0]);
1663         msg->fd[0] = -1;
1664     }
1665 }
1666 
1667 
1668 static nxt_int_t
nxt_main_file_store(nxt_task_t * task,const char * tmp_name,const char * name,u_char * buf,size_t size)1669 nxt_main_file_store(nxt_task_t *task, const char *tmp_name, const char *name,
1670     u_char *buf, size_t size)
1671 {
1672     ssize_t     n;
1673     nxt_int_t   ret;
1674     nxt_file_t  file;
1675 
1676     nxt_memzero(&file, sizeof(nxt_file_t));
1677 
1678     file.name = (nxt_file_name_t *) name;
1679 
1680     ret = nxt_file_open(task, &file, NXT_FILE_WRONLY, NXT_FILE_TRUNCATE,
1681                         NXT_FILE_OWNER_ACCESS);
1682     if (nxt_slow_path(ret != NXT_OK)) {
1683         return NXT_ERROR;
1684     }
1685 
1686     n = nxt_file_write(&file, buf, size, 0);
1687 
1688     nxt_file_close(task, &file);
1689 
1690     if (nxt_slow_path(n != (ssize_t) size)) {
1691         (void) nxt_file_delete(file.name);
1692         return NXT_ERROR;
1693     }
1694 
1695     return nxt_file_rename(file.name, (nxt_file_name_t *) name);
1696 }
1697 
1698 
1699 static void
nxt_main_port_access_log_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1700 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1701 {
1702     u_char               *path;
1703     nxt_int_t            ret;
1704     nxt_file_t           file;
1705     nxt_port_t           *port;
1706     nxt_port_msg_type_t  type;
1707 
1708     nxt_debug(task, "opening access log file");
1709 
1710     path = msg->buf->mem.pos;
1711 
1712     nxt_memzero(&file, sizeof(nxt_file_t));
1713 
1714     file.name = (nxt_file_name_t *) path;
1715     file.log_level = NXT_LOG_ERR;
1716 
1717     ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1718                         NXT_FILE_OWNER_ACCESS);
1719 
1720     type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1721                            : NXT_PORT_MSG_RPC_ERROR;
1722 
1723     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1724                                  msg->port_msg.reply_port);
1725 
1726     if (nxt_fast_path(port != NULL)) {
1727         (void) nxt_port_socket_write(task, port, type, file.fd,
1728                                      msg->port_msg.stream, 0, NULL);
1729     }
1730 }
1731