1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_runtime.h> 9 #include <nxt_port.h> 10 #include <nxt_main_process.h> 11 #include <nxt_conf.h> 12 #include <nxt_router.h> 13 #if (NXT_TLS) 14 #include <nxt_cert.h> 15 #endif 16 17 #include <sys/mount.h> 18 19 20 typedef struct { 21 nxt_socket_t socket; 22 nxt_socket_error_t error; 23 u_char *start; 24 u_char *end; 25 } nxt_listening_socket_t; 26 27 28 typedef struct { 29 nxt_uint_t size; 30 nxt_conf_map_t *map; 31 } nxt_conf_app_map_t; 32 33 34 extern nxt_port_handlers_t nxt_controller_process_port_handlers; 35 extern nxt_port_handlers_t nxt_router_process_port_handlers; 36 37 38 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, 39 nxt_runtime_t *rt); 40 static void nxt_main_process_title(nxt_task_t *task); 41 static nxt_int_t nxt_main_process_create(nxt_task_t *task, 42 const nxt_process_init_t init); 43 static nxt_int_t nxt_main_start_process(nxt_task_t *task, 44 nxt_process_t *process); 45 static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt); 46 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, 47 void *data); 48 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, 49 void *data); 50 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, 51 void *data); 52 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, 53 void *data); 54 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj, 55 void *data); 56 static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid); 57 static void nxt_main_port_socket_handler(nxt_task_t *task, 58 nxt_port_recv_msg_t *msg); 59 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, 60 nxt_listening_socket_t *ls); 61 static void nxt_main_port_modules_handler(nxt_task_t *task, 62 nxt_port_recv_msg_t *msg); 63 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); 64 static void nxt_main_port_conf_store_handler(nxt_task_t *task, 65 nxt_port_recv_msg_t *msg); 66 static void nxt_main_port_access_log_handler(nxt_task_t *task, 67 nxt_port_recv_msg_t *msg); 68 69 const nxt_sig_event_t nxt_main_process_signals[] = { 70 nxt_event_signal(SIGHUP, nxt_main_process_signal_handler), 71 nxt_event_signal(SIGINT, nxt_main_process_sigterm_handler), 72 nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler), 73 nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler), 74 nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler), 75 nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler), 76 nxt_event_signal_end, 77 }; 78 79 80 static nxt_bool_t nxt_exiting; 81 82 83 nxt_int_t 84 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, 85 nxt_runtime_t *rt) 86 { 87 rt->type = NXT_PROCESS_MAIN; 88 89 if (nxt_main_process_port_create(task, rt) != NXT_OK) { 90 return NXT_ERROR; 91 } 92 93 nxt_main_process_title(task); 94 95 /* 96 * The discovery process will send a message processed by 97 * nxt_main_port_modules_handler() which starts the controller 98 * and router processes. 99 */ 100 return nxt_main_process_create(task, nxt_discovery_process); 101 } 102 103 104 static nxt_conf_map_t nxt_common_app_conf[] = { 105 { 106 nxt_string("type"), 107 NXT_CONF_MAP_STR, 108 offsetof(nxt_common_app_conf_t, type), 109 }, 110 111 { 112 nxt_string("user"), 113 NXT_CONF_MAP_STR, 114 offsetof(nxt_common_app_conf_t, user), 115 }, 116 117 { 118 nxt_string("group"), 119 NXT_CONF_MAP_STR, 120 offsetof(nxt_common_app_conf_t, group), 121 }, 122 123 { 124 nxt_string("working_directory"), 125 NXT_CONF_MAP_CSTRZ, 126 offsetof(nxt_common_app_conf_t, working_directory), 127 }, 128 129 { 130 nxt_string("environment"), 131 NXT_CONF_MAP_PTR, 132 offsetof(nxt_common_app_conf_t, environment), 133 }, 134 135 { 136 nxt_string("isolation"), 137 NXT_CONF_MAP_PTR, 138 offsetof(nxt_common_app_conf_t, isolation), 139 }, 140 141 { 142 nxt_string("limits"), 143 NXT_CONF_MAP_PTR, 144 offsetof(nxt_common_app_conf_t, limits), 145 }, 146 147 }; 148 149 150 static nxt_conf_map_t nxt_common_app_limits_conf[] = { 151 { 152 nxt_string("shm"), 153 NXT_CONF_MAP_SIZE, 154 offsetof(nxt_common_app_conf_t, shm_limit), 155 }, 156 157 }; 158 159 160 static nxt_conf_map_t nxt_external_app_conf[] = { 161 { 162 nxt_string("executable"), 163 NXT_CONF_MAP_CSTRZ, 164 offsetof(nxt_common_app_conf_t, u.external.executable), 165 }, 166 167 { 168 nxt_string("arguments"), 169 NXT_CONF_MAP_PTR, 170 offsetof(nxt_common_app_conf_t, u.external.arguments), 171 }, 172 173 }; 174 175 176 static nxt_conf_map_t nxt_python_app_conf[] = { 177 { 178 nxt_string("home"), 179 NXT_CONF_MAP_CSTRZ, 180 offsetof(nxt_common_app_conf_t, u.python.home), 181 }, 182 183 { 184 nxt_string("path"), 185 NXT_CONF_MAP_STR, 186 offsetof(nxt_common_app_conf_t, u.python.path), 187 }, 188 189 { 190 nxt_string("module"), 191 NXT_CONF_MAP_STR, 192 offsetof(nxt_common_app_conf_t, u.python.module), 193 }, 194 }; 195 196 197 static nxt_conf_map_t nxt_php_app_conf[] = { 198 { 199 nxt_string("targets"), 200 NXT_CONF_MAP_PTR, 201 offsetof(nxt_common_app_conf_t, u.php.targets), 202 }, 203 204 { 205 nxt_string("options"), 206 NXT_CONF_MAP_PTR, 207 offsetof(nxt_common_app_conf_t, u.php.options), 208 }, 209 }; 210 211 212 static nxt_conf_map_t nxt_perl_app_conf[] = { 213 { 214 nxt_string("script"), 215 NXT_CONF_MAP_CSTRZ, 216 offsetof(nxt_common_app_conf_t, u.perl.script), 217 }, 218 }; 219 220 221 static nxt_conf_map_t nxt_ruby_app_conf[] = { 222 { 223 nxt_string("script"), 224 NXT_CONF_MAP_STR, 225 offsetof(nxt_common_app_conf_t, u.ruby.script), 226 }, 227 }; 228 229 230 static nxt_conf_map_t nxt_java_app_conf[] = { 231 { 232 nxt_string("classpath"), 233 NXT_CONF_MAP_PTR, 234 offsetof(nxt_common_app_conf_t, u.java.classpath), 235 }, 236 { 237 nxt_string("webapp"), 238 NXT_CONF_MAP_CSTRZ, 239 offsetof(nxt_common_app_conf_t, u.java.webapp), 240 }, 241 { 242 nxt_string("options"), 243 NXT_CONF_MAP_PTR, 244 offsetof(nxt_common_app_conf_t, u.java.options), 245 }, 246 { 247 nxt_string("unit_jars"), 248 NXT_CONF_MAP_CSTRZ, 249 offsetof(nxt_common_app_conf_t, u.java.unit_jars), 250 }, 251 252 }; 253 254 255 static nxt_conf_app_map_t nxt_app_maps[] = { 256 { nxt_nitems(nxt_external_app_conf), nxt_external_app_conf }, 257 { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf }, 258 { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf }, 259 { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf }, 260 { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf }, 261 { nxt_nitems(nxt_java_app_conf), nxt_java_app_conf }, 262 }; 263 264 265 static void 266 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 267 { 268 nxt_debug(task, "main data: %*s", 269 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 270 } 271 272 273 static void 274 nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 275 { 276 u_char *start, *p, ch; 277 size_t type_len; 278 nxt_int_t ret; 279 nxt_buf_t *b; 280 nxt_port_t *port; 281 nxt_runtime_t *rt; 282 nxt_process_t *process; 283 nxt_app_type_t idx; 284 nxt_conf_value_t *conf; 285 nxt_process_init_t *init; 286 nxt_common_app_conf_t *app_conf; 287 288 ret = NXT_ERROR; 289 290 rt = task->thread->runtime; 291 292 process = nxt_main_process_new(task, rt); 293 if (nxt_slow_path(process == NULL)) { 294 return; 295 } 296 297 init = nxt_process_init(process); 298 299 *init = nxt_app_process; 300 301 b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size); 302 if (b == NULL) { 303 goto failed; 304 } 305 306 nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos, 307 b->mem.pos); 308 309 app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t)); 310 if (nxt_slow_path(app_conf == NULL)) { 311 goto failed; 312 } 313 314 start = b->mem.pos; 315 316 app_conf->name.start = start; 317 app_conf->name.length = nxt_strlen(start); 318 319 init->name = (const char *) start; 320 321 process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length 322 + sizeof("\"\" application") + 1); 323 324 if (nxt_slow_path(process->name == NULL)) { 325 goto failed; 326 } 327 328 p = (u_char *) process->name; 329 *p++ = '"'; 330 p = nxt_cpymem(p, init->name, app_conf->name.length); 331 p = nxt_cpymem(p, "\" application", 13); 332 *p = '\0'; 333 334 app_conf->shm_limit = 100 * 1024 * 1024; 335 336 start += app_conf->name.length + 1; 337 338 conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL); 339 if (conf == NULL) { 340 nxt_alert(task, "router app configuration parsing error"); 341 342 goto failed; 343 } 344 345 rt = task->thread->runtime; 346 347 app_conf->user.start = (u_char*)rt->user_cred.user; 348 app_conf->user.length = nxt_strlen(rt->user_cred.user); 349 350 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf, 351 nxt_nitems(nxt_common_app_conf), app_conf); 352 353 if (ret != NXT_OK) { 354 nxt_alert(task, "failed to map common app conf received from router"); 355 goto failed; 356 } 357 358 for (type_len = 0; type_len != app_conf->type.length; type_len++) { 359 ch = app_conf->type.start[type_len]; 360 361 if (ch == ' ' || nxt_isdigit(ch)) { 362 break; 363 } 364 } 365 366 idx = nxt_app_parse_type(app_conf->type.start, type_len); 367 368 if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) { 369 nxt_alert(task, "invalid app type %d received from router", (int) idx); 370 goto failed; 371 } 372 373 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map, 374 nxt_app_maps[idx].size, app_conf); 375 376 if (nxt_slow_path(ret != NXT_OK)) { 377 nxt_alert(task, "failed to map app conf received from router"); 378 goto failed; 379 } 380 381 if (app_conf->limits != NULL) { 382 ret = nxt_conf_map_object(process->mem_pool, app_conf->limits, 383 nxt_common_app_limits_conf, 384 nxt_nitems(nxt_common_app_limits_conf), 385 app_conf); 386 387 if (nxt_slow_path(ret != NXT_OK)) { 388 nxt_alert(task, "failed to map app limits received from router"); 389 goto failed; 390 } 391 } 392 393 app_conf->self = conf; 394 395 process->stream = msg->port_msg.stream; 396 process->data.app = app_conf; 397 398 ret = nxt_main_start_process(task, process); 399 if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) { 400 return; 401 } 402 403 failed: 404 405 nxt_process_use(task, process, -1); 406 407 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 408 msg->port_msg.reply_port); 409 410 if (nxt_fast_path(port != NULL)) { 411 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 412 -1, msg->port_msg.stream, 0, NULL); 413 } 414 } 415 416 417 static void 418 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 419 { 420 nxt_port_t *port; 421 nxt_process_t *process; 422 nxt_runtime_t *rt; 423 424 rt = task->thread->runtime; 425 426 process = nxt_runtime_process_find(rt, msg->port_msg.pid); 427 if (nxt_slow_path(process == NULL)) { 428 return; 429 } 430 431 nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); 432 433 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 434 msg->port_msg.reply_port); 435 436 437 if (nxt_slow_path(port == NULL)) { 438 return; 439 } 440 441 #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) 442 if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { 443 if (nxt_slow_path(nxt_clone_credential_map(task, process->pid, 444 process->user_cred, 445 &process->isolation.clone) 446 != NXT_OK)) 447 { 448 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 449 -1, msg->port_msg.stream, 0, NULL); 450 return; 451 } 452 } 453 454 #endif 455 456 process->state = NXT_PROCESS_STATE_CREATED; 457 458 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, 459 -1, msg->port_msg.stream, 0, NULL); 460 } 461 462 463 static nxt_port_handlers_t nxt_main_process_port_handlers = { 464 .data = nxt_port_main_data_handler, 465 .process_created = nxt_main_process_created_handler, 466 .process_ready = nxt_port_process_ready_handler, 467 .start_process = nxt_port_main_start_process_handler, 468 .socket = nxt_main_port_socket_handler, 469 .modules = nxt_main_port_modules_handler, 470 .conf_store = nxt_main_port_conf_store_handler, 471 #if (NXT_TLS) 472 .cert_get = nxt_cert_store_get_handler, 473 .cert_delete = nxt_cert_store_delete_handler, 474 #endif 475 .access_log = nxt_main_port_access_log_handler, 476 .rpc_ready = nxt_port_rpc_handler, 477 .rpc_error = nxt_port_rpc_handler, 478 }; 479 480 481 static nxt_int_t 482 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) 483 { 484 nxt_int_t ret; 485 nxt_port_t *port; 486 nxt_process_t *process; 487 488 port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0, 489 NXT_PROCESS_MAIN); 490 if (nxt_slow_path(port == NULL)) { 491 return NXT_ERROR; 492 } 493 494 process = port->process; 495 496 ret = nxt_port_socket_init(task, port, 0); 497 if (nxt_slow_path(ret != NXT_OK)) { 498 nxt_port_use(task, port, -1); 499 return ret; 500 } 501 502 /* 503 * A main process port. A write port is not closed 504 * since it should be inherited by processes. 505 */ 506 nxt_port_enable(task, port, &nxt_main_process_port_handlers); 507 508 process->state = NXT_PROCESS_STATE_READY; 509 510 return NXT_OK; 511 } 512 513 514 static void 515 nxt_main_process_title(nxt_task_t *task) 516 { 517 u_char *p, *end; 518 nxt_uint_t i; 519 u_char title[2048]; 520 521 end = title + sizeof(title) - 1; 522 523 p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s", 524 nxt_process_argv[0]); 525 526 for (i = 1; nxt_process_argv[i] != NULL; i++) { 527 p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]); 528 } 529 530 if (p < end) { 531 *p++ = ']'; 532 } 533 534 *p = '\0'; 535 536 nxt_process_title(task, "%s", title); 537 } 538 539 540 static nxt_int_t 541 nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init) 542 { 543 nxt_int_t ret; 544 nxt_runtime_t *rt; 545 nxt_process_t *process; 546 nxt_process_init_t *pinit; 547 548 rt = task->thread->runtime; 549 550 process = nxt_main_process_new(task, rt); 551 if (nxt_slow_path(process == NULL)) { 552 return NXT_ERROR; 553 } 554 555 process->name = init.name; 556 process->user_cred = &rt->user_cred; 557 558 pinit = nxt_process_init(process); 559 *pinit = init; 560 561 ret = nxt_main_start_process(task, process); 562 if (nxt_slow_path(ret == NXT_ERROR)) { 563 nxt_process_use(task, process, -1); 564 } 565 566 return ret; 567 } 568 569 570 static nxt_process_t * 571 nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt) 572 { 573 nxt_process_t *process; 574 575 process = nxt_runtime_process_new(rt); 576 if (nxt_slow_path(process == NULL)) { 577 return NULL; 578 } 579 580 process->mem_pool = nxt_mp_create(1024, 128, 256, 32); 581 if (process->mem_pool == NULL) { 582 nxt_process_use(task, process, -1); 583 return NULL; 584 } 585 586 return process; 587 } 588 589 590 static nxt_int_t 591 nxt_main_start_process(nxt_task_t *task, nxt_process_t *process) 592 { 593 nxt_mp_t *tmp_mp; 594 nxt_int_t ret; 595 nxt_pid_t pid; 596 nxt_port_t *port; 597 nxt_process_init_t *init; 598 599 init = nxt_process_init(process); 600 601 port = nxt_port_new(task, 0, 0, init->type); 602 if (nxt_slow_path(port == NULL)) { 603 return NXT_ERROR; 604 } 605 606 nxt_process_port_add(task, process, port); 607 608 ret = nxt_port_socket_init(task, port, 0); 609 if (nxt_slow_path(ret != NXT_OK)) { 610 goto free_port; 611 } 612 613 tmp_mp = nxt_mp_create(1024, 128, 256, 32); 614 if (nxt_slow_path(tmp_mp == NULL)) { 615 ret = NXT_ERROR; 616 617 goto close_port; 618 } 619 620 if (init->prefork) { 621 ret = init->prefork(task, process, tmp_mp); 622 if (nxt_slow_path(ret != NXT_OK)) { 623 goto free_mempool; 624 } 625 } 626 627 pid = nxt_process_create(task, process); 628 629 switch (pid) { 630 631 case -1: 632 ret = NXT_ERROR; 633 break; 634 635 case 0: 636 /* The child process: return to the event engine work queue loop. */ 637 638 nxt_process_use(task, process, -1); 639 640 ret = NXT_AGAIN; 641 break; 642 643 default: 644 /* The main process created a new process. */ 645 646 nxt_process_use(task, process, -1); 647 648 nxt_port_read_close(port); 649 nxt_port_write_enable(task, port); 650 651 ret = NXT_OK; 652 break; 653 } 654 655 free_mempool: 656 657 nxt_mp_destroy(tmp_mp); 658 659 close_port: 660 661 if (nxt_slow_path(ret == NXT_ERROR)) { 662 nxt_port_close(task, port); 663 } 664 665 free_port: 666 667 nxt_port_use(task, port, -1); 668 669 return ret; 670 } 671 672 673 static void 674 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) 675 { 676 nxt_debug(task, "sigterm handler signo:%d (%s)", 677 (int) (uintptr_t) obj, data); 678 679 /* TODO: fast exit. */ 680 681 nxt_exiting = 1; 682 683 nxt_runtime_quit(task, 0); 684 } 685 686 687 static void 688 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data) 689 { 690 nxt_debug(task, "sigquit handler signo:%d (%s)", 691 (int) (uintptr_t) obj, data); 692 693 /* TODO: graceful exit. */ 694 695 nxt_exiting = 1; 696 697 nxt_runtime_quit(task, 0); 698 } 699 700 701 static void 702 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) 703 { 704 nxt_mp_t *mp; 705 nxt_int_t ret; 706 nxt_uint_t n; 707 nxt_port_t *port; 708 nxt_file_t *file, *new_file; 709 nxt_array_t *new_files; 710 nxt_runtime_t *rt; 711 712 nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s", 713 (int) (uintptr_t) obj, data, "log files rotation"); 714 715 rt = task->thread->runtime; 716 717 port = rt->port_by_type[NXT_PROCESS_ROUTER]; 718 719 if (nxt_fast_path(port != NULL)) { 720 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG, 721 -1, 0, 0, NULL); 722 } 723 724 mp = nxt_mp_create(1024, 128, 256, 32); 725 if (mp == NULL) { 726 return; 727 } 728 729 n = nxt_list_nelts(rt->log_files); 730 731 new_files = nxt_array_create(mp, n, sizeof(nxt_file_t)); 732 if (new_files == NULL) { 733 nxt_mp_destroy(mp); 734 return; 735 } 736 737 nxt_list_each(file, rt->log_files) { 738 739 /* This allocation cannot fail. */ 740 new_file = nxt_array_add(new_files); 741 742 new_file->name = file->name; 743 new_file->fd = NXT_FILE_INVALID; 744 new_file->log_level = NXT_LOG_ALERT; 745 746 ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT, 747 NXT_FILE_OWNER_ACCESS); 748 749 if (ret != NXT_OK) { 750 goto fail; 751 } 752 753 } nxt_list_loop; 754 755 new_file = new_files->elts; 756 757 ret = nxt_file_stderr(&new_file[0]); 758 759 if (ret == NXT_OK) { 760 n = 0; 761 762 nxt_list_each(file, rt->log_files) { 763 764 nxt_port_change_log_file(task, rt, n, new_file[n].fd); 765 /* 766 * The old log file descriptor must be closed at the moment 767 * when no other threads use it. dup2() allows to use the 768 * old file descriptor for new log file. This change is 769 * performed atomically in the kernel. 770 */ 771 (void) nxt_file_redirect(file, new_file[n].fd); 772 773 n++; 774 775 } nxt_list_loop; 776 777 nxt_mp_destroy(mp); 778 return; 779 } 780 781 fail: 782 783 new_file = new_files->elts; 784 n = new_files->nelts; 785 786 while (n != 0) { 787 if (new_file->fd != NXT_FILE_INVALID) { 788 nxt_file_close(task, new_file); 789 } 790 791 new_file++; 792 n--; 793 } 794 795 nxt_mp_destroy(mp); 796 } 797 798 799 static void 800 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) 801 { 802 int status; 803 nxt_err_t err; 804 nxt_pid_t pid; 805 806 nxt_debug(task, "sigchld handler signo:%d (%s)", 807 (int) (uintptr_t) obj, data); 808 809 for ( ;; ) { 810 pid = waitpid(-1, &status, WNOHANG); 811 812 if (pid == -1) { 813 814 switch (err = nxt_errno) { 815 816 case NXT_ECHILD: 817 return; 818 819 case NXT_EINTR: 820 continue; 821 822 default: 823 nxt_alert(task, "waitpid() failed: %E", err); 824 return; 825 } 826 } 827 828 nxt_debug(task, "waitpid(): %PI", pid); 829 830 if (pid == 0) { 831 return; 832 } 833 834 if (WTERMSIG(status)) { 835 #ifdef WCOREDUMP 836 nxt_alert(task, "process %PI exited on signal %d%s", 837 pid, WTERMSIG(status), 838 WCOREDUMP(status) ? " (core dumped)" : ""); 839 #else 840 nxt_alert(task, "process %PI exited on signal %d", 841 pid, WTERMSIG(status)); 842 #endif 843 844 } else { 845 nxt_trace(task, "process %PI exited with code %d", 846 pid, WEXITSTATUS(status)); 847 } 848 849 nxt_main_cleanup_process(task, pid); 850 } 851 } 852 853 854 static void 855 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) 856 { 857 nxt_trace(task, "signal signo:%d (%s) recevied, ignored", 858 (int) (uintptr_t) obj, data); 859 } 860 861 862 static void 863 nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid) 864 { 865 int stream; 866 nxt_int_t ret; 867 nxt_buf_t *buf; 868 nxt_port_t *port; 869 const char *name; 870 nxt_runtime_t *rt; 871 nxt_process_t *process; 872 nxt_process_init_t init; 873 874 rt = task->thread->runtime; 875 876 process = nxt_runtime_process_find(rt, pid); 877 if (!process) { 878 return; 879 } 880 881 if (process->isolation.cleanup != NULL) { 882 process->isolation.cleanup(task, process); 883 } 884 885 name = process->name; 886 stream = process->stream; 887 init = *((nxt_process_init_t *) nxt_process_init(process)); 888 889 if (process->state == NXT_PROCESS_STATE_READY) { 890 process->stream = 0; 891 } 892 893 nxt_process_close_ports(task, process); 894 895 if (nxt_exiting) { 896 if (rt->nprocesses <= 1) { 897 nxt_runtime_quit(task, 0); 898 } 899 900 return; 901 } 902 903 nxt_runtime_process_each(rt, process) { 904 905 if (process->pid == nxt_pid 906 || process->pid == pid 907 || nxt_queue_is_empty(&process->ports)) 908 { 909 continue; 910 } 911 912 port = nxt_process_port_first(process); 913 914 if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) { 915 continue; 916 } 917 918 buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 919 sizeof(pid)); 920 921 if (nxt_slow_path(buf == NULL)) { 922 continue; 923 } 924 925 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); 926 927 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, 928 stream, 0, buf); 929 930 } nxt_runtime_process_loop; 931 932 if (init.restart) { 933 ret = nxt_main_process_create(task, init); 934 if (nxt_slow_path(ret == NXT_ERROR)) { 935 nxt_alert(task, "failed to restart %s", name); 936 } 937 } 938 } 939 940 941 static void 942 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 943 { 944 size_t size; 945 nxt_int_t ret; 946 nxt_buf_t *b, *out; 947 nxt_port_t *port; 948 nxt_sockaddr_t *sa; 949 nxt_port_msg_type_t type; 950 nxt_listening_socket_t ls; 951 u_char message[2048]; 952 953 b = msg->buf; 954 sa = (nxt_sockaddr_t *) b->mem.pos; 955 956 /* TODO check b size and make plain */ 957 958 out = NULL; 959 960 ls.socket = -1; 961 ls.error = NXT_SOCKET_ERROR_SYSTEM; 962 ls.start = message; 963 ls.end = message + sizeof(message); 964 965 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 966 msg->port_msg.reply_port); 967 968 nxt_debug(task, "listening socket \"%*s\"", 969 (size_t) sa->length, nxt_sockaddr_start(sa)); 970 971 ret = nxt_main_listening_socket(sa, &ls); 972 973 if (ret == NXT_OK) { 974 nxt_debug(task, "socket(\"%*s\"): %d", 975 (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket); 976 977 type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD; 978 979 } else { 980 size = ls.end - ls.start; 981 982 nxt_alert(task, "%*s", size, ls.start); 983 984 out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 985 size + 1); 986 if (nxt_slow_path(out == NULL)) { 987 return; 988 } 989 990 *out->mem.free++ = (uint8_t) ls.error; 991 992 out->mem.free = nxt_cpymem(out->mem.free, ls.start, size); 993 994 type = NXT_PORT_MSG_RPC_ERROR; 995 } 996 997 nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream, 998 0, out); 999 } 1000 1001 1002 static nxt_int_t 1003 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls) 1004 { 1005 nxt_err_t err; 1006 nxt_socket_t s; 1007 1008 const socklen_t length = sizeof(int); 1009 static const int enable = 1; 1010 1011 s = socket(sa->u.sockaddr.sa_family, sa->type, 0); 1012 1013 if (nxt_slow_path(s == -1)) { 1014 err = nxt_errno; 1015 1016 #if (NXT_INET6) 1017 1018 if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) { 1019 ls->error = NXT_SOCKET_ERROR_NOINET6; 1020 } 1021 1022 #endif 1023 1024 ls->end = nxt_sprintf(ls->start, ls->end, 1025 "socket(\\\"%*s\\\") failed %E", 1026 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1027 1028 return NXT_ERROR; 1029 } 1030 1031 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) { 1032 ls->end = nxt_sprintf(ls->start, ls->end, 1033 "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E", 1034 (size_t) sa->length, nxt_sockaddr_start(sa), 1035 nxt_errno); 1036 goto fail; 1037 } 1038 1039 #if (NXT_INET6) 1040 1041 if (sa->u.sockaddr.sa_family == AF_INET6) { 1042 1043 if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) { 1044 ls->end = nxt_sprintf(ls->start, ls->end, 1045 "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E", 1046 (size_t) sa->length, nxt_sockaddr_start(sa), 1047 nxt_errno); 1048 goto fail; 1049 } 1050 } 1051 1052 #endif 1053 1054 if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) { 1055 err = nxt_errno; 1056 1057 #if (NXT_HAVE_UNIX_DOMAIN) 1058 1059 if (sa->u.sockaddr.sa_family == AF_UNIX) { 1060 switch (err) { 1061 1062 case EACCES: 1063 ls->error = NXT_SOCKET_ERROR_ACCESS; 1064 break; 1065 1066 case ENOENT: 1067 case ENOTDIR: 1068 ls->error = NXT_SOCKET_ERROR_PATH; 1069 break; 1070 } 1071 1072 } else 1073 #endif 1074 { 1075 switch (err) { 1076 1077 case EACCES: 1078 ls->error = NXT_SOCKET_ERROR_PORT; 1079 break; 1080 1081 case EADDRINUSE: 1082 ls->error = NXT_SOCKET_ERROR_INUSE; 1083 break; 1084 1085 case EADDRNOTAVAIL: 1086 ls->error = NXT_SOCKET_ERROR_NOADDR; 1087 break; 1088 } 1089 } 1090 1091 ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E", 1092 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1093 goto fail; 1094 } 1095 1096 #if (NXT_HAVE_UNIX_DOMAIN) 1097 1098 if (sa->u.sockaddr.sa_family == AF_UNIX) { 1099 char *filename; 1100 mode_t access; 1101 1102 filename = sa->u.sockaddr_un.sun_path; 1103 access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); 1104 1105 if (chmod(filename, access) != 0) { 1106 ls->end = nxt_sprintf(ls->start, ls->end, 1107 "chmod(\\\"%s\\\") failed %E", 1108 filename, nxt_errno); 1109 goto fail; 1110 } 1111 } 1112 1113 #endif 1114 1115 ls->socket = s; 1116 1117 return NXT_OK; 1118 1119 fail: 1120 1121 (void) close(s); 1122 1123 return NXT_ERROR; 1124 } 1125 1126 1127 static nxt_conf_map_t nxt_app_lang_module_map[] = { 1128 { 1129 nxt_string("type"), 1130 NXT_CONF_MAP_INT, 1131 offsetof(nxt_app_lang_module_t, type), 1132 }, 1133 1134 { 1135 nxt_string("version"), 1136 NXT_CONF_MAP_CSTRZ, 1137 offsetof(nxt_app_lang_module_t, version), 1138 }, 1139 1140 { 1141 nxt_string("file"), 1142 NXT_CONF_MAP_CSTRZ, 1143 offsetof(nxt_app_lang_module_t, file), 1144 }, 1145 }; 1146 1147 1148 static nxt_conf_map_t nxt_app_lang_mounts_map[] = { 1149 { 1150 nxt_string("src"), 1151 NXT_CONF_MAP_CSTRZ, 1152 offsetof(nxt_fs_mount_t, src), 1153 }, 1154 { 1155 nxt_string("dst"), 1156 NXT_CONF_MAP_CSTRZ, 1157 offsetof(nxt_fs_mount_t, dst), 1158 }, 1159 { 1160 nxt_string("fstype"), 1161 NXT_CONF_MAP_CSTRZ, 1162 offsetof(nxt_fs_mount_t, fstype), 1163 }, 1164 { 1165 nxt_string("flags"), 1166 NXT_CONF_MAP_INT, 1167 offsetof(nxt_fs_mount_t, flags), 1168 }, 1169 { 1170 nxt_string("data"), 1171 NXT_CONF_MAP_CSTRZ, 1172 offsetof(nxt_fs_mount_t, data), 1173 }, 1174 }; 1175 1176 1177 static void 1178 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1179 { 1180 uint32_t index, jindex, nmounts; 1181 nxt_mp_t *mp; 1182 nxt_int_t ret; 1183 nxt_buf_t *b; 1184 nxt_port_t *port; 1185 nxt_runtime_t *rt; 1186 nxt_fs_mount_t *mnt; 1187 nxt_conf_value_t *conf, *root, *value, *mounts; 1188 nxt_app_lang_module_t *lang; 1189 1190 static nxt_str_t root_path = nxt_string("/"); 1191 static nxt_str_t mounts_name = nxt_string("mounts"); 1192 1193 rt = task->thread->runtime; 1194 1195 if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) { 1196 return; 1197 } 1198 1199 if (nxt_exiting) { 1200 nxt_debug(task, "ignoring discovered modules, exiting"); 1201 return; 1202 } 1203 1204 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1205 msg->port_msg.reply_port); 1206 1207 if (nxt_fast_path(port != NULL)) { 1208 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 1209 msg->port_msg.stream, 0, NULL); 1210 } 1211 1212 b = msg->buf; 1213 1214 if (b == NULL) { 1215 return; 1216 } 1217 1218 mp = nxt_mp_create(1024, 128, 256, 32); 1219 if (mp == NULL) { 1220 return; 1221 } 1222 1223 b = nxt_buf_chk_make_plain(mp, b, msg->size); 1224 1225 if (b == NULL) { 1226 return; 1227 } 1228 1229 nxt_debug(task, "application languages: \"%*s\"", 1230 b->mem.free - b->mem.pos, b->mem.pos); 1231 1232 conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL); 1233 if (conf == NULL) { 1234 goto fail; 1235 } 1236 1237 root = nxt_conf_get_path(conf, &root_path); 1238 if (root == NULL) { 1239 goto fail; 1240 } 1241 1242 for (index = 0; /* void */ ; index++) { 1243 value = nxt_conf_get_array_element(root, index); 1244 if (value == NULL) { 1245 break; 1246 } 1247 1248 lang = nxt_array_zero_add(rt->languages); 1249 if (lang == NULL) { 1250 goto fail; 1251 } 1252 1253 lang->module = NULL; 1254 1255 ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map, 1256 nxt_nitems(nxt_app_lang_module_map), lang); 1257 1258 if (ret != NXT_OK) { 1259 goto fail; 1260 } 1261 1262 mounts = nxt_conf_get_object_member(value, &mounts_name, NULL); 1263 if (mounts == NULL) { 1264 nxt_alert(task, "missing mounts from discovery message."); 1265 goto fail; 1266 } 1267 1268 if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) { 1269 nxt_alert(task, "invalid mounts type from discovery message."); 1270 goto fail; 1271 } 1272 1273 nmounts = nxt_conf_array_elements_count(mounts); 1274 1275 lang->mounts = nxt_array_create(rt->mem_pool, nmounts, 1276 sizeof(nxt_fs_mount_t)); 1277 1278 if (lang->mounts == NULL) { 1279 goto fail; 1280 } 1281 1282 for (jindex = 0; /* */; jindex++) { 1283 value = nxt_conf_get_array_element(mounts, jindex); 1284 if (value == NULL) { 1285 break; 1286 } 1287 1288 mnt = nxt_array_zero_add(lang->mounts); 1289 if (mnt == NULL) { 1290 goto fail; 1291 } 1292 1293 ret = nxt_conf_map_object(rt->mem_pool, value, 1294 nxt_app_lang_mounts_map, 1295 nxt_nitems(nxt_app_lang_mounts_map), mnt); 1296 1297 if (ret != NXT_OK) { 1298 goto fail; 1299 } 1300 } 1301 1302 nxt_debug(task, "lang %d %s \"%s\" (%d mounts)", 1303 lang->type, lang->version, lang->file, lang->mounts->nelts); 1304 } 1305 1306 qsort(rt->languages->elts, rt->languages->nelts, 1307 sizeof(nxt_app_lang_module_t), nxt_app_lang_compare); 1308 1309 fail: 1310 1311 nxt_mp_destroy(mp); 1312 1313 ret = nxt_main_process_create(task, nxt_controller_process); 1314 if (ret == NXT_OK) { 1315 ret = nxt_main_process_create(task, nxt_router_process); 1316 } 1317 1318 if (nxt_slow_path(ret == NXT_ERROR)) { 1319 nxt_exiting = 1; 1320 1321 nxt_runtime_quit(task, 1); 1322 } 1323 } 1324 1325 1326 static int nxt_cdecl 1327 nxt_app_lang_compare(const void *v1, const void *v2) 1328 { 1329 int n; 1330 const nxt_app_lang_module_t *lang1, *lang2; 1331 1332 lang1 = v1; 1333 lang2 = v2; 1334 1335 n = lang1->type - lang2->type; 1336 1337 if (n != 0) { 1338 return n; 1339 } 1340 1341 n = nxt_strverscmp(lang1->version, lang2->version); 1342 1343 /* Negate result to move higher versions to the beginning. */ 1344 1345 return -n; 1346 } 1347 1348 1349 static void 1350 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1351 { 1352 ssize_t n, size, offset; 1353 nxt_buf_t *b; 1354 nxt_int_t ret; 1355 nxt_file_t file; 1356 nxt_runtime_t *rt; 1357 1358 nxt_memzero(&file, sizeof(nxt_file_t)); 1359 1360 rt = task->thread->runtime; 1361 1362 file.name = (nxt_file_name_t *) rt->conf_tmp; 1363 1364 if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY, 1365 NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS) 1366 != NXT_OK)) 1367 { 1368 goto error; 1369 } 1370 1371 offset = 0; 1372 1373 for (b = msg->buf; b != NULL; b = b->next) { 1374 size = nxt_buf_mem_used_size(&b->mem); 1375 1376 n = nxt_file_write(&file, b->mem.pos, size, offset); 1377 1378 if (nxt_slow_path(n != size)) { 1379 nxt_file_close(task, &file); 1380 (void) nxt_file_delete(file.name); 1381 goto error; 1382 } 1383 1384 offset += n; 1385 } 1386 1387 nxt_file_close(task, &file); 1388 1389 ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf); 1390 1391 if (nxt_fast_path(ret == NXT_OK)) { 1392 return; 1393 } 1394 1395 error: 1396 1397 nxt_alert(task, "failed to store current configuration"); 1398 } 1399 1400 1401 static void 1402 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1403 { 1404 u_char *path; 1405 nxt_int_t ret; 1406 nxt_file_t file; 1407 nxt_port_t *port; 1408 nxt_port_msg_type_t type; 1409 1410 nxt_debug(task, "opening access log file"); 1411 1412 path = msg->buf->mem.pos; 1413 1414 nxt_memzero(&file, sizeof(nxt_file_t)); 1415 1416 file.name = (nxt_file_name_t *) path; 1417 file.log_level = NXT_LOG_ERR; 1418 1419 ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT, 1420 NXT_FILE_OWNER_ACCESS); 1421 1422 type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD 1423 : NXT_PORT_MSG_RPC_ERROR; 1424 1425 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1426 msg->port_msg.reply_port); 1427 1428 if (nxt_fast_path(port != NULL)) { 1429 (void) nxt_port_socket_write(task, port, type, file.fd, 1430 msg->port_msg.stream, 0, NULL); 1431 } 1432 } 1433