1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_runtime.h> 9 #include <nxt_port.h> 10 #include <nxt_main_process.h> 11 #include <nxt_conf.h> 12 #include <nxt_router.h> 13 #if (NXT_TLS) 14 #include <nxt_cert.h> 15 #endif 16 17 #include <sys/mount.h> 18 19 20 typedef struct { 21 nxt_socket_t socket; 22 nxt_socket_error_t error; 23 u_char *start; 24 u_char *end; 25 } nxt_listening_socket_t; 26 27 28 typedef struct { 29 nxt_uint_t size; 30 nxt_conf_map_t *map; 31 } nxt_conf_app_map_t; 32 33 34 extern nxt_port_handlers_t nxt_controller_process_port_handlers; 35 extern nxt_port_handlers_t nxt_router_process_port_handlers; 36 37 38 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, 39 nxt_runtime_t *rt); 40 static void nxt_main_process_title(nxt_task_t *task); 41 static nxt_int_t nxt_main_process_create(nxt_task_t *task, 42 const nxt_process_init_t init); 43 static nxt_int_t nxt_main_start_process(nxt_task_t *task, 44 nxt_process_t *process); 45 static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt); 46 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, 47 void *data); 48 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, 49 void *data); 50 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, 51 void *data); 52 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, 53 void *data); 54 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj, 55 void *data); 56 static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid); 57 static void nxt_main_port_socket_handler(nxt_task_t *task, 58 nxt_port_recv_msg_t *msg); 59 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, 60 nxt_listening_socket_t *ls); 61 static void nxt_main_port_modules_handler(nxt_task_t *task, 62 nxt_port_recv_msg_t *msg); 63 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); 64 static void nxt_main_port_conf_store_handler(nxt_task_t *task, 65 nxt_port_recv_msg_t *msg); 66 static void nxt_main_port_access_log_handler(nxt_task_t *task, 67 nxt_port_recv_msg_t *msg); 68 69 const nxt_sig_event_t nxt_main_process_signals[] = { 70 nxt_event_signal(SIGHUP, nxt_main_process_signal_handler), 71 nxt_event_signal(SIGINT, nxt_main_process_sigterm_handler), 72 nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler), 73 nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler), 74 nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler), 75 nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler), 76 nxt_event_signal_end, 77 }; 78 79 80 static nxt_bool_t nxt_exiting; 81 82 83 nxt_int_t 84 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, 85 nxt_runtime_t *rt) 86 { 87 rt->type = NXT_PROCESS_MAIN; 88 89 if (nxt_main_process_port_create(task, rt) != NXT_OK) { 90 return NXT_ERROR; 91 } 92 93 nxt_main_process_title(task); 94 95 /* 96 * The discovery process will send a message processed by 97 * nxt_main_port_modules_handler() which starts the controller 98 * and router processes. 99 */ 100 return nxt_main_process_create(task, nxt_discovery_process); 101 } 102 103 104 static nxt_conf_map_t nxt_common_app_conf[] = { 105 { 106 nxt_string("type"), 107 NXT_CONF_MAP_STR, 108 offsetof(nxt_common_app_conf_t, type), 109 }, 110 111 { 112 nxt_string("user"), 113 NXT_CONF_MAP_STR, 114 offsetof(nxt_common_app_conf_t, user), 115 }, 116 117 { 118 nxt_string("group"), 119 NXT_CONF_MAP_STR, 120 offsetof(nxt_common_app_conf_t, group), 121 }, 122 123 { 124 nxt_string("working_directory"), 125 NXT_CONF_MAP_CSTRZ, 126 offsetof(nxt_common_app_conf_t, working_directory), 127 }, 128 129 { 130 nxt_string("environment"), 131 NXT_CONF_MAP_PTR, 132 offsetof(nxt_common_app_conf_t, environment), 133 }, 134 135 { 136 nxt_string("isolation"), 137 NXT_CONF_MAP_PTR, 138 offsetof(nxt_common_app_conf_t, isolation), 139 }, 140 141 { 142 nxt_string("limits"), 143 NXT_CONF_MAP_PTR, 144 offsetof(nxt_common_app_conf_t, limits), 145 }, 146 147 }; 148 149 150 static nxt_conf_map_t nxt_common_app_limits_conf[] = { 151 { 152 nxt_string("shm"), 153 NXT_CONF_MAP_SIZE, 154 offsetof(nxt_common_app_conf_t, shm_limit), 155 }, 156 157 }; 158 159 160 static nxt_conf_map_t nxt_external_app_conf[] = { 161 { 162 nxt_string("executable"), 163 NXT_CONF_MAP_CSTRZ, 164 offsetof(nxt_common_app_conf_t, u.external.executable), 165 }, 166 167 { 168 nxt_string("arguments"), 169 NXT_CONF_MAP_PTR, 170 offsetof(nxt_common_app_conf_t, u.external.arguments), 171 }, 172 173 }; 174 175 176 static nxt_conf_map_t nxt_python_app_conf[] = { 177 { 178 nxt_string("home"), 179 NXT_CONF_MAP_CSTRZ, 180 offsetof(nxt_common_app_conf_t, u.python.home), 181 }, 182 183 { 184 nxt_string("path"), 185 NXT_CONF_MAP_PTR, 186 offsetof(nxt_common_app_conf_t, u.python.path), 187 }, 188 189 { 190 nxt_string("module"), 191 NXT_CONF_MAP_STR, 192 offsetof(nxt_common_app_conf_t, u.python.module), 193 }, 194 195 { 196 nxt_string("callable"), 197 NXT_CONF_MAP_CSTRZ, 198 offsetof(nxt_common_app_conf_t, u.python.callable), 199 }, 200 201 { 202 nxt_string("protocol"), 203 NXT_CONF_MAP_STR, 204 offsetof(nxt_common_app_conf_t, u.python.protocol), 205 }, 206 207 { 208 nxt_string("threads"), 209 NXT_CONF_MAP_INT32, 210 offsetof(nxt_common_app_conf_t, u.python.threads), 211 }, 212 213 { 214 nxt_string("thread_stack_size"), 215 NXT_CONF_MAP_INT32, 216 offsetof(nxt_common_app_conf_t, u.python.thread_stack_size), 217 }, 218 }; 219 220 221 static nxt_conf_map_t nxt_php_app_conf[] = { 222 { 223 nxt_string("targets"), 224 NXT_CONF_MAP_PTR, 225 offsetof(nxt_common_app_conf_t, u.php.targets), 226 }, 227 228 { 229 nxt_string("options"), 230 NXT_CONF_MAP_PTR, 231 offsetof(nxt_common_app_conf_t, u.php.options), 232 }, 233 }; 234 235 236 static nxt_conf_map_t nxt_perl_app_conf[] = { 237 { 238 nxt_string("script"), 239 NXT_CONF_MAP_CSTRZ, 240 offsetof(nxt_common_app_conf_t, u.perl.script), 241 }, 242 243 { 244 nxt_string("threads"), 245 NXT_CONF_MAP_INT32, 246 offsetof(nxt_common_app_conf_t, u.perl.threads), 247 }, 248 249 { 250 nxt_string("thread_stack_size"), 251 NXT_CONF_MAP_INT32, 252 offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size), 253 }, 254 }; 255 256 257 static nxt_conf_map_t nxt_ruby_app_conf[] = { 258 { 259 nxt_string("script"), 260 NXT_CONF_MAP_STR, 261 offsetof(nxt_common_app_conf_t, u.ruby.script), 262 }, 263 { 264 nxt_string("threads"), 265 NXT_CONF_MAP_INT32, 266 offsetof(nxt_common_app_conf_t, u.ruby.threads), 267 }, 268 }; 269 270 271 static nxt_conf_map_t nxt_java_app_conf[] = { 272 { 273 nxt_string("classpath"), 274 NXT_CONF_MAP_PTR, 275 offsetof(nxt_common_app_conf_t, u.java.classpath), 276 }, 277 { 278 nxt_string("webapp"), 279 NXT_CONF_MAP_CSTRZ, 280 offsetof(nxt_common_app_conf_t, u.java.webapp), 281 }, 282 { 283 nxt_string("options"), 284 NXT_CONF_MAP_PTR, 285 offsetof(nxt_common_app_conf_t, u.java.options), 286 }, 287 { 288 nxt_string("unit_jars"), 289 NXT_CONF_MAP_CSTRZ, 290 offsetof(nxt_common_app_conf_t, u.java.unit_jars), 291 }, 292 { 293 nxt_string("threads"), 294 NXT_CONF_MAP_INT32, 295 offsetof(nxt_common_app_conf_t, u.java.threads), 296 }, 297 { 298 nxt_string("thread_stack_size"), 299 NXT_CONF_MAP_INT32, 300 offsetof(nxt_common_app_conf_t, u.java.thread_stack_size), 301 }, 302 303 }; 304 305 306 static nxt_conf_app_map_t nxt_app_maps[] = { 307 { nxt_nitems(nxt_external_app_conf), nxt_external_app_conf }, 308 { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf }, 309 { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf }, 310 { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf }, 311 { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf }, 312 { nxt_nitems(nxt_java_app_conf), nxt_java_app_conf }, 313 }; 314 315 316 static void 317 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 318 { 319 nxt_debug(task, "main data: %*s", 320 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 321 } 322 323 324 static void 325 nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 326 { 327 u_char *start, *p, ch; 328 size_t type_len; 329 nxt_int_t ret; 330 nxt_buf_t *b; 331 nxt_port_t *port; 332 nxt_runtime_t *rt; 333 nxt_process_t *process; 334 nxt_app_type_t idx; 335 nxt_conf_value_t *conf; 336 nxt_process_init_t *init; 337 nxt_common_app_conf_t *app_conf; 338 339 ret = NXT_ERROR; 340 341 rt = task->thread->runtime; 342 343 process = nxt_main_process_new(task, rt); 344 if (nxt_slow_path(process == NULL)) { 345 return; 346 } 347 348 init = nxt_process_init(process); 349 350 *init = nxt_app_process; 351 352 b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size); 353 if (b == NULL) { 354 goto failed; 355 } 356 357 nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos, 358 b->mem.pos); 359 360 app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t)); 361 if (nxt_slow_path(app_conf == NULL)) { 362 goto failed; 363 } 364 365 start = b->mem.pos; 366 367 app_conf->name.start = start; 368 app_conf->name.length = nxt_strlen(start); 369 370 init->name = (const char *) start; 371 372 process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length 373 + sizeof("\"\" application") + 1); 374 375 if (nxt_slow_path(process->name == NULL)) { 376 goto failed; 377 } 378 379 p = (u_char *) process->name; 380 *p++ = '"'; 381 p = nxt_cpymem(p, init->name, app_conf->name.length); 382 p = nxt_cpymem(p, "\" application", 13); 383 *p = '\0'; 384 385 app_conf->shm_limit = 100 * 1024 * 1024; 386 387 start += app_conf->name.length + 1; 388 389 conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL); 390 if (conf == NULL) { 391 nxt_alert(task, "router app configuration parsing error"); 392 393 goto failed; 394 } 395 396 rt = task->thread->runtime; 397 398 app_conf->user.start = (u_char*)rt->user_cred.user; 399 app_conf->user.length = nxt_strlen(rt->user_cred.user); 400 401 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf, 402 nxt_nitems(nxt_common_app_conf), app_conf); 403 404 if (ret != NXT_OK) { 405 nxt_alert(task, "failed to map common app conf received from router"); 406 goto failed; 407 } 408 409 for (type_len = 0; type_len != app_conf->type.length; type_len++) { 410 ch = app_conf->type.start[type_len]; 411 412 if (ch == ' ' || nxt_isdigit(ch)) { 413 break; 414 } 415 } 416 417 idx = nxt_app_parse_type(app_conf->type.start, type_len); 418 419 if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) { 420 nxt_alert(task, "invalid app type %d received from router", (int) idx); 421 goto failed; 422 } 423 424 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map, 425 nxt_app_maps[idx].size, app_conf); 426 427 if (nxt_slow_path(ret != NXT_OK)) { 428 nxt_alert(task, "failed to map app conf received from router"); 429 goto failed; 430 } 431 432 if (app_conf->limits != NULL) { 433 ret = nxt_conf_map_object(process->mem_pool, app_conf->limits, 434 nxt_common_app_limits_conf, 435 nxt_nitems(nxt_common_app_limits_conf), 436 app_conf); 437 438 if (nxt_slow_path(ret != NXT_OK)) { 439 nxt_alert(task, "failed to map app limits received from router"); 440 goto failed; 441 } 442 } 443 444 app_conf->self = conf; 445 446 process->stream = msg->port_msg.stream; 447 process->data.app = app_conf; 448 449 ret = nxt_main_start_process(task, process); 450 if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) { 451 return; 452 } 453 454 failed: 455 456 nxt_process_use(task, process, -1); 457 458 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 459 msg->port_msg.reply_port); 460 461 if (nxt_fast_path(port != NULL)) { 462 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 463 -1, msg->port_msg.stream, 0, NULL); 464 } 465 } 466 467 468 static void 469 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 470 { 471 nxt_port_t *port; 472 nxt_process_t *process; 473 nxt_runtime_t *rt; 474 475 rt = task->thread->runtime; 476 477 process = nxt_runtime_process_find(rt, msg->port_msg.pid); 478 if (nxt_slow_path(process == NULL)) { 479 return; 480 } 481 482 nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); 483 484 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 485 msg->port_msg.reply_port); 486 487 488 if (nxt_slow_path(port == NULL)) { 489 return; 490 } 491 492 #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) 493 if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { 494 if (nxt_slow_path(nxt_clone_credential_map(task, process->pid, 495 process->user_cred, 496 &process->isolation.clone) 497 != NXT_OK)) 498 { 499 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 500 -1, msg->port_msg.stream, 0, NULL); 501 return; 502 } 503 } 504 505 #endif 506 507 process->state = NXT_PROCESS_STATE_CREATED; 508 509 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, 510 -1, msg->port_msg.stream, 0, NULL); 511 } 512 513 514 static nxt_port_handlers_t nxt_main_process_port_handlers = { 515 .data = nxt_port_main_data_handler, 516 .process_created = nxt_main_process_created_handler, 517 .process_ready = nxt_port_process_ready_handler, 518 .start_process = nxt_port_main_start_process_handler, 519 .socket = nxt_main_port_socket_handler, 520 .modules = nxt_main_port_modules_handler, 521 .conf_store = nxt_main_port_conf_store_handler, 522 #if (NXT_TLS) 523 .cert_get = nxt_cert_store_get_handler, 524 .cert_delete = nxt_cert_store_delete_handler, 525 #endif 526 .access_log = nxt_main_port_access_log_handler, 527 .rpc_ready = nxt_port_rpc_handler, 528 .rpc_error = nxt_port_rpc_handler, 529 }; 530 531 532 static nxt_int_t 533 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) 534 { 535 nxt_int_t ret; 536 nxt_port_t *port; 537 nxt_process_t *process; 538 539 port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0, 540 NXT_PROCESS_MAIN); 541 if (nxt_slow_path(port == NULL)) { 542 return NXT_ERROR; 543 } 544 545 process = port->process; 546 547 ret = nxt_port_socket_init(task, port, 0); 548 if (nxt_slow_path(ret != NXT_OK)) { 549 nxt_port_use(task, port, -1); 550 return ret; 551 } 552 553 /* 554 * A main process port. A write port is not closed 555 * since it should be inherited by processes. 556 */ 557 nxt_port_enable(task, port, &nxt_main_process_port_handlers); 558 559 process->state = NXT_PROCESS_STATE_READY; 560 561 return NXT_OK; 562 } 563 564 565 static void 566 nxt_main_process_title(nxt_task_t *task) 567 { 568 u_char *p, *end; 569 nxt_uint_t i; 570 u_char title[2048]; 571 572 end = title + sizeof(title) - 1; 573 574 p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s", 575 nxt_process_argv[0]); 576 577 for (i = 1; nxt_process_argv[i] != NULL; i++) { 578 p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]); 579 } 580 581 if (p < end) { 582 *p++ = ']'; 583 } 584 585 *p = '\0'; 586 587 nxt_process_title(task, "%s", title); 588 } 589 590 591 static nxt_int_t 592 nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init) 593 { 594 nxt_int_t ret; 595 nxt_runtime_t *rt; 596 nxt_process_t *process; 597 nxt_process_init_t *pinit; 598 599 rt = task->thread->runtime; 600 601 process = nxt_main_process_new(task, rt); 602 if (nxt_slow_path(process == NULL)) { 603 return NXT_ERROR; 604 } 605 606 process->name = init.name; 607 process->user_cred = &rt->user_cred; 608 609 pinit = nxt_process_init(process); 610 *pinit = init; 611 612 ret = nxt_main_start_process(task, process); 613 if (nxt_slow_path(ret == NXT_ERROR)) { 614 nxt_process_use(task, process, -1); 615 } 616 617 return ret; 618 } 619 620 621 static nxt_process_t * 622 nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt) 623 { 624 nxt_process_t *process; 625 626 process = nxt_runtime_process_new(rt); 627 if (nxt_slow_path(process == NULL)) { 628 return NULL; 629 } 630 631 process->mem_pool = nxt_mp_create(1024, 128, 256, 32); 632 if (process->mem_pool == NULL) { 633 nxt_process_use(task, process, -1); 634 return NULL; 635 } 636 637 return process; 638 } 639 640 641 static nxt_int_t 642 nxt_main_start_process(nxt_task_t *task, nxt_process_t *process) 643 { 644 nxt_mp_t *tmp_mp; 645 nxt_int_t ret; 646 nxt_pid_t pid; 647 nxt_port_t *port; 648 nxt_process_init_t *init; 649 650 init = nxt_process_init(process); 651 652 port = nxt_port_new(task, 0, 0, init->type); 653 if (nxt_slow_path(port == NULL)) { 654 return NXT_ERROR; 655 } 656 657 nxt_process_port_add(task, process, port); 658 659 ret = nxt_port_socket_init(task, port, 0); 660 if (nxt_slow_path(ret != NXT_OK)) { 661 goto free_port; 662 } 663 664 tmp_mp = nxt_mp_create(1024, 128, 256, 32); 665 if (nxt_slow_path(tmp_mp == NULL)) { 666 ret = NXT_ERROR; 667 668 goto close_port; 669 } 670 671 if (init->prefork) { 672 ret = init->prefork(task, process, tmp_mp); 673 if (nxt_slow_path(ret != NXT_OK)) { 674 goto free_mempool; 675 } 676 } 677 678 pid = nxt_process_create(task, process); 679 680 switch (pid) { 681 682 case -1: 683 ret = NXT_ERROR; 684 break; 685 686 case 0: 687 /* The child process: return to the event engine work queue loop. */ 688 689 nxt_process_use(task, process, -1); 690 691 ret = NXT_AGAIN; 692 break; 693 694 default: 695 /* The main process created a new process. */ 696 697 nxt_process_use(task, process, -1); 698 699 nxt_port_read_close(port); 700 nxt_port_write_enable(task, port); 701 702 ret = NXT_OK; 703 break; 704 } 705 706 free_mempool: 707 708 nxt_mp_destroy(tmp_mp); 709 710 close_port: 711 712 if (nxt_slow_path(ret == NXT_ERROR)) { 713 nxt_port_close(task, port); 714 } 715 716 free_port: 717 718 nxt_port_use(task, port, -1); 719 720 return ret; 721 } 722 723 724 static void 725 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) 726 { 727 nxt_debug(task, "sigterm handler signo:%d (%s)", 728 (int) (uintptr_t) obj, data); 729 730 /* TODO: fast exit. */ 731 732 nxt_exiting = 1; 733 734 nxt_runtime_quit(task, 0); 735 } 736 737 738 static void 739 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data) 740 { 741 nxt_debug(task, "sigquit handler signo:%d (%s)", 742 (int) (uintptr_t) obj, data); 743 744 /* TODO: graceful exit. */ 745 746 nxt_exiting = 1; 747 748 nxt_runtime_quit(task, 0); 749 } 750 751 752 static void 753 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) 754 { 755 nxt_mp_t *mp; 756 nxt_int_t ret; 757 nxt_uint_t n; 758 nxt_port_t *port; 759 nxt_file_t *file, *new_file; 760 nxt_array_t *new_files; 761 nxt_runtime_t *rt; 762 763 nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s", 764 (int) (uintptr_t) obj, data, "log files rotation"); 765 766 rt = task->thread->runtime; 767 768 port = rt->port_by_type[NXT_PROCESS_ROUTER]; 769 770 if (nxt_fast_path(port != NULL)) { 771 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG, 772 -1, 0, 0, NULL); 773 } 774 775 mp = nxt_mp_create(1024, 128, 256, 32); 776 if (mp == NULL) { 777 return; 778 } 779 780 n = nxt_list_nelts(rt->log_files); 781 782 new_files = nxt_array_create(mp, n, sizeof(nxt_file_t)); 783 if (new_files == NULL) { 784 nxt_mp_destroy(mp); 785 return; 786 } 787 788 nxt_list_each(file, rt->log_files) { 789 790 /* This allocation cannot fail. */ 791 new_file = nxt_array_add(new_files); 792 793 new_file->name = file->name; 794 new_file->fd = NXT_FILE_INVALID; 795 new_file->log_level = NXT_LOG_ALERT; 796 797 ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT, 798 NXT_FILE_OWNER_ACCESS); 799 800 if (ret != NXT_OK) { 801 goto fail; 802 } 803 804 } nxt_list_loop; 805 806 new_file = new_files->elts; 807 808 ret = nxt_file_stderr(&new_file[0]); 809 810 if (ret == NXT_OK) { 811 n = 0; 812 813 nxt_list_each(file, rt->log_files) { 814 815 nxt_port_change_log_file(task, rt, n, new_file[n].fd); 816 /* 817 * The old log file descriptor must be closed at the moment 818 * when no other threads use it. dup2() allows to use the 819 * old file descriptor for new log file. This change is 820 * performed atomically in the kernel. 821 */ 822 (void) nxt_file_redirect(file, new_file[n].fd); 823 824 n++; 825 826 } nxt_list_loop; 827 828 nxt_mp_destroy(mp); 829 return; 830 } 831 832 fail: 833 834 new_file = new_files->elts; 835 n = new_files->nelts; 836 837 while (n != 0) { 838 if (new_file->fd != NXT_FILE_INVALID) { 839 nxt_file_close(task, new_file); 840 } 841 842 new_file++; 843 n--; 844 } 845 846 nxt_mp_destroy(mp); 847 } 848 849 850 static void 851 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) 852 { 853 int status; 854 nxt_err_t err; 855 nxt_pid_t pid; 856 857 nxt_debug(task, "sigchld handler signo:%d (%s)", 858 (int) (uintptr_t) obj, data); 859 860 for ( ;; ) { 861 pid = waitpid(-1, &status, WNOHANG); 862 863 if (pid == -1) { 864 865 switch (err = nxt_errno) { 866 867 case NXT_ECHILD: 868 return; 869 870 case NXT_EINTR: 871 continue; 872 873 default: 874 nxt_alert(task, "waitpid() failed: %E", err); 875 return; 876 } 877 } 878 879 nxt_debug(task, "waitpid(): %PI", pid); 880 881 if (pid == 0) { 882 return; 883 } 884 885 if (WTERMSIG(status)) { 886 #ifdef WCOREDUMP 887 nxt_alert(task, "process %PI exited on signal %d%s", 888 pid, WTERMSIG(status), 889 WCOREDUMP(status) ? " (core dumped)" : ""); 890 #else 891 nxt_alert(task, "process %PI exited on signal %d", 892 pid, WTERMSIG(status)); 893 #endif 894 895 } else { 896 nxt_trace(task, "process %PI exited with code %d", 897 pid, WEXITSTATUS(status)); 898 } 899 900 nxt_main_cleanup_process(task, pid); 901 } 902 } 903 904 905 static void 906 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) 907 { 908 nxt_trace(task, "signal signo:%d (%s) recevied, ignored", 909 (int) (uintptr_t) obj, data); 910 } 911 912 913 static void 914 nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid) 915 { 916 int stream; 917 nxt_int_t ret; 918 nxt_buf_t *buf; 919 nxt_port_t *port; 920 const char *name; 921 nxt_runtime_t *rt; 922 nxt_process_t *process; 923 nxt_process_init_t init; 924 925 rt = task->thread->runtime; 926 927 process = nxt_runtime_process_find(rt, pid); 928 if (!process) { 929 return; 930 } 931 932 if (process->isolation.cleanup != NULL) { 933 process->isolation.cleanup(task, process); 934 } 935 936 name = process->name; 937 stream = process->stream; 938 init = *((nxt_process_init_t *) nxt_process_init(process)); 939 940 if (process->state == NXT_PROCESS_STATE_READY) { 941 process->stream = 0; 942 } 943 944 nxt_process_close_ports(task, process); 945 946 if (nxt_exiting) { 947 if (rt->nprocesses <= 1) { 948 nxt_runtime_quit(task, 0); 949 } 950 951 return; 952 } 953 954 nxt_runtime_process_each(rt, process) { 955 956 if (process->pid == nxt_pid 957 || process->pid == pid 958 || nxt_queue_is_empty(&process->ports)) 959 { 960 continue; 961 } 962 963 port = nxt_process_port_first(process); 964 965 if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) { 966 continue; 967 } 968 969 buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 970 sizeof(pid)); 971 972 if (nxt_slow_path(buf == NULL)) { 973 continue; 974 } 975 976 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); 977 978 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, 979 stream, 0, buf); 980 981 } nxt_runtime_process_loop; 982 983 if (init.restart) { 984 ret = nxt_main_process_create(task, init); 985 if (nxt_slow_path(ret == NXT_ERROR)) { 986 nxt_alert(task, "failed to restart %s", name); 987 } 988 } 989 } 990 991 992 static void 993 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 994 { 995 size_t size; 996 nxt_int_t ret; 997 nxt_buf_t *b, *out; 998 nxt_port_t *port; 999 nxt_sockaddr_t *sa; 1000 nxt_port_msg_type_t type; 1001 nxt_listening_socket_t ls; 1002 u_char message[2048]; 1003 1004 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1005 msg->port_msg.reply_port); 1006 if (nxt_slow_path(port == NULL)) { 1007 return; 1008 } 1009 1010 b = msg->buf; 1011 sa = (nxt_sockaddr_t *) b->mem.pos; 1012 1013 /* TODO check b size and make plain */ 1014 1015 ls.socket = -1; 1016 ls.error = NXT_SOCKET_ERROR_SYSTEM; 1017 ls.start = message; 1018 ls.end = message + sizeof(message); 1019 1020 nxt_debug(task, "listening socket \"%*s\"", 1021 (size_t) sa->length, nxt_sockaddr_start(sa)); 1022 1023 ret = nxt_main_listening_socket(sa, &ls); 1024 1025 if (ret == NXT_OK) { 1026 nxt_debug(task, "socket(\"%*s\"): %d", 1027 (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket); 1028 1029 out = NULL; 1030 1031 type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD; 1032 1033 } else { 1034 size = ls.end - ls.start; 1035 1036 nxt_alert(task, "%*s", size, ls.start); 1037 1038 out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 1039 size + 1); 1040 if (nxt_fast_path(out != NULL)) { 1041 *out->mem.free++ = (uint8_t) ls.error; 1042 1043 out->mem.free = nxt_cpymem(out->mem.free, ls.start, size); 1044 } 1045 1046 type = NXT_PORT_MSG_RPC_ERROR; 1047 } 1048 1049 nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream, 1050 0, out); 1051 } 1052 1053 1054 static nxt_int_t 1055 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls) 1056 { 1057 nxt_err_t err; 1058 nxt_socket_t s; 1059 1060 const socklen_t length = sizeof(int); 1061 static const int enable = 1; 1062 1063 s = socket(sa->u.sockaddr.sa_family, sa->type, 0); 1064 1065 if (nxt_slow_path(s == -1)) { 1066 err = nxt_errno; 1067 1068 #if (NXT_INET6) 1069 1070 if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) { 1071 ls->error = NXT_SOCKET_ERROR_NOINET6; 1072 } 1073 1074 #endif 1075 1076 ls->end = nxt_sprintf(ls->start, ls->end, 1077 "socket(\\\"%*s\\\") failed %E", 1078 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1079 1080 return NXT_ERROR; 1081 } 1082 1083 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) { 1084 ls->end = nxt_sprintf(ls->start, ls->end, 1085 "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E", 1086 (size_t) sa->length, nxt_sockaddr_start(sa), 1087 nxt_errno); 1088 goto fail; 1089 } 1090 1091 #if (NXT_INET6) 1092 1093 if (sa->u.sockaddr.sa_family == AF_INET6) { 1094 1095 if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) { 1096 ls->end = nxt_sprintf(ls->start, ls->end, 1097 "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E", 1098 (size_t) sa->length, nxt_sockaddr_start(sa), 1099 nxt_errno); 1100 goto fail; 1101 } 1102 } 1103 1104 #endif 1105 1106 if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) { 1107 err = nxt_errno; 1108 1109 #if (NXT_HAVE_UNIX_DOMAIN) 1110 1111 if (sa->u.sockaddr.sa_family == AF_UNIX) { 1112 switch (err) { 1113 1114 case EACCES: 1115 ls->error = NXT_SOCKET_ERROR_ACCESS; 1116 break; 1117 1118 case ENOENT: 1119 case ENOTDIR: 1120 ls->error = NXT_SOCKET_ERROR_PATH; 1121 break; 1122 } 1123 1124 } else 1125 #endif 1126 { 1127 switch (err) { 1128 1129 case EACCES: 1130 ls->error = NXT_SOCKET_ERROR_PORT; 1131 break; 1132 1133 case EADDRINUSE: 1134 ls->error = NXT_SOCKET_ERROR_INUSE; 1135 break; 1136 1137 case EADDRNOTAVAIL: 1138 ls->error = NXT_SOCKET_ERROR_NOADDR; 1139 break; 1140 } 1141 } 1142 1143 ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E", 1144 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1145 goto fail; 1146 } 1147 1148 #if (NXT_HAVE_UNIX_DOMAIN) 1149 1150 if (sa->u.sockaddr.sa_family == AF_UNIX) { 1151 char *filename; 1152 mode_t access; 1153 1154 filename = sa->u.sockaddr_un.sun_path; 1155 access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); 1156 1157 if (chmod(filename, access) != 0) { 1158 ls->end = nxt_sprintf(ls->start, ls->end, 1159 "chmod(\\\"%s\\\") failed %E", 1160 filename, nxt_errno); 1161 goto fail; 1162 } 1163 } 1164 1165 #endif 1166 1167 ls->socket = s; 1168 1169 return NXT_OK; 1170 1171 fail: 1172 1173 (void) close(s); 1174 1175 return NXT_ERROR; 1176 } 1177 1178 1179 static nxt_conf_map_t nxt_app_lang_module_map[] = { 1180 { 1181 nxt_string("type"), 1182 NXT_CONF_MAP_INT, 1183 offsetof(nxt_app_lang_module_t, type), 1184 }, 1185 1186 { 1187 nxt_string("version"), 1188 NXT_CONF_MAP_CSTRZ, 1189 offsetof(nxt_app_lang_module_t, version), 1190 }, 1191 1192 { 1193 nxt_string("file"), 1194 NXT_CONF_MAP_CSTRZ, 1195 offsetof(nxt_app_lang_module_t, file), 1196 }, 1197 }; 1198 1199 1200 static nxt_conf_map_t nxt_app_lang_mounts_map[] = { 1201 { 1202 nxt_string("src"), 1203 NXT_CONF_MAP_CSTRZ, 1204 offsetof(nxt_fs_mount_t, src), 1205 }, 1206 { 1207 nxt_string("dst"), 1208 NXT_CONF_MAP_CSTRZ, 1209 offsetof(nxt_fs_mount_t, dst), 1210 }, 1211 { 1212 nxt_string("name"), 1213 NXT_CONF_MAP_CSTRZ, 1214 offsetof(nxt_fs_mount_t, name), 1215 }, 1216 { 1217 nxt_string("type"), 1218 NXT_CONF_MAP_INT, 1219 offsetof(nxt_fs_mount_t, type), 1220 }, 1221 { 1222 nxt_string("flags"), 1223 NXT_CONF_MAP_INT, 1224 offsetof(nxt_fs_mount_t, flags), 1225 }, 1226 { 1227 nxt_string("data"), 1228 NXT_CONF_MAP_CSTRZ, 1229 offsetof(nxt_fs_mount_t, data), 1230 }, 1231 }; 1232 1233 1234 static void 1235 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1236 { 1237 uint32_t index, jindex, nmounts; 1238 nxt_mp_t *mp; 1239 nxt_int_t ret; 1240 nxt_buf_t *b; 1241 nxt_port_t *port; 1242 nxt_runtime_t *rt; 1243 nxt_fs_mount_t *mnt; 1244 nxt_conf_value_t *conf, *root, *value, *mounts; 1245 nxt_app_lang_module_t *lang; 1246 1247 static nxt_str_t root_path = nxt_string("/"); 1248 static nxt_str_t mounts_name = nxt_string("mounts"); 1249 1250 rt = task->thread->runtime; 1251 1252 if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) { 1253 return; 1254 } 1255 1256 if (nxt_exiting) { 1257 nxt_debug(task, "ignoring discovered modules, exiting"); 1258 return; 1259 } 1260 1261 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1262 msg->port_msg.reply_port); 1263 1264 if (nxt_fast_path(port != NULL)) { 1265 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 1266 msg->port_msg.stream, 0, NULL); 1267 } 1268 1269 b = msg->buf; 1270 1271 if (b == NULL) { 1272 return; 1273 } 1274 1275 mp = nxt_mp_create(1024, 128, 256, 32); 1276 if (mp == NULL) { 1277 return; 1278 } 1279 1280 b = nxt_buf_chk_make_plain(mp, b, msg->size); 1281 1282 if (b == NULL) { 1283 return; 1284 } 1285 1286 nxt_debug(task, "application languages: \"%*s\"", 1287 b->mem.free - b->mem.pos, b->mem.pos); 1288 1289 conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL); 1290 if (conf == NULL) { 1291 goto fail; 1292 } 1293 1294 root = nxt_conf_get_path(conf, &root_path); 1295 if (root == NULL) { 1296 goto fail; 1297 } 1298 1299 for (index = 0; /* void */ ; index++) { 1300 value = nxt_conf_get_array_element(root, index); 1301 if (value == NULL) { 1302 break; 1303 } 1304 1305 lang = nxt_array_zero_add(rt->languages); 1306 if (lang == NULL) { 1307 goto fail; 1308 } 1309 1310 lang->module = NULL; 1311 1312 ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map, 1313 nxt_nitems(nxt_app_lang_module_map), lang); 1314 1315 if (ret != NXT_OK) { 1316 goto fail; 1317 } 1318 1319 mounts = nxt_conf_get_object_member(value, &mounts_name, NULL); 1320 if (mounts == NULL) { 1321 nxt_alert(task, "missing mounts from discovery message."); 1322 goto fail; 1323 } 1324 1325 if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) { 1326 nxt_alert(task, "invalid mounts type from discovery message."); 1327 goto fail; 1328 } 1329 1330 nmounts = nxt_conf_array_elements_count(mounts); 1331 1332 lang->mounts = nxt_array_create(rt->mem_pool, nmounts, 1333 sizeof(nxt_fs_mount_t)); 1334 1335 if (lang->mounts == NULL) { 1336 goto fail; 1337 } 1338 1339 for (jindex = 0; /* */; jindex++) { 1340 value = nxt_conf_get_array_element(mounts, jindex); 1341 if (value == NULL) { 1342 break; 1343 } 1344 1345 mnt = nxt_array_zero_add(lang->mounts); 1346 if (mnt == NULL) { 1347 goto fail; 1348 } 1349 1350 mnt->builtin = 1; 1351 mnt->deps = 1; 1352 1353 ret = nxt_conf_map_object(rt->mem_pool, value, 1354 nxt_app_lang_mounts_map, 1355 nxt_nitems(nxt_app_lang_mounts_map), mnt); 1356 1357 if (ret != NXT_OK) { 1358 goto fail; 1359 } 1360 } 1361 1362 nxt_debug(task, "lang %d %s \"%s\" (%d mounts)", 1363 lang->type, lang->version, lang->file, lang->mounts->nelts); 1364 } 1365 1366 qsort(rt->languages->elts, rt->languages->nelts, 1367 sizeof(nxt_app_lang_module_t), nxt_app_lang_compare); 1368 1369 fail: 1370 1371 nxt_mp_destroy(mp); 1372 1373 ret = nxt_main_process_create(task, nxt_controller_process); 1374 if (ret == NXT_OK) { 1375 ret = nxt_main_process_create(task, nxt_router_process); 1376 } 1377 1378 if (nxt_slow_path(ret == NXT_ERROR)) { 1379 nxt_exiting = 1; 1380 1381 nxt_runtime_quit(task, 1); 1382 } 1383 } 1384 1385 1386 static int nxt_cdecl 1387 nxt_app_lang_compare(const void *v1, const void *v2) 1388 { 1389 int n; 1390 const nxt_app_lang_module_t *lang1, *lang2; 1391 1392 lang1 = v1; 1393 lang2 = v2; 1394 1395 n = lang1->type - lang2->type; 1396 1397 if (n != 0) { 1398 return n; 1399 } 1400 1401 n = nxt_strverscmp(lang1->version, lang2->version); 1402 1403 /* Negate result to move higher versions to the beginning. */ 1404 1405 return -n; 1406 } 1407 1408 1409 static void 1410 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1411 { 1412 void *p; 1413 size_t size; 1414 ssize_t n; 1415 nxt_int_t ret; 1416 nxt_file_t file; 1417 nxt_runtime_t *rt; 1418 1419 p = MAP_FAILED; 1420 1421 /* 1422 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 1423 * initialized in 'cleanup' section. 1424 */ 1425 size = 0; 1426 1427 if (nxt_slow_path(msg->fd[0] == -1)) { 1428 nxt_alert(task, "conf_store_handler: invalid shm fd"); 1429 goto error; 1430 } 1431 1432 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 1433 nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)", 1434 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 1435 goto error; 1436 } 1437 1438 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 1439 1440 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 1441 1442 nxt_fd_close(msg->fd[0]); 1443 msg->fd[0] = -1; 1444 1445 if (nxt_slow_path(p == MAP_FAILED)) { 1446 goto error; 1447 } 1448 1449 nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p); 1450 1451 nxt_memzero(&file, sizeof(nxt_file_t)); 1452 1453 rt = task->thread->runtime; 1454 1455 file.name = (nxt_file_name_t *) rt->conf_tmp; 1456 1457 if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY, 1458 NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS) 1459 != NXT_OK)) 1460 { 1461 goto error; 1462 } 1463 1464 n = nxt_file_write(&file, p, size, 0); 1465 1466 nxt_file_close(task, &file); 1467 1468 if (nxt_slow_path(n != (ssize_t) size)) { 1469 (void) nxt_file_delete(file.name); 1470 goto error; 1471 } 1472 1473 ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf); 1474 1475 if (nxt_fast_path(ret == NXT_OK)) { 1476 goto cleanup; 1477 } 1478 1479 error: 1480 1481 nxt_alert(task, "failed to store current configuration"); 1482 1483 cleanup: 1484 1485 if (p != MAP_FAILED) { 1486 nxt_mem_munmap(p, size); 1487 } 1488 1489 if (msg->fd[0] != -1) { 1490 nxt_fd_close(msg->fd[0]); 1491 msg->fd[0] = -1; 1492 } 1493 } 1494 1495 1496 static void 1497 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1498 { 1499 u_char *path; 1500 nxt_int_t ret; 1501 nxt_file_t file; 1502 nxt_port_t *port; 1503 nxt_port_msg_type_t type; 1504 1505 nxt_debug(task, "opening access log file"); 1506 1507 path = msg->buf->mem.pos; 1508 1509 nxt_memzero(&file, sizeof(nxt_file_t)); 1510 1511 file.name = (nxt_file_name_t *) path; 1512 file.log_level = NXT_LOG_ERR; 1513 1514 ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT, 1515 NXT_FILE_OWNER_ACCESS); 1516 1517 type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD 1518 : NXT_PORT_MSG_RPC_ERROR; 1519 1520 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1521 msg->port_msg.reply_port); 1522 1523 if (nxt_fast_path(port != NULL)) { 1524 (void) nxt_port_socket_write(task, port, type, file.fd, 1525 msg->port_msg.stream, 0, NULL); 1526 } 1527 } 1528