1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_router.h>
13 #include <nxt_port_queue.h>
14 #if (NXT_TLS)
15 #include <nxt_cert.h>
16 #endif
17 #if (NXT_HAVE_NJS)
18 #include <nxt_script.h>
19 #endif
20
21 #include <sys/mount.h>
22
23
24 typedef struct {
25 nxt_socket_t socket;
26 nxt_socket_error_t error;
27 u_char *start;
28 u_char *end;
29 } nxt_listening_socket_t;
30
31
32 typedef struct {
33 nxt_uint_t size;
34 nxt_conf_map_t *map;
35 } nxt_conf_app_map_t;
36
37
38 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
39 nxt_runtime_t *rt);
40 static void nxt_main_process_title(nxt_task_t *task);
41 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
42 void *data);
43 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
44 void *data);
45 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
46 void *data);
47 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
48 void *data);
49 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
50 void *data);
51 static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process);
52 static void nxt_main_port_socket_handler(nxt_task_t *task,
53 nxt_port_recv_msg_t *msg);
54 static void nxt_main_port_socket_unlink_handler(nxt_task_t *task,
55 nxt_port_recv_msg_t *msg);
56 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
57 nxt_listening_socket_t *ls);
58 static void nxt_main_port_modules_handler(nxt_task_t *task,
59 nxt_port_recv_msg_t *msg);
60 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
61 static void nxt_main_process_whoami_handler(nxt_task_t *task,
62 nxt_port_recv_msg_t *msg);
63 static void nxt_main_port_conf_store_handler(nxt_task_t *task,
64 nxt_port_recv_msg_t *msg);
65 static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name,
66 const char *name, u_char *buf, size_t size);
67 static void nxt_main_port_access_log_handler(nxt_task_t *task,
68 nxt_port_recv_msg_t *msg);
69
70 const nxt_sig_event_t nxt_main_process_signals[] = {
71 nxt_event_signal(SIGHUP, nxt_main_process_signal_handler),
72 nxt_event_signal(SIGINT, nxt_main_process_sigterm_handler),
73 nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
74 nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
75 nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
76 nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
77 nxt_event_signal_end,
78 };
79
80
81 nxt_uint_t nxt_conf_ver;
82
83 static nxt_bool_t nxt_exiting;
84
85
86 nxt_int_t
nxt_main_process_start(nxt_thread_t * thr,nxt_task_t * task,nxt_runtime_t * rt)87 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
88 nxt_runtime_t *rt)
89 {
90 rt->type = NXT_PROCESS_MAIN;
91
92 if (nxt_main_process_port_create(task, rt) != NXT_OK) {
93 return NXT_ERROR;
94 }
95
96 nxt_main_process_title(task);
97
98 /*
99 * The discovery process will send a message processed by
100 * nxt_main_port_modules_handler() which starts the controller
101 * and router processes.
102 */
103 return nxt_process_init_start(task, nxt_discovery_process);
104 }
105
106
107 static nxt_conf_map_t nxt_common_app_conf[] = {
108 {
109 nxt_string("type"),
110 NXT_CONF_MAP_STR,
111 offsetof(nxt_common_app_conf_t, type),
112 },
113
114 {
115 nxt_string("user"),
116 NXT_CONF_MAP_STR,
117 offsetof(nxt_common_app_conf_t, user),
118 },
119
120 {
121 nxt_string("group"),
122 NXT_CONF_MAP_STR,
123 offsetof(nxt_common_app_conf_t, group),
124 },
125
126 {
127 nxt_string("stdout"),
128 NXT_CONF_MAP_CSTRZ,
129 offsetof(nxt_common_app_conf_t, stdout_log),
130 },
131
132 {
133 nxt_string("stderr"),
134 NXT_CONF_MAP_CSTRZ,
135 offsetof(nxt_common_app_conf_t, stderr_log),
136 },
137
138 {
139 nxt_string("working_directory"),
140 NXT_CONF_MAP_CSTRZ,
141 offsetof(nxt_common_app_conf_t, working_directory),
142 },
143
144 {
145 nxt_string("environment"),
146 NXT_CONF_MAP_PTR,
147 offsetof(nxt_common_app_conf_t, environment),
148 },
149
150 {
151 nxt_string("isolation"),
152 NXT_CONF_MAP_PTR,
153 offsetof(nxt_common_app_conf_t, isolation),
154 },
155
156 {
157 nxt_string("limits"),
158 NXT_CONF_MAP_PTR,
159 offsetof(nxt_common_app_conf_t, limits),
160 },
161
162 };
163
164
165 static nxt_conf_map_t nxt_common_app_limits_conf[] = {
166 {
167 nxt_string("shm"),
168 NXT_CONF_MAP_SIZE,
169 offsetof(nxt_common_app_conf_t, shm_limit),
170 },
171
172 {
173 nxt_string("requests"),
174 NXT_CONF_MAP_INT32,
175 offsetof(nxt_common_app_conf_t, request_limit),
176 },
177
178 };
179
180
181 static nxt_conf_map_t nxt_external_app_conf[] = {
182 {
183 nxt_string("executable"),
184 NXT_CONF_MAP_CSTRZ,
185 offsetof(nxt_common_app_conf_t, u.external.executable),
186 },
187
188 {
189 nxt_string("arguments"),
190 NXT_CONF_MAP_PTR,
191 offsetof(nxt_common_app_conf_t, u.external.arguments),
192 },
193
194 };
195
196
197 static nxt_conf_map_t nxt_python_app_conf[] = {
198 {
199 nxt_string("home"),
200 NXT_CONF_MAP_CSTRZ,
201 offsetof(nxt_common_app_conf_t, u.python.home),
202 },
203
204 {
205 nxt_string("path"),
206 NXT_CONF_MAP_PTR,
207 offsetof(nxt_common_app_conf_t, u.python.path),
208 },
209
210 {
211 nxt_string("protocol"),
212 NXT_CONF_MAP_STR,
213 offsetof(nxt_common_app_conf_t, u.python.protocol),
214 },
215
216 {
217 nxt_string("threads"),
218 NXT_CONF_MAP_INT32,
219 offsetof(nxt_common_app_conf_t, u.python.threads),
220 },
221
222 {
223 nxt_string("targets"),
224 NXT_CONF_MAP_PTR,
225 offsetof(nxt_common_app_conf_t, u.python.targets),
226 },
227
228 {
229 nxt_string("thread_stack_size"),
230 NXT_CONF_MAP_INT32,
231 offsetof(nxt_common_app_conf_t, u.python.thread_stack_size),
232 },
233 };
234
235
236 static nxt_conf_map_t nxt_php_app_conf[] = {
237 {
238 nxt_string("targets"),
239 NXT_CONF_MAP_PTR,
240 offsetof(nxt_common_app_conf_t, u.php.targets),
241 },
242
243 {
244 nxt_string("options"),
245 NXT_CONF_MAP_PTR,
246 offsetof(nxt_common_app_conf_t, u.php.options),
247 },
248 };
249
250
251 static nxt_conf_map_t nxt_perl_app_conf[] = {
252 {
253 nxt_string("script"),
254 NXT_CONF_MAP_CSTRZ,
255 offsetof(nxt_common_app_conf_t, u.perl.script),
256 },
257
258 {
259 nxt_string("threads"),
260 NXT_CONF_MAP_INT32,
261 offsetof(nxt_common_app_conf_t, u.perl.threads),
262 },
263
264 {
265 nxt_string("thread_stack_size"),
266 NXT_CONF_MAP_INT32,
267 offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size),
268 },
269 };
270
271
272 static nxt_conf_map_t nxt_ruby_app_conf[] = {
273 {
274 nxt_string("script"),
275 NXT_CONF_MAP_STR,
276 offsetof(nxt_common_app_conf_t, u.ruby.script),
277 },
278 {
279 nxt_string("threads"),
280 NXT_CONF_MAP_INT32,
281 offsetof(nxt_common_app_conf_t, u.ruby.threads),
282 },
283 {
284 nxt_string("hooks"),
285 NXT_CONF_MAP_STR,
286 offsetof(nxt_common_app_conf_t, u.ruby.hooks),
287 }
288 };
289
290
291 static nxt_conf_map_t nxt_java_app_conf[] = {
292 {
293 nxt_string("classpath"),
294 NXT_CONF_MAP_PTR,
295 offsetof(nxt_common_app_conf_t, u.java.classpath),
296 },
297 {
298 nxt_string("webapp"),
299 NXT_CONF_MAP_CSTRZ,
300 offsetof(nxt_common_app_conf_t, u.java.webapp),
301 },
302 {
303 nxt_string("options"),
304 NXT_CONF_MAP_PTR,
305 offsetof(nxt_common_app_conf_t, u.java.options),
306 },
307 {
308 nxt_string("unit_jars"),
309 NXT_CONF_MAP_CSTRZ,
310 offsetof(nxt_common_app_conf_t, u.java.unit_jars),
311 },
312 {
313 nxt_string("threads"),
314 NXT_CONF_MAP_INT32,
315 offsetof(nxt_common_app_conf_t, u.java.threads),
316 },
317 {
318 nxt_string("thread_stack_size"),
319 NXT_CONF_MAP_INT32,
320 offsetof(nxt_common_app_conf_t, u.java.thread_stack_size),
321 },
322
323 };
324
325
326 static nxt_conf_map_t nxt_wasm_app_conf[] = {
327 {
328 nxt_string("module"),
329 NXT_CONF_MAP_CSTRZ,
330 offsetof(nxt_common_app_conf_t, u.wasm.module),
331 },
332 {
333 nxt_string("request_handler"),
334 NXT_CONF_MAP_CSTRZ,
335 offsetof(nxt_common_app_conf_t, u.wasm.request_handler),
336 },
337 {
338 nxt_string("malloc_handler"),
339 NXT_CONF_MAP_CSTRZ,
340 offsetof(nxt_common_app_conf_t, u.wasm.malloc_handler),
341 },
342 {
343 nxt_string("free_handler"),
344 NXT_CONF_MAP_CSTRZ,
345 offsetof(nxt_common_app_conf_t, u.wasm.free_handler),
346 },
347 {
348 nxt_string("module_init_handler"),
349 NXT_CONF_MAP_CSTRZ,
350 offsetof(nxt_common_app_conf_t, u.wasm.module_init_handler),
351 },
352 {
353 nxt_string("module_end_handler"),
354 NXT_CONF_MAP_CSTRZ,
355 offsetof(nxt_common_app_conf_t, u.wasm.module_end_handler),
356 },
357 {
358 nxt_string("request_init_handler"),
359 NXT_CONF_MAP_CSTRZ,
360 offsetof(nxt_common_app_conf_t, u.wasm.request_init_handler),
361 },
362 {
363 nxt_string("request_end_handler"),
364 NXT_CONF_MAP_CSTRZ,
365 offsetof(nxt_common_app_conf_t, u.wasm.request_end_handler),
366 },
367 {
368 nxt_string("response_end_handler"),
369 NXT_CONF_MAP_CSTRZ,
370 offsetof(nxt_common_app_conf_t, u.wasm.response_end_handler),
371 },
372 {
373 nxt_string("access"),
374 NXT_CONF_MAP_PTR,
375 offsetof(nxt_common_app_conf_t, u.wasm.access),
376 },
377 };
378
379
380 static nxt_conf_map_t nxt_wasm_wc_app_conf[] = {
381 {
382 nxt_string("component"),
383 NXT_CONF_MAP_CSTRZ,
384 offsetof(nxt_common_app_conf_t, u.wasm_wc.component),
385 },
386 {
387 nxt_string("access"),
388 NXT_CONF_MAP_PTR,
389 offsetof(nxt_common_app_conf_t, u.wasm_wc.access),
390 },
391 };
392
393
394 static nxt_conf_app_map_t nxt_app_maps[] = {
395 { nxt_nitems(nxt_external_app_conf), nxt_external_app_conf },
396 { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf },
397 { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf },
398 { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf },
399 { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf },
400 { nxt_nitems(nxt_java_app_conf), nxt_java_app_conf },
401 { nxt_nitems(nxt_wasm_app_conf), nxt_wasm_app_conf },
402 { nxt_nitems(nxt_wasm_wc_app_conf), nxt_wasm_wc_app_conf },
403 };
404
405
406 static void
nxt_main_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)407 nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
408 {
409 nxt_debug(task, "main data: %*s",
410 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
411 }
412
413
414 static void
nxt_main_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)415 nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
416 {
417 void *mem;
418 nxt_port_t *port;
419
420 nxt_port_new_port_handler(task, msg);
421
422 port = msg->u.new_port;
423
424 if (port != NULL
425 && port->type == NXT_PROCESS_APP
426 && msg->fd[1] != -1)
427 {
428 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
429 PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0);
430 if (nxt_fast_path(mem != MAP_FAILED)) {
431 port->queue = mem;
432 }
433
434 nxt_fd_close(msg->fd[1]);
435 msg->fd[1] = -1;
436 }
437 }
438
439
440 static void
nxt_main_start_process_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)441 nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
442 {
443 u_char *start, *p, ch;
444 size_t type_len;
445 nxt_int_t ret;
446 nxt_buf_t *b;
447 nxt_port_t *port;
448 nxt_runtime_t *rt;
449 nxt_process_t *process;
450 nxt_app_type_t idx;
451 nxt_conf_value_t *conf;
452 nxt_process_init_t *init;
453 nxt_common_app_conf_t *app_conf;
454
455 rt = task->thread->runtime;
456
457 port = rt->port_by_type[NXT_PROCESS_ROUTER];
458 if (nxt_slow_path(port == NULL)) {
459 nxt_alert(task, "router port not found");
460 goto close_fds;
461 }
462
463 if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) {
464 nxt_alert(task, "process %PI cannot start processes",
465 nxt_recv_msg_cmsg_pid(msg));
466
467 goto close_fds;
468 }
469
470 process = nxt_process_new(rt);
471 if (nxt_slow_path(process == NULL)) {
472 goto close_fds;
473 }
474
475 process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
476 if (process->mem_pool == NULL) {
477 nxt_process_use(task, process, -1);
478 goto close_fds;
479 }
480
481 process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN];
482
483 init = nxt_process_init(process);
484
485 *init = nxt_proto_process;
486
487 b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
488 if (b == NULL) {
489 goto failed;
490 }
491
492 nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos,
493 b->mem.pos);
494
495 app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
496 if (nxt_slow_path(app_conf == NULL)) {
497 goto failed;
498 }
499
500 app_conf->shared_port_fd = msg->fd[0];
501 app_conf->shared_queue_fd = msg->fd[1];
502
503 start = b->mem.pos;
504
505 app_conf->name.start = start;
506 app_conf->name.length = nxt_strlen(start);
507
508 init->name = (const char *) start;
509
510 process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
511 + sizeof("\"\" prototype") + 1);
512
513 if (nxt_slow_path(process->name == NULL)) {
514 goto failed;
515 }
516
517 p = (u_char *) process->name;
518 *p++ = '"';
519 p = nxt_cpymem(p, init->name, app_conf->name.length);
520 p = nxt_cpymem(p, "\" prototype", 11);
521 *p = '\0';
522
523 app_conf->shm_limit = 100 * 1024 * 1024;
524 app_conf->request_limit = 0;
525
526 start += app_conf->name.length + 1;
527
528 conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL);
529 if (conf == NULL) {
530 nxt_alert(task, "router app configuration parsing error");
531
532 goto failed;
533 }
534
535 rt = task->thread->runtime;
536
537 app_conf->user.start = (u_char*)rt->user_cred.user;
538 app_conf->user.length = nxt_strlen(rt->user_cred.user);
539
540 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf,
541 nxt_nitems(nxt_common_app_conf), app_conf);
542
543 if (ret != NXT_OK) {
544 nxt_alert(task, "failed to map common app conf received from router");
545 goto failed;
546 }
547
548 for (type_len = 0; type_len != app_conf->type.length; type_len++) {
549 ch = app_conf->type.start[type_len];
550
551 if (ch == ' ' || nxt_isdigit(ch)) {
552 break;
553 }
554 }
555
556 idx = nxt_app_parse_type(app_conf->type.start, type_len);
557
558 if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
559 nxt_alert(task, "invalid app type %d received from router", (int) idx);
560 goto failed;
561 }
562
563 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map,
564 nxt_app_maps[idx].size, app_conf);
565
566 if (nxt_slow_path(ret != NXT_OK)) {
567 nxt_alert(task, "failed to map app conf received from router");
568 goto failed;
569 }
570
571 if (app_conf->limits != NULL) {
572 ret = nxt_conf_map_object(process->mem_pool, app_conf->limits,
573 nxt_common_app_limits_conf,
574 nxt_nitems(nxt_common_app_limits_conf),
575 app_conf);
576
577 if (nxt_slow_path(ret != NXT_OK)) {
578 nxt_alert(task, "failed to map app limits received from router");
579 goto failed;
580 }
581 }
582
583 app_conf->self = conf;
584
585 process->stream = msg->port_msg.stream;
586 process->data.app = app_conf;
587
588 ret = nxt_process_start(task, process);
589 if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
590
591 /* Close shared port fds only in main process. */
592 if (ret == NXT_OK) {
593 nxt_fd_close(app_conf->shared_port_fd);
594 nxt_fd_close(app_conf->shared_queue_fd);
595 }
596
597 /* Avoid fds close in caller. */
598 msg->fd[0] = -1;
599 msg->fd[1] = -1;
600
601 return;
602 }
603
604 failed:
605
606 nxt_process_use(task, process, -1);
607
608 port = nxt_runtime_port_find(rt, msg->port_msg.pid,
609 msg->port_msg.reply_port);
610
611 if (nxt_fast_path(port != NULL)) {
612 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
613 -1, msg->port_msg.stream, 0, NULL);
614 }
615
616 close_fds:
617
618 nxt_fd_close(msg->fd[0]);
619 msg->fd[0] = -1;
620
621 nxt_fd_close(msg->fd[1]);
622 msg->fd[1] = -1;
623 }
624
625
626 static void
nxt_main_process_created_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)627 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
628 {
629 nxt_port_t *port;
630 nxt_process_t *process;
631 nxt_runtime_t *rt;
632
633 rt = task->thread->runtime;
634
635 port = nxt_runtime_port_find(rt, msg->port_msg.pid,
636 msg->port_msg.reply_port);
637 if (nxt_slow_path(port == NULL)) {
638 return;
639 }
640
641 process = port->process;
642
643 nxt_assert(process != NULL);
644 nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
645
646 #if (NXT_HAVE_LINUX_NS && NXT_HAVE_CLONE_NEWUSER)
647 if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
648 if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
649 process->user_cred,
650 &process->isolation.clone)
651 != NXT_OK))
652 {
653 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
654 -1, msg->port_msg.stream, 0, NULL);
655 return;
656 }
657 }
658
659 #endif
660
661 process->state = NXT_PROCESS_STATE_CREATED;
662
663 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST,
664 -1, msg->port_msg.stream, 0, NULL);
665 }
666
667
668 static nxt_port_handlers_t nxt_main_process_port_handlers = {
669 .data = nxt_main_data_handler,
670 .new_port = nxt_main_new_port_handler,
671 .process_created = nxt_main_process_created_handler,
672 .process_ready = nxt_port_process_ready_handler,
673 .whoami = nxt_main_process_whoami_handler,
674 .remove_pid = nxt_port_remove_pid_handler,
675 .start_process = nxt_main_start_process_handler,
676 .socket = nxt_main_port_socket_handler,
677 .socket_unlink = nxt_main_port_socket_unlink_handler,
678 .modules = nxt_main_port_modules_handler,
679 .conf_store = nxt_main_port_conf_store_handler,
680 #if (NXT_TLS)
681 .cert_get = nxt_cert_store_get_handler,
682 .cert_delete = nxt_cert_store_delete_handler,
683 #endif
684 #if (NXT_HAVE_NJS)
685 .script_get = nxt_script_store_get_handler,
686 .script_delete = nxt_script_store_delete_handler,
687 #endif
688 .access_log = nxt_main_port_access_log_handler,
689 .rpc_ready = nxt_port_rpc_handler,
690 .rpc_error = nxt_port_rpc_handler,
691 };
692
693
694 static void
nxt_main_process_whoami_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)695 nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
696 {
697 nxt_buf_t *buf;
698 nxt_pid_t pid, ppid;
699 nxt_port_t *port;
700 nxt_runtime_t *rt;
701 nxt_process_t *pprocess;
702
703 nxt_assert(msg->port_msg.reply_port == 0);
704
705 if (nxt_slow_path(msg->buf == NULL
706 || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t)))
707 {
708 nxt_alert(task, "whoami: buffer is NULL or unexpected size");
709 goto fail;
710 }
711
712 nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t));
713
714 rt = task->thread->runtime;
715
716 pprocess = nxt_runtime_process_find(rt, ppid);
717 if (nxt_slow_path(pprocess == NULL)) {
718 nxt_alert(task, "whoami: parent process %PI not found", ppid);
719 goto fail;
720 }
721
722 pid = nxt_recv_msg_cmsg_pid(msg);
723
724 nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid,
725 msg->fd[0]);
726
727 if (msg->fd[0] != -1) {
728 port = nxt_runtime_process_port_create(task, rt, pid, 0,
729 NXT_PROCESS_APP);
730 if (nxt_slow_path(port == NULL)) {
731 goto fail;
732 }
733
734 nxt_fd_nonblocking(task, msg->fd[0]);
735
736 port->pair[0] = -1;
737 port->pair[1] = msg->fd[0];
738 msg->fd[0] = -1;
739
740 port->max_size = 16 * 1024;
741 port->max_share = 64 * 1024;
742 port->socket.task = task;
743
744 nxt_port_write_enable(task, port);
745
746 } else {
747 port = nxt_runtime_port_find(rt, pid, 0);
748 if (nxt_slow_path(port == NULL)) {
749 goto fail;
750 }
751 }
752
753 if (ppid != nxt_pid) {
754 nxt_queue_insert_tail(&pprocess->children, &port->process->link);
755 }
756
757 buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
758 sizeof(nxt_pid_t), 0);
759 if (nxt_slow_path(buf == NULL)) {
760 goto fail;
761 }
762
763 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t));
764
765 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1,
766 msg->port_msg.stream, 0, buf);
767
768 fail:
769
770 if (msg->fd[0] != -1) {
771 nxt_fd_close(msg->fd[0]);
772 }
773 }
774
775
776 static nxt_int_t
nxt_main_process_port_create(nxt_task_t * task,nxt_runtime_t * rt)777 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
778 {
779 nxt_int_t ret;
780 nxt_port_t *port;
781 nxt_process_t *process;
782
783 port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
784 NXT_PROCESS_MAIN);
785 if (nxt_slow_path(port == NULL)) {
786 return NXT_ERROR;
787 }
788
789 process = port->process;
790
791 ret = nxt_port_socket_init(task, port, 0);
792 if (nxt_slow_path(ret != NXT_OK)) {
793 nxt_port_use(task, port, -1);
794 return ret;
795 }
796
797 /*
798 * A main process port. A write port is not closed
799 * since it should be inherited by processes.
800 */
801 nxt_port_enable(task, port, &nxt_main_process_port_handlers);
802
803 process->state = NXT_PROCESS_STATE_READY;
804
805 return NXT_OK;
806 }
807
808
809 static void
nxt_main_process_title(nxt_task_t * task)810 nxt_main_process_title(nxt_task_t *task)
811 {
812 u_char *p, *end;
813 nxt_uint_t i;
814 u_char title[2048];
815
816 end = title + sizeof(title) - 1;
817
818 p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
819 nxt_process_argv[0]);
820
821 for (i = 1; nxt_process_argv[i] != NULL; i++) {
822 p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
823 }
824
825 if (p < end) {
826 *p++ = ']';
827 }
828
829 *p = '\0';
830
831 nxt_process_title(task, "%s", title);
832 }
833
834
835 static void
nxt_main_process_sigterm_handler(nxt_task_t * task,void * obj,void * data)836 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
837 {
838 nxt_debug(task, "sigterm handler signo:%d (%s)",
839 (int) (uintptr_t) obj, data);
840
841 /* TODO: fast exit. */
842
843 nxt_exiting = 1;
844
845 nxt_runtime_quit(task, 0);
846 }
847
848
849 static void
nxt_main_process_sigquit_handler(nxt_task_t * task,void * obj,void * data)850 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
851 {
852 nxt_debug(task, "sigquit handler signo:%d (%s)",
853 (int) (uintptr_t) obj, data);
854
855 /* TODO: graceful exit. */
856
857 nxt_exiting = 1;
858
859 nxt_runtime_quit(task, 0);
860 }
861
862
863 static void
nxt_main_process_sigusr1_handler(nxt_task_t * task,void * obj,void * data)864 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
865 {
866 nxt_mp_t *mp;
867 nxt_int_t ret;
868 nxt_uint_t n;
869 nxt_port_t *port;
870 nxt_file_t *file, *new_file;
871 nxt_array_t *new_files;
872 nxt_runtime_t *rt;
873
874 nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) received, %s",
875 (int) (uintptr_t) obj, data, "log files rotation");
876
877 rt = task->thread->runtime;
878
879 port = rt->port_by_type[NXT_PROCESS_ROUTER];
880
881 if (nxt_fast_path(port != NULL)) {
882 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
883 -1, 0, 0, NULL);
884 }
885
886 mp = nxt_mp_create(1024, 128, 256, 32);
887 if (mp == NULL) {
888 return;
889 }
890
891 n = nxt_list_nelts(rt->log_files);
892
893 new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
894 if (new_files == NULL) {
895 nxt_mp_destroy(mp);
896 return;
897 }
898
899 nxt_list_each(file, rt->log_files) {
900
901 /* This allocation cannot fail. */
902 new_file = nxt_array_add(new_files);
903
904 new_file->name = file->name;
905 new_file->fd = NXT_FILE_INVALID;
906 new_file->log_level = NXT_LOG_ALERT;
907
908 ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
909 NXT_FILE_OWNER_ACCESS);
910
911 if (ret != NXT_OK) {
912 goto fail;
913 }
914
915 } nxt_list_loop;
916
917 new_file = new_files->elts;
918
919 ret = nxt_file_stderr(&new_file[0]);
920
921 if (ret == NXT_OK) {
922 n = 0;
923
924 nxt_list_each(file, rt->log_files) {
925
926 nxt_port_change_log_file(task, rt, n, new_file[n].fd);
927 /*
928 * The old log file descriptor must be closed at the moment
929 * when no other threads use it. dup2() allows to use the
930 * old file descriptor for new log file. This change is
931 * performed atomically in the kernel.
932 */
933 (void) nxt_file_redirect(file, new_file[n].fd);
934
935 n++;
936
937 } nxt_list_loop;
938
939 nxt_mp_destroy(mp);
940 return;
941 }
942
943 fail:
944
945 new_file = new_files->elts;
946 n = new_files->nelts;
947
948 while (n != 0) {
949 if (new_file->fd != NXT_FILE_INVALID) {
950 nxt_file_close(task, new_file);
951 }
952
953 new_file++;
954 n--;
955 }
956
957 nxt_mp_destroy(mp);
958 }
959
960
961 static void
nxt_main_process_sigchld_handler(nxt_task_t * task,void * obj,void * data)962 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
963 {
964 int status;
965 nxt_int_t ret;
966 nxt_err_t err;
967 nxt_pid_t pid;
968 nxt_port_t *port;
969 nxt_queue_t children;
970 nxt_runtime_t *rt;
971 nxt_process_t *process, *child;
972 nxt_process_init_t init;
973
974 nxt_debug(task, "sigchld handler signo:%d (%s)",
975 (int) (uintptr_t) obj, data);
976
977 rt = task->thread->runtime;
978
979 for ( ;; ) {
980 pid = waitpid(-1, &status, WNOHANG);
981
982 if (pid == -1) {
983
984 switch (err = nxt_errno) {
985
986 case NXT_ECHILD:
987 return;
988
989 case NXT_EINTR:
990 continue;
991
992 default:
993 nxt_alert(task, "waitpid() failed: %E", err);
994 return;
995 }
996 }
997
998 nxt_debug(task, "waitpid(): %PI", pid);
999
1000 if (pid == 0) {
1001 return;
1002 }
1003
1004 if (WTERMSIG(status)) {
1005 #ifdef WCOREDUMP
1006 nxt_alert(task, "process %PI exited on signal %d%s",
1007 pid, WTERMSIG(status),
1008 WCOREDUMP(status) ? " (core dumped)" : "");
1009 #else
1010 nxt_alert(task, "process %PI exited on signal %d",
1011 pid, WTERMSIG(status));
1012 #endif
1013
1014 } else {
1015 nxt_trace(task, "process %PI exited with code %d",
1016 pid, WEXITSTATUS(status));
1017 }
1018
1019 process = nxt_runtime_process_find(rt, pid);
1020
1021 if (process != NULL) {
1022 nxt_main_process_cleanup(task, process);
1023
1024 if (process->state == NXT_PROCESS_STATE_READY) {
1025 process->stream = 0;
1026 }
1027
1028 nxt_queue_init(&children);
1029
1030 if (!nxt_queue_is_empty(&process->children)) {
1031 nxt_queue_add(&children, &process->children);
1032
1033 nxt_queue_init(&process->children);
1034
1035 nxt_queue_each(child, &children, nxt_process_t, link) {
1036 port = nxt_process_port_first(child);
1037
1038 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
1039 -1, 0, 0, NULL);
1040 } nxt_queue_loop;
1041 }
1042
1043 if (nxt_exiting) {
1044 nxt_process_close_ports(task, process);
1045
1046 nxt_queue_each(child, &children, nxt_process_t, link) {
1047 nxt_queue_remove(&child->link);
1048 child->link.next = NULL;
1049
1050 nxt_process_close_ports(task, child);
1051 } nxt_queue_loop;
1052
1053 if (rt->nprocesses <= 1) {
1054 nxt_runtime_quit(task, 0);
1055
1056 return;
1057 }
1058
1059 continue;
1060 }
1061
1062 nxt_port_remove_notify_others(task, process);
1063
1064 nxt_queue_each(child, &children, nxt_process_t, link) {
1065 nxt_port_remove_notify_others(task, child);
1066
1067 nxt_queue_remove(&child->link);
1068 child->link.next = NULL;
1069
1070 nxt_process_close_ports(task, child);
1071 } nxt_queue_loop;
1072
1073 init = *(nxt_process_init_t *) nxt_process_init(process);
1074
1075 nxt_process_close_ports(task, process);
1076
1077 if (init.restart) {
1078 ret = nxt_process_init_start(task, init);
1079 if (nxt_slow_path(ret == NXT_ERROR)) {
1080 nxt_alert(task, "failed to restart %s", init.name);
1081 }
1082 }
1083 }
1084 }
1085 }
1086
1087
1088 static void
nxt_main_process_signal_handler(nxt_task_t * task,void * obj,void * data)1089 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
1090 {
1091 nxt_trace(task, "signal signo:%d (%s) received, ignored",
1092 (int) (uintptr_t) obj, data);
1093 }
1094
1095
1096 static void
nxt_main_process_cleanup(nxt_task_t * task,nxt_process_t * process)1097 nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process)
1098 {
1099 if (process->isolation.cleanup != NULL) {
1100 process->isolation.cleanup(task, process);
1101 }
1102
1103 if (process->isolation.cgroup_cleanup != NULL) {
1104 process->isolation.cgroup_cleanup(task, process);
1105 }
1106 }
1107
1108
1109 static void
nxt_main_port_socket_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1110 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1111 {
1112 size_t size;
1113 nxt_int_t ret;
1114 nxt_buf_t *b, *out;
1115 nxt_port_t *port;
1116 nxt_sockaddr_t *sa;
1117 nxt_port_msg_type_t type;
1118 nxt_listening_socket_t ls;
1119 u_char message[2048];
1120
1121 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1122 msg->port_msg.reply_port);
1123 if (nxt_slow_path(port == NULL)) {
1124 return;
1125 }
1126
1127 if (nxt_slow_path(port->type != NXT_PROCESS_ROUTER)) {
1128 nxt_alert(task, "process %PI cannot create listener sockets",
1129 msg->port_msg.pid);
1130
1131 return;
1132 }
1133
1134 b = msg->buf;
1135 sa = (nxt_sockaddr_t *) b->mem.pos;
1136
1137 /* TODO check b size and make plain */
1138
1139 ls.socket = -1;
1140 ls.error = NXT_SOCKET_ERROR_SYSTEM;
1141 ls.start = message;
1142 ls.end = message + sizeof(message);
1143
1144 nxt_debug(task, "listening socket \"%*s\"",
1145 (size_t) sa->length, nxt_sockaddr_start(sa));
1146
1147 ret = nxt_main_listening_socket(sa, &ls);
1148
1149 if (ret == NXT_OK) {
1150 nxt_debug(task, "socket(\"%*s\"): %d",
1151 (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
1152
1153 out = NULL;
1154
1155 type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
1156
1157 } else {
1158 size = ls.end - ls.start;
1159
1160 nxt_alert(task, "%*s", size, ls.start);
1161
1162 out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
1163 size + 1);
1164 if (nxt_fast_path(out != NULL)) {
1165 *out->mem.free++ = (uint8_t) ls.error;
1166
1167 out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
1168 }
1169
1170 type = NXT_PORT_MSG_RPC_ERROR;
1171 }
1172
1173 nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
1174 0, out);
1175 }
1176
1177
1178 static nxt_int_t
nxt_main_listening_socket(nxt_sockaddr_t * sa,nxt_listening_socket_t * ls)1179 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
1180 {
1181 nxt_err_t err;
1182 nxt_socket_t s;
1183
1184 const socklen_t length = sizeof(int);
1185 static const int enable = 1;
1186
1187 s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1188
1189 if (nxt_slow_path(s == -1)) {
1190 err = nxt_errno;
1191
1192 #if (NXT_INET6)
1193
1194 if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1195 ls->error = NXT_SOCKET_ERROR_NOINET6;
1196 }
1197
1198 #endif
1199
1200 ls->end = nxt_sprintf(ls->start, ls->end,
1201 "socket(\\\"%*s\\\") failed %E",
1202 (size_t) sa->length, nxt_sockaddr_start(sa), err);
1203
1204 return NXT_ERROR;
1205 }
1206
1207 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1208 ls->end = nxt_sprintf(ls->start, ls->end,
1209 "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1210 (size_t) sa->length, nxt_sockaddr_start(sa),
1211 nxt_errno);
1212 goto fail;
1213 }
1214
1215 #if (NXT_INET6)
1216
1217 if (sa->u.sockaddr.sa_family == AF_INET6) {
1218
1219 if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1220 ls->end = nxt_sprintf(ls->start, ls->end,
1221 "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1222 (size_t) sa->length, nxt_sockaddr_start(sa),
1223 nxt_errno);
1224 goto fail;
1225 }
1226 }
1227
1228 #endif
1229
1230 if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1231 err = nxt_errno;
1232
1233 #if (NXT_HAVE_UNIX_DOMAIN)
1234
1235 if (sa->u.sockaddr.sa_family == AF_UNIX) {
1236 switch (err) {
1237
1238 case EACCES:
1239 ls->error = NXT_SOCKET_ERROR_ACCESS;
1240 break;
1241
1242 case ENOENT:
1243 case ENOTDIR:
1244 ls->error = NXT_SOCKET_ERROR_PATH;
1245 break;
1246 }
1247
1248 } else
1249 #endif
1250 {
1251 switch (err) {
1252
1253 case EACCES:
1254 ls->error = NXT_SOCKET_ERROR_PORT;
1255 break;
1256
1257 case EADDRINUSE:
1258 ls->error = NXT_SOCKET_ERROR_INUSE;
1259 break;
1260
1261 case EADDRNOTAVAIL:
1262 ls->error = NXT_SOCKET_ERROR_NOADDR;
1263 break;
1264 }
1265 }
1266
1267 ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1268 (size_t) sa->length, nxt_sockaddr_start(sa), err);
1269 goto fail;
1270 }
1271
1272 #if (NXT_HAVE_UNIX_DOMAIN)
1273
1274 if (sa->u.sockaddr.sa_family == AF_UNIX
1275 && sa->u.sockaddr_un.sun_path[0] != '\0')
1276 {
1277 char *filename;
1278 mode_t access;
1279 nxt_thread_t *thr;
1280
1281 filename = sa->u.sockaddr_un.sun_path;
1282 access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1283
1284 if (chmod(filename, access) != 0) {
1285 ls->end = nxt_sprintf(ls->start, ls->end,
1286 "chmod(\\\"%s\\\") failed %E",
1287 filename, nxt_errno);
1288 goto fail;
1289 }
1290
1291 thr = nxt_thread();
1292 nxt_runtime_listen_socket_add(thr->runtime, sa);
1293 }
1294
1295 #endif
1296
1297 ls->socket = s;
1298
1299 return NXT_OK;
1300
1301 fail:
1302
1303 (void) close(s);
1304
1305 return NXT_ERROR;
1306 }
1307
1308
1309 static void
nxt_main_port_socket_unlink_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1310 nxt_main_port_socket_unlink_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1311 {
1312 #if (NXT_HAVE_UNIX_DOMAIN)
1313 size_t i;
1314 nxt_buf_t *b;
1315 const char *filename;
1316 nxt_runtime_t *rt;
1317 nxt_sockaddr_t *sa;
1318 nxt_listen_socket_t *ls;
1319
1320 b = msg->buf;
1321 sa = (nxt_sockaddr_t *) b->mem.pos;
1322
1323 filename = sa->u.sockaddr_un.sun_path;
1324 unlink(filename);
1325
1326 rt = task->thread->runtime;
1327
1328 for (i = 0; i < rt->listen_sockets->nelts; i++) {
1329 const char *name;
1330
1331 ls = (nxt_listen_socket_t *) rt->listen_sockets->elts + i;
1332 sa = ls->sockaddr;
1333
1334 if (sa->u.sockaddr.sa_family != AF_UNIX
1335 || sa->u.sockaddr_un.sun_path[0] == '\0')
1336 {
1337 continue;
1338 }
1339
1340 name = sa->u.sockaddr_un.sun_path;
1341 if (strcmp(name, filename) != 0) {
1342 continue;
1343 }
1344
1345 nxt_array_remove(rt->listen_sockets, ls);
1346 break;
1347 }
1348 #endif
1349 }
1350
1351
1352 static nxt_conf_map_t nxt_app_lang_module_map[] = {
1353 {
1354 nxt_string("type"),
1355 NXT_CONF_MAP_INT,
1356 offsetof(nxt_app_lang_module_t, type),
1357 },
1358
1359 {
1360 nxt_string("version"),
1361 NXT_CONF_MAP_CSTRZ,
1362 offsetof(nxt_app_lang_module_t, version),
1363 },
1364
1365 {
1366 nxt_string("file"),
1367 NXT_CONF_MAP_CSTRZ,
1368 offsetof(nxt_app_lang_module_t, file),
1369 },
1370 };
1371
1372
1373 static nxt_conf_map_t nxt_app_lang_mounts_map[] = {
1374 {
1375 nxt_string("src"),
1376 NXT_CONF_MAP_CSTRZ,
1377 offsetof(nxt_fs_mount_t, src),
1378 },
1379 {
1380 nxt_string("dst"),
1381 NXT_CONF_MAP_CSTRZ,
1382 offsetof(nxt_fs_mount_t, dst),
1383 },
1384 {
1385 nxt_string("name"),
1386 NXT_CONF_MAP_CSTRZ,
1387 offsetof(nxt_fs_mount_t, name),
1388 },
1389 {
1390 nxt_string("type"),
1391 NXT_CONF_MAP_INT,
1392 offsetof(nxt_fs_mount_t, type),
1393 },
1394 {
1395 nxt_string("flags"),
1396 NXT_CONF_MAP_INT,
1397 offsetof(nxt_fs_mount_t, flags),
1398 },
1399 {
1400 nxt_string("data"),
1401 NXT_CONF_MAP_CSTRZ,
1402 offsetof(nxt_fs_mount_t, data),
1403 },
1404 };
1405
1406
1407 static void
nxt_main_port_modules_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1408 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1409 {
1410 uint32_t index, jindex, nmounts;
1411 nxt_mp_t *mp;
1412 nxt_int_t ret;
1413 nxt_buf_t *b;
1414 nxt_port_t *port;
1415 nxt_runtime_t *rt;
1416 nxt_fs_mount_t *mnt;
1417 nxt_conf_value_t *conf, *root, *value, *mounts;
1418 nxt_app_lang_module_t *lang;
1419
1420 static nxt_str_t root_path = nxt_string("/");
1421 static nxt_str_t mounts_name = nxt_string("mounts");
1422
1423 rt = task->thread->runtime;
1424
1425 if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1426 nxt_alert(task, "process %PI cannot send modules", msg->port_msg.pid);
1427 return;
1428 }
1429
1430 if (nxt_exiting) {
1431 nxt_debug(task, "ignoring discovered modules, exiting");
1432 return;
1433 }
1434
1435 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1436 msg->port_msg.reply_port);
1437
1438 if (nxt_fast_path(port != NULL)) {
1439 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1440 msg->port_msg.stream, 0, NULL);
1441 }
1442
1443 b = msg->buf;
1444
1445 if (b == NULL) {
1446 return;
1447 }
1448
1449 mp = nxt_mp_create(1024, 128, 256, 32);
1450 if (mp == NULL) {
1451 return;
1452 }
1453
1454 b = nxt_buf_chk_make_plain(mp, b, msg->size);
1455
1456 if (b == NULL) {
1457 return;
1458 }
1459
1460 nxt_debug(task, "application languages: \"%*s\"",
1461 b->mem.free - b->mem.pos, b->mem.pos);
1462
1463 conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1464 if (conf == NULL) {
1465 goto fail;
1466 }
1467
1468 root = nxt_conf_get_path(conf, &root_path);
1469 if (root == NULL) {
1470 goto fail;
1471 }
1472
1473 for (index = 0; /* void */ ; index++) {
1474 value = nxt_conf_get_array_element(root, index);
1475 if (value == NULL) {
1476 break;
1477 }
1478
1479 lang = nxt_array_zero_add(rt->languages);
1480 if (lang == NULL) {
1481 goto fail;
1482 }
1483
1484 lang->module = NULL;
1485
1486 ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1487 nxt_nitems(nxt_app_lang_module_map), lang);
1488
1489 if (ret != NXT_OK) {
1490 goto fail;
1491 }
1492
1493 mounts = nxt_conf_get_object_member(value, &mounts_name, NULL);
1494 if (mounts == NULL) {
1495 nxt_alert(task, "missing mounts from discovery message.");
1496 goto fail;
1497 }
1498
1499 if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) {
1500 nxt_alert(task, "invalid mounts type from discovery message.");
1501 goto fail;
1502 }
1503
1504 nmounts = nxt_conf_array_elements_count(mounts);
1505
1506 lang->mounts = nxt_array_create(rt->mem_pool, nmounts,
1507 sizeof(nxt_fs_mount_t));
1508
1509 if (lang->mounts == NULL) {
1510 goto fail;
1511 }
1512
1513 for (jindex = 0; /* */; jindex++) {
1514 value = nxt_conf_get_array_element(mounts, jindex);
1515 if (value == NULL) {
1516 break;
1517 }
1518
1519 mnt = nxt_array_zero_add(lang->mounts);
1520 if (mnt == NULL) {
1521 goto fail;
1522 }
1523
1524 mnt->builtin = 1;
1525 mnt->deps = 1;
1526
1527 ret = nxt_conf_map_object(rt->mem_pool, value,
1528 nxt_app_lang_mounts_map,
1529 nxt_nitems(nxt_app_lang_mounts_map), mnt);
1530
1531 if (ret != NXT_OK) {
1532 goto fail;
1533 }
1534 }
1535
1536 nxt_debug(task, "lang %d %s \"%s\" (%d mounts)",
1537 lang->type, lang->version, lang->file, lang->mounts->nelts);
1538 }
1539
1540 qsort(rt->languages->elts, rt->languages->nelts,
1541 sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1542
1543 fail:
1544
1545 nxt_mp_destroy(mp);
1546
1547 ret = nxt_process_init_start(task, nxt_controller_process);
1548 if (ret == NXT_OK) {
1549 ret = nxt_process_init_start(task, nxt_router_process);
1550 }
1551
1552 if (nxt_slow_path(ret == NXT_ERROR)) {
1553 nxt_exiting = 1;
1554
1555 nxt_runtime_quit(task, 1);
1556 }
1557 }
1558
1559
1560 static int nxt_cdecl
nxt_app_lang_compare(const void * v1,const void * v2)1561 nxt_app_lang_compare(const void *v1, const void *v2)
1562 {
1563 int n;
1564 const nxt_app_lang_module_t *lang1, *lang2;
1565
1566 lang1 = v1;
1567 lang2 = v2;
1568
1569 n = lang1->type - lang2->type;
1570
1571 if (n != 0) {
1572 return n;
1573 }
1574
1575 n = nxt_strverscmp(lang1->version, lang2->version);
1576
1577 /* Negate result to move higher versions to the beginning. */
1578
1579 return -n;
1580 }
1581
1582
1583 static void
nxt_main_port_conf_store_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1584 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1585 {
1586 void *p;
1587 size_t n, size;
1588 nxt_int_t ret;
1589 nxt_port_t *ctl_port;
1590 nxt_runtime_t *rt;
1591 u_char ver[NXT_INT_T_LEN];
1592
1593 rt = task->thread->runtime;
1594
1595 ctl_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
1596
1597 if (nxt_slow_path(msg->port_msg.pid != ctl_port->pid)) {
1598 nxt_alert(task, "process %PI cannot store conf", msg->port_msg.pid);
1599 return;
1600 }
1601
1602 p = MAP_FAILED;
1603
1604 /*
1605 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
1606 * initialized in 'cleanup' section.
1607 */
1608 size = 0;
1609
1610 if (nxt_slow_path(msg->fd[0] == -1)) {
1611 nxt_alert(task, "conf_store_handler: invalid shm fd");
1612 goto error;
1613 }
1614
1615 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
1616 nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)",
1617 (int) nxt_buf_mem_used_size(&msg->buf->mem));
1618 goto error;
1619 }
1620
1621 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
1622
1623 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
1624
1625 nxt_fd_close(msg->fd[0]);
1626 msg->fd[0] = -1;
1627
1628 if (nxt_slow_path(p == MAP_FAILED)) {
1629 goto error;
1630 }
1631
1632 nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p);
1633
1634 if (nxt_conf_ver != NXT_VERNUM) {
1635 n = nxt_sprintf(ver, ver + NXT_INT_T_LEN, "%d", NXT_VERNUM) - ver;
1636
1637 ret = nxt_main_file_store(task, rt->ver_tmp, rt->ver, ver, n);
1638 if (nxt_slow_path(ret != NXT_OK)) {
1639 goto error;
1640 }
1641
1642 nxt_conf_ver = NXT_VERNUM;
1643 }
1644
1645 ret = nxt_main_file_store(task, rt->conf_tmp, rt->conf, p, size);
1646
1647 if (nxt_fast_path(ret == NXT_OK)) {
1648 goto cleanup;
1649 }
1650
1651 error:
1652
1653 nxt_alert(task, "failed to store current configuration");
1654
1655 cleanup:
1656
1657 if (p != MAP_FAILED) {
1658 nxt_mem_munmap(p, size);
1659 }
1660
1661 if (msg->fd[0] != -1) {
1662 nxt_fd_close(msg->fd[0]);
1663 msg->fd[0] = -1;
1664 }
1665 }
1666
1667
1668 static nxt_int_t
nxt_main_file_store(nxt_task_t * task,const char * tmp_name,const char * name,u_char * buf,size_t size)1669 nxt_main_file_store(nxt_task_t *task, const char *tmp_name, const char *name,
1670 u_char *buf, size_t size)
1671 {
1672 ssize_t n;
1673 nxt_int_t ret;
1674 nxt_file_t file;
1675
1676 nxt_memzero(&file, sizeof(nxt_file_t));
1677
1678 file.name = (nxt_file_name_t *) name;
1679
1680 ret = nxt_file_open(task, &file, NXT_FILE_WRONLY, NXT_FILE_TRUNCATE,
1681 NXT_FILE_OWNER_ACCESS);
1682 if (nxt_slow_path(ret != NXT_OK)) {
1683 return NXT_ERROR;
1684 }
1685
1686 n = nxt_file_write(&file, buf, size, 0);
1687
1688 nxt_file_close(task, &file);
1689
1690 if (nxt_slow_path(n != (ssize_t) size)) {
1691 (void) nxt_file_delete(file.name);
1692 return NXT_ERROR;
1693 }
1694
1695 return nxt_file_rename(file.name, (nxt_file_name_t *) name);
1696 }
1697
1698
1699 static void
nxt_main_port_access_log_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1700 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1701 {
1702 u_char *path;
1703 nxt_int_t ret;
1704 nxt_file_t file;
1705 nxt_port_t *port;
1706 nxt_port_msg_type_t type;
1707
1708 nxt_debug(task, "opening access log file");
1709
1710 path = msg->buf->mem.pos;
1711
1712 nxt_memzero(&file, sizeof(nxt_file_t));
1713
1714 file.name = (nxt_file_name_t *) path;
1715 file.log_level = NXT_LOG_ERR;
1716
1717 ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1718 NXT_FILE_OWNER_ACCESS);
1719
1720 type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1721 : NXT_PORT_MSG_RPC_ERROR;
1722
1723 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1724 msg->port_msg.reply_port);
1725
1726 if (nxt_fast_path(port != NULL)) {
1727 (void) nxt_port_socket_write(task, port, type, file.fd,
1728 msg->port_msg.stream, 0, NULL);
1729 }
1730 }
1731