1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_runtime.h> 9 #include <nxt_port.h> 10 #include <nxt_main_process.h> 11 #include <nxt_conf.h> 12 #include <nxt_router.h> 13 #if (NXT_TLS) 14 #include <nxt_cert.h> 15 #endif 16 17 18 typedef struct { 19 nxt_socket_t socket; 20 nxt_socket_error_t error; 21 u_char *start; 22 u_char *end; 23 } nxt_listening_socket_t; 24 25 26 typedef struct { 27 nxt_uint_t size; 28 nxt_conf_map_t *map; 29 } nxt_conf_app_map_t; 30 31 32 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, 33 nxt_runtime_t *rt); 34 static void nxt_main_process_title(nxt_task_t *task); 35 static nxt_int_t nxt_main_start_controller_process(nxt_task_t *task, 36 nxt_runtime_t *rt); 37 static nxt_int_t nxt_main_create_controller_process(nxt_task_t *task, 38 nxt_runtime_t *rt, nxt_process_init_t *init); 39 static nxt_int_t nxt_main_start_router_process(nxt_task_t *task, 40 nxt_runtime_t *rt); 41 static nxt_int_t nxt_main_start_discovery_process(nxt_task_t *task, 42 nxt_runtime_t *rt); 43 static nxt_int_t nxt_main_start_worker_process(nxt_task_t *task, 44 nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream); 45 static nxt_int_t nxt_main_create_worker_process(nxt_task_t *task, 46 nxt_runtime_t *rt, nxt_process_init_t *init); 47 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, 48 void *data); 49 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, 50 void *data); 51 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, 52 void *data); 53 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, 54 void *data); 55 static void nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid); 56 static void nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt); 57 static void nxt_main_port_socket_handler(nxt_task_t *task, 58 nxt_port_recv_msg_t *msg); 59 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, 60 nxt_listening_socket_t *ls); 61 static void nxt_main_port_modules_handler(nxt_task_t *task, 62 nxt_port_recv_msg_t *msg); 63 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); 64 static void nxt_main_port_conf_store_handler(nxt_task_t *task, 65 nxt_port_recv_msg_t *msg); 66 static void nxt_main_port_access_log_handler(nxt_task_t *task, 67 nxt_port_recv_msg_t *msg); 68 69 70 const nxt_sig_event_t nxt_main_process_signals[] = { 71 nxt_event_signal(SIGINT, nxt_main_process_sigterm_handler), 72 nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler), 73 nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler), 74 nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler), 75 nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler), 76 nxt_event_signal_end, 77 }; 78 79 80 static nxt_bool_t nxt_exiting; 81 82 83 nxt_int_t 84 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, 85 nxt_runtime_t *rt) 86 { 87 rt->type = NXT_PROCESS_MAIN; 88 89 if (nxt_main_process_port_create(task, rt) != NXT_OK) { 90 return NXT_ERROR; 91 } 92 93 nxt_main_process_title(task); 94 95 /* 96 * The dicsovery process will send a message processed by 97 * nxt_main_port_modules_handler() which starts the controller 98 * and router processes. 99 */ 100 return nxt_main_start_discovery_process(task, rt); 101 } 102 103 104 static nxt_conf_map_t nxt_common_app_conf[] = { 105 { 106 nxt_string("type"), 107 NXT_CONF_MAP_STR, 108 offsetof(nxt_common_app_conf_t, type), 109 }, 110 111 { 112 nxt_string("user"), 113 NXT_CONF_MAP_STR, 114 offsetof(nxt_common_app_conf_t, user), 115 }, 116 117 { 118 nxt_string("group"), 119 NXT_CONF_MAP_STR, 120 offsetof(nxt_common_app_conf_t, group), 121 }, 122 123 { 124 nxt_string("working_directory"), 125 NXT_CONF_MAP_CSTRZ, 126 offsetof(nxt_common_app_conf_t, working_directory), 127 }, 128 129 { 130 nxt_string("environment"), 131 NXT_CONF_MAP_PTR, 132 offsetof(nxt_common_app_conf_t, environment), 133 }, 134 }; 135 136 137 static nxt_conf_map_t nxt_external_app_conf[] = { 138 { 139 nxt_string("executable"), 140 NXT_CONF_MAP_CSTRZ, 141 offsetof(nxt_common_app_conf_t, u.external.executable), 142 }, 143 144 { 145 nxt_string("arguments"), 146 NXT_CONF_MAP_PTR, 147 offsetof(nxt_common_app_conf_t, u.external.arguments), 148 }, 149 150 }; 151 152 153 static nxt_conf_map_t nxt_python_app_conf[] = { 154 { 155 nxt_string("home"), 156 NXT_CONF_MAP_CSTRZ, 157 offsetof(nxt_common_app_conf_t, u.python.home), 158 }, 159 160 { 161 nxt_string("path"), 162 NXT_CONF_MAP_STR, 163 offsetof(nxt_common_app_conf_t, u.python.path), 164 }, 165 166 { 167 nxt_string("module"), 168 NXT_CONF_MAP_STR, 169 offsetof(nxt_common_app_conf_t, u.python.module), 170 }, 171 }; 172 173 174 static nxt_conf_map_t nxt_php_app_conf[] = { 175 { 176 nxt_string("root"), 177 NXT_CONF_MAP_CSTRZ, 178 offsetof(nxt_common_app_conf_t, u.php.root), 179 }, 180 181 { 182 nxt_string("script"), 183 NXT_CONF_MAP_STR, 184 offsetof(nxt_common_app_conf_t, u.php.script), 185 }, 186 187 { 188 nxt_string("index"), 189 NXT_CONF_MAP_STR, 190 offsetof(nxt_common_app_conf_t, u.php.index), 191 }, 192 193 { 194 nxt_string("options"), 195 NXT_CONF_MAP_PTR, 196 offsetof(nxt_common_app_conf_t, u.php.options), 197 }, 198 }; 199 200 201 static nxt_conf_map_t nxt_perl_app_conf[] = { 202 { 203 nxt_string("script"), 204 NXT_CONF_MAP_CSTRZ, 205 offsetof(nxt_common_app_conf_t, u.perl.script), 206 }, 207 }; 208 209 210 static nxt_conf_map_t nxt_ruby_app_conf[] = { 211 { 212 nxt_string("script"), 213 NXT_CONF_MAP_STR, 214 offsetof(nxt_common_app_conf_t, u.ruby.script), 215 }, 216 }; 217 218 219 static nxt_conf_app_map_t nxt_app_maps[] = { 220 { nxt_nitems(nxt_external_app_conf), nxt_external_app_conf }, 221 { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf }, 222 { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf }, 223 { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf }, 224 { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf }, 225 }; 226 227 228 static void 229 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 230 { 231 nxt_debug(task, "main data: %*s", 232 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 233 } 234 235 236 static void 237 nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 238 { 239 u_char *start, ch; 240 size_t type_len; 241 nxt_mp_t *mp; 242 nxt_int_t ret; 243 nxt_buf_t *b; 244 nxt_port_t *port; 245 nxt_app_type_t idx; 246 nxt_conf_value_t *conf; 247 nxt_common_app_conf_t app_conf; 248 249 static nxt_str_t nobody = nxt_string("nobody"); 250 251 ret = NXT_ERROR; 252 253 mp = nxt_mp_create(1024, 128, 256, 32); 254 255 if (nxt_slow_path(mp == NULL)) { 256 return; 257 } 258 259 b = nxt_buf_chk_make_plain(mp, msg->buf, msg->size); 260 261 if (b == NULL) { 262 return; 263 } 264 265 nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos, 266 b->mem.pos); 267 268 nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t)); 269 270 start = b->mem.pos; 271 272 app_conf.name.start = start; 273 app_conf.name.length = nxt_strlen(start); 274 275 start += app_conf.name.length + 1; 276 277 conf = nxt_conf_json_parse(mp, start, b->mem.free, NULL); 278 279 if (conf == NULL) { 280 nxt_alert(task, "router app configuration parsing error"); 281 282 goto failed; 283 } 284 285 app_conf.user = nobody; 286 287 ret = nxt_conf_map_object(mp, conf, nxt_common_app_conf, 288 nxt_nitems(nxt_common_app_conf), &app_conf); 289 if (ret != NXT_OK) { 290 nxt_alert(task, "failed to map common app conf received from router"); 291 goto failed; 292 } 293 294 for (type_len = 0; type_len != app_conf.type.length; type_len++) { 295 ch = app_conf.type.start[type_len]; 296 297 if (ch == ' ' || nxt_isdigit(ch)) { 298 break; 299 } 300 } 301 302 idx = nxt_app_parse_type(app_conf.type.start, type_len); 303 304 if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) { 305 nxt_alert(task, "invalid app type %d received from router", (int) idx); 306 goto failed; 307 } 308 309 ret = nxt_conf_map_object(mp, conf, nxt_app_maps[idx].map, 310 nxt_app_maps[idx].size, &app_conf); 311 312 if (nxt_slow_path(ret != NXT_OK)) { 313 nxt_alert(task, "failed to map app conf received from router"); 314 goto failed; 315 } 316 317 ret = nxt_main_start_worker_process(task, task->thread->runtime, 318 &app_conf, msg->port_msg.stream); 319 320 failed: 321 322 if (ret == NXT_ERROR) { 323 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 324 msg->port_msg.reply_port); 325 if (nxt_fast_path(port != NULL)) { 326 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 327 -1, msg->port_msg.stream, 0, NULL); 328 } 329 } 330 331 nxt_mp_destroy(mp); 332 } 333 334 335 static nxt_port_handlers_t nxt_main_process_port_handlers = { 336 .data = nxt_port_main_data_handler, 337 .process_ready = nxt_port_process_ready_handler, 338 .start_worker = nxt_port_main_start_worker_handler, 339 .socket = nxt_main_port_socket_handler, 340 .modules = nxt_main_port_modules_handler, 341 .conf_store = nxt_main_port_conf_store_handler, 342 #if (NXT_TLS) 343 .cert_get = nxt_cert_store_get_handler, 344 .cert_delete = nxt_cert_store_delete_handler, 345 #endif 346 .access_log = nxt_main_port_access_log_handler, 347 .rpc_ready = nxt_port_rpc_handler, 348 .rpc_error = nxt_port_rpc_handler, 349 }; 350 351 352 static nxt_int_t 353 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) 354 { 355 nxt_int_t ret; 356 nxt_port_t *port; 357 nxt_process_t *process; 358 359 process = nxt_runtime_process_get(rt, nxt_pid); 360 if (nxt_slow_path(process == NULL)) { 361 return NXT_ERROR; 362 } 363 364 port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN); 365 if (nxt_slow_path(port == NULL)) { 366 nxt_process_use(task, process, -1); 367 return NXT_ERROR; 368 } 369 370 nxt_process_port_add(task, process, port); 371 372 nxt_process_use(task, process, -1); 373 374 ret = nxt_port_socket_init(task, port, 0); 375 if (nxt_slow_path(ret != NXT_OK)) { 376 nxt_port_use(task, port, -1); 377 return ret; 378 } 379 380 nxt_runtime_port_add(task, port); 381 382 nxt_port_use(task, port, -1); 383 384 /* 385 * A main process port. A write port is not closed 386 * since it should be inherited by worker processes. 387 */ 388 nxt_port_enable(task, port, &nxt_main_process_port_handlers); 389 390 process->ready = 1; 391 392 return NXT_OK; 393 } 394 395 396 static void 397 nxt_main_process_title(nxt_task_t *task) 398 { 399 u_char *p, *end; 400 nxt_uint_t i; 401 u_char title[2048]; 402 403 end = title + sizeof(title) - 1; 404 405 p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s", 406 nxt_process_argv[0]); 407 408 for (i = 1; nxt_process_argv[i] != NULL; i++) { 409 p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]); 410 } 411 412 if (p < end) { 413 *p++ = ']'; 414 } 415 416 *p = '\0'; 417 418 nxt_process_title(task, "%s", title); 419 } 420 421 422 static nxt_int_t 423 nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) 424 { 425 nxt_process_init_t *init; 426 427 init = nxt_malloc(sizeof(nxt_process_init_t)); 428 if (nxt_slow_path(init == NULL)) { 429 return NXT_ERROR; 430 } 431 432 init->start = nxt_controller_start; 433 init->name = "controller"; 434 init->user_cred = &rt->user_cred; 435 init->port_handlers = &nxt_controller_process_port_handlers; 436 init->signals = nxt_worker_process_signals; 437 init->type = NXT_PROCESS_CONTROLLER; 438 init->stream = 0; 439 init->restart = &nxt_main_create_controller_process; 440 441 return nxt_main_create_controller_process(task, rt, init);; 442 } 443 444 445 static nxt_int_t 446 nxt_main_create_controller_process(nxt_task_t *task, nxt_runtime_t *rt, 447 nxt_process_init_t *init) 448 { 449 ssize_t n; 450 nxt_int_t ret; 451 nxt_str_t *conf; 452 nxt_file_t file; 453 nxt_file_info_t fi; 454 nxt_controller_init_t ctrl_init; 455 456 nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t)); 457 458 conf = &ctrl_init.conf; 459 460 nxt_memzero(&file, sizeof(nxt_file_t)); 461 462 file.name = (nxt_file_name_t *) rt->conf; 463 464 ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0); 465 466 if (ret == NXT_OK) { 467 ret = nxt_file_info(&file, &fi); 468 469 if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) { 470 conf->length = nxt_file_size(&fi); 471 conf->start = nxt_malloc(conf->length); 472 473 if (nxt_slow_path(conf->start == NULL)) { 474 nxt_file_close(task, &file); 475 return NXT_ERROR; 476 } 477 478 n = nxt_file_read(&file, conf->start, conf->length, 0); 479 480 if (nxt_slow_path(n != (ssize_t) conf->length)) { 481 nxt_free(conf->start); 482 conf->start = NULL; 483 484 nxt_alert(task, "failed to restore previous configuration: " 485 "cannot read the file"); 486 } 487 } 488 489 nxt_file_close(task, &file); 490 } 491 492 #if (NXT_TLS) 493 ctrl_init.certs = nxt_cert_store_load(task); 494 #endif 495 496 init->data = &ctrl_init; 497 498 ret = nxt_main_create_worker_process(task, rt, init); 499 500 if (ret == NXT_OK) { 501 if (conf->start != NULL) { 502 nxt_free(conf->start); 503 } 504 505 #if (NXT_TLS) 506 if (ctrl_init.certs != NULL) { 507 nxt_cert_store_release(ctrl_init.certs); 508 } 509 #endif 510 } 511 512 return ret; 513 } 514 515 516 static nxt_int_t 517 nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt) 518 { 519 nxt_process_init_t *init; 520 521 init = nxt_malloc(sizeof(nxt_process_init_t)); 522 if (nxt_slow_path(init == NULL)) { 523 return NXT_ERROR; 524 } 525 526 init->start = nxt_discovery_start; 527 init->name = "discovery"; 528 init->user_cred = &rt->user_cred; 529 init->port_handlers = &nxt_discovery_process_port_handlers; 530 init->signals = nxt_worker_process_signals; 531 init->type = NXT_PROCESS_DISCOVERY; 532 init->data = rt; 533 init->stream = 0; 534 init->restart = NULL; 535 536 return nxt_main_create_worker_process(task, rt, init); 537 } 538 539 540 static nxt_int_t 541 nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) 542 { 543 nxt_process_init_t *init; 544 545 init = nxt_malloc(sizeof(nxt_process_init_t)); 546 if (nxt_slow_path(init == NULL)) { 547 return NXT_ERROR; 548 } 549 550 init->start = nxt_router_start; 551 init->name = "router"; 552 init->user_cred = &rt->user_cred; 553 init->port_handlers = &nxt_router_process_port_handlers; 554 init->signals = nxt_worker_process_signals; 555 init->type = NXT_PROCESS_ROUTER; 556 init->data = rt; 557 init->stream = 0; 558 init->restart = &nxt_main_create_worker_process; 559 560 return nxt_main_create_worker_process(task, rt, init); 561 } 562 563 564 static nxt_int_t 565 nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, 566 nxt_common_app_conf_t *app_conf, uint32_t stream) 567 { 568 char *user, *group; 569 u_char *title, *last, *end; 570 size_t size; 571 nxt_process_init_t *init; 572 573 size = sizeof(nxt_process_init_t) 574 + sizeof(nxt_user_cred_t) 575 + app_conf->user.length + 1 576 + app_conf->group.length + 1 577 + app_conf->name.length + sizeof("\"\" application"); 578 579 init = nxt_malloc(size); 580 if (nxt_slow_path(init == NULL)) { 581 return NXT_ERROR; 582 } 583 584 init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t)); 585 user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t)); 586 587 nxt_memcpy(user, app_conf->user.start, app_conf->user.length); 588 last = nxt_pointer_to(user, app_conf->user.length); 589 *last++ = '\0'; 590 591 init->user_cred->user = user; 592 593 if (app_conf->group.start != NULL) { 594 group = (char *) last; 595 596 nxt_memcpy(group, app_conf->group.start, app_conf->group.length); 597 last = nxt_pointer_to(group, app_conf->group.length); 598 *last++ = '\0'; 599 600 } else { 601 group = NULL; 602 } 603 604 if (nxt_user_cred_get(task, init->user_cred, group) != NXT_OK) { 605 return NXT_ERROR; 606 } 607 608 title = last; 609 end = title + app_conf->name.length + sizeof("\"\" application"); 610 611 nxt_sprintf(title, end, "\"%V\" application%Z", &app_conf->name); 612 613 init->start = nxt_app_start; 614 init->name = (char *) title; 615 init->port_handlers = &nxt_app_process_port_handlers; 616 init->signals = nxt_worker_process_signals; 617 init->type = NXT_PROCESS_WORKER; 618 init->data = app_conf; 619 init->stream = stream; 620 init->restart = NULL; 621 622 return nxt_main_create_worker_process(task, rt, init); 623 } 624 625 626 static nxt_int_t 627 nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, 628 nxt_process_init_t *init) 629 { 630 nxt_int_t ret; 631 nxt_pid_t pid; 632 nxt_port_t *port; 633 nxt_process_t *process; 634 635 /* 636 * TODO: remove process, init, ports from array on memory and fork failures. 637 */ 638 639 process = nxt_runtime_process_new(rt); 640 if (nxt_slow_path(process == NULL)) { 641 return NXT_ERROR; 642 } 643 644 process->init = init; 645 646 port = nxt_port_new(task, 0, 0, init->type); 647 if (nxt_slow_path(port == NULL)) { 648 nxt_process_use(task, process, -1); 649 return NXT_ERROR; 650 } 651 652 nxt_process_port_add(task, process, port); 653 654 nxt_process_use(task, process, -1); 655 656 ret = nxt_port_socket_init(task, port, 0); 657 if (nxt_slow_path(ret != NXT_OK)) { 658 nxt_port_use(task, port, -1); 659 return ret; 660 } 661 662 pid = nxt_process_create(task, process); 663 664 nxt_port_use(task, port, -1); 665 666 switch (pid) { 667 668 case -1: 669 return NXT_ERROR; 670 671 case 0: 672 /* A worker process, return to the event engine work queue loop. */ 673 return NXT_AGAIN; 674 675 default: 676 /* The main process created a new process. */ 677 678 nxt_port_read_close(port); 679 nxt_port_write_enable(task, port); 680 681 return NXT_OK; 682 } 683 } 684 685 686 void 687 nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) 688 { 689 nxt_port_t *port; 690 nxt_process_t *process; 691 692 nxt_runtime_process_each(rt, process) { 693 694 if (nxt_pid != process->pid) { 695 process->init = NULL; 696 697 nxt_process_port_each(process, port) { 698 699 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, 700 -1, 0, 0, NULL); 701 702 } nxt_process_port_loop; 703 } 704 705 } nxt_runtime_process_loop; 706 } 707 708 709 710 static void 711 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) 712 { 713 nxt_debug(task, "sigterm handler signo:%d (%s)", 714 (int) (uintptr_t) obj, data); 715 716 /* TODO: fast exit. */ 717 718 nxt_exiting = 1; 719 720 nxt_runtime_quit(task, 0); 721 } 722 723 724 static void 725 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data) 726 { 727 nxt_debug(task, "sigquit handler signo:%d (%s)", 728 (int) (uintptr_t) obj, data); 729 730 /* TODO: graceful exit. */ 731 732 nxt_exiting = 1; 733 734 nxt_runtime_quit(task, 0); 735 } 736 737 738 static void 739 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) 740 { 741 nxt_mp_t *mp; 742 nxt_int_t ret; 743 nxt_uint_t n; 744 nxt_port_t *port; 745 nxt_file_t *file, *new_file; 746 nxt_array_t *new_files; 747 nxt_runtime_t *rt; 748 749 nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s", 750 (int) (uintptr_t) obj, data, "log files rotation"); 751 752 rt = task->thread->runtime; 753 754 port = rt->port_by_type[NXT_PROCESS_ROUTER]; 755 756 if (nxt_fast_path(port != NULL)) { 757 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG, 758 -1, 0, 0, NULL); 759 } 760 761 mp = nxt_mp_create(1024, 128, 256, 32); 762 if (mp == NULL) { 763 return; 764 } 765 766 n = nxt_list_nelts(rt->log_files); 767 768 new_files = nxt_array_create(mp, n, sizeof(nxt_file_t)); 769 if (new_files == NULL) { 770 nxt_mp_destroy(mp); 771 return; 772 } 773 774 nxt_list_each(file, rt->log_files) { 775 776 /* This allocation cannot fail. */ 777 new_file = nxt_array_add(new_files); 778 779 new_file->name = file->name; 780 new_file->fd = NXT_FILE_INVALID; 781 new_file->log_level = NXT_LOG_ALERT; 782 783 ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT, 784 NXT_FILE_OWNER_ACCESS); 785 786 if (ret != NXT_OK) { 787 goto fail; 788 } 789 790 } nxt_list_loop; 791 792 new_file = new_files->elts; 793 794 ret = nxt_file_stderr(&new_file[0]); 795 796 if (ret == NXT_OK) { 797 n = 0; 798 799 nxt_list_each(file, rt->log_files) { 800 801 nxt_port_change_log_file(task, rt, n, new_file[n].fd); 802 /* 803 * The old log file descriptor must be closed at the moment 804 * when no other threads use it. dup2() allows to use the 805 * old file descriptor for new log file. This change is 806 * performed atomically in the kernel. 807 */ 808 (void) nxt_file_redirect(file, new_file[n].fd); 809 810 n++; 811 812 } nxt_list_loop; 813 814 nxt_mp_destroy(mp); 815 return; 816 } 817 818 fail: 819 820 new_file = new_files->elts; 821 n = new_files->nelts; 822 823 while (n != 0) { 824 if (new_file->fd != NXT_FILE_INVALID) { 825 nxt_file_close(task, new_file); 826 } 827 828 new_file++; 829 n--; 830 } 831 832 nxt_mp_destroy(mp); 833 } 834 835 836 static void 837 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) 838 { 839 int status; 840 nxt_err_t err; 841 nxt_pid_t pid; 842 843 nxt_debug(task, "sigchld handler signo:%d (%s)", 844 (int) (uintptr_t) obj, data); 845 846 for ( ;; ) { 847 pid = waitpid(-1, &status, WNOHANG); 848 849 if (pid == -1) { 850 851 switch (err = nxt_errno) { 852 853 case NXT_ECHILD: 854 return; 855 856 case NXT_EINTR: 857 continue; 858 859 default: 860 nxt_alert(task, "waitpid() failed: %E", err); 861 return; 862 } 863 } 864 865 nxt_debug(task, "waitpid(): %PI", pid); 866 867 if (pid == 0) { 868 return; 869 } 870 871 if (WTERMSIG(status)) { 872 #ifdef WCOREDUMP 873 nxt_alert(task, "process %PI exited on signal %d%s", 874 pid, WTERMSIG(status), 875 WCOREDUMP(status) ? " (core dumped)" : ""); 876 #else 877 nxt_alert(task, "process %PI exited on signal %d", 878 pid, WTERMSIG(status)); 879 #endif 880 881 } else { 882 nxt_trace(task, "process %PI exited with code %d", 883 pid, WEXITSTATUS(status)); 884 } 885 886 nxt_main_cleanup_worker_process(task, pid); 887 } 888 } 889 890 891 static void 892 nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) 893 { 894 nxt_buf_t *buf; 895 nxt_port_t *port; 896 nxt_runtime_t *rt; 897 nxt_process_t *process; 898 nxt_process_type_t ptype; 899 nxt_process_init_t *init; 900 901 rt = task->thread->runtime; 902 903 process = nxt_runtime_process_find(rt, pid); 904 905 if (process) { 906 init = process->init; 907 908 ptype = nxt_process_type(process); 909 910 if (process->ready && init != NULL) { 911 init->stream = 0; 912 } 913 914 nxt_process_close_ports(task, process); 915 916 if (!nxt_exiting) { 917 nxt_runtime_process_each(rt, process) { 918 919 if (process->pid == nxt_pid 920 || process->pid == pid 921 || nxt_queue_is_empty(&process->ports)) 922 { 923 continue; 924 } 925 926 port = nxt_process_port_first(process); 927 928 if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) { 929 continue; 930 } 931 932 buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 933 sizeof(pid)); 934 if (nxt_slow_path(buf == NULL)) { 935 continue; 936 } 937 938 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); 939 940 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, 941 -1, init->stream, 0, buf); 942 } nxt_runtime_process_loop; 943 } 944 945 if (nxt_exiting) { 946 947 if (rt->nprocesses == 2) { 948 nxt_runtime_quit(task, 0); 949 } 950 951 } else if (init != NULL) { 952 if (init->restart != NULL) { 953 if (init->type == NXT_PROCESS_ROUTER) { 954 nxt_main_stop_worker_processes(task, rt); 955 } 956 957 init->restart(task, rt, init); 958 959 } else { 960 nxt_free(init); 961 } 962 } 963 } 964 } 965 966 967 static void 968 nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) 969 { 970 nxt_port_t *port; 971 nxt_process_t *process; 972 973 nxt_runtime_process_each(rt, process) { 974 975 nxt_process_port_each(process, port) { 976 977 if (port->type == NXT_PROCESS_WORKER) { 978 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, 979 -1, 0, 0, NULL); 980 } 981 982 } nxt_process_port_loop; 983 984 } nxt_runtime_process_loop; 985 } 986 987 988 static void 989 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 990 { 991 size_t size; 992 nxt_int_t ret; 993 nxt_buf_t *b, *out; 994 nxt_port_t *port; 995 nxt_sockaddr_t *sa; 996 nxt_port_msg_type_t type; 997 nxt_listening_socket_t ls; 998 u_char message[2048]; 999 1000 b = msg->buf; 1001 sa = (nxt_sockaddr_t *) b->mem.pos; 1002 1003 /* TODO check b size and make plain */ 1004 1005 out = NULL; 1006 1007 ls.socket = -1; 1008 ls.error = NXT_SOCKET_ERROR_SYSTEM; 1009 ls.start = message; 1010 ls.end = message + sizeof(message); 1011 1012 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1013 msg->port_msg.reply_port); 1014 1015 nxt_debug(task, "listening socket \"%*s\"", 1016 (size_t) sa->length, nxt_sockaddr_start(sa)); 1017 1018 ret = nxt_main_listening_socket(sa, &ls); 1019 1020 if (ret == NXT_OK) { 1021 nxt_debug(task, "socket(\"%*s\"): %d", 1022 (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket); 1023 1024 type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD; 1025 1026 } else { 1027 size = ls.end - ls.start; 1028 1029 nxt_alert(task, "%*s", size, ls.start); 1030 1031 out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 1032 size + 1); 1033 if (nxt_slow_path(out == NULL)) { 1034 return; 1035 } 1036 1037 *out->mem.free++ = (uint8_t) ls.error; 1038 1039 out->mem.free = nxt_cpymem(out->mem.free, ls.start, size); 1040 1041 type = NXT_PORT_MSG_RPC_ERROR; 1042 } 1043 1044 nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream, 1045 0, out); 1046 } 1047 1048 1049 static nxt_int_t 1050 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls) 1051 { 1052 nxt_err_t err; 1053 nxt_socket_t s; 1054 1055 const socklen_t length = sizeof(int); 1056 static const int enable = 1; 1057 1058 s = socket(sa->u.sockaddr.sa_family, sa->type, 0); 1059 1060 if (nxt_slow_path(s == -1)) { 1061 err = nxt_errno; 1062 1063 #if (NXT_INET6) 1064 1065 if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) { 1066 ls->error = NXT_SOCKET_ERROR_NOINET6; 1067 } 1068 1069 #endif 1070 1071 ls->end = nxt_sprintf(ls->start, ls->end, 1072 "socket(\\\"%*s\\\") failed %E", 1073 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1074 1075 return NXT_ERROR; 1076 } 1077 1078 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) { 1079 ls->end = nxt_sprintf(ls->start, ls->end, 1080 "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E", 1081 (size_t) sa->length, nxt_sockaddr_start(sa), 1082 nxt_errno); 1083 goto fail; 1084 } 1085 1086 #if (NXT_INET6) 1087 1088 if (sa->u.sockaddr.sa_family == AF_INET6) { 1089 1090 if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) { 1091 ls->end = nxt_sprintf(ls->start, ls->end, 1092 "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E", 1093 (size_t) sa->length, nxt_sockaddr_start(sa), 1094 nxt_errno); 1095 goto fail; 1096 } 1097 } 1098 1099 #endif 1100 1101 if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) { 1102 err = nxt_errno; 1103 1104 #if (NXT_HAVE_UNIX_DOMAIN) 1105 1106 if (sa->u.sockaddr.sa_family == AF_UNIX) { 1107 switch (err) { 1108 1109 case EACCES: 1110 ls->error = NXT_SOCKET_ERROR_ACCESS; 1111 break; 1112 1113 case ENOENT: 1114 case ENOTDIR: 1115 ls->error = NXT_SOCKET_ERROR_PATH; 1116 break; 1117 } 1118 1119 goto next; 1120 } 1121 1122 #endif 1123 1124 switch (err) { 1125 1126 case EACCES: 1127 ls->error = NXT_SOCKET_ERROR_PORT; 1128 break; 1129 1130 case EADDRINUSE: 1131 ls->error = NXT_SOCKET_ERROR_INUSE; 1132 break; 1133 1134 case EADDRNOTAVAIL: 1135 ls->error = NXT_SOCKET_ERROR_NOADDR; 1136 break; 1137 } 1138 1139 ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E", 1140 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1141 goto fail; 1142 } 1143 1144 #if (NXT_HAVE_UNIX_DOMAIN) 1145 1146 next: 1147 1148 if (sa->u.sockaddr.sa_family == AF_UNIX) { 1149 char *filename; 1150 mode_t access; 1151 1152 filename = sa->u.sockaddr_un.sun_path; 1153 access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); 1154 1155 if (chmod(filename, access) != 0) { 1156 ls->end = nxt_sprintf(ls->start, ls->end, 1157 "chmod(\\\"%s\\\") failed %E", 1158 filename, nxt_errno); 1159 goto fail; 1160 } 1161 } 1162 1163 #endif 1164 1165 ls->socket = s; 1166 1167 return NXT_OK; 1168 1169 fail: 1170 1171 (void) close(s); 1172 1173 return NXT_ERROR; 1174 } 1175 1176 1177 static nxt_conf_map_t nxt_app_lang_module_map[] = { 1178 { 1179 nxt_string("type"), 1180 NXT_CONF_MAP_INT, 1181 offsetof(nxt_app_lang_module_t, type), 1182 }, 1183 1184 { 1185 nxt_string("version"), 1186 NXT_CONF_MAP_CSTRZ, 1187 offsetof(nxt_app_lang_module_t, version), 1188 }, 1189 1190 { 1191 nxt_string("file"), 1192 NXT_CONF_MAP_CSTRZ, 1193 offsetof(nxt_app_lang_module_t, file), 1194 }, 1195 }; 1196 1197 1198 static void 1199 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1200 { 1201 uint32_t index; 1202 nxt_mp_t *mp; 1203 nxt_int_t ret; 1204 nxt_buf_t *b; 1205 nxt_port_t *port; 1206 nxt_runtime_t *rt; 1207 nxt_conf_value_t *conf, *root, *value; 1208 nxt_app_lang_module_t *lang; 1209 1210 static nxt_str_t root_path = nxt_string("/"); 1211 1212 rt = task->thread->runtime; 1213 1214 if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) { 1215 return; 1216 } 1217 1218 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1219 msg->port_msg.reply_port); 1220 1221 if (nxt_fast_path(port != NULL)) { 1222 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 1223 msg->port_msg.stream, 0, NULL); 1224 } 1225 1226 b = msg->buf; 1227 1228 if (b == NULL) { 1229 return; 1230 } 1231 1232 mp = nxt_mp_create(1024, 128, 256, 32); 1233 if (mp == NULL) { 1234 return; 1235 } 1236 1237 b = nxt_buf_chk_make_plain(mp, b, msg->size); 1238 1239 if (b == NULL) { 1240 return; 1241 } 1242 1243 nxt_debug(task, "application languages: \"%*s\"", 1244 b->mem.free - b->mem.pos, b->mem.pos); 1245 1246 conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL); 1247 if (conf == NULL) { 1248 goto fail; 1249 } 1250 1251 root = nxt_conf_get_path(conf, &root_path); 1252 if (root == NULL) { 1253 goto fail; 1254 } 1255 1256 for (index = 0; /* void */ ; index++) { 1257 value = nxt_conf_get_array_element(root, index); 1258 if (value == NULL) { 1259 break; 1260 } 1261 1262 lang = nxt_array_add(rt->languages); 1263 if (lang == NULL) { 1264 goto fail; 1265 } 1266 1267 lang->module = NULL; 1268 1269 ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map, 1270 nxt_nitems(nxt_app_lang_module_map), lang); 1271 1272 if (ret != NXT_OK) { 1273 goto fail; 1274 } 1275 1276 nxt_debug(task, "lang %d %s \"%s\"", 1277 lang->type, lang->version, lang->file); 1278 } 1279 1280 qsort(rt->languages->elts, rt->languages->nelts, 1281 sizeof(nxt_app_lang_module_t), nxt_app_lang_compare); 1282 1283 fail: 1284 1285 nxt_mp_destroy(mp); 1286 1287 ret = nxt_main_start_controller_process(task, rt); 1288 1289 if (ret == NXT_OK) { 1290 (void) nxt_main_start_router_process(task, rt); 1291 } 1292 } 1293 1294 1295 static int nxt_cdecl 1296 nxt_app_lang_compare(const void *v1, const void *v2) 1297 { 1298 int n; 1299 const nxt_app_lang_module_t *lang1, *lang2; 1300 1301 lang1 = v1; 1302 lang2 = v2; 1303 1304 n = lang1->type - lang2->type; 1305 1306 if (n != 0) { 1307 return n; 1308 } 1309 1310 n = nxt_strverscmp(lang1->version, lang2->version); 1311 1312 /* Negate result to move higher versions to the beginning. */ 1313 1314 return -n; 1315 } 1316 1317 1318 static void 1319 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1320 { 1321 ssize_t n, size, offset; 1322 nxt_buf_t *b; 1323 nxt_int_t ret; 1324 nxt_file_t file; 1325 nxt_runtime_t *rt; 1326 1327 nxt_memzero(&file, sizeof(nxt_file_t)); 1328 1329 rt = task->thread->runtime; 1330 1331 file.name = (nxt_file_name_t *) rt->conf_tmp; 1332 1333 if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY, 1334 NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS) 1335 != NXT_OK)) 1336 { 1337 goto error; 1338 } 1339 1340 offset = 0; 1341 1342 for (b = msg->buf; b != NULL; b = b->next) { 1343 size = nxt_buf_mem_used_size(&b->mem); 1344 1345 n = nxt_file_write(&file, b->mem.pos, size, offset); 1346 1347 if (nxt_slow_path(n != size)) { 1348 nxt_file_close(task, &file); 1349 (void) nxt_file_delete(file.name); 1350 goto error; 1351 } 1352 1353 offset += n; 1354 } 1355 1356 nxt_file_close(task, &file); 1357 1358 ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf); 1359 1360 if (nxt_fast_path(ret == NXT_OK)) { 1361 return; 1362 } 1363 1364 error: 1365 1366 nxt_alert(task, "failed to store current configuration"); 1367 } 1368 1369 1370 static void 1371 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1372 { 1373 u_char *path; 1374 nxt_int_t ret; 1375 nxt_file_t file; 1376 nxt_port_t *port; 1377 nxt_port_msg_type_t type; 1378 1379 nxt_debug(task, "opening access log file"); 1380 1381 path = msg->buf->mem.pos; 1382 1383 nxt_memzero(&file, sizeof(nxt_file_t)); 1384 1385 file.name = (nxt_file_name_t *) path; 1386 file.log_level = NXT_LOG_ERR; 1387 1388 ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT, 1389 NXT_FILE_OWNER_ACCESS); 1390 1391 type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD 1392 : NXT_PORT_MSG_RPC_ERROR; 1393 1394 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1395 msg->port_msg.reply_port); 1396 1397 if (nxt_fast_path(port != NULL)) { 1398 (void) nxt_port_socket_write(task, port, type, file.fd, 1399 msg->port_msg.stream, 0, NULL); 1400 } 1401 } 1402