xref: /unit/src/nxt_main_process.c (revision 2240:0c49318572c2)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_router.h>
13 #include <nxt_port_queue.h>
14 #if (NXT_TLS)
15 #include <nxt_cert.h>
16 #endif
17 
18 #include <sys/mount.h>
19 
20 
21 typedef struct {
22     nxt_socket_t        socket;
23     nxt_socket_error_t  error;
24     u_char              *start;
25     u_char              *end;
26 } nxt_listening_socket_t;
27 
28 
29 typedef struct {
30     nxt_uint_t          size;
31     nxt_conf_map_t      *map;
32 } nxt_conf_app_map_t;
33 
34 
35 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
36     nxt_runtime_t *rt);
37 static void nxt_main_process_title(nxt_task_t *task);
38 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
39     void *data);
40 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
41     void *data);
42 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
43     void *data);
44 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
45     void *data);
46 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
47     void *data);
48 static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process);
49 static void nxt_main_port_socket_handler(nxt_task_t *task,
50     nxt_port_recv_msg_t *msg);
51 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
52     nxt_listening_socket_t *ls);
53 static void nxt_main_port_modules_handler(nxt_task_t *task,
54     nxt_port_recv_msg_t *msg);
55 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
56 static void nxt_main_process_whoami_handler(nxt_task_t *task,
57     nxt_port_recv_msg_t *msg);
58 static void nxt_main_port_conf_store_handler(nxt_task_t *task,
59     nxt_port_recv_msg_t *msg);
60 static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name,
61     const char *name, u_char *buf, size_t size);
62 static void nxt_main_port_access_log_handler(nxt_task_t *task,
63     nxt_port_recv_msg_t *msg);
64 
65 const nxt_sig_event_t  nxt_main_process_signals[] = {
66     nxt_event_signal(SIGHUP,  nxt_main_process_signal_handler),
67     nxt_event_signal(SIGINT,  nxt_main_process_sigterm_handler),
68     nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
69     nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
70     nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
71     nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
72     nxt_event_signal_end,
73 };
74 
75 
76 nxt_uint_t  nxt_conf_ver;
77 
78 static nxt_bool_t  nxt_exiting;
79 
80 
81 nxt_int_t
nxt_main_process_start(nxt_thread_t * thr,nxt_task_t * task,nxt_runtime_t * rt)82 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
83     nxt_runtime_t *rt)
84 {
85     rt->type = NXT_PROCESS_MAIN;
86 
87     if (nxt_main_process_port_create(task, rt) != NXT_OK) {
88         return NXT_ERROR;
89     }
90 
91     nxt_main_process_title(task);
92 
93     /*
94      * The discovery process will send a message processed by
95      * nxt_main_port_modules_handler() which starts the controller
96      * and router processes.
97      */
98     return nxt_process_init_start(task, nxt_discovery_process);
99 }
100 
101 
102 static nxt_conf_map_t  nxt_common_app_conf[] = {
103     {
104         nxt_string("type"),
105         NXT_CONF_MAP_STR,
106         offsetof(nxt_common_app_conf_t, type),
107     },
108 
109     {
110         nxt_string("user"),
111         NXT_CONF_MAP_STR,
112         offsetof(nxt_common_app_conf_t, user),
113     },
114 
115     {
116         nxt_string("group"),
117         NXT_CONF_MAP_STR,
118         offsetof(nxt_common_app_conf_t, group),
119     },
120 
121     {
122         nxt_string("working_directory"),
123         NXT_CONF_MAP_CSTRZ,
124         offsetof(nxt_common_app_conf_t, working_directory),
125     },
126 
127     {
128         nxt_string("environment"),
129         NXT_CONF_MAP_PTR,
130         offsetof(nxt_common_app_conf_t, environment),
131     },
132 
133     {
134         nxt_string("isolation"),
135         NXT_CONF_MAP_PTR,
136         offsetof(nxt_common_app_conf_t, isolation),
137     },
138 
139     {
140         nxt_string("limits"),
141         NXT_CONF_MAP_PTR,
142         offsetof(nxt_common_app_conf_t, limits),
143     },
144 
145 };
146 
147 
148 static nxt_conf_map_t  nxt_common_app_limits_conf[] = {
149     {
150         nxt_string("shm"),
151         NXT_CONF_MAP_SIZE,
152         offsetof(nxt_common_app_conf_t, shm_limit),
153     },
154 
155     {
156         nxt_string("requests"),
157         NXT_CONF_MAP_INT32,
158         offsetof(nxt_common_app_conf_t, request_limit),
159     },
160 
161 };
162 
163 
164 static nxt_conf_map_t  nxt_external_app_conf[] = {
165     {
166         nxt_string("executable"),
167         NXT_CONF_MAP_CSTRZ,
168         offsetof(nxt_common_app_conf_t, u.external.executable),
169     },
170 
171     {
172         nxt_string("arguments"),
173         NXT_CONF_MAP_PTR,
174         offsetof(nxt_common_app_conf_t, u.external.arguments),
175     },
176 
177 };
178 
179 
180 static nxt_conf_map_t  nxt_python_app_conf[] = {
181     {
182         nxt_string("home"),
183         NXT_CONF_MAP_CSTRZ,
184         offsetof(nxt_common_app_conf_t, u.python.home),
185     },
186 
187     {
188         nxt_string("path"),
189         NXT_CONF_MAP_PTR,
190         offsetof(nxt_common_app_conf_t, u.python.path),
191     },
192 
193     {
194         nxt_string("protocol"),
195         NXT_CONF_MAP_STR,
196         offsetof(nxt_common_app_conf_t, u.python.protocol),
197     },
198 
199     {
200         nxt_string("threads"),
201         NXT_CONF_MAP_INT32,
202         offsetof(nxt_common_app_conf_t, u.python.threads),
203     },
204 
205     {
206         nxt_string("targets"),
207         NXT_CONF_MAP_PTR,
208         offsetof(nxt_common_app_conf_t, u.python.targets),
209     },
210 
211     {
212         nxt_string("thread_stack_size"),
213         NXT_CONF_MAP_INT32,
214         offsetof(nxt_common_app_conf_t, u.python.thread_stack_size),
215     },
216 };
217 
218 
219 static nxt_conf_map_t  nxt_php_app_conf[] = {
220     {
221         nxt_string("targets"),
222         NXT_CONF_MAP_PTR,
223         offsetof(nxt_common_app_conf_t, u.php.targets),
224     },
225 
226     {
227         nxt_string("options"),
228         NXT_CONF_MAP_PTR,
229         offsetof(nxt_common_app_conf_t, u.php.options),
230     },
231 };
232 
233 
234 static nxt_conf_map_t  nxt_perl_app_conf[] = {
235     {
236         nxt_string("script"),
237         NXT_CONF_MAP_CSTRZ,
238         offsetof(nxt_common_app_conf_t, u.perl.script),
239     },
240 
241     {
242         nxt_string("threads"),
243         NXT_CONF_MAP_INT32,
244         offsetof(nxt_common_app_conf_t, u.perl.threads),
245     },
246 
247     {
248         nxt_string("thread_stack_size"),
249         NXT_CONF_MAP_INT32,
250         offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size),
251     },
252 };
253 
254 
255 static nxt_conf_map_t  nxt_ruby_app_conf[] = {
256     {
257         nxt_string("script"),
258         NXT_CONF_MAP_STR,
259         offsetof(nxt_common_app_conf_t, u.ruby.script),
260     },
261     {
262         nxt_string("threads"),
263         NXT_CONF_MAP_INT32,
264         offsetof(nxt_common_app_conf_t, u.ruby.threads),
265     },
266     {
267         nxt_string("hooks"),
268         NXT_CONF_MAP_STR,
269         offsetof(nxt_common_app_conf_t, u.ruby.hooks),
270     }
271 };
272 
273 
274 static nxt_conf_map_t  nxt_java_app_conf[] = {
275     {
276         nxt_string("classpath"),
277         NXT_CONF_MAP_PTR,
278         offsetof(nxt_common_app_conf_t, u.java.classpath),
279     },
280     {
281         nxt_string("webapp"),
282         NXT_CONF_MAP_CSTRZ,
283         offsetof(nxt_common_app_conf_t, u.java.webapp),
284     },
285     {
286         nxt_string("options"),
287         NXT_CONF_MAP_PTR,
288         offsetof(nxt_common_app_conf_t, u.java.options),
289     },
290     {
291         nxt_string("unit_jars"),
292         NXT_CONF_MAP_CSTRZ,
293         offsetof(nxt_common_app_conf_t, u.java.unit_jars),
294     },
295     {
296         nxt_string("threads"),
297         NXT_CONF_MAP_INT32,
298         offsetof(nxt_common_app_conf_t, u.java.threads),
299     },
300     {
301         nxt_string("thread_stack_size"),
302         NXT_CONF_MAP_INT32,
303         offsetof(nxt_common_app_conf_t, u.java.thread_stack_size),
304     },
305 
306 };
307 
308 
309 static nxt_conf_app_map_t  nxt_app_maps[] = {
310     { nxt_nitems(nxt_external_app_conf),  nxt_external_app_conf },
311     { nxt_nitems(nxt_python_app_conf),    nxt_python_app_conf },
312     { nxt_nitems(nxt_php_app_conf),       nxt_php_app_conf },
313     { nxt_nitems(nxt_perl_app_conf),      nxt_perl_app_conf },
314     { nxt_nitems(nxt_ruby_app_conf),      nxt_ruby_app_conf },
315     { nxt_nitems(nxt_java_app_conf),      nxt_java_app_conf },
316 };
317 
318 
319 static void
nxt_main_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)320 nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
321 {
322     nxt_debug(task, "main data: %*s",
323               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
324 }
325 
326 
327 static void
nxt_main_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)328 nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
329 {
330     void        *mem;
331     nxt_port_t  *port;
332 
333     nxt_port_new_port_handler(task, msg);
334 
335     port = msg->u.new_port;
336 
337     if (port != NULL
338         && port->type == NXT_PROCESS_APP
339         && msg->fd[1] != -1)
340     {
341         mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
342                            PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0);
343         if (nxt_fast_path(mem != MAP_FAILED)) {
344             port->queue = mem;
345         }
346 
347         nxt_fd_close(msg->fd[1]);
348         msg->fd[1] = -1;
349     }
350 }
351 
352 
353 static void
nxt_main_start_process_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)354 nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
355 {
356     u_char                 *start, *p, ch;
357     size_t                 type_len;
358     nxt_int_t              ret;
359     nxt_buf_t              *b;
360     nxt_port_t             *port;
361     nxt_runtime_t          *rt;
362     nxt_process_t          *process;
363     nxt_app_type_t         idx;
364     nxt_conf_value_t       *conf;
365     nxt_process_init_t     *init;
366     nxt_common_app_conf_t  *app_conf;
367 
368     rt = task->thread->runtime;
369 
370     port = rt->port_by_type[NXT_PROCESS_ROUTER];
371     if (nxt_slow_path(port == NULL)) {
372         nxt_alert(task, "router port not found");
373         goto close_fds;
374     }
375 
376     if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) {
377         nxt_alert(task, "process %PI cannot start processes",
378                   nxt_recv_msg_cmsg_pid(msg));
379 
380         goto close_fds;
381     }
382 
383     process = nxt_process_new(rt);
384     if (nxt_slow_path(process == NULL)) {
385         goto close_fds;
386     }
387 
388     process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
389     if (process->mem_pool == NULL) {
390         nxt_process_use(task, process, -1);
391         goto close_fds;
392     }
393 
394     process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN];
395 
396     init = nxt_process_init(process);
397 
398     *init = nxt_proto_process;
399 
400     b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
401     if (b == NULL) {
402         goto failed;
403     }
404 
405     nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos,
406               b->mem.pos);
407 
408     app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
409     if (nxt_slow_path(app_conf == NULL)) {
410         goto failed;
411     }
412 
413     app_conf->shared_port_fd = msg->fd[0];
414     app_conf->shared_queue_fd = msg->fd[1];
415 
416     start = b->mem.pos;
417 
418     app_conf->name.start = start;
419     app_conf->name.length = nxt_strlen(start);
420 
421     init->name = (const char *) start;
422 
423     process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
424                                  + sizeof("\"\" prototype") + 1);
425 
426     if (nxt_slow_path(process->name == NULL)) {
427         goto failed;
428     }
429 
430     p = (u_char *) process->name;
431     *p++ = '"';
432     p = nxt_cpymem(p, init->name, app_conf->name.length);
433     p = nxt_cpymem(p, "\" prototype", 11);
434     *p = '\0';
435 
436     app_conf->shm_limit = 100 * 1024 * 1024;
437     app_conf->request_limit = 0;
438 
439     start += app_conf->name.length + 1;
440 
441     conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL);
442     if (conf == NULL) {
443         nxt_alert(task, "router app configuration parsing error");
444 
445         goto failed;
446     }
447 
448     rt = task->thread->runtime;
449 
450     app_conf->user.start  = (u_char*)rt->user_cred.user;
451     app_conf->user.length = nxt_strlen(rt->user_cred.user);
452 
453     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf,
454                               nxt_nitems(nxt_common_app_conf), app_conf);
455 
456     if (ret != NXT_OK) {
457         nxt_alert(task, "failed to map common app conf received from router");
458         goto failed;
459     }
460 
461     for (type_len = 0; type_len != app_conf->type.length; type_len++) {
462         ch = app_conf->type.start[type_len];
463 
464         if (ch == ' ' || nxt_isdigit(ch)) {
465             break;
466         }
467     }
468 
469     idx = nxt_app_parse_type(app_conf->type.start, type_len);
470 
471     if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
472         nxt_alert(task, "invalid app type %d received from router", (int) idx);
473         goto failed;
474     }
475 
476     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map,
477                               nxt_app_maps[idx].size, app_conf);
478 
479     if (nxt_slow_path(ret != NXT_OK)) {
480         nxt_alert(task, "failed to map app conf received from router");
481         goto failed;
482     }
483 
484     if (app_conf->limits != NULL) {
485         ret = nxt_conf_map_object(process->mem_pool, app_conf->limits,
486                                   nxt_common_app_limits_conf,
487                                   nxt_nitems(nxt_common_app_limits_conf),
488                                   app_conf);
489 
490         if (nxt_slow_path(ret != NXT_OK)) {
491             nxt_alert(task, "failed to map app limits received from router");
492             goto failed;
493         }
494     }
495 
496     app_conf->self = conf;
497 
498     process->stream = msg->port_msg.stream;
499     process->data.app = app_conf;
500 
501     ret = nxt_process_start(task, process);
502     if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
503 
504         /* Close shared port fds only in main process. */
505         if (ret == NXT_OK) {
506             nxt_fd_close(app_conf->shared_port_fd);
507             nxt_fd_close(app_conf->shared_queue_fd);
508         }
509 
510         /* Avoid fds close in caller. */
511         msg->fd[0] = -1;
512         msg->fd[1] = -1;
513 
514         return;
515     }
516 
517 failed:
518 
519     nxt_process_use(task, process, -1);
520 
521     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
522                                  msg->port_msg.reply_port);
523 
524     if (nxt_fast_path(port != NULL)) {
525         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
526                               -1, msg->port_msg.stream, 0, NULL);
527     }
528 
529 close_fds:
530 
531     nxt_fd_close(msg->fd[0]);
532     msg->fd[0] = -1;
533 
534     nxt_fd_close(msg->fd[1]);
535     msg->fd[1] = -1;
536 }
537 
538 
539 static void
nxt_main_process_created_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)540 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
541 {
542     nxt_port_t     *port;
543     nxt_process_t  *process;
544     nxt_runtime_t  *rt;
545 
546     rt = task->thread->runtime;
547 
548     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
549                                  msg->port_msg.reply_port);
550     if (nxt_slow_path(port == NULL)) {
551         return;
552     }
553 
554     process = port->process;
555 
556     nxt_assert(process != NULL);
557     nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
558 
559 #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
560     if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
561         if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
562                                                    process->user_cred,
563                                                    &process->isolation.clone)
564                           != NXT_OK))
565         {
566             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
567                                          -1, msg->port_msg.stream, 0, NULL);
568             return;
569         }
570     }
571 
572 #endif
573 
574     process->state = NXT_PROCESS_STATE_CREATED;
575 
576     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST,
577                                  -1, msg->port_msg.stream, 0, NULL);
578 }
579 
580 
581 static nxt_port_handlers_t  nxt_main_process_port_handlers = {
582     .data             = nxt_main_data_handler,
583     .new_port         = nxt_main_new_port_handler,
584     .process_created  = nxt_main_process_created_handler,
585     .process_ready    = nxt_port_process_ready_handler,
586     .whoami           = nxt_main_process_whoami_handler,
587     .remove_pid       = nxt_port_remove_pid_handler,
588     .start_process    = nxt_main_start_process_handler,
589     .socket           = nxt_main_port_socket_handler,
590     .modules          = nxt_main_port_modules_handler,
591     .conf_store       = nxt_main_port_conf_store_handler,
592 #if (NXT_TLS)
593     .cert_get         = nxt_cert_store_get_handler,
594     .cert_delete      = nxt_cert_store_delete_handler,
595 #endif
596     .access_log       = nxt_main_port_access_log_handler,
597     .rpc_ready        = nxt_port_rpc_handler,
598     .rpc_error        = nxt_port_rpc_handler,
599 };
600 
601 
602 static void
nxt_main_process_whoami_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)603 nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
604 {
605     nxt_buf_t      *buf;
606     nxt_pid_t      pid, ppid;
607     nxt_port_t     *port;
608     nxt_runtime_t  *rt;
609     nxt_process_t  *pprocess;
610 
611     nxt_assert(msg->port_msg.reply_port == 0);
612 
613     if (nxt_slow_path(msg->buf == NULL
614         || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t)))
615     {
616         nxt_alert(task, "whoami: buffer is NULL or unexpected size");
617         goto fail;
618     }
619 
620     nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t));
621 
622     rt = task->thread->runtime;
623 
624     pprocess = nxt_runtime_process_find(rt, ppid);
625     if (nxt_slow_path(pprocess == NULL)) {
626         nxt_alert(task, "whoami: parent process %PI not found", ppid);
627         goto fail;
628     }
629 
630     pid = nxt_recv_msg_cmsg_pid(msg);
631 
632     nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid,
633               msg->fd[0]);
634 
635     if (msg->fd[0] != -1) {
636         port = nxt_runtime_process_port_create(task, rt, pid, 0,
637                                                NXT_PROCESS_APP);
638         if (nxt_slow_path(port == NULL)) {
639             goto fail;
640         }
641 
642         nxt_fd_nonblocking(task, msg->fd[0]);
643 
644         port->pair[0] = -1;
645         port->pair[1] = msg->fd[0];
646         msg->fd[0] = -1;
647 
648         port->max_size = 16 * 1024;
649         port->max_share = 64 * 1024;
650         port->socket.task = task;
651 
652         nxt_port_write_enable(task, port);
653 
654     } else {
655         port = nxt_runtime_port_find(rt, pid, 0);
656         if (nxt_slow_path(port == NULL)) {
657             goto fail;
658         }
659     }
660 
661     if (ppid != nxt_pid) {
662         nxt_queue_insert_tail(&pprocess->children, &port->process->link);
663     }
664 
665     buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
666                             sizeof(nxt_pid_t), 0);
667     if (nxt_slow_path(buf == NULL)) {
668         goto fail;
669     }
670 
671     buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t));
672 
673     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1,
674                                  msg->port_msg.stream, 0, buf);
675 
676 fail:
677 
678     if (msg->fd[0] != -1) {
679         nxt_fd_close(msg->fd[0]);
680     }
681 }
682 
683 
684 static nxt_int_t
nxt_main_process_port_create(nxt_task_t * task,nxt_runtime_t * rt)685 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
686 {
687     nxt_int_t      ret;
688     nxt_port_t     *port;
689     nxt_process_t  *process;
690 
691     port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
692                                            NXT_PROCESS_MAIN);
693     if (nxt_slow_path(port == NULL)) {
694         return NXT_ERROR;
695     }
696 
697     process = port->process;
698 
699     ret = nxt_port_socket_init(task, port, 0);
700     if (nxt_slow_path(ret != NXT_OK)) {
701         nxt_port_use(task, port, -1);
702         return ret;
703     }
704 
705     /*
706      * A main process port.  A write port is not closed
707      * since it should be inherited by processes.
708      */
709     nxt_port_enable(task, port, &nxt_main_process_port_handlers);
710 
711     process->state = NXT_PROCESS_STATE_READY;
712 
713     return NXT_OK;
714 }
715 
716 
717 static void
nxt_main_process_title(nxt_task_t * task)718 nxt_main_process_title(nxt_task_t *task)
719 {
720     u_char      *p, *end;
721     nxt_uint_t  i;
722     u_char      title[2048];
723 
724     end = title + sizeof(title) - 1;
725 
726     p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
727                     nxt_process_argv[0]);
728 
729     for (i = 1; nxt_process_argv[i] != NULL; i++) {
730         p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
731     }
732 
733     if (p < end) {
734         *p++ = ']';
735     }
736 
737     *p = '\0';
738 
739     nxt_process_title(task, "%s", title);
740 }
741 
742 
743 static void
nxt_main_process_sigterm_handler(nxt_task_t * task,void * obj,void * data)744 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
745 {
746     nxt_debug(task, "sigterm handler signo:%d (%s)",
747               (int) (uintptr_t) obj, data);
748 
749     /* TODO: fast exit. */
750 
751     nxt_exiting = 1;
752 
753     nxt_runtime_quit(task, 0);
754 }
755 
756 
757 static void
nxt_main_process_sigquit_handler(nxt_task_t * task,void * obj,void * data)758 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
759 {
760     nxt_debug(task, "sigquit handler signo:%d (%s)",
761               (int) (uintptr_t) obj, data);
762 
763     /* TODO: graceful exit. */
764 
765     nxt_exiting = 1;
766 
767     nxt_runtime_quit(task, 0);
768 }
769 
770 
771 static void
nxt_main_process_sigusr1_handler(nxt_task_t * task,void * obj,void * data)772 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
773 {
774     nxt_mp_t        *mp;
775     nxt_int_t       ret;
776     nxt_uint_t      n;
777     nxt_port_t      *port;
778     nxt_file_t      *file, *new_file;
779     nxt_array_t     *new_files;
780     nxt_runtime_t   *rt;
781 
782     nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
783             (int) (uintptr_t) obj, data, "log files rotation");
784 
785     rt = task->thread->runtime;
786 
787     port = rt->port_by_type[NXT_PROCESS_ROUTER];
788 
789     if (nxt_fast_path(port != NULL)) {
790         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
791                                      -1, 0, 0, NULL);
792     }
793 
794     mp = nxt_mp_create(1024, 128, 256, 32);
795     if (mp == NULL) {
796         return;
797     }
798 
799     n = nxt_list_nelts(rt->log_files);
800 
801     new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
802     if (new_files == NULL) {
803         nxt_mp_destroy(mp);
804         return;
805     }
806 
807     nxt_list_each(file, rt->log_files) {
808 
809         /* This allocation cannot fail. */
810         new_file = nxt_array_add(new_files);
811 
812         new_file->name = file->name;
813         new_file->fd = NXT_FILE_INVALID;
814         new_file->log_level = NXT_LOG_ALERT;
815 
816         ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
817                             NXT_FILE_OWNER_ACCESS);
818 
819         if (ret != NXT_OK) {
820             goto fail;
821         }
822 
823     } nxt_list_loop;
824 
825     new_file = new_files->elts;
826 
827     ret = nxt_file_stderr(&new_file[0]);
828 
829     if (ret == NXT_OK) {
830         n = 0;
831 
832         nxt_list_each(file, rt->log_files) {
833 
834             nxt_port_change_log_file(task, rt, n, new_file[n].fd);
835             /*
836              * The old log file descriptor must be closed at the moment
837              * when no other threads use it.  dup2() allows to use the
838              * old file descriptor for new log file.  This change is
839              * performed atomically in the kernel.
840              */
841             (void) nxt_file_redirect(file, new_file[n].fd);
842 
843             n++;
844 
845         } nxt_list_loop;
846 
847         nxt_mp_destroy(mp);
848         return;
849     }
850 
851 fail:
852 
853     new_file = new_files->elts;
854     n = new_files->nelts;
855 
856     while (n != 0) {
857         if (new_file->fd != NXT_FILE_INVALID) {
858             nxt_file_close(task, new_file);
859         }
860 
861         new_file++;
862         n--;
863     }
864 
865     nxt_mp_destroy(mp);
866 }
867 
868 
869 static void
nxt_main_process_sigchld_handler(nxt_task_t * task,void * obj,void * data)870 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
871 {
872     int                 status;
873     nxt_int_t           ret;
874     nxt_err_t           err;
875     nxt_pid_t           pid;
876     nxt_port_t          *port;
877     nxt_queue_t         children;
878     nxt_runtime_t       *rt;
879     nxt_process_t       *process, *child;
880     nxt_process_init_t  init;
881 
882     nxt_debug(task, "sigchld handler signo:%d (%s)",
883               (int) (uintptr_t) obj, data);
884 
885     rt = task->thread->runtime;
886 
887     for ( ;; ) {
888         pid = waitpid(-1, &status, WNOHANG);
889 
890         if (pid == -1) {
891 
892             switch (err = nxt_errno) {
893 
894             case NXT_ECHILD:
895                 return;
896 
897             case NXT_EINTR:
898                 continue;
899 
900             default:
901                 nxt_alert(task, "waitpid() failed: %E", err);
902                 return;
903             }
904         }
905 
906         nxt_debug(task, "waitpid(): %PI", pid);
907 
908         if (pid == 0) {
909             return;
910         }
911 
912         if (WTERMSIG(status)) {
913 #ifdef WCOREDUMP
914             nxt_alert(task, "process %PI exited on signal %d%s",
915                       pid, WTERMSIG(status),
916                       WCOREDUMP(status) ? " (core dumped)" : "");
917 #else
918             nxt_alert(task, "process %PI exited on signal %d",
919                       pid, WTERMSIG(status));
920 #endif
921 
922         } else {
923             nxt_trace(task, "process %PI exited with code %d",
924                       pid, WEXITSTATUS(status));
925         }
926 
927         process = nxt_runtime_process_find(rt, pid);
928 
929         if (process != NULL) {
930             nxt_main_process_cleanup(task, process);
931 
932             if (process->state == NXT_PROCESS_STATE_READY) {
933                 process->stream = 0;
934             }
935 
936             nxt_queue_init(&children);
937 
938             if (!nxt_queue_is_empty(&process->children)) {
939                 nxt_queue_add(&children, &process->children);
940 
941                 nxt_queue_init(&process->children);
942 
943                 nxt_queue_each(child, &children, nxt_process_t, link) {
944                     port = nxt_process_port_first(child);
945 
946                     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
947                                                  -1, 0, 0, NULL);
948                 } nxt_queue_loop;
949             }
950 
951             if (nxt_exiting) {
952                 nxt_process_close_ports(task, process);
953 
954                 nxt_queue_each(child, &children, nxt_process_t, link) {
955                     nxt_queue_remove(&child->link);
956                     child->link.next = NULL;
957 
958                     nxt_process_close_ports(task, child);
959                 } nxt_queue_loop;
960 
961                 if (rt->nprocesses <= 1) {
962                     nxt_runtime_quit(task, 0);
963 
964                     return;
965                 }
966 
967                 continue;
968             }
969 
970             nxt_port_remove_notify_others(task, process);
971 
972             nxt_queue_each(child, &children, nxt_process_t, link) {
973                 nxt_port_remove_notify_others(task, child);
974 
975                 nxt_queue_remove(&child->link);
976                 child->link.next = NULL;
977 
978                 nxt_process_close_ports(task, child);
979             } nxt_queue_loop;
980 
981             init = *(nxt_process_init_t *) nxt_process_init(process);
982 
983             nxt_process_close_ports(task, process);
984 
985             if (init.restart) {
986                 ret = nxt_process_init_start(task, init);
987                 if (nxt_slow_path(ret == NXT_ERROR)) {
988                     nxt_alert(task, "failed to restart %s", init.name);
989                 }
990             }
991         }
992     }
993 }
994 
995 
996 static void
nxt_main_process_signal_handler(nxt_task_t * task,void * obj,void * data)997 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
998 {
999     nxt_trace(task, "signal signo:%d (%s) recevied, ignored",
1000               (int) (uintptr_t) obj, data);
1001 }
1002 
1003 
1004 static void
nxt_main_process_cleanup(nxt_task_t * task,nxt_process_t * process)1005 nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process)
1006 {
1007     if (process->isolation.cleanup != NULL) {
1008         process->isolation.cleanup(task, process);
1009     }
1010 }
1011 
1012 
1013 static void
nxt_main_port_socket_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1014 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1015 {
1016     size_t                  size;
1017     nxt_int_t               ret;
1018     nxt_buf_t               *b, *out;
1019     nxt_port_t              *port;
1020     nxt_sockaddr_t          *sa;
1021     nxt_port_msg_type_t     type;
1022     nxt_listening_socket_t  ls;
1023     u_char                  message[2048];
1024 
1025     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1026                                  msg->port_msg.reply_port);
1027     if (nxt_slow_path(port == NULL)) {
1028         return;
1029     }
1030 
1031     if (nxt_slow_path(port->type != NXT_PROCESS_ROUTER)) {
1032         nxt_alert(task, "process %PI cannot create listener sockets",
1033                   msg->port_msg.pid);
1034 
1035         return;
1036     }
1037 
1038     b = msg->buf;
1039     sa = (nxt_sockaddr_t *) b->mem.pos;
1040 
1041     /* TODO check b size and make plain */
1042 
1043     ls.socket = -1;
1044     ls.error = NXT_SOCKET_ERROR_SYSTEM;
1045     ls.start = message;
1046     ls.end = message + sizeof(message);
1047 
1048     nxt_debug(task, "listening socket \"%*s\"",
1049               (size_t) sa->length, nxt_sockaddr_start(sa));
1050 
1051     ret = nxt_main_listening_socket(sa, &ls);
1052 
1053     if (ret == NXT_OK) {
1054         nxt_debug(task, "socket(\"%*s\"): %d",
1055                   (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
1056 
1057         out = NULL;
1058 
1059         type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
1060 
1061     } else {
1062         size = ls.end - ls.start;
1063 
1064         nxt_alert(task, "%*s", size, ls.start);
1065 
1066         out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
1067                                    size + 1);
1068         if (nxt_fast_path(out != NULL)) {
1069             *out->mem.free++ = (uint8_t) ls.error;
1070 
1071             out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
1072         }
1073 
1074         type = NXT_PORT_MSG_RPC_ERROR;
1075     }
1076 
1077     nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
1078                           0, out);
1079 }
1080 
1081 
1082 static nxt_int_t
nxt_main_listening_socket(nxt_sockaddr_t * sa,nxt_listening_socket_t * ls)1083 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
1084 {
1085     nxt_err_t         err;
1086     nxt_socket_t      s;
1087 
1088     const socklen_t   length = sizeof(int);
1089     static const int  enable = 1;
1090 
1091     s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1092 
1093     if (nxt_slow_path(s == -1)) {
1094         err = nxt_errno;
1095 
1096 #if (NXT_INET6)
1097 
1098         if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1099             ls->error = NXT_SOCKET_ERROR_NOINET6;
1100         }
1101 
1102 #endif
1103 
1104         ls->end = nxt_sprintf(ls->start, ls->end,
1105                               "socket(\\\"%*s\\\") failed %E",
1106                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1107 
1108         return NXT_ERROR;
1109     }
1110 
1111     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1112         ls->end = nxt_sprintf(ls->start, ls->end,
1113                               "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1114                               (size_t) sa->length, nxt_sockaddr_start(sa),
1115                               nxt_errno);
1116         goto fail;
1117     }
1118 
1119 #if (NXT_INET6)
1120 
1121     if (sa->u.sockaddr.sa_family == AF_INET6) {
1122 
1123         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1124             ls->end = nxt_sprintf(ls->start, ls->end,
1125                                "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1126                                (size_t) sa->length, nxt_sockaddr_start(sa),
1127                                nxt_errno);
1128             goto fail;
1129         }
1130     }
1131 
1132 #endif
1133 
1134     if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1135         err = nxt_errno;
1136 
1137 #if (NXT_HAVE_UNIX_DOMAIN)
1138 
1139         if (sa->u.sockaddr.sa_family == AF_UNIX) {
1140             switch (err) {
1141 
1142             case EACCES:
1143                 ls->error = NXT_SOCKET_ERROR_ACCESS;
1144                 break;
1145 
1146             case ENOENT:
1147             case ENOTDIR:
1148                 ls->error = NXT_SOCKET_ERROR_PATH;
1149                 break;
1150             }
1151 
1152         } else
1153 #endif
1154         {
1155             switch (err) {
1156 
1157             case EACCES:
1158                 ls->error = NXT_SOCKET_ERROR_PORT;
1159                 break;
1160 
1161             case EADDRINUSE:
1162                 ls->error = NXT_SOCKET_ERROR_INUSE;
1163                 break;
1164 
1165             case EADDRNOTAVAIL:
1166                 ls->error = NXT_SOCKET_ERROR_NOADDR;
1167                 break;
1168             }
1169         }
1170 
1171         ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1172                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1173         goto fail;
1174     }
1175 
1176 #if (NXT_HAVE_UNIX_DOMAIN)
1177 
1178     if (sa->u.sockaddr.sa_family == AF_UNIX
1179         && sa->u.sockaddr_un.sun_path[0] != '\0')
1180     {
1181         char     *filename;
1182         mode_t   access;
1183 
1184         filename = sa->u.sockaddr_un.sun_path;
1185         access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1186 
1187         if (chmod(filename, access) != 0) {
1188             ls->end = nxt_sprintf(ls->start, ls->end,
1189                                   "chmod(\\\"%s\\\") failed %E",
1190                                   filename, nxt_errno);
1191             goto fail;
1192         }
1193     }
1194 
1195 #endif
1196 
1197     ls->socket = s;
1198 
1199     return NXT_OK;
1200 
1201 fail:
1202 
1203     (void) close(s);
1204 
1205     return NXT_ERROR;
1206 }
1207 
1208 
1209 static nxt_conf_map_t  nxt_app_lang_module_map[] = {
1210     {
1211         nxt_string("type"),
1212         NXT_CONF_MAP_INT,
1213         offsetof(nxt_app_lang_module_t, type),
1214     },
1215 
1216     {
1217         nxt_string("version"),
1218         NXT_CONF_MAP_CSTRZ,
1219         offsetof(nxt_app_lang_module_t, version),
1220     },
1221 
1222     {
1223         nxt_string("file"),
1224         NXT_CONF_MAP_CSTRZ,
1225         offsetof(nxt_app_lang_module_t, file),
1226     },
1227 };
1228 
1229 
1230 static nxt_conf_map_t  nxt_app_lang_mounts_map[] = {
1231     {
1232         nxt_string("src"),
1233         NXT_CONF_MAP_CSTRZ,
1234         offsetof(nxt_fs_mount_t, src),
1235     },
1236     {
1237         nxt_string("dst"),
1238         NXT_CONF_MAP_CSTRZ,
1239         offsetof(nxt_fs_mount_t, dst),
1240     },
1241     {
1242         nxt_string("name"),
1243         NXT_CONF_MAP_CSTRZ,
1244         offsetof(nxt_fs_mount_t, name),
1245     },
1246     {
1247         nxt_string("type"),
1248         NXT_CONF_MAP_INT,
1249         offsetof(nxt_fs_mount_t, type),
1250     },
1251     {
1252         nxt_string("flags"),
1253         NXT_CONF_MAP_INT,
1254         offsetof(nxt_fs_mount_t, flags),
1255     },
1256     {
1257         nxt_string("data"),
1258         NXT_CONF_MAP_CSTRZ,
1259         offsetof(nxt_fs_mount_t, data),
1260     },
1261 };
1262 
1263 
1264 static void
nxt_main_port_modules_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1265 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1266 {
1267     uint32_t               index, jindex, nmounts;
1268     nxt_mp_t               *mp;
1269     nxt_int_t              ret;
1270     nxt_buf_t              *b;
1271     nxt_port_t             *port;
1272     nxt_runtime_t          *rt;
1273     nxt_fs_mount_t         *mnt;
1274     nxt_conf_value_t       *conf, *root, *value, *mounts;
1275     nxt_app_lang_module_t  *lang;
1276 
1277     static nxt_str_t root_path = nxt_string("/");
1278     static nxt_str_t mounts_name = nxt_string("mounts");
1279 
1280     rt = task->thread->runtime;
1281 
1282     if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1283         nxt_alert(task, "process %PI cannot send modules", msg->port_msg.pid);
1284         return;
1285     }
1286 
1287     if (nxt_exiting) {
1288         nxt_debug(task, "ignoring discovered modules, exiting");
1289         return;
1290     }
1291 
1292     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1293                                  msg->port_msg.reply_port);
1294 
1295     if (nxt_fast_path(port != NULL)) {
1296         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1297                                      msg->port_msg.stream, 0, NULL);
1298     }
1299 
1300     b = msg->buf;
1301 
1302     if (b == NULL) {
1303         return;
1304     }
1305 
1306     mp = nxt_mp_create(1024, 128, 256, 32);
1307     if (mp == NULL) {
1308         return;
1309     }
1310 
1311     b = nxt_buf_chk_make_plain(mp, b, msg->size);
1312 
1313     if (b == NULL) {
1314         return;
1315     }
1316 
1317     nxt_debug(task, "application languages: \"%*s\"",
1318               b->mem.free - b->mem.pos, b->mem.pos);
1319 
1320     conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1321     if (conf == NULL) {
1322         goto fail;
1323     }
1324 
1325     root = nxt_conf_get_path(conf, &root_path);
1326     if (root == NULL) {
1327         goto fail;
1328     }
1329 
1330     for (index = 0; /* void */ ; index++) {
1331         value = nxt_conf_get_array_element(root, index);
1332         if (value == NULL) {
1333             break;
1334         }
1335 
1336         lang = nxt_array_zero_add(rt->languages);
1337         if (lang == NULL) {
1338             goto fail;
1339         }
1340 
1341         lang->module = NULL;
1342 
1343         ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1344                                   nxt_nitems(nxt_app_lang_module_map), lang);
1345 
1346         if (ret != NXT_OK) {
1347             goto fail;
1348         }
1349 
1350         mounts = nxt_conf_get_object_member(value, &mounts_name, NULL);
1351         if (mounts == NULL) {
1352             nxt_alert(task, "missing mounts from discovery message.");
1353             goto fail;
1354         }
1355 
1356         if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) {
1357             nxt_alert(task, "invalid mounts type from discovery message.");
1358             goto fail;
1359         }
1360 
1361         nmounts = nxt_conf_array_elements_count(mounts);
1362 
1363         lang->mounts = nxt_array_create(rt->mem_pool, nmounts,
1364                                         sizeof(nxt_fs_mount_t));
1365 
1366         if (lang->mounts == NULL) {
1367             goto fail;
1368         }
1369 
1370         for (jindex = 0; /* */; jindex++) {
1371             value = nxt_conf_get_array_element(mounts, jindex);
1372             if (value == NULL) {
1373                 break;
1374             }
1375 
1376             mnt = nxt_array_zero_add(lang->mounts);
1377             if (mnt == NULL) {
1378                 goto fail;
1379             }
1380 
1381             mnt->builtin = 1;
1382             mnt->deps = 1;
1383 
1384             ret = nxt_conf_map_object(rt->mem_pool, value,
1385                                       nxt_app_lang_mounts_map,
1386                                       nxt_nitems(nxt_app_lang_mounts_map), mnt);
1387 
1388             if (ret != NXT_OK) {
1389                 goto fail;
1390             }
1391         }
1392 
1393         nxt_debug(task, "lang %d %s \"%s\" (%d mounts)",
1394                   lang->type, lang->version, lang->file, lang->mounts->nelts);
1395     }
1396 
1397     qsort(rt->languages->elts, rt->languages->nelts,
1398           sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1399 
1400 fail:
1401 
1402     nxt_mp_destroy(mp);
1403 
1404     ret = nxt_process_init_start(task, nxt_controller_process);
1405     if (ret == NXT_OK) {
1406         ret = nxt_process_init_start(task, nxt_router_process);
1407     }
1408 
1409     if (nxt_slow_path(ret == NXT_ERROR)) {
1410         nxt_exiting = 1;
1411 
1412         nxt_runtime_quit(task, 1);
1413     }
1414 }
1415 
1416 
1417 static int nxt_cdecl
nxt_app_lang_compare(const void * v1,const void * v2)1418 nxt_app_lang_compare(const void *v1, const void *v2)
1419 {
1420     int                          n;
1421     const nxt_app_lang_module_t  *lang1, *lang2;
1422 
1423     lang1 = v1;
1424     lang2 = v2;
1425 
1426     n = lang1->type - lang2->type;
1427 
1428     if (n != 0) {
1429         return n;
1430     }
1431 
1432     n = nxt_strverscmp(lang1->version, lang2->version);
1433 
1434     /* Negate result to move higher versions to the beginning. */
1435 
1436     return -n;
1437 }
1438 
1439 
1440 static void
nxt_main_port_conf_store_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1441 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1442 {
1443     void           *p;
1444     size_t         n, size;
1445     nxt_int_t      ret;
1446     nxt_port_t     *ctl_port;
1447     nxt_runtime_t  *rt;
1448     u_char         ver[NXT_INT_T_LEN];
1449 
1450     rt = task->thread->runtime;
1451 
1452     ctl_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
1453 
1454     if (nxt_slow_path(msg->port_msg.pid != ctl_port->pid)) {
1455         nxt_alert(task, "process %PI cannot store conf", msg->port_msg.pid);
1456         return;
1457     }
1458 
1459     p = MAP_FAILED;
1460 
1461     /*
1462      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
1463      * initialized in 'cleanup' section.
1464      */
1465     size = 0;
1466 
1467     if (nxt_slow_path(msg->fd[0] == -1)) {
1468         nxt_alert(task, "conf_store_handler: invalid shm fd");
1469         goto error;
1470     }
1471 
1472     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
1473         nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)",
1474                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
1475         goto error;
1476     }
1477 
1478     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
1479 
1480     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
1481 
1482     nxt_fd_close(msg->fd[0]);
1483     msg->fd[0] = -1;
1484 
1485     if (nxt_slow_path(p == MAP_FAILED)) {
1486         goto error;
1487     }
1488 
1489     nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p);
1490 
1491     if (nxt_conf_ver != NXT_VERNUM) {
1492         n = nxt_sprintf(ver, ver + NXT_INT_T_LEN, "%d", NXT_VERNUM) - ver;
1493 
1494         ret = nxt_main_file_store(task, rt->ver_tmp, rt->ver, ver, n);
1495         if (nxt_slow_path(ret != NXT_OK)) {
1496             goto error;
1497         }
1498 
1499         nxt_conf_ver = NXT_VERNUM;
1500     }
1501 
1502     ret = nxt_main_file_store(task, rt->conf_tmp, rt->conf, p, size);
1503 
1504     if (nxt_fast_path(ret == NXT_OK)) {
1505         goto cleanup;
1506     }
1507 
1508 error:
1509 
1510     nxt_alert(task, "failed to store current configuration");
1511 
1512 cleanup:
1513 
1514     if (p != MAP_FAILED) {
1515         nxt_mem_munmap(p, size);
1516     }
1517 
1518     if (msg->fd[0] != -1) {
1519         nxt_fd_close(msg->fd[0]);
1520         msg->fd[0] = -1;
1521     }
1522 }
1523 
1524 
1525 static nxt_int_t
nxt_main_file_store(nxt_task_t * task,const char * tmp_name,const char * name,u_char * buf,size_t size)1526 nxt_main_file_store(nxt_task_t *task, const char *tmp_name, const char *name,
1527     u_char *buf, size_t size)
1528 {
1529     ssize_t     n;
1530     nxt_int_t   ret;
1531     nxt_file_t  file;
1532 
1533     nxt_memzero(&file, sizeof(nxt_file_t));
1534 
1535     file.name = (nxt_file_name_t *) name;
1536 
1537     ret = nxt_file_open(task, &file, NXT_FILE_WRONLY, NXT_FILE_TRUNCATE,
1538                         NXT_FILE_OWNER_ACCESS);
1539     if (nxt_slow_path(ret != NXT_OK)) {
1540         return NXT_ERROR;
1541     }
1542 
1543     n = nxt_file_write(&file, buf, size, 0);
1544 
1545     nxt_file_close(task, &file);
1546 
1547     if (nxt_slow_path(n != (ssize_t) size)) {
1548         (void) nxt_file_delete(file.name);
1549         return NXT_ERROR;
1550     }
1551 
1552     return nxt_file_rename(file.name, (nxt_file_name_t *) name);
1553 }
1554 
1555 
1556 static void
nxt_main_port_access_log_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1557 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1558 {
1559     u_char               *path;
1560     nxt_int_t            ret;
1561     nxt_file_t           file;
1562     nxt_port_t           *port;
1563     nxt_port_msg_type_t  type;
1564 
1565     nxt_debug(task, "opening access log file");
1566 
1567     path = msg->buf->mem.pos;
1568 
1569     nxt_memzero(&file, sizeof(nxt_file_t));
1570 
1571     file.name = (nxt_file_name_t *) path;
1572     file.log_level = NXT_LOG_ERR;
1573 
1574     ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1575                         NXT_FILE_OWNER_ACCESS);
1576 
1577     type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1578                            : NXT_PORT_MSG_RPC_ERROR;
1579 
1580     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1581                                  msg->port_msg.reply_port);
1582 
1583     if (nxt_fast_path(port != NULL)) {
1584         (void) nxt_port_socket_write(task, port, type, file.fd,
1585                                      msg->port_msg.stream, 0, NULL);
1586     }
1587 }
1588