1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_runtime.h> 9 #include <nxt_port.h> 10 #include <nxt_main_process.h> 11 #include <nxt_conf.h> 12 #include <nxt_router.h> 13 #if (NXT_TLS) 14 #include <nxt_cert.h> 15 #endif 16 17 #include <sys/mount.h> 18 19 20 typedef struct { 21 nxt_socket_t socket; 22 nxt_socket_error_t error; 23 u_char *start; 24 u_char *end; 25 } nxt_listening_socket_t; 26 27 28 typedef struct { 29 nxt_uint_t size; 30 nxt_conf_map_t *map; 31 } nxt_conf_app_map_t; 32 33 34 extern nxt_port_handlers_t nxt_controller_process_port_handlers; 35 extern nxt_port_handlers_t nxt_router_process_port_handlers; 36 37 38 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, 39 nxt_runtime_t *rt); 40 static void nxt_main_process_title(nxt_task_t *task); 41 static nxt_int_t nxt_main_process_create(nxt_task_t *task, 42 const nxt_process_init_t init); 43 static nxt_int_t nxt_main_start_process(nxt_task_t *task, 44 nxt_process_t *process); 45 static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt); 46 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, 47 void *data); 48 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, 49 void *data); 50 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, 51 void *data); 52 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, 53 void *data); 54 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj, 55 void *data); 56 static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid); 57 static void nxt_main_port_socket_handler(nxt_task_t *task, 58 nxt_port_recv_msg_t *msg); 59 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, 60 nxt_listening_socket_t *ls); 61 static void nxt_main_port_modules_handler(nxt_task_t *task, 62 nxt_port_recv_msg_t *msg); 63 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); 64 static void nxt_main_port_conf_store_handler(nxt_task_t *task, 65 nxt_port_recv_msg_t *msg); 66 static void nxt_main_port_access_log_handler(nxt_task_t *task, 67 nxt_port_recv_msg_t *msg); 68 69 const nxt_sig_event_t nxt_main_process_signals[] = { 70 nxt_event_signal(SIGHUP, nxt_main_process_signal_handler), 71 nxt_event_signal(SIGINT, nxt_main_process_sigterm_handler), 72 nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler), 73 nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler), 74 nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler), 75 nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler), 76 nxt_event_signal_end, 77 }; 78 79 80 static nxt_bool_t nxt_exiting; 81 82 83 nxt_int_t 84 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, 85 nxt_runtime_t *rt) 86 { 87 rt->type = NXT_PROCESS_MAIN; 88 89 if (nxt_main_process_port_create(task, rt) != NXT_OK) { 90 return NXT_ERROR; 91 } 92 93 nxt_main_process_title(task); 94 95 /* 96 * The discovery process will send a message processed by 97 * nxt_main_port_modules_handler() which starts the controller 98 * and router processes. 99 */ 100 return nxt_main_process_create(task, nxt_discovery_process); 101 } 102 103 104 static nxt_conf_map_t nxt_common_app_conf[] = { 105 { 106 nxt_string("type"), 107 NXT_CONF_MAP_STR, 108 offsetof(nxt_common_app_conf_t, type), 109 }, 110 111 { 112 nxt_string("user"), 113 NXT_CONF_MAP_STR, 114 offsetof(nxt_common_app_conf_t, user), 115 }, 116 117 { 118 nxt_string("group"), 119 NXT_CONF_MAP_STR, 120 offsetof(nxt_common_app_conf_t, group), 121 }, 122 123 { 124 nxt_string("working_directory"), 125 NXT_CONF_MAP_CSTRZ, 126 offsetof(nxt_common_app_conf_t, working_directory), 127 }, 128 129 { 130 nxt_string("environment"), 131 NXT_CONF_MAP_PTR, 132 offsetof(nxt_common_app_conf_t, environment), 133 }, 134 135 { 136 nxt_string("isolation"), 137 NXT_CONF_MAP_PTR, 138 offsetof(nxt_common_app_conf_t, isolation), 139 }, 140 141 { 142 nxt_string("limits"), 143 NXT_CONF_MAP_PTR, 144 offsetof(nxt_common_app_conf_t, limits), 145 }, 146 147 }; 148 149 150 static nxt_conf_map_t nxt_common_app_limits_conf[] = { 151 { 152 nxt_string("shm"), 153 NXT_CONF_MAP_SIZE, 154 offsetof(nxt_common_app_conf_t, shm_limit), 155 }, 156 157 }; 158 159 160 static nxt_conf_map_t nxt_external_app_conf[] = { 161 { 162 nxt_string("executable"), 163 NXT_CONF_MAP_CSTRZ, 164 offsetof(nxt_common_app_conf_t, u.external.executable), 165 }, 166 167 { 168 nxt_string("arguments"), 169 NXT_CONF_MAP_PTR, 170 offsetof(nxt_common_app_conf_t, u.external.arguments), 171 }, 172 173 }; 174 175 176 static nxt_conf_map_t nxt_python_app_conf[] = { 177 { 178 nxt_string("home"), 179 NXT_CONF_MAP_CSTRZ, 180 offsetof(nxt_common_app_conf_t, u.python.home), 181 }, 182 183 { 184 nxt_string("path"), 185 NXT_CONF_MAP_PTR, 186 offsetof(nxt_common_app_conf_t, u.python.path), 187 }, 188 189 { 190 nxt_string("module"), 191 NXT_CONF_MAP_STR, 192 offsetof(nxt_common_app_conf_t, u.python.module), 193 }, 194 195 { 196 nxt_string("callable"), 197 NXT_CONF_MAP_CSTRZ, 198 offsetof(nxt_common_app_conf_t, u.python.callable), 199 }, 200 201 { 202 nxt_string("protocol"), 203 NXT_CONF_MAP_STR, 204 offsetof(nxt_common_app_conf_t, u.python.protocol), 205 }, 206 207 { 208 nxt_string("threads"), 209 NXT_CONF_MAP_INT32, 210 offsetof(nxt_common_app_conf_t, u.python.threads), 211 }, 212 213 { 214 nxt_string("thread_stack_size"), 215 NXT_CONF_MAP_INT32, 216 offsetof(nxt_common_app_conf_t, u.python.thread_stack_size), 217 }, 218 }; 219 220 221 static nxt_conf_map_t nxt_php_app_conf[] = { 222 { 223 nxt_string("targets"), 224 NXT_CONF_MAP_PTR, 225 offsetof(nxt_common_app_conf_t, u.php.targets), 226 }, 227 228 { 229 nxt_string("options"), 230 NXT_CONF_MAP_PTR, 231 offsetof(nxt_common_app_conf_t, u.php.options), 232 }, 233 }; 234 235 236 static nxt_conf_map_t nxt_perl_app_conf[] = { 237 { 238 nxt_string("script"), 239 NXT_CONF_MAP_CSTRZ, 240 offsetof(nxt_common_app_conf_t, u.perl.script), 241 }, 242 243 { 244 nxt_string("threads"), 245 NXT_CONF_MAP_INT32, 246 offsetof(nxt_common_app_conf_t, u.perl.threads), 247 }, 248 249 { 250 nxt_string("thread_stack_size"), 251 NXT_CONF_MAP_INT32, 252 offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size), 253 }, 254 }; 255 256 257 static nxt_conf_map_t nxt_ruby_app_conf[] = { 258 { 259 nxt_string("script"), 260 NXT_CONF_MAP_STR, 261 offsetof(nxt_common_app_conf_t, u.ruby.script), 262 }, 263 { 264 nxt_string("threads"), 265 NXT_CONF_MAP_INT32, 266 offsetof(nxt_common_app_conf_t, u.ruby.threads), 267 }, 268 }; 269 270 271 static nxt_conf_map_t nxt_java_app_conf[] = { 272 { 273 nxt_string("classpath"), 274 NXT_CONF_MAP_PTR, 275 offsetof(nxt_common_app_conf_t, u.java.classpath), 276 }, 277 { 278 nxt_string("webapp"), 279 NXT_CONF_MAP_CSTRZ, 280 offsetof(nxt_common_app_conf_t, u.java.webapp), 281 }, 282 { 283 nxt_string("options"), 284 NXT_CONF_MAP_PTR, 285 offsetof(nxt_common_app_conf_t, u.java.options), 286 }, 287 { 288 nxt_string("unit_jars"), 289 NXT_CONF_MAP_CSTRZ, 290 offsetof(nxt_common_app_conf_t, u.java.unit_jars), 291 }, 292 { 293 nxt_string("threads"), 294 NXT_CONF_MAP_INT32, 295 offsetof(nxt_common_app_conf_t, u.java.threads), 296 }, 297 { 298 nxt_string("thread_stack_size"), 299 NXT_CONF_MAP_INT32, 300 offsetof(nxt_common_app_conf_t, u.java.thread_stack_size), 301 }, 302 303 }; 304 305 306 static nxt_conf_app_map_t nxt_app_maps[] = { 307 { nxt_nitems(nxt_external_app_conf), nxt_external_app_conf }, 308 { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf }, 309 { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf }, 310 { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf }, 311 { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf }, 312 { nxt_nitems(nxt_java_app_conf), nxt_java_app_conf }, 313 }; 314 315 316 static void 317 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 318 { 319 nxt_debug(task, "main data: %*s", 320 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 321 } 322 323 324 static void 325 nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 326 { 327 u_char *start, *p, ch; 328 size_t type_len; 329 nxt_int_t ret; 330 nxt_buf_t *b; 331 nxt_port_t *port; 332 nxt_runtime_t *rt; 333 nxt_process_t *process; 334 nxt_app_type_t idx; 335 nxt_conf_value_t *conf; 336 nxt_process_init_t *init; 337 nxt_common_app_conf_t *app_conf; 338 339 ret = NXT_ERROR; 340 341 rt = task->thread->runtime; 342 343 process = nxt_main_process_new(task, rt); 344 if (nxt_slow_path(process == NULL)) { 345 return; 346 } 347 348 init = nxt_process_init(process); 349 350 *init = nxt_app_process; 351 352 b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size); 353 if (b == NULL) { 354 goto failed; 355 } 356 357 nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos, 358 b->mem.pos); 359 360 app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t)); 361 if (nxt_slow_path(app_conf == NULL)) { 362 goto failed; 363 } 364 365 start = b->mem.pos; 366 367 app_conf->name.start = start; 368 app_conf->name.length = nxt_strlen(start); 369 370 init->name = (const char *) start; 371 372 process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length 373 + sizeof("\"\" application") + 1); 374 375 if (nxt_slow_path(process->name == NULL)) { 376 goto failed; 377 } 378 379 p = (u_char *) process->name; 380 *p++ = '"'; 381 p = nxt_cpymem(p, init->name, app_conf->name.length); 382 p = nxt_cpymem(p, "\" application", 13); 383 *p = '\0'; 384 385 app_conf->shm_limit = 100 * 1024 * 1024; 386 387 start += app_conf->name.length + 1; 388 389 conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL); 390 if (conf == NULL) { 391 nxt_alert(task, "router app configuration parsing error"); 392 393 goto failed; 394 } 395 396 rt = task->thread->runtime; 397 398 app_conf->user.start = (u_char*)rt->user_cred.user; 399 app_conf->user.length = nxt_strlen(rt->user_cred.user); 400 401 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf, 402 nxt_nitems(nxt_common_app_conf), app_conf); 403 404 if (ret != NXT_OK) { 405 nxt_alert(task, "failed to map common app conf received from router"); 406 goto failed; 407 } 408 409 for (type_len = 0; type_len != app_conf->type.length; type_len++) { 410 ch = app_conf->type.start[type_len]; 411 412 if (ch == ' ' || nxt_isdigit(ch)) { 413 break; 414 } 415 } 416 417 idx = nxt_app_parse_type(app_conf->type.start, type_len); 418 419 if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) { 420 nxt_alert(task, "invalid app type %d received from router", (int) idx); 421 goto failed; 422 } 423 424 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map, 425 nxt_app_maps[idx].size, app_conf); 426 427 if (nxt_slow_path(ret != NXT_OK)) { 428 nxt_alert(task, "failed to map app conf received from router"); 429 goto failed; 430 } 431 432 if (app_conf->limits != NULL) { 433 ret = nxt_conf_map_object(process->mem_pool, app_conf->limits, 434 nxt_common_app_limits_conf, 435 nxt_nitems(nxt_common_app_limits_conf), 436 app_conf); 437 438 if (nxt_slow_path(ret != NXT_OK)) { 439 nxt_alert(task, "failed to map app limits received from router"); 440 goto failed; 441 } 442 } 443 444 app_conf->self = conf; 445 446 process->stream = msg->port_msg.stream; 447 process->data.app = app_conf; 448 449 ret = nxt_main_start_process(task, process); 450 if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) { 451 return; 452 } 453 454 failed: 455 456 nxt_process_use(task, process, -1); 457 458 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 459 msg->port_msg.reply_port); 460 461 if (nxt_fast_path(port != NULL)) { 462 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 463 -1, msg->port_msg.stream, 0, NULL); 464 } 465 } 466 467 468 static void 469 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 470 { 471 nxt_port_t *port; 472 nxt_process_t *process; 473 nxt_runtime_t *rt; 474 475 rt = task->thread->runtime; 476 477 process = nxt_runtime_process_find(rt, msg->port_msg.pid); 478 if (nxt_slow_path(process == NULL)) { 479 return; 480 } 481 482 nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); 483 484 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 485 msg->port_msg.reply_port); 486 487 488 if (nxt_slow_path(port == NULL)) { 489 return; 490 } 491 492 #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) 493 if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { 494 if (nxt_slow_path(nxt_clone_credential_map(task, process->pid, 495 process->user_cred, 496 &process->isolation.clone) 497 != NXT_OK)) 498 { 499 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 500 -1, msg->port_msg.stream, 0, NULL); 501 return; 502 } 503 } 504 505 #endif 506 507 process->state = NXT_PROCESS_STATE_CREATED; 508 509 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, 510 -1, msg->port_msg.stream, 0, NULL); 511 } 512 513 514 static nxt_port_handlers_t nxt_main_process_port_handlers = { 515 .data = nxt_port_main_data_handler, 516 .process_created = nxt_main_process_created_handler, 517 .process_ready = nxt_port_process_ready_handler, 518 .start_process = nxt_port_main_start_process_handler, 519 .socket = nxt_main_port_socket_handler, 520 .modules = nxt_main_port_modules_handler, 521 .conf_store = nxt_main_port_conf_store_handler, 522 #if (NXT_TLS) 523 .cert_get = nxt_cert_store_get_handler, 524 .cert_delete = nxt_cert_store_delete_handler, 525 #endif 526 .access_log = nxt_main_port_access_log_handler, 527 .rpc_ready = nxt_port_rpc_handler, 528 .rpc_error = nxt_port_rpc_handler, 529 }; 530 531 532 static nxt_int_t 533 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) 534 { 535 nxt_int_t ret; 536 nxt_port_t *port; 537 nxt_process_t *process; 538 539 port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0, 540 NXT_PROCESS_MAIN); 541 if (nxt_slow_path(port == NULL)) { 542 return NXT_ERROR; 543 } 544 545 process = port->process; 546 547 ret = nxt_port_socket_init(task, port, 0); 548 if (nxt_slow_path(ret != NXT_OK)) { 549 nxt_port_use(task, port, -1); 550 return ret; 551 } 552 553 /* 554 * A main process port. A write port is not closed 555 * since it should be inherited by processes. 556 */ 557 nxt_port_enable(task, port, &nxt_main_process_port_handlers); 558 559 process->state = NXT_PROCESS_STATE_READY; 560 561 return NXT_OK; 562 } 563 564 565 static void 566 nxt_main_process_title(nxt_task_t *task) 567 { 568 u_char *p, *end; 569 nxt_uint_t i; 570 u_char title[2048]; 571 572 end = title + sizeof(title) - 1; 573 574 p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s", 575 nxt_process_argv[0]); 576 577 for (i = 1; nxt_process_argv[i] != NULL; i++) { 578 p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]); 579 } 580 581 if (p < end) { 582 *p++ = ']'; 583 } 584 585 *p = '\0'; 586 587 nxt_process_title(task, "%s", title); 588 } 589 590 591 static nxt_int_t 592 nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init) 593 { 594 nxt_int_t ret; 595 nxt_runtime_t *rt; 596 nxt_process_t *process; 597 nxt_process_init_t *pinit; 598 599 rt = task->thread->runtime; 600 601 process = nxt_main_process_new(task, rt); 602 if (nxt_slow_path(process == NULL)) { 603 return NXT_ERROR; 604 } 605 606 process->name = init.name; 607 process->user_cred = &rt->user_cred; 608 609 pinit = nxt_process_init(process); 610 *pinit = init; 611 612 ret = nxt_main_start_process(task, process); 613 if (nxt_slow_path(ret == NXT_ERROR)) { 614 nxt_process_use(task, process, -1); 615 } 616 617 return ret; 618 } 619 620 621 static nxt_process_t * 622 nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt) 623 { 624 nxt_process_t *process; 625 626 process = nxt_runtime_process_new(rt); 627 if (nxt_slow_path(process == NULL)) { 628 return NULL; 629 } 630 631 process->mem_pool = nxt_mp_create(1024, 128, 256, 32); 632 if (process->mem_pool == NULL) { 633 nxt_process_use(task, process, -1); 634 return NULL; 635 } 636 637 return process; 638 } 639 640 641 static nxt_int_t 642 nxt_main_start_process(nxt_task_t *task, nxt_process_t *process) 643 { 644 nxt_mp_t *tmp_mp; 645 nxt_int_t ret; 646 nxt_pid_t pid; 647 nxt_port_t *port; 648 nxt_process_init_t *init; 649 650 init = nxt_process_init(process); 651 652 port = nxt_port_new(task, 0, 0, init->type); 653 if (nxt_slow_path(port == NULL)) { 654 return NXT_ERROR; 655 } 656 657 nxt_process_port_add(task, process, port); 658 659 ret = nxt_port_socket_init(task, port, 0); 660 if (nxt_slow_path(ret != NXT_OK)) { 661 goto free_port; 662 } 663 664 tmp_mp = nxt_mp_create(1024, 128, 256, 32); 665 if (nxt_slow_path(tmp_mp == NULL)) { 666 ret = NXT_ERROR; 667 668 goto close_port; 669 } 670 671 if (init->prefork) { 672 ret = init->prefork(task, process, tmp_mp); 673 if (nxt_slow_path(ret != NXT_OK)) { 674 goto free_mempool; 675 } 676 } 677 678 pid = nxt_process_create(task, process); 679 680 switch (pid) { 681 682 case -1: 683 ret = NXT_ERROR; 684 break; 685 686 case 0: 687 /* The child process: return to the event engine work queue loop. */ 688 689 nxt_process_use(task, process, -1); 690 691 ret = NXT_AGAIN; 692 break; 693 694 default: 695 /* The main process created a new process. */ 696 697 nxt_process_use(task, process, -1); 698 699 nxt_port_read_close(port); 700 nxt_port_write_enable(task, port); 701 702 ret = NXT_OK; 703 break; 704 } 705 706 free_mempool: 707 708 nxt_mp_destroy(tmp_mp); 709 710 close_port: 711 712 if (nxt_slow_path(ret == NXT_ERROR)) { 713 nxt_port_close(task, port); 714 } 715 716 free_port: 717 718 nxt_port_use(task, port, -1); 719 720 return ret; 721 } 722 723 724 static void 725 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) 726 { 727 nxt_debug(task, "sigterm handler signo:%d (%s)", 728 (int) (uintptr_t) obj, data); 729 730 /* TODO: fast exit. */ 731 732 nxt_exiting = 1; 733 734 nxt_runtime_quit(task, 0); 735 } 736 737 738 static void 739 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data) 740 { 741 nxt_debug(task, "sigquit handler signo:%d (%s)", 742 (int) (uintptr_t) obj, data); 743 744 /* TODO: graceful exit. */ 745 746 nxt_exiting = 1; 747 748 nxt_runtime_quit(task, 0); 749 } 750 751 752 static void 753 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) 754 { 755 nxt_mp_t *mp; 756 nxt_int_t ret; 757 nxt_uint_t n; 758 nxt_port_t *port; 759 nxt_file_t *file, *new_file; 760 nxt_array_t *new_files; 761 nxt_runtime_t *rt; 762 763 nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s", 764 (int) (uintptr_t) obj, data, "log files rotation"); 765 766 rt = task->thread->runtime; 767 768 port = rt->port_by_type[NXT_PROCESS_ROUTER]; 769 770 if (nxt_fast_path(port != NULL)) { 771 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG, 772 -1, 0, 0, NULL); 773 } 774 775 mp = nxt_mp_create(1024, 128, 256, 32); 776 if (mp == NULL) { 777 return; 778 } 779 780 n = nxt_list_nelts(rt->log_files); 781 782 new_files = nxt_array_create(mp, n, sizeof(nxt_file_t)); 783 if (new_files == NULL) { 784 nxt_mp_destroy(mp); 785 return; 786 } 787 788 nxt_list_each(file, rt->log_files) { 789 790 /* This allocation cannot fail. */ 791 new_file = nxt_array_add(new_files); 792 793 new_file->name = file->name; 794 new_file->fd = NXT_FILE_INVALID; 795 new_file->log_level = NXT_LOG_ALERT; 796 797 ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT, 798 NXT_FILE_OWNER_ACCESS); 799 800 if (ret != NXT_OK) { 801 goto fail; 802 } 803 804 } nxt_list_loop; 805 806 new_file = new_files->elts; 807 808 ret = nxt_file_stderr(&new_file[0]); 809 810 if (ret == NXT_OK) { 811 n = 0; 812 813 nxt_list_each(file, rt->log_files) { 814 815 nxt_port_change_log_file(task, rt, n, new_file[n].fd); 816 /* 817 * The old log file descriptor must be closed at the moment 818 * when no other threads use it. dup2() allows to use the 819 * old file descriptor for new log file. This change is 820 * performed atomically in the kernel. 821 */ 822 (void) nxt_file_redirect(file, new_file[n].fd); 823 824 n++; 825 826 } nxt_list_loop; 827 828 nxt_mp_destroy(mp); 829 return; 830 } 831 832 fail: 833 834 new_file = new_files->elts; 835 n = new_files->nelts; 836 837 while (n != 0) { 838 if (new_file->fd != NXT_FILE_INVALID) { 839 nxt_file_close(task, new_file); 840 } 841 842 new_file++; 843 n--; 844 } 845 846 nxt_mp_destroy(mp); 847 } 848 849 850 static void 851 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) 852 { 853 int status; 854 nxt_err_t err; 855 nxt_pid_t pid; 856 857 nxt_debug(task, "sigchld handler signo:%d (%s)", 858 (int) (uintptr_t) obj, data); 859 860 for ( ;; ) { 861 pid = waitpid(-1, &status, WNOHANG); 862 863 if (pid == -1) { 864 865 switch (err = nxt_errno) { 866 867 case NXT_ECHILD: 868 return; 869 870 case NXT_EINTR: 871 continue; 872 873 default: 874 nxt_alert(task, "waitpid() failed: %E", err); 875 return; 876 } 877 } 878 879 nxt_debug(task, "waitpid(): %PI", pid); 880 881 if (pid == 0) { 882 return; 883 } 884 885 if (WTERMSIG(status)) { 886 #ifdef WCOREDUMP 887 nxt_alert(task, "process %PI exited on signal %d%s", 888 pid, WTERMSIG(status), 889 WCOREDUMP(status) ? " (core dumped)" : ""); 890 #else 891 nxt_alert(task, "process %PI exited on signal %d", 892 pid, WTERMSIG(status)); 893 #endif 894 895 } else { 896 nxt_trace(task, "process %PI exited with code %d", 897 pid, WEXITSTATUS(status)); 898 } 899 900 nxt_main_cleanup_process(task, pid); 901 } 902 } 903 904 905 static void 906 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) 907 { 908 nxt_trace(task, "signal signo:%d (%s) recevied, ignored", 909 (int) (uintptr_t) obj, data); 910 } 911 912 913 static void 914 nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid) 915 { 916 int stream; 917 nxt_int_t ret; 918 nxt_buf_t *buf; 919 nxt_port_t *port; 920 const char *name; 921 nxt_runtime_t *rt; 922 nxt_process_t *process; 923 nxt_process_init_t init; 924 925 rt = task->thread->runtime; 926 927 process = nxt_runtime_process_find(rt, pid); 928 if (!process) { 929 return; 930 } 931 932 if (process->isolation.cleanup != NULL) { 933 process->isolation.cleanup(task, process); 934 } 935 936 name = process->name; 937 stream = process->stream; 938 init = *((nxt_process_init_t *) nxt_process_init(process)); 939 940 if (process->state == NXT_PROCESS_STATE_READY) { 941 process->stream = 0; 942 } 943 944 nxt_process_close_ports(task, process); 945 946 if (nxt_exiting) { 947 if (rt->nprocesses <= 1) { 948 nxt_runtime_quit(task, 0); 949 } 950 951 return; 952 } 953 954 nxt_runtime_process_each(rt, process) { 955 956 if (process->pid == nxt_pid 957 || process->pid == pid 958 || nxt_queue_is_empty(&process->ports)) 959 { 960 continue; 961 } 962 963 port = nxt_process_port_first(process); 964 965 if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) { 966 continue; 967 } 968 969 buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 970 sizeof(pid)); 971 972 if (nxt_slow_path(buf == NULL)) { 973 continue; 974 } 975 976 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); 977 978 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, 979 stream, 0, buf); 980 981 } nxt_runtime_process_loop; 982 983 if (init.restart) { 984 ret = nxt_main_process_create(task, init); 985 if (nxt_slow_path(ret == NXT_ERROR)) { 986 nxt_alert(task, "failed to restart %s", name); 987 } 988 } 989 } 990 991 992 static void 993 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 994 { 995 size_t size; 996 nxt_int_t ret; 997 nxt_buf_t *b, *out; 998 nxt_port_t *port; 999 nxt_sockaddr_t *sa; 1000 nxt_port_msg_type_t type; 1001 nxt_listening_socket_t ls; 1002 u_char message[2048]; 1003 1004 b = msg->buf; 1005 sa = (nxt_sockaddr_t *) b->mem.pos; 1006 1007 /* TODO check b size and make plain */ 1008 1009 out = NULL; 1010 1011 ls.socket = -1; 1012 ls.error = NXT_SOCKET_ERROR_SYSTEM; 1013 ls.start = message; 1014 ls.end = message + sizeof(message); 1015 1016 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1017 msg->port_msg.reply_port); 1018 1019 nxt_debug(task, "listening socket \"%*s\"", 1020 (size_t) sa->length, nxt_sockaddr_start(sa)); 1021 1022 ret = nxt_main_listening_socket(sa, &ls); 1023 1024 if (ret == NXT_OK) { 1025 nxt_debug(task, "socket(\"%*s\"): %d", 1026 (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket); 1027 1028 type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD; 1029 1030 } else { 1031 size = ls.end - ls.start; 1032 1033 nxt_alert(task, "%*s", size, ls.start); 1034 1035 out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 1036 size + 1); 1037 if (nxt_slow_path(out == NULL)) { 1038 return; 1039 } 1040 1041 *out->mem.free++ = (uint8_t) ls.error; 1042 1043 out->mem.free = nxt_cpymem(out->mem.free, ls.start, size); 1044 1045 type = NXT_PORT_MSG_RPC_ERROR; 1046 } 1047 1048 nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream, 1049 0, out); 1050 } 1051 1052 1053 static nxt_int_t 1054 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls) 1055 { 1056 nxt_err_t err; 1057 nxt_socket_t s; 1058 1059 const socklen_t length = sizeof(int); 1060 static const int enable = 1; 1061 1062 s = socket(sa->u.sockaddr.sa_family, sa->type, 0); 1063 1064 if (nxt_slow_path(s == -1)) { 1065 err = nxt_errno; 1066 1067 #if (NXT_INET6) 1068 1069 if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) { 1070 ls->error = NXT_SOCKET_ERROR_NOINET6; 1071 } 1072 1073 #endif 1074 1075 ls->end = nxt_sprintf(ls->start, ls->end, 1076 "socket(\\\"%*s\\\") failed %E", 1077 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1078 1079 return NXT_ERROR; 1080 } 1081 1082 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) { 1083 ls->end = nxt_sprintf(ls->start, ls->end, 1084 "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E", 1085 (size_t) sa->length, nxt_sockaddr_start(sa), 1086 nxt_errno); 1087 goto fail; 1088 } 1089 1090 #if (NXT_INET6) 1091 1092 if (sa->u.sockaddr.sa_family == AF_INET6) { 1093 1094 if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) { 1095 ls->end = nxt_sprintf(ls->start, ls->end, 1096 "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E", 1097 (size_t) sa->length, nxt_sockaddr_start(sa), 1098 nxt_errno); 1099 goto fail; 1100 } 1101 } 1102 1103 #endif 1104 1105 if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) { 1106 err = nxt_errno; 1107 1108 #if (NXT_HAVE_UNIX_DOMAIN) 1109 1110 if (sa->u.sockaddr.sa_family == AF_UNIX) { 1111 switch (err) { 1112 1113 case EACCES: 1114 ls->error = NXT_SOCKET_ERROR_ACCESS; 1115 break; 1116 1117 case ENOENT: 1118 case ENOTDIR: 1119 ls->error = NXT_SOCKET_ERROR_PATH; 1120 break; 1121 } 1122 1123 } else 1124 #endif 1125 { 1126 switch (err) { 1127 1128 case EACCES: 1129 ls->error = NXT_SOCKET_ERROR_PORT; 1130 break; 1131 1132 case EADDRINUSE: 1133 ls->error = NXT_SOCKET_ERROR_INUSE; 1134 break; 1135 1136 case EADDRNOTAVAIL: 1137 ls->error = NXT_SOCKET_ERROR_NOADDR; 1138 break; 1139 } 1140 } 1141 1142 ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E", 1143 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1144 goto fail; 1145 } 1146 1147 #if (NXT_HAVE_UNIX_DOMAIN) 1148 1149 if (sa->u.sockaddr.sa_family == AF_UNIX) { 1150 char *filename; 1151 mode_t access; 1152 1153 filename = sa->u.sockaddr_un.sun_path; 1154 access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); 1155 1156 if (chmod(filename, access) != 0) { 1157 ls->end = nxt_sprintf(ls->start, ls->end, 1158 "chmod(\\\"%s\\\") failed %E", 1159 filename, nxt_errno); 1160 goto fail; 1161 } 1162 } 1163 1164 #endif 1165 1166 ls->socket = s; 1167 1168 return NXT_OK; 1169 1170 fail: 1171 1172 (void) close(s); 1173 1174 return NXT_ERROR; 1175 } 1176 1177 1178 static nxt_conf_map_t nxt_app_lang_module_map[] = { 1179 { 1180 nxt_string("type"), 1181 NXT_CONF_MAP_INT, 1182 offsetof(nxt_app_lang_module_t, type), 1183 }, 1184 1185 { 1186 nxt_string("version"), 1187 NXT_CONF_MAP_CSTRZ, 1188 offsetof(nxt_app_lang_module_t, version), 1189 }, 1190 1191 { 1192 nxt_string("file"), 1193 NXT_CONF_MAP_CSTRZ, 1194 offsetof(nxt_app_lang_module_t, file), 1195 }, 1196 }; 1197 1198 1199 static nxt_conf_map_t nxt_app_lang_mounts_map[] = { 1200 { 1201 nxt_string("src"), 1202 NXT_CONF_MAP_CSTRZ, 1203 offsetof(nxt_fs_mount_t, src), 1204 }, 1205 { 1206 nxt_string("dst"), 1207 NXT_CONF_MAP_CSTRZ, 1208 offsetof(nxt_fs_mount_t, dst), 1209 }, 1210 { 1211 nxt_string("name"), 1212 NXT_CONF_MAP_CSTRZ, 1213 offsetof(nxt_fs_mount_t, name), 1214 }, 1215 { 1216 nxt_string("type"), 1217 NXT_CONF_MAP_INT, 1218 offsetof(nxt_fs_mount_t, type), 1219 }, 1220 { 1221 nxt_string("flags"), 1222 NXT_CONF_MAP_INT, 1223 offsetof(nxt_fs_mount_t, flags), 1224 }, 1225 { 1226 nxt_string("data"), 1227 NXT_CONF_MAP_CSTRZ, 1228 offsetof(nxt_fs_mount_t, data), 1229 }, 1230 }; 1231 1232 1233 static void 1234 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1235 { 1236 uint32_t index, jindex, nmounts; 1237 nxt_mp_t *mp; 1238 nxt_int_t ret; 1239 nxt_buf_t *b; 1240 nxt_port_t *port; 1241 nxt_runtime_t *rt; 1242 nxt_fs_mount_t *mnt; 1243 nxt_conf_value_t *conf, *root, *value, *mounts; 1244 nxt_app_lang_module_t *lang; 1245 1246 static nxt_str_t root_path = nxt_string("/"); 1247 static nxt_str_t mounts_name = nxt_string("mounts"); 1248 1249 rt = task->thread->runtime; 1250 1251 if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) { 1252 return; 1253 } 1254 1255 if (nxt_exiting) { 1256 nxt_debug(task, "ignoring discovered modules, exiting"); 1257 return; 1258 } 1259 1260 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1261 msg->port_msg.reply_port); 1262 1263 if (nxt_fast_path(port != NULL)) { 1264 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 1265 msg->port_msg.stream, 0, NULL); 1266 } 1267 1268 b = msg->buf; 1269 1270 if (b == NULL) { 1271 return; 1272 } 1273 1274 mp = nxt_mp_create(1024, 128, 256, 32); 1275 if (mp == NULL) { 1276 return; 1277 } 1278 1279 b = nxt_buf_chk_make_plain(mp, b, msg->size); 1280 1281 if (b == NULL) { 1282 return; 1283 } 1284 1285 nxt_debug(task, "application languages: \"%*s\"", 1286 b->mem.free - b->mem.pos, b->mem.pos); 1287 1288 conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL); 1289 if (conf == NULL) { 1290 goto fail; 1291 } 1292 1293 root = nxt_conf_get_path(conf, &root_path); 1294 if (root == NULL) { 1295 goto fail; 1296 } 1297 1298 for (index = 0; /* void */ ; index++) { 1299 value = nxt_conf_get_array_element(root, index); 1300 if (value == NULL) { 1301 break; 1302 } 1303 1304 lang = nxt_array_zero_add(rt->languages); 1305 if (lang == NULL) { 1306 goto fail; 1307 } 1308 1309 lang->module = NULL; 1310 1311 ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map, 1312 nxt_nitems(nxt_app_lang_module_map), lang); 1313 1314 if (ret != NXT_OK) { 1315 goto fail; 1316 } 1317 1318 mounts = nxt_conf_get_object_member(value, &mounts_name, NULL); 1319 if (mounts == NULL) { 1320 nxt_alert(task, "missing mounts from discovery message."); 1321 goto fail; 1322 } 1323 1324 if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) { 1325 nxt_alert(task, "invalid mounts type from discovery message."); 1326 goto fail; 1327 } 1328 1329 nmounts = nxt_conf_array_elements_count(mounts); 1330 1331 lang->mounts = nxt_array_create(rt->mem_pool, nmounts, 1332 sizeof(nxt_fs_mount_t)); 1333 1334 if (lang->mounts == NULL) { 1335 goto fail; 1336 } 1337 1338 for (jindex = 0; /* */; jindex++) { 1339 value = nxt_conf_get_array_element(mounts, jindex); 1340 if (value == NULL) { 1341 break; 1342 } 1343 1344 mnt = nxt_array_zero_add(lang->mounts); 1345 if (mnt == NULL) { 1346 goto fail; 1347 } 1348 1349 mnt->builtin = 1; 1350 mnt->deps = 1; 1351 1352 ret = nxt_conf_map_object(rt->mem_pool, value, 1353 nxt_app_lang_mounts_map, 1354 nxt_nitems(nxt_app_lang_mounts_map), mnt); 1355 1356 if (ret != NXT_OK) { 1357 goto fail; 1358 } 1359 } 1360 1361 nxt_debug(task, "lang %d %s \"%s\" (%d mounts)", 1362 lang->type, lang->version, lang->file, lang->mounts->nelts); 1363 } 1364 1365 qsort(rt->languages->elts, rt->languages->nelts, 1366 sizeof(nxt_app_lang_module_t), nxt_app_lang_compare); 1367 1368 fail: 1369 1370 nxt_mp_destroy(mp); 1371 1372 ret = nxt_main_process_create(task, nxt_controller_process); 1373 if (ret == NXT_OK) { 1374 ret = nxt_main_process_create(task, nxt_router_process); 1375 } 1376 1377 if (nxt_slow_path(ret == NXT_ERROR)) { 1378 nxt_exiting = 1; 1379 1380 nxt_runtime_quit(task, 1); 1381 } 1382 } 1383 1384 1385 static int nxt_cdecl 1386 nxt_app_lang_compare(const void *v1, const void *v2) 1387 { 1388 int n; 1389 const nxt_app_lang_module_t *lang1, *lang2; 1390 1391 lang1 = v1; 1392 lang2 = v2; 1393 1394 n = lang1->type - lang2->type; 1395 1396 if (n != 0) { 1397 return n; 1398 } 1399 1400 n = nxt_strverscmp(lang1->version, lang2->version); 1401 1402 /* Negate result to move higher versions to the beginning. */ 1403 1404 return -n; 1405 } 1406 1407 1408 static void 1409 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1410 { 1411 void *p; 1412 size_t size; 1413 ssize_t n; 1414 nxt_int_t ret; 1415 nxt_file_t file; 1416 nxt_runtime_t *rt; 1417 1418 p = MAP_FAILED; 1419 1420 /* 1421 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 1422 * initialized in 'cleanup' section. 1423 */ 1424 size = 0; 1425 1426 if (nxt_slow_path(msg->fd[0] == -1)) { 1427 nxt_alert(task, "conf_store_handler: invalid shm fd"); 1428 goto error; 1429 } 1430 1431 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 1432 nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)", 1433 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 1434 goto error; 1435 } 1436 1437 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 1438 1439 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 1440 1441 nxt_fd_close(msg->fd[0]); 1442 msg->fd[0] = -1; 1443 1444 if (nxt_slow_path(p == MAP_FAILED)) { 1445 goto error; 1446 } 1447 1448 nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p); 1449 1450 nxt_memzero(&file, sizeof(nxt_file_t)); 1451 1452 rt = task->thread->runtime; 1453 1454 file.name = (nxt_file_name_t *) rt->conf_tmp; 1455 1456 if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY, 1457 NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS) 1458 != NXT_OK)) 1459 { 1460 goto error; 1461 } 1462 1463 n = nxt_file_write(&file, p, size, 0); 1464 1465 nxt_file_close(task, &file); 1466 1467 if (nxt_slow_path(n != (ssize_t) size)) { 1468 (void) nxt_file_delete(file.name); 1469 goto error; 1470 } 1471 1472 ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf); 1473 1474 if (nxt_fast_path(ret == NXT_OK)) { 1475 goto cleanup; 1476 } 1477 1478 error: 1479 1480 nxt_alert(task, "failed to store current configuration"); 1481 1482 cleanup: 1483 1484 if (p != MAP_FAILED) { 1485 nxt_mem_munmap(p, size); 1486 } 1487 1488 if (msg->fd[0] != -1) { 1489 nxt_fd_close(msg->fd[0]); 1490 msg->fd[0] = -1; 1491 } 1492 } 1493 1494 1495 static void 1496 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1497 { 1498 u_char *path; 1499 nxt_int_t ret; 1500 nxt_file_t file; 1501 nxt_port_t *port; 1502 nxt_port_msg_type_t type; 1503 1504 nxt_debug(task, "opening access log file"); 1505 1506 path = msg->buf->mem.pos; 1507 1508 nxt_memzero(&file, sizeof(nxt_file_t)); 1509 1510 file.name = (nxt_file_name_t *) path; 1511 file.log_level = NXT_LOG_ERR; 1512 1513 ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT, 1514 NXT_FILE_OWNER_ACCESS); 1515 1516 type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD 1517 : NXT_PORT_MSG_RPC_ERROR; 1518 1519 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1520 msg->port_msg.reply_port); 1521 1522 if (nxt_fast_path(port != NULL)) { 1523 (void) nxt_port_socket_write(task, port, type, file.fd, 1524 msg->port_msg.stream, 0, NULL); 1525 } 1526 } 1527