1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_runtime.h> 9 #include <nxt_port.h> 10 #include <nxt_main_process.h> 11 #include <nxt_conf.h> 12 #include <nxt_router.h> 13 #if (NXT_TLS) 14 #include <nxt_cert.h> 15 #endif 16 17 18 typedef struct { 19 nxt_socket_t socket; 20 nxt_socket_error_t error; 21 u_char *start; 22 u_char *end; 23 } nxt_listening_socket_t; 24 25 26 typedef struct { 27 nxt_uint_t size; 28 nxt_conf_map_t *map; 29 } nxt_conf_app_map_t; 30 31 32 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, 33 nxt_runtime_t *rt); 34 static void nxt_main_process_title(nxt_task_t *task); 35 static nxt_int_t nxt_main_start_controller_process(nxt_task_t *task, 36 nxt_runtime_t *rt); 37 static nxt_int_t nxt_main_create_controller_process(nxt_task_t *task, 38 nxt_runtime_t *rt, nxt_process_init_t *init); 39 static nxt_int_t nxt_main_start_router_process(nxt_task_t *task, 40 nxt_runtime_t *rt); 41 static nxt_int_t nxt_main_start_discovery_process(nxt_task_t *task, 42 nxt_runtime_t *rt); 43 static nxt_int_t nxt_main_start_worker_process(nxt_task_t *task, 44 nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream); 45 static nxt_int_t nxt_main_create_worker_process(nxt_task_t *task, 46 nxt_runtime_t *rt, nxt_process_init_t *init); 47 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, 48 void *data); 49 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, 50 void *data); 51 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, 52 void *data); 53 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, 54 void *data); 55 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj, 56 void *data); 57 static void nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid); 58 static void nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt); 59 static void nxt_main_port_socket_handler(nxt_task_t *task, 60 nxt_port_recv_msg_t *msg); 61 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, 62 nxt_listening_socket_t *ls); 63 static void nxt_main_port_modules_handler(nxt_task_t *task, 64 nxt_port_recv_msg_t *msg); 65 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); 66 static void nxt_main_port_conf_store_handler(nxt_task_t *task, 67 nxt_port_recv_msg_t *msg); 68 static void nxt_main_port_access_log_handler(nxt_task_t *task, 69 nxt_port_recv_msg_t *msg); 70 71 72 const nxt_sig_event_t nxt_main_process_signals[] = { 73 nxt_event_signal(SIGHUP, nxt_main_process_signal_handler), 74 nxt_event_signal(SIGINT, nxt_main_process_sigterm_handler), 75 nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler), 76 nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler), 77 nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler), 78 nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler), 79 nxt_event_signal_end, 80 }; 81 82 83 static nxt_bool_t nxt_exiting; 84 85 86 nxt_int_t 87 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, 88 nxt_runtime_t *rt) 89 { 90 rt->type = NXT_PROCESS_MAIN; 91 92 if (nxt_main_process_port_create(task, rt) != NXT_OK) { 93 return NXT_ERROR; 94 } 95 96 nxt_main_process_title(task); 97 98 /* 99 * The dicsovery process will send a message processed by 100 * nxt_main_port_modules_handler() which starts the controller 101 * and router processes. 102 */ 103 return nxt_main_start_discovery_process(task, rt); 104 } 105 106 107 static nxt_conf_map_t nxt_common_app_conf[] = { 108 { 109 nxt_string("type"), 110 NXT_CONF_MAP_STR, 111 offsetof(nxt_common_app_conf_t, type), 112 }, 113 114 { 115 nxt_string("user"), 116 NXT_CONF_MAP_STR, 117 offsetof(nxt_common_app_conf_t, user), 118 }, 119 120 { 121 nxt_string("group"), 122 NXT_CONF_MAP_STR, 123 offsetof(nxt_common_app_conf_t, group), 124 }, 125 126 { 127 nxt_string("working_directory"), 128 NXT_CONF_MAP_CSTRZ, 129 offsetof(nxt_common_app_conf_t, working_directory), 130 }, 131 132 { 133 nxt_string("environment"), 134 NXT_CONF_MAP_PTR, 135 offsetof(nxt_common_app_conf_t, environment), 136 }, 137 }; 138 139 140 static nxt_conf_map_t nxt_external_app_conf[] = { 141 { 142 nxt_string("executable"), 143 NXT_CONF_MAP_CSTRZ, 144 offsetof(nxt_common_app_conf_t, u.external.executable), 145 }, 146 147 { 148 nxt_string("arguments"), 149 NXT_CONF_MAP_PTR, 150 offsetof(nxt_common_app_conf_t, u.external.arguments), 151 }, 152 153 }; 154 155 156 static nxt_conf_map_t nxt_python_app_conf[] = { 157 { 158 nxt_string("home"), 159 NXT_CONF_MAP_CSTRZ, 160 offsetof(nxt_common_app_conf_t, u.python.home), 161 }, 162 163 { 164 nxt_string("path"), 165 NXT_CONF_MAP_STR, 166 offsetof(nxt_common_app_conf_t, u.python.path), 167 }, 168 169 { 170 nxt_string("module"), 171 NXT_CONF_MAP_STR, 172 offsetof(nxt_common_app_conf_t, u.python.module), 173 }, 174 }; 175 176 177 static nxt_conf_map_t nxt_php_app_conf[] = { 178 { 179 nxt_string("root"), 180 NXT_CONF_MAP_CSTRZ, 181 offsetof(nxt_common_app_conf_t, u.php.root), 182 }, 183 184 { 185 nxt_string("script"), 186 NXT_CONF_MAP_STR, 187 offsetof(nxt_common_app_conf_t, u.php.script), 188 }, 189 190 { 191 nxt_string("index"), 192 NXT_CONF_MAP_STR, 193 offsetof(nxt_common_app_conf_t, u.php.index), 194 }, 195 196 { 197 nxt_string("options"), 198 NXT_CONF_MAP_PTR, 199 offsetof(nxt_common_app_conf_t, u.php.options), 200 }, 201 }; 202 203 204 static nxt_conf_map_t nxt_perl_app_conf[] = { 205 { 206 nxt_string("script"), 207 NXT_CONF_MAP_CSTRZ, 208 offsetof(nxt_common_app_conf_t, u.perl.script), 209 }, 210 }; 211 212 213 static nxt_conf_map_t nxt_ruby_app_conf[] = { 214 { 215 nxt_string("script"), 216 NXT_CONF_MAP_STR, 217 offsetof(nxt_common_app_conf_t, u.ruby.script), 218 }, 219 }; 220 221 222 static nxt_conf_map_t nxt_java_app_conf[] = { 223 { 224 nxt_string("classpath"), 225 NXT_CONF_MAP_PTR, 226 offsetof(nxt_common_app_conf_t, u.java.classpath), 227 }, 228 { 229 nxt_string("webapp"), 230 NXT_CONF_MAP_CSTRZ, 231 offsetof(nxt_common_app_conf_t, u.java.webapp), 232 }, 233 { 234 nxt_string("options"), 235 NXT_CONF_MAP_PTR, 236 offsetof(nxt_common_app_conf_t, u.java.options), 237 }, 238 { 239 nxt_string("unit_jars"), 240 NXT_CONF_MAP_CSTRZ, 241 offsetof(nxt_common_app_conf_t, u.java.unit_jars), 242 }, 243 244 }; 245 246 247 static nxt_conf_app_map_t nxt_app_maps[] = { 248 { nxt_nitems(nxt_external_app_conf), nxt_external_app_conf }, 249 { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf }, 250 { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf }, 251 { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf }, 252 { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf }, 253 { nxt_nitems(nxt_java_app_conf), nxt_java_app_conf }, 254 }; 255 256 257 static void 258 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 259 { 260 nxt_debug(task, "main data: %*s", 261 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 262 } 263 264 265 static void 266 nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 267 { 268 u_char *start, ch; 269 size_t type_len; 270 nxt_mp_t *mp; 271 nxt_int_t ret; 272 nxt_buf_t *b; 273 nxt_port_t *port; 274 nxt_app_type_t idx; 275 nxt_conf_value_t *conf; 276 nxt_common_app_conf_t app_conf; 277 278 static nxt_str_t nobody = nxt_string("nobody"); 279 280 ret = NXT_ERROR; 281 282 mp = nxt_mp_create(1024, 128, 256, 32); 283 284 if (nxt_slow_path(mp == NULL)) { 285 return; 286 } 287 288 b = nxt_buf_chk_make_plain(mp, msg->buf, msg->size); 289 290 if (b == NULL) { 291 return; 292 } 293 294 nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos, 295 b->mem.pos); 296 297 nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t)); 298 299 start = b->mem.pos; 300 301 app_conf.name.start = start; 302 app_conf.name.length = nxt_strlen(start); 303 304 start += app_conf.name.length + 1; 305 306 conf = nxt_conf_json_parse(mp, start, b->mem.free, NULL); 307 308 if (conf == NULL) { 309 nxt_alert(task, "router app configuration parsing error"); 310 311 goto failed; 312 } 313 314 app_conf.user = nobody; 315 316 ret = nxt_conf_map_object(mp, conf, nxt_common_app_conf, 317 nxt_nitems(nxt_common_app_conf), &app_conf); 318 if (ret != NXT_OK) { 319 nxt_alert(task, "failed to map common app conf received from router"); 320 goto failed; 321 } 322 323 for (type_len = 0; type_len != app_conf.type.length; type_len++) { 324 ch = app_conf.type.start[type_len]; 325 326 if (ch == ' ' || nxt_isdigit(ch)) { 327 break; 328 } 329 } 330 331 idx = nxt_app_parse_type(app_conf.type.start, type_len); 332 333 if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) { 334 nxt_alert(task, "invalid app type %d received from router", (int) idx); 335 goto failed; 336 } 337 338 ret = nxt_conf_map_object(mp, conf, nxt_app_maps[idx].map, 339 nxt_app_maps[idx].size, &app_conf); 340 341 if (nxt_slow_path(ret != NXT_OK)) { 342 nxt_alert(task, "failed to map app conf received from router"); 343 goto failed; 344 } 345 346 ret = nxt_main_start_worker_process(task, task->thread->runtime, 347 &app_conf, msg->port_msg.stream); 348 349 failed: 350 351 if (ret == NXT_ERROR) { 352 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 353 msg->port_msg.reply_port); 354 if (nxt_fast_path(port != NULL)) { 355 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 356 -1, msg->port_msg.stream, 0, NULL); 357 } 358 } 359 360 nxt_mp_destroy(mp); 361 } 362 363 364 static nxt_port_handlers_t nxt_main_process_port_handlers = { 365 .data = nxt_port_main_data_handler, 366 .process_ready = nxt_port_process_ready_handler, 367 .start_worker = nxt_port_main_start_worker_handler, 368 .socket = nxt_main_port_socket_handler, 369 .modules = nxt_main_port_modules_handler, 370 .conf_store = nxt_main_port_conf_store_handler, 371 #if (NXT_TLS) 372 .cert_get = nxt_cert_store_get_handler, 373 .cert_delete = nxt_cert_store_delete_handler, 374 #endif 375 .access_log = nxt_main_port_access_log_handler, 376 .rpc_ready = nxt_port_rpc_handler, 377 .rpc_error = nxt_port_rpc_handler, 378 }; 379 380 381 static nxt_int_t 382 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) 383 { 384 nxt_int_t ret; 385 nxt_port_t *port; 386 nxt_process_t *process; 387 388 process = nxt_runtime_process_get(rt, nxt_pid); 389 if (nxt_slow_path(process == NULL)) { 390 return NXT_ERROR; 391 } 392 393 port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN); 394 if (nxt_slow_path(port == NULL)) { 395 nxt_process_use(task, process, -1); 396 return NXT_ERROR; 397 } 398 399 nxt_process_port_add(task, process, port); 400 401 nxt_process_use(task, process, -1); 402 403 ret = nxt_port_socket_init(task, port, 0); 404 if (nxt_slow_path(ret != NXT_OK)) { 405 nxt_port_use(task, port, -1); 406 return ret; 407 } 408 409 nxt_runtime_port_add(task, port); 410 411 nxt_port_use(task, port, -1); 412 413 /* 414 * A main process port. A write port is not closed 415 * since it should be inherited by worker processes. 416 */ 417 nxt_port_enable(task, port, &nxt_main_process_port_handlers); 418 419 process->ready = 1; 420 421 return NXT_OK; 422 } 423 424 425 static void 426 nxt_main_process_title(nxt_task_t *task) 427 { 428 u_char *p, *end; 429 nxt_uint_t i; 430 u_char title[2048]; 431 432 end = title + sizeof(title) - 1; 433 434 p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s", 435 nxt_process_argv[0]); 436 437 for (i = 1; nxt_process_argv[i] != NULL; i++) { 438 p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]); 439 } 440 441 if (p < end) { 442 *p++ = ']'; 443 } 444 445 *p = '\0'; 446 447 nxt_process_title(task, "%s", title); 448 } 449 450 451 static nxt_int_t 452 nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) 453 { 454 nxt_process_init_t *init; 455 456 init = nxt_malloc(sizeof(nxt_process_init_t)); 457 if (nxt_slow_path(init == NULL)) { 458 return NXT_ERROR; 459 } 460 461 init->start = nxt_controller_start; 462 init->name = "controller"; 463 init->user_cred = &rt->user_cred; 464 init->port_handlers = &nxt_controller_process_port_handlers; 465 init->signals = nxt_worker_process_signals; 466 init->type = NXT_PROCESS_CONTROLLER; 467 init->stream = 0; 468 init->restart = &nxt_main_create_controller_process; 469 470 return nxt_main_create_controller_process(task, rt, init);; 471 } 472 473 474 static nxt_int_t 475 nxt_main_create_controller_process(nxt_task_t *task, nxt_runtime_t *rt, 476 nxt_process_init_t *init) 477 { 478 ssize_t n; 479 nxt_int_t ret; 480 nxt_str_t *conf; 481 nxt_file_t file; 482 nxt_file_info_t fi; 483 nxt_controller_init_t ctrl_init; 484 485 nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t)); 486 487 conf = &ctrl_init.conf; 488 489 nxt_memzero(&file, sizeof(nxt_file_t)); 490 491 file.name = (nxt_file_name_t *) rt->conf; 492 493 ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0); 494 495 if (ret == NXT_OK) { 496 ret = nxt_file_info(&file, &fi); 497 498 if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) { 499 conf->length = nxt_file_size(&fi); 500 conf->start = nxt_malloc(conf->length); 501 502 if (nxt_slow_path(conf->start == NULL)) { 503 nxt_file_close(task, &file); 504 return NXT_ERROR; 505 } 506 507 n = nxt_file_read(&file, conf->start, conf->length, 0); 508 509 if (nxt_slow_path(n != (ssize_t) conf->length)) { 510 nxt_free(conf->start); 511 conf->start = NULL; 512 513 nxt_alert(task, "failed to restore previous configuration: " 514 "cannot read the file"); 515 } 516 } 517 518 nxt_file_close(task, &file); 519 } 520 521 #if (NXT_TLS) 522 ctrl_init.certs = nxt_cert_store_load(task); 523 #endif 524 525 init->data = &ctrl_init; 526 527 ret = nxt_main_create_worker_process(task, rt, init); 528 529 if (ret == NXT_OK) { 530 if (conf->start != NULL) { 531 nxt_free(conf->start); 532 } 533 534 #if (NXT_TLS) 535 if (ctrl_init.certs != NULL) { 536 nxt_cert_store_release(ctrl_init.certs); 537 } 538 #endif 539 } 540 541 return ret; 542 } 543 544 545 static nxt_int_t 546 nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt) 547 { 548 nxt_process_init_t *init; 549 550 init = nxt_malloc(sizeof(nxt_process_init_t)); 551 if (nxt_slow_path(init == NULL)) { 552 return NXT_ERROR; 553 } 554 555 init->start = nxt_discovery_start; 556 init->name = "discovery"; 557 init->user_cred = &rt->user_cred; 558 init->port_handlers = &nxt_discovery_process_port_handlers; 559 init->signals = nxt_worker_process_signals; 560 init->type = NXT_PROCESS_DISCOVERY; 561 init->data = rt; 562 init->stream = 0; 563 init->restart = NULL; 564 565 return nxt_main_create_worker_process(task, rt, init); 566 } 567 568 569 static nxt_int_t 570 nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) 571 { 572 nxt_process_init_t *init; 573 574 init = nxt_malloc(sizeof(nxt_process_init_t)); 575 if (nxt_slow_path(init == NULL)) { 576 return NXT_ERROR; 577 } 578 579 init->start = nxt_router_start; 580 init->name = "router"; 581 init->user_cred = &rt->user_cred; 582 init->port_handlers = &nxt_router_process_port_handlers; 583 init->signals = nxt_worker_process_signals; 584 init->type = NXT_PROCESS_ROUTER; 585 init->data = rt; 586 init->stream = 0; 587 init->restart = &nxt_main_create_worker_process; 588 589 return nxt_main_create_worker_process(task, rt, init); 590 } 591 592 593 static nxt_int_t 594 nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, 595 nxt_common_app_conf_t *app_conf, uint32_t stream) 596 { 597 char *user, *group; 598 u_char *title, *last, *end; 599 size_t size; 600 nxt_process_init_t *init; 601 602 size = sizeof(nxt_process_init_t) 603 + sizeof(nxt_user_cred_t) 604 + app_conf->user.length + 1 605 + app_conf->group.length + 1 606 + app_conf->name.length + sizeof("\"\" application"); 607 608 init = nxt_malloc(size); 609 if (nxt_slow_path(init == NULL)) { 610 return NXT_ERROR; 611 } 612 613 init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t)); 614 user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t)); 615 616 nxt_memcpy(user, app_conf->user.start, app_conf->user.length); 617 last = nxt_pointer_to(user, app_conf->user.length); 618 *last++ = '\0'; 619 620 init->user_cred->user = user; 621 622 if (app_conf->group.start != NULL) { 623 group = (char *) last; 624 625 nxt_memcpy(group, app_conf->group.start, app_conf->group.length); 626 last = nxt_pointer_to(group, app_conf->group.length); 627 *last++ = '\0'; 628 629 } else { 630 group = NULL; 631 } 632 633 if (nxt_user_cred_get(task, init->user_cred, group) != NXT_OK) { 634 return NXT_ERROR; 635 } 636 637 title = last; 638 end = title + app_conf->name.length + sizeof("\"\" application"); 639 640 nxt_sprintf(title, end, "\"%V\" application%Z", &app_conf->name); 641 642 init->start = nxt_app_start; 643 init->name = (char *) title; 644 init->port_handlers = &nxt_app_process_port_handlers; 645 init->signals = nxt_worker_process_signals; 646 init->type = NXT_PROCESS_WORKER; 647 init->data = app_conf; 648 init->stream = stream; 649 init->restart = NULL; 650 651 return nxt_main_create_worker_process(task, rt, init); 652 } 653 654 655 static nxt_int_t 656 nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, 657 nxt_process_init_t *init) 658 { 659 nxt_int_t ret; 660 nxt_pid_t pid; 661 nxt_port_t *port; 662 nxt_process_t *process; 663 664 /* 665 * TODO: remove process, init, ports from array on memory and fork failures. 666 */ 667 668 process = nxt_runtime_process_new(rt); 669 if (nxt_slow_path(process == NULL)) { 670 return NXT_ERROR; 671 } 672 673 process->init = init; 674 675 port = nxt_port_new(task, 0, 0, init->type); 676 if (nxt_slow_path(port == NULL)) { 677 nxt_process_use(task, process, -1); 678 return NXT_ERROR; 679 } 680 681 nxt_process_port_add(task, process, port); 682 683 nxt_process_use(task, process, -1); 684 685 ret = nxt_port_socket_init(task, port, 0); 686 if (nxt_slow_path(ret != NXT_OK)) { 687 nxt_port_use(task, port, -1); 688 return ret; 689 } 690 691 pid = nxt_process_create(task, process); 692 693 nxt_port_use(task, port, -1); 694 695 switch (pid) { 696 697 case -1: 698 return NXT_ERROR; 699 700 case 0: 701 /* A worker process, return to the event engine work queue loop. */ 702 return NXT_AGAIN; 703 704 default: 705 /* The main process created a new process. */ 706 707 nxt_port_read_close(port); 708 nxt_port_write_enable(task, port); 709 710 return NXT_OK; 711 } 712 } 713 714 715 void 716 nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) 717 { 718 nxt_port_t *port; 719 nxt_process_t *process; 720 721 nxt_runtime_process_each(rt, process) { 722 723 if (nxt_pid != process->pid) { 724 process->init = NULL; 725 726 nxt_process_port_each(process, port) { 727 728 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, 729 -1, 0, 0, NULL); 730 731 } nxt_process_port_loop; 732 } 733 734 } nxt_runtime_process_loop; 735 } 736 737 738 739 static void 740 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) 741 { 742 nxt_debug(task, "sigterm handler signo:%d (%s)", 743 (int) (uintptr_t) obj, data); 744 745 /* TODO: fast exit. */ 746 747 nxt_exiting = 1; 748 749 nxt_runtime_quit(task, 0); 750 } 751 752 753 static void 754 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data) 755 { 756 nxt_debug(task, "sigquit handler signo:%d (%s)", 757 (int) (uintptr_t) obj, data); 758 759 /* TODO: graceful exit. */ 760 761 nxt_exiting = 1; 762 763 nxt_runtime_quit(task, 0); 764 } 765 766 767 static void 768 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) 769 { 770 nxt_mp_t *mp; 771 nxt_int_t ret; 772 nxt_uint_t n; 773 nxt_port_t *port; 774 nxt_file_t *file, *new_file; 775 nxt_array_t *new_files; 776 nxt_runtime_t *rt; 777 778 nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s", 779 (int) (uintptr_t) obj, data, "log files rotation"); 780 781 rt = task->thread->runtime; 782 783 port = rt->port_by_type[NXT_PROCESS_ROUTER]; 784 785 if (nxt_fast_path(port != NULL)) { 786 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG, 787 -1, 0, 0, NULL); 788 } 789 790 mp = nxt_mp_create(1024, 128, 256, 32); 791 if (mp == NULL) { 792 return; 793 } 794 795 n = nxt_list_nelts(rt->log_files); 796 797 new_files = nxt_array_create(mp, n, sizeof(nxt_file_t)); 798 if (new_files == NULL) { 799 nxt_mp_destroy(mp); 800 return; 801 } 802 803 nxt_list_each(file, rt->log_files) { 804 805 /* This allocation cannot fail. */ 806 new_file = nxt_array_add(new_files); 807 808 new_file->name = file->name; 809 new_file->fd = NXT_FILE_INVALID; 810 new_file->log_level = NXT_LOG_ALERT; 811 812 ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT, 813 NXT_FILE_OWNER_ACCESS); 814 815 if (ret != NXT_OK) { 816 goto fail; 817 } 818 819 } nxt_list_loop; 820 821 new_file = new_files->elts; 822 823 ret = nxt_file_stderr(&new_file[0]); 824 825 if (ret == NXT_OK) { 826 n = 0; 827 828 nxt_list_each(file, rt->log_files) { 829 830 nxt_port_change_log_file(task, rt, n, new_file[n].fd); 831 /* 832 * The old log file descriptor must be closed at the moment 833 * when no other threads use it. dup2() allows to use the 834 * old file descriptor for new log file. This change is 835 * performed atomically in the kernel. 836 */ 837 (void) nxt_file_redirect(file, new_file[n].fd); 838 839 n++; 840 841 } nxt_list_loop; 842 843 nxt_mp_destroy(mp); 844 return; 845 } 846 847 fail: 848 849 new_file = new_files->elts; 850 n = new_files->nelts; 851 852 while (n != 0) { 853 if (new_file->fd != NXT_FILE_INVALID) { 854 nxt_file_close(task, new_file); 855 } 856 857 new_file++; 858 n--; 859 } 860 861 nxt_mp_destroy(mp); 862 } 863 864 865 static void 866 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) 867 { 868 int status; 869 nxt_err_t err; 870 nxt_pid_t pid; 871 872 nxt_debug(task, "sigchld handler signo:%d (%s)", 873 (int) (uintptr_t) obj, data); 874 875 for ( ;; ) { 876 pid = waitpid(-1, &status, WNOHANG); 877 878 if (pid == -1) { 879 880 switch (err = nxt_errno) { 881 882 case NXT_ECHILD: 883 return; 884 885 case NXT_EINTR: 886 continue; 887 888 default: 889 nxt_alert(task, "waitpid() failed: %E", err); 890 return; 891 } 892 } 893 894 nxt_debug(task, "waitpid(): %PI", pid); 895 896 if (pid == 0) { 897 return; 898 } 899 900 if (WTERMSIG(status)) { 901 #ifdef WCOREDUMP 902 nxt_alert(task, "process %PI exited on signal %d%s", 903 pid, WTERMSIG(status), 904 WCOREDUMP(status) ? " (core dumped)" : ""); 905 #else 906 nxt_alert(task, "process %PI exited on signal %d", 907 pid, WTERMSIG(status)); 908 #endif 909 910 } else { 911 nxt_trace(task, "process %PI exited with code %d", 912 pid, WEXITSTATUS(status)); 913 } 914 915 nxt_main_cleanup_worker_process(task, pid); 916 } 917 } 918 919 920 static void 921 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) 922 { 923 nxt_trace(task, "signal signo:%d (%s) recevied, ignored", 924 (int) (uintptr_t) obj, data); 925 } 926 927 928 static void 929 nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) 930 { 931 nxt_buf_t *buf; 932 nxt_port_t *port; 933 nxt_runtime_t *rt; 934 nxt_process_t *process; 935 nxt_process_type_t ptype; 936 nxt_process_init_t *init; 937 938 rt = task->thread->runtime; 939 940 process = nxt_runtime_process_find(rt, pid); 941 942 if (process) { 943 init = process->init; 944 945 ptype = nxt_process_type(process); 946 947 if (process->ready && init != NULL) { 948 init->stream = 0; 949 } 950 951 nxt_process_close_ports(task, process); 952 953 if (!nxt_exiting) { 954 nxt_runtime_process_each(rt, process) { 955 956 if (process->pid == nxt_pid 957 || process->pid == pid 958 || nxt_queue_is_empty(&process->ports)) 959 { 960 continue; 961 } 962 963 port = nxt_process_port_first(process); 964 965 if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) { 966 continue; 967 } 968 969 buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 970 sizeof(pid)); 971 if (nxt_slow_path(buf == NULL)) { 972 continue; 973 } 974 975 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); 976 977 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, 978 -1, init->stream, 0, buf); 979 } nxt_runtime_process_loop; 980 } 981 982 if (nxt_exiting) { 983 984 if (rt->nprocesses == 2) { 985 nxt_runtime_quit(task, 0); 986 } 987 988 } else if (init != NULL) { 989 if (init->restart != NULL) { 990 if (init->type == NXT_PROCESS_ROUTER) { 991 nxt_main_stop_worker_processes(task, rt); 992 } 993 994 init->restart(task, rt, init); 995 996 } else { 997 nxt_free(init); 998 } 999 } 1000 } 1001 } 1002 1003 1004 static void 1005 nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) 1006 { 1007 nxt_port_t *port; 1008 nxt_process_t *process; 1009 1010 nxt_runtime_process_each(rt, process) { 1011 1012 nxt_process_port_each(process, port) { 1013 1014 if (port->type == NXT_PROCESS_WORKER) { 1015 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, 1016 -1, 0, 0, NULL); 1017 } 1018 1019 } nxt_process_port_loop; 1020 1021 } nxt_runtime_process_loop; 1022 } 1023 1024 1025 static void 1026 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1027 { 1028 size_t size; 1029 nxt_int_t ret; 1030 nxt_buf_t *b, *out; 1031 nxt_port_t *port; 1032 nxt_sockaddr_t *sa; 1033 nxt_port_msg_type_t type; 1034 nxt_listening_socket_t ls; 1035 u_char message[2048]; 1036 1037 b = msg->buf; 1038 sa = (nxt_sockaddr_t *) b->mem.pos; 1039 1040 /* TODO check b size and make plain */ 1041 1042 out = NULL; 1043 1044 ls.socket = -1; 1045 ls.error = NXT_SOCKET_ERROR_SYSTEM; 1046 ls.start = message; 1047 ls.end = message + sizeof(message); 1048 1049 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1050 msg->port_msg.reply_port); 1051 1052 nxt_debug(task, "listening socket \"%*s\"", 1053 (size_t) sa->length, nxt_sockaddr_start(sa)); 1054 1055 ret = nxt_main_listening_socket(sa, &ls); 1056 1057 if (ret == NXT_OK) { 1058 nxt_debug(task, "socket(\"%*s\"): %d", 1059 (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket); 1060 1061 type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD; 1062 1063 } else { 1064 size = ls.end - ls.start; 1065 1066 nxt_alert(task, "%*s", size, ls.start); 1067 1068 out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 1069 size + 1); 1070 if (nxt_slow_path(out == NULL)) { 1071 return; 1072 } 1073 1074 *out->mem.free++ = (uint8_t) ls.error; 1075 1076 out->mem.free = nxt_cpymem(out->mem.free, ls.start, size); 1077 1078 type = NXT_PORT_MSG_RPC_ERROR; 1079 } 1080 1081 nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream, 1082 0, out); 1083 } 1084 1085 1086 static nxt_int_t 1087 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls) 1088 { 1089 nxt_err_t err; 1090 nxt_socket_t s; 1091 1092 const socklen_t length = sizeof(int); 1093 static const int enable = 1; 1094 1095 s = socket(sa->u.sockaddr.sa_family, sa->type, 0); 1096 1097 if (nxt_slow_path(s == -1)) { 1098 err = nxt_errno; 1099 1100 #if (NXT_INET6) 1101 1102 if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) { 1103 ls->error = NXT_SOCKET_ERROR_NOINET6; 1104 } 1105 1106 #endif 1107 1108 ls->end = nxt_sprintf(ls->start, ls->end, 1109 "socket(\\\"%*s\\\") failed %E", 1110 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1111 1112 return NXT_ERROR; 1113 } 1114 1115 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) { 1116 ls->end = nxt_sprintf(ls->start, ls->end, 1117 "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E", 1118 (size_t) sa->length, nxt_sockaddr_start(sa), 1119 nxt_errno); 1120 goto fail; 1121 } 1122 1123 #if (NXT_INET6) 1124 1125 if (sa->u.sockaddr.sa_family == AF_INET6) { 1126 1127 if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) { 1128 ls->end = nxt_sprintf(ls->start, ls->end, 1129 "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E", 1130 (size_t) sa->length, nxt_sockaddr_start(sa), 1131 nxt_errno); 1132 goto fail; 1133 } 1134 } 1135 1136 #endif 1137 1138 if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) { 1139 err = nxt_errno; 1140 1141 #if (NXT_HAVE_UNIX_DOMAIN) 1142 1143 if (sa->u.sockaddr.sa_family == AF_UNIX) { 1144 switch (err) { 1145 1146 case EACCES: 1147 ls->error = NXT_SOCKET_ERROR_ACCESS; 1148 break; 1149 1150 case ENOENT: 1151 case ENOTDIR: 1152 ls->error = NXT_SOCKET_ERROR_PATH; 1153 break; 1154 } 1155 1156 } else 1157 #endif 1158 { 1159 switch (err) { 1160 1161 case EACCES: 1162 ls->error = NXT_SOCKET_ERROR_PORT; 1163 break; 1164 1165 case EADDRINUSE: 1166 ls->error = NXT_SOCKET_ERROR_INUSE; 1167 break; 1168 1169 case EADDRNOTAVAIL: 1170 ls->error = NXT_SOCKET_ERROR_NOADDR; 1171 break; 1172 } 1173 } 1174 1175 ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E", 1176 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1177 goto fail; 1178 } 1179 1180 #if (NXT_HAVE_UNIX_DOMAIN) 1181 1182 if (sa->u.sockaddr.sa_family == AF_UNIX) { 1183 char *filename; 1184 mode_t access; 1185 1186 filename = sa->u.sockaddr_un.sun_path; 1187 access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); 1188 1189 if (chmod(filename, access) != 0) { 1190 ls->end = nxt_sprintf(ls->start, ls->end, 1191 "chmod(\\\"%s\\\") failed %E", 1192 filename, nxt_errno); 1193 goto fail; 1194 } 1195 } 1196 1197 #endif 1198 1199 ls->socket = s; 1200 1201 return NXT_OK; 1202 1203 fail: 1204 1205 (void) close(s); 1206 1207 return NXT_ERROR; 1208 } 1209 1210 1211 static nxt_conf_map_t nxt_app_lang_module_map[] = { 1212 { 1213 nxt_string("type"), 1214 NXT_CONF_MAP_INT, 1215 offsetof(nxt_app_lang_module_t, type), 1216 }, 1217 1218 { 1219 nxt_string("version"), 1220 NXT_CONF_MAP_CSTRZ, 1221 offsetof(nxt_app_lang_module_t, version), 1222 }, 1223 1224 { 1225 nxt_string("file"), 1226 NXT_CONF_MAP_CSTRZ, 1227 offsetof(nxt_app_lang_module_t, file), 1228 }, 1229 }; 1230 1231 1232 static void 1233 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1234 { 1235 uint32_t index; 1236 nxt_mp_t *mp; 1237 nxt_int_t ret; 1238 nxt_buf_t *b; 1239 nxt_port_t *port; 1240 nxt_runtime_t *rt; 1241 nxt_conf_value_t *conf, *root, *value; 1242 nxt_app_lang_module_t *lang; 1243 1244 static nxt_str_t root_path = nxt_string("/"); 1245 1246 rt = task->thread->runtime; 1247 1248 if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) { 1249 return; 1250 } 1251 1252 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1253 msg->port_msg.reply_port); 1254 1255 if (nxt_fast_path(port != NULL)) { 1256 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 1257 msg->port_msg.stream, 0, NULL); 1258 } 1259 1260 b = msg->buf; 1261 1262 if (b == NULL) { 1263 return; 1264 } 1265 1266 mp = nxt_mp_create(1024, 128, 256, 32); 1267 if (mp == NULL) { 1268 return; 1269 } 1270 1271 b = nxt_buf_chk_make_plain(mp, b, msg->size); 1272 1273 if (b == NULL) { 1274 return; 1275 } 1276 1277 nxt_debug(task, "application languages: \"%*s\"", 1278 b->mem.free - b->mem.pos, b->mem.pos); 1279 1280 conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL); 1281 if (conf == NULL) { 1282 goto fail; 1283 } 1284 1285 root = nxt_conf_get_path(conf, &root_path); 1286 if (root == NULL) { 1287 goto fail; 1288 } 1289 1290 for (index = 0; /* void */ ; index++) { 1291 value = nxt_conf_get_array_element(root, index); 1292 if (value == NULL) { 1293 break; 1294 } 1295 1296 lang = nxt_array_add(rt->languages); 1297 if (lang == NULL) { 1298 goto fail; 1299 } 1300 1301 lang->module = NULL; 1302 1303 ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map, 1304 nxt_nitems(nxt_app_lang_module_map), lang); 1305 1306 if (ret != NXT_OK) { 1307 goto fail; 1308 } 1309 1310 nxt_debug(task, "lang %d %s \"%s\"", 1311 lang->type, lang->version, lang->file); 1312 } 1313 1314 qsort(rt->languages->elts, rt->languages->nelts, 1315 sizeof(nxt_app_lang_module_t), nxt_app_lang_compare); 1316 1317 fail: 1318 1319 nxt_mp_destroy(mp); 1320 1321 ret = nxt_main_start_controller_process(task, rt); 1322 1323 if (ret == NXT_OK) { 1324 (void) nxt_main_start_router_process(task, rt); 1325 } 1326 } 1327 1328 1329 static int nxt_cdecl 1330 nxt_app_lang_compare(const void *v1, const void *v2) 1331 { 1332 int n; 1333 const nxt_app_lang_module_t *lang1, *lang2; 1334 1335 lang1 = v1; 1336 lang2 = v2; 1337 1338 n = lang1->type - lang2->type; 1339 1340 if (n != 0) { 1341 return n; 1342 } 1343 1344 n = nxt_strverscmp(lang1->version, lang2->version); 1345 1346 /* Negate result to move higher versions to the beginning. */ 1347 1348 return -n; 1349 } 1350 1351 1352 static void 1353 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1354 { 1355 ssize_t n, size, offset; 1356 nxt_buf_t *b; 1357 nxt_int_t ret; 1358 nxt_file_t file; 1359 nxt_runtime_t *rt; 1360 1361 nxt_memzero(&file, sizeof(nxt_file_t)); 1362 1363 rt = task->thread->runtime; 1364 1365 file.name = (nxt_file_name_t *) rt->conf_tmp; 1366 1367 if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY, 1368 NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS) 1369 != NXT_OK)) 1370 { 1371 goto error; 1372 } 1373 1374 offset = 0; 1375 1376 for (b = msg->buf; b != NULL; b = b->next) { 1377 size = nxt_buf_mem_used_size(&b->mem); 1378 1379 n = nxt_file_write(&file, b->mem.pos, size, offset); 1380 1381 if (nxt_slow_path(n != size)) { 1382 nxt_file_close(task, &file); 1383 (void) nxt_file_delete(file.name); 1384 goto error; 1385 } 1386 1387 offset += n; 1388 } 1389 1390 nxt_file_close(task, &file); 1391 1392 ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf); 1393 1394 if (nxt_fast_path(ret == NXT_OK)) { 1395 return; 1396 } 1397 1398 error: 1399 1400 nxt_alert(task, "failed to store current configuration"); 1401 } 1402 1403 1404 static void 1405 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1406 { 1407 u_char *path; 1408 nxt_int_t ret; 1409 nxt_file_t file; 1410 nxt_port_t *port; 1411 nxt_port_msg_type_t type; 1412 1413 nxt_debug(task, "opening access log file"); 1414 1415 path = msg->buf->mem.pos; 1416 1417 nxt_memzero(&file, sizeof(nxt_file_t)); 1418 1419 file.name = (nxt_file_name_t *) path; 1420 file.log_level = NXT_LOG_ERR; 1421 1422 ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT, 1423 NXT_FILE_OWNER_ACCESS); 1424 1425 type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD 1426 : NXT_PORT_MSG_RPC_ERROR; 1427 1428 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1429 msg->port_msg.reply_port); 1430 1431 if (nxt_fast_path(port != NULL)) { 1432 (void) nxt_port_socket_write(task, port, type, file.fd, 1433 msg->port_msg.stream, 0, NULL); 1434 } 1435 } 1436