94 lang->version = (u_char *) ""; 95 lang->file = NULL; 96 lang->module = &nxt_go_module; 97 98 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); 99 if (nxt_slow_path(listen_sockets == NULL)) { 100 goto fail; 101 } 102 103 rt->listen_sockets = listen_sockets; 104 105 ret = nxt_runtime_inherited_listen_sockets(task, rt); 106 if (nxt_slow_path(ret != NXT_OK)) { 107 goto fail; 108 } 109 110 if (nxt_runtime_hostname(task, rt) != NXT_OK) { 111 goto fail; 112 } 113 114 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { 115 goto fail; 116 } 117 118 if (nxt_runtime_event_engines(task, rt) != NXT_OK) { 119 goto fail; 120 } 121 122 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { 123 goto fail; 124 } 125 126 rt->start = nxt_runtime_initial_start; 127 128 if (nxt_runtime_conf_init(task, rt) != NXT_OK) { 129 goto fail; 130 } 131 132 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 133 nxt_runtime_start, task, rt, NULL); 134 135 return NXT_OK; 136 137fail: 138 139 nxt_mp_destroy(mp); 140 141 return NXT_ERROR; 142} 143 144 145static nxt_int_t 146nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 147{ 148 u_char *v, *p; 149 nxt_int_t type; 150 nxt_array_t *inherited_sockets; 151 nxt_socket_t s; 152 nxt_listen_socket_t *ls; 153 154 v = (u_char *) getenv("NGINX"); 155 156 if (v == NULL) { 157 return nxt_runtime_systemd_listen_sockets(task, rt); 158 } 159 160 nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v); 161 162 inherited_sockets = nxt_array_create(rt->mem_pool, 163 1, sizeof(nxt_listen_socket_t)); 164 if (inherited_sockets == NULL) { 165 return NXT_ERROR; 166 } 167 168 rt->inherited_sockets = inherited_sockets; 169 170 for (p = v; *p != '\0'; p++) { 171 172 if (*p == ';') { 173 s = nxt_int_parse(v, p - v); 174 175 if (nxt_slow_path(s < 0)) { 176 nxt_log(task, NXT_LOG_CRIT, "invalid socket number " 177 "\"%s\" in NGINX environment variable, " 178 "ignoring the rest of the variable", v); 179 return NXT_ERROR; 180 } 181 182 v = p + 1; 183 184 ls = nxt_array_zero_add(inherited_sockets); 185 if (nxt_slow_path(ls == NULL)) { 186 return NXT_ERROR; 187 } 188 189 ls->socket = s; 190 191 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 192 if (nxt_slow_path(ls->sockaddr == NULL)) { 193 return NXT_ERROR; 194 } 195 196 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); 197 if (nxt_slow_path(type == -1)) { 198 return NXT_ERROR; 199 } 200 201 ls->sockaddr->type = (uint16_t) type; 202 } 203 } 204 205 return NXT_OK; 206} 207 208 209static nxt_int_t 210nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 211{ 212 u_char *nfd, *pid; 213 nxt_int_t n; 214 nxt_array_t *inherited_sockets; 215 nxt_socket_t s; 216 nxt_listen_socket_t *ls; 217 218 /* 219 * Number of listening sockets passed. The socket 220 * descriptors start from number 3 and are sequential. 221 */ 222 nfd = (u_char *) getenv("LISTEN_FDS"); 223 if (nfd == NULL) { 224 return NXT_OK; 225 } 226 227 /* The pid of the service process. */ 228 pid = (u_char *) getenv("LISTEN_PID"); 229 if (pid == NULL) { 230 return NXT_OK; 231 } 232 233 n = nxt_int_parse(nfd, nxt_strlen(nfd)); 234 if (n < 0) { 235 return NXT_OK; 236 } 237 238 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { 239 return NXT_OK; 240 } 241 242 nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n); 243 244 inherited_sockets = nxt_array_create(rt->mem_pool, 245 n, sizeof(nxt_listen_socket_t)); 246 if (inherited_sockets == NULL) { 247 return NXT_ERROR; 248 } 249 250 rt->inherited_sockets = inherited_sockets; 251 252 for (s = 3; s < n; s++) { 253 ls = nxt_array_zero_add(inherited_sockets); 254 if (nxt_slow_path(ls == NULL)) { 255 return NXT_ERROR; 256 } 257 258 ls->socket = s; 259 260 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 261 if (nxt_slow_path(ls->sockaddr == NULL)) { 262 return NXT_ERROR; 263 } 264 265 ls->sockaddr->type = SOCK_STREAM; 266 } 267 268 return NXT_OK; 269} 270 271 272static nxt_int_t 273nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) 274{ 275 nxt_thread_t *thread; 276 nxt_event_engine_t *engine; 277 const nxt_event_interface_t *interface; 278 279 interface = nxt_service_get(rt->services, "engine", NULL); 280 281 if (nxt_slow_path(interface == NULL)) { 282 /* TODO: log */ 283 return NXT_ERROR; 284 } 285 286 engine = nxt_event_engine_create(task, interface, 287 nxt_main_process_signals, 0, 0); 288 289 if (nxt_slow_path(engine == NULL)) { 290 return NXT_ERROR; 291 } 292 293 thread = task->thread; 294 thread->engine = engine; 295#if 0 296 thread->fiber = &engine->fibers->fiber; 297#endif 298 299 engine->id = rt->last_engine_id++; 300 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32); 301 302 nxt_queue_init(&rt->engines); 303 nxt_queue_insert_tail(&rt->engines, &engine->link); 304 305 return NXT_OK; 306} 307 308 309static nxt_int_t 310nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) 311{ 312 nxt_int_t ret; 313 nxt_array_t *thread_pools; 314 315 thread_pools = nxt_array_create(rt->mem_pool, 1, 316 sizeof(nxt_thread_pool_t *)); 317 318 if (nxt_slow_path(thread_pools == NULL)) { 319 return NXT_ERROR; 320 } 321 322 rt->thread_pools = thread_pools; 323 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); 324 325 if (nxt_slow_path(ret != NXT_OK)) { 326 return NXT_ERROR; 327 } 328 329 return NXT_OK; 330} 331 332 333static void 334nxt_runtime_start(nxt_task_t *task, void *obj, void *data) 335{ 336 nxt_runtime_t *rt; 337 338 rt = obj; 339 340 nxt_debug(task, "rt conf done"); 341 342 task->thread->log->ctx_handler = NULL; 343 task->thread->log->ctx = NULL; 344 345 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { 346 goto fail; 347 } 348 349 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 350 goto fail; 351 } 352 353 /* 354 * Thread pools should be destroyed before starting worker 355 * processes, because thread pool semaphores will stick in 356 * locked state in new processes after fork(). 357 */ 358 nxt_runtime_thread_pool_destroy(task, rt, rt->start); 359 360 return; 361 362fail: 363 364 nxt_runtime_quit(task); 365} 366 367 368static void 369nxt_runtime_initial_start(nxt_task_t *task) 370{ 371 nxt_int_t ret; 372 nxt_thread_t *thr; 373 nxt_runtime_t *rt; 374 const nxt_event_interface_t *interface; 375 376 thr = task->thread; 377 rt = thr->runtime; 378 379 if (rt->inherited_sockets == NULL && rt->daemon) { 380 381 if (nxt_process_daemon(task) != NXT_OK) { 382 goto fail; 383 } 384 385 /* 386 * An event engine should be updated after fork() 387 * even if an event facility was not changed because: 388 * 1) inherited kqueue descriptor is invalid, 389 * 2) the signal thread is not inherited. 390 */ 391 interface = nxt_service_get(rt->services, "engine", rt->engine); 392 if (interface == NULL) { 393 goto fail; 394 } 395 396 ret = nxt_event_engine_change(task->thread->engine, interface, 397 rt->batch); 398 if (ret != NXT_OK) { 399 goto fail; 400 } 401 } 402 403 ret = nxt_runtime_pid_file_create(task, rt->pid_file); 404 if (ret != NXT_OK) { 405 goto fail; 406 } 407 408 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 409 goto fail; 410 } 411 412 thr->engine->max_connections = rt->engine_connections; 413 414 if (rt->main_process) { 415 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) { 416 return; 417 } 418 419 } else { 420 nxt_single_process_start(thr, task, rt); 421 return; 422 } 423 424fail: 425 426 nxt_runtime_quit(task); 427} 428 429 430static void 431nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt) 432{ 433 nxt_int_t ret; 434 435 ret = nxt_runtime_thread_pool_create(thr, rt, rt->auxiliary_threads, 436 60000 * 1000000LL); 437 438 if (nxt_slow_path(ret != NXT_OK)) { 439 nxt_runtime_quit(task); 440 return; 441 } 442 443 rt->types |= (1U << NXT_PROCESS_SINGLE); 444 445 nxt_runtime_listen_sockets_enable(task, rt); 446 447 return; 448} 449 450 451void 452nxt_runtime_quit(nxt_task_t *task) 453{ 454 nxt_bool_t done; 455 nxt_runtime_t *rt; 456 nxt_event_engine_t *engine; 457 458 rt = task->thread->runtime; 459 engine = task->thread->engine; 460 461 nxt_debug(task, "exiting"); 462 463 done = 1; 464 465 if (!engine->shutdown) { 466 engine->shutdown = 1; 467 468 if (!nxt_array_is_empty(rt->thread_pools)) { 469 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); 470 done = 0; 471 } 472 473 if (nxt_runtime_is_main(rt)) { 474 nxt_main_stop_worker_processes(task, rt); 475 done = 0; 476 } 477 } 478 479 nxt_runtime_close_idle_connections(engine); 480 481 if (done) { 482 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, 483 task, rt, engine); 484 } 485} 486 487 488static void 489nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) 490{ 491 nxt_conn_t *c; 492 nxt_queue_t *idle; 493 nxt_queue_link_t *link, *next; 494 495 nxt_debug(&engine->task, "close idle connections"); 496 497 idle = &engine->idle_connections; 498 499 for (link = nxt_queue_head(idle); 500 link != nxt_queue_tail(idle); 501 link = next) 502 { 503 next = nxt_queue_next(link); 504 c = nxt_queue_link_data(link, nxt_conn_t, link); 505 506 if (!c->socket.read_ready) { 507 nxt_queue_remove(link); 508 nxt_conn_close(engine, c); 509 } 510 } 511} 512 513 514static void 515nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) 516{ 517 nxt_runtime_t *rt; 518 nxt_process_t *process; 519 nxt_event_engine_t *engine; 520 521 rt = obj; 522 engine = data; 523 524 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); 525 526 if (!nxt_array_is_empty(rt->thread_pools)) { 527 return; 528 } 529 530 if (nxt_runtime_is_main(rt)) { 531 if (rt->pid_file != NULL) { 532 nxt_file_delete(rt->pid_file); 533 } 534 535#if (NXT_HAVE_UNIX_DOMAIN) 536 { 537 nxt_sockaddr_t *sa; 538 nxt_file_name_t *name; 539 540 sa = rt->controller_listen; 541 542 if (sa->u.sockaddr.sa_family == AF_UNIX) { 543 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path; 544 (void) nxt_file_delete(name); 545 } 546 } 547#endif 548 } 549 550 if (!engine->event.signal_support) { 551 nxt_event_engine_signals_stop(engine); 552 } 553 554 nxt_runtime_process_each(rt, process) { 555 556 nxt_process_close_ports(task, process); 557 558 } nxt_runtime_process_loop; 559 560 nxt_thread_mutex_destroy(&rt->processes_mutex); 561 562 nxt_mp_destroy(rt->mem_pool); 563 564 nxt_debug(task, "exit"); 565 566 exit(0); 567 nxt_unreachable(); 568} 569 570 571static nxt_int_t 572nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) 573{ 574 nxt_event_engine_t *engine; 575 const nxt_event_interface_t *interface; 576 577 engine = task->thread->engine; 578 579 if (engine->batch == rt->batch 580 && nxt_strcmp(engine->event.name, rt->engine) == 0) 581 { 582 return NXT_OK; 583 } 584 585 interface = nxt_service_get(rt->services, "engine", rt->engine); 586 587 if (interface != NULL) { 588 return nxt_event_engine_change(engine, interface, rt->batch); 589 } 590 591 return NXT_ERROR; 592} 593 594 595void 596nxt_runtime_event_engine_free(nxt_runtime_t *rt) 597{ 598 nxt_queue_link_t *link; 599 nxt_event_engine_t *engine; 600 601 link = nxt_queue_first(&rt->engines); 602 nxt_queue_remove(link); 603 604 engine = nxt_queue_link_data(link, nxt_event_engine_t, link); 605 nxt_event_engine_free(engine); 606} 607 608 609nxt_int_t 610nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, 611 nxt_uint_t max_threads, nxt_nsec_t timeout) 612{ 613 nxt_thread_pool_t *thread_pool, **tp; 614 615 tp = nxt_array_add(rt->thread_pools); 616 if (tp == NULL) { 617 return NXT_ERROR; 618 } 619 620 thread_pool = nxt_thread_pool_create(max_threads, timeout, 621 nxt_runtime_thread_pool_init, 622 thr->engine, 623 nxt_runtime_thread_pool_exit); 624 625 if (nxt_fast_path(thread_pool != NULL)) { 626 *tp = thread_pool; 627 } 628 629 return NXT_OK; 630} 631 632 633static void 634nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 635 nxt_runtime_cont_t cont) 636{ 637 nxt_uint_t n; 638 nxt_thread_pool_t **tp; 639 640 rt->continuation = cont; 641 642 n = rt->thread_pools->nelts; 643 644 if (n == 0) { 645 cont(task); 646 return; 647 } 648 649 tp = rt->thread_pools->elts; 650 651 do { 652 nxt_thread_pool_destroy(*tp); 653 654 tp++; 655 n--; 656 } while (n != 0); 657} 658 659 660static void 661nxt_runtime_thread_pool_init(void) 662{ 663#if (NXT_REGEX) 664 nxt_regex_init(0); 665#endif 666} 667 668 669static void 670nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 671{ 672 nxt_uint_t i, n; 673 nxt_runtime_t *rt; 674 nxt_thread_pool_t *tp, **thread_pools; 675 nxt_thread_handle_t handle; 676 677 tp = obj; 678 679 if (data != NULL) { 680 handle = (nxt_thread_handle_t) (uintptr_t) data; 681 nxt_thread_wait(handle); 682 } 683 684 rt = task->thread->runtime; 685 686 thread_pools = rt->thread_pools->elts; 687 n = rt->thread_pools->nelts; 688 689 nxt_debug(task, "thread pools: %ui", n); 690 691 for (i = 0; i < n; i++) { 692 693 if (tp == thread_pools[i]) { 694 nxt_array_remove(rt->thread_pools, &thread_pools[i]); 695 696 if (n == 1) { 697 /* The last thread pool. */ 698 rt->continuation(task); 699 } 700 701 return; 702 } 703 } 704} 705 706 707static nxt_int_t 708nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) 709{ 710 nxt_int_t ret; 711 nxt_str_t control; 712 nxt_uint_t n; 713 nxt_file_t *file; 714 const char *slash; 715 nxt_sockaddr_t *sa; 716 nxt_file_name_str_t file_name; 717 const nxt_event_interface_t *interface; 718 719 rt->daemon = 1; 720 rt->main_process = 1; 721 rt->engine_connections = 256; 722 rt->auxiliary_threads = 2; 723 rt->user_cred.user = NXT_USER; 724 rt->group = NXT_GROUP; 725 rt->pid = NXT_PID; 726 rt->log = NXT_LOG; 727 rt->modules = NXT_MODULES; 728 rt->state = NXT_STATE; 729 rt->control = NXT_CONTROL_SOCK; 730 731 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { 732 return NXT_ERROR; 733 } 734 735 if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) { 736 return NXT_ERROR; 737 } 738 739 /* An engine's parameters. */ 740 741 interface = nxt_service_get(rt->services, "engine", rt->engine); 742 if (interface == NULL) { 743 return NXT_ERROR; 744 } 745 746 rt->engine = interface->name; 747 748 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid); 749 if (nxt_slow_path(ret != NXT_OK)) { 750 return NXT_ERROR; 751 } 752 753 rt->pid_file = file_name.start; 754 755 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log); 756 if (nxt_slow_path(ret != NXT_OK)) { 757 return NXT_ERROR; 758 } 759 760 file = nxt_list_first(rt->log_files); 761 file->name = file_name.start; 762 763 slash = ""; 764 n = nxt_strlen(rt->modules); 765 766 if (n > 1 && rt->modules[n - 1] != '/') { 767 slash = "/"; 768 } 769 770 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z", 771 rt->modules, slash); 772 if (nxt_slow_path(ret != NXT_OK)) { 773 return NXT_ERROR; 774 } 775 776 rt->modules = (char *) file_name.start; 777 778 slash = ""; 779 n = nxt_strlen(rt->state); 780 781 if (n > 1 && rt->state[n - 1] != '/') { 782 slash = "/"; 783 } 784 785 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z", 786 rt->state, slash); 787 if (nxt_slow_path(ret != NXT_OK)) { 788 return NXT_ERROR; 789 } 790 791 rt->conf = (char *) file_name.start; 792 793 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf); 794 if (nxt_slow_path(ret != NXT_OK)) { 795 return NXT_ERROR; 796 } 797 798 rt->conf_tmp = (char *) file_name.start; 799 800 control.length = nxt_strlen(rt->control); 801 control.start = (u_char *) rt->control; 802 803 sa = nxt_runtime_sockaddr_parse(task, rt->mem_pool, &control); 804 if (nxt_slow_path(sa == NULL)) { 805 return NXT_ERROR; 806 } 807 808 rt->controller_listen = sa; 809 810 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { 811 return NXT_ERROR; 812 } 813 814 return NXT_OK; 815} 816 817 818static nxt_int_t 819nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) 820{ 821 char *p, **argv; 822 u_char *end; 823 u_char buf[1024]; 824 825 static const char version[] = 826 "unit version: " NXT_VERSION "\n" 827 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n"; 828 829 static const char no_control[] = 830 "option \"--control\" requires socket address\n"; 831 static const char no_user[] = "option \"--user\" requires username\n"; 832 static const char no_group[] = "option \"--group\" requires group name\n"; 833 static const char no_pid[] = "option \"--pid\" requires filename\n"; 834 static const char no_log[] = "option \"--log\" requires filename\n"; 835 static const char no_modules[] = 836 "option \"--modules\" requires directory\n"; 837 static const char no_state[] = "option \"--state\" requires directory\n"; 838 839 static const char help[] = 840 "\n" 841 "unit options:\n" 842 "\n" 843 " --version print unit version and configure options\n" 844 "\n" 845 " --no-daemon run unit in non-daemon mode\n" 846 "\n" 847 " --control ADDRESS set address of control API socket\n" 848 " default: \"" NXT_CONTROL_SOCK "\"\n" 849 "\n" 850 " --pid FILE set pid filename\n" 851 " default: \"" NXT_PID "\"\n" 852 "\n" 853 " --log FILE set log filename\n" 854 " default: \"" NXT_LOG "\"\n" 855 "\n" 856 " --modules DIRECTORY set modules directory name\n" 857 " default: \"" NXT_MODULES "\"\n" 858 "\n" 859 " --state DIRECTORY set state directory name\n" 860 " default: \"" NXT_STATE "\"\n" 861 "\n" 862 " --user USER set non-privileged processes to run" 863 " as specified user\n" 864 " default: \"" NXT_USER "\"\n" 865 "\n" 866 " --group GROUP set non-privileged processes to run" 867 " as specified group\n" 868 " default: "; 869 870 static const char group[] = "\"" NXT_GROUP "\"\n\n"; 871 static const char primary[] = "user's primary group\n\n"; 872 873 argv = &nxt_process_argv[1]; 874 875 while (*argv != NULL) { 876 p = *argv++; 877 878 if (nxt_strcmp(p, "--control") == 0) { 879 if (*argv == NULL) { 880 write(STDERR_FILENO, no_control, sizeof(no_control) - 1); 881 return NXT_ERROR; 882 } 883 884 p = *argv++; 885 886 rt->control = p; 887 888 continue; 889 } 890 891 if (nxt_strcmp(p, "--upstream") == 0) { 892 if (*argv == NULL) { 893 nxt_log(task, NXT_LOG_CRIT, 894 "no argument for option \"--upstream\""); 895 return NXT_ERROR; 896 } 897 898 p = *argv++; 899 900 rt->upstream.length = nxt_strlen(p); 901 rt->upstream.start = (u_char *) p; 902 903 continue; 904 } 905 906 if (nxt_strcmp(p, "--user") == 0) { 907 if (*argv == NULL) { 908 write(STDERR_FILENO, no_user, sizeof(no_user) - 1); 909 return NXT_ERROR; 910 } 911 912 p = *argv++; 913 914 rt->user_cred.user = p; 915 916 continue; 917 } 918 919 if (nxt_strcmp(p, "--group") == 0) { 920 if (*argv == NULL) { 921 write(STDERR_FILENO, no_group, sizeof(no_group) - 1); 922 return NXT_ERROR; 923 } 924 925 p = *argv++; 926 927 rt->group = p; 928 929 continue; 930 } 931 932 if (nxt_strcmp(p, "--pid") == 0) { 933 if (*argv == NULL) { 934 write(STDERR_FILENO, no_pid, sizeof(no_pid) - 1); 935 return NXT_ERROR; 936 } 937 938 p = *argv++; 939 940 rt->pid = p; 941 942 continue; 943 } 944 945 if (nxt_strcmp(p, "--log") == 0) { 946 if (*argv == NULL) { 947 write(STDERR_FILENO, no_log, sizeof(no_log) - 1); 948 return NXT_ERROR; 949 } 950 951 p = *argv++; 952 953 rt->log = p; 954 955 continue; 956 } 957 958 if (nxt_strcmp(p, "--modules") == 0) { 959 if (*argv == NULL) { 960 write(STDERR_FILENO, no_modules, sizeof(no_modules) - 1); 961 return NXT_ERROR; 962 } 963 964 p = *argv++; 965 966 rt->modules = p; 967 968 continue; 969 } 970 971 if (nxt_strcmp(p, "--state") == 0) { 972 if (*argv == NULL) { 973 write(STDERR_FILENO, no_state, sizeof(no_state) - 1); 974 return NXT_ERROR; 975 } 976 977 p = *argv++; 978 979 rt->state = p; 980 981 continue; 982 } 983 984 if (nxt_strcmp(p, "--no-daemon") == 0) { 985 rt->daemon = 0; 986 continue; 987 } 988 989 if (nxt_strcmp(p, "--version") == 0) { 990 write(STDERR_FILENO, version, sizeof(version) - 1); 991 exit(0); 992 } 993 994 if (nxt_strcmp(p, "--help") == 0) { 995 write(STDOUT_FILENO, help, sizeof(help) - 1); 996 997 if (sizeof(NXT_GROUP) == 1) { 998 write(STDOUT_FILENO, primary, sizeof(primary) - 1); 999 1000 } else { 1001 write(STDOUT_FILENO, group, sizeof(group) - 1); 1002 } 1003 1004 exit(0); 1005 } 1006 1007 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\"\n", p); 1008 write(STDERR_FILENO, buf, end - buf); 1009 1010 return NXT_ERROR; 1011 } 1012 1013 return NXT_OK; 1014} 1015 1016 1017static nxt_sockaddr_t * 1018nxt_runtime_sockaddr_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 1019{ 1020 u_char *p; 1021 size_t length; 1022 1023 length = addr->length; 1024 p = addr->start; 1025 1026 if (length >= 5 && nxt_memcmp(p, "unix:", 5) == 0) { 1027 return nxt_runtime_sockaddr_unix_parse(task, mp, addr); 1028 } 1029 1030 if (length != 0 && *p == '[') { 1031 return nxt_runtime_sockaddr_inet6_parse(task, mp, addr); 1032 } 1033 1034 return nxt_runtime_sockaddr_inet_parse(task, mp, addr); 1035} 1036 1037 1038static nxt_sockaddr_t * 1039nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 1040{ 1041#if (NXT_HAVE_UNIX_DOMAIN) 1042 u_char *p; 1043 size_t length, socklen; 1044 nxt_sockaddr_t *sa; 1045 1046 /* 1047 * Actual sockaddr_un length can be lesser or even larger than defined 1048 * struct sockaddr_un length (see comment in nxt_socket.h). So 1049 * limit maximum Unix domain socket address length by defined sun_path[] 1050 * length because some OSes accept addresses twice larger than defined 1051 * struct sockaddr_un. Also reserve space for a trailing zero to avoid 1052 * ambiguity, since many OSes accept Unix domain socket addresses 1053 * without a trailing zero. 1054 */ 1055 const size_t max_len = sizeof(struct sockaddr_un) 1056 - offsetof(struct sockaddr_un, sun_path) - 1; 1057 1058 /* cutting "unix:" */ 1059 length = addr->length - 5; 1060 p = addr->start + 5; 1061 1062 if (length == 0) { 1063 nxt_log(task, NXT_LOG_CRIT, 1064 "unix domain socket \"%V\" name is invalid", addr); 1065 return NULL; 1066 } 1067 1068 if (length > max_len) { 1069 nxt_log(task, NXT_LOG_CRIT, 1070 "unix domain socket \"%V\" name is too long", addr); 1071 return NULL; 1072 } 1073 1074 socklen = offsetof(struct sockaddr_un, sun_path) + length + 1; 1075 1076#if (NXT_LINUX) 1077 1078 /* 1079 * Linux unix(7): 1080 * 1081 * abstract: an abstract socket address is distinguished by the fact 1082 * that sun_path[0] is a null byte ('\0'). The socket's address in 1083 * this namespace is given by the additional bytes in sun_path that 1084 * are covered by the specified length of the address structure. 1085 * (Null bytes in the name have no special significance.) 1086 */ 1087 if (p[0] == '@') { 1088 p[0] = '\0'; 1089 socklen--; 1090 } 1091 1092#endif 1093 1094 sa = nxt_sockaddr_alloc(mp, socklen, addr->length); 1095 1096 if (nxt_slow_path(sa == NULL)) { 1097 return NULL; 1098 } 1099 1100 sa->type = SOCK_STREAM; 1101 1102 sa->u.sockaddr_un.sun_family = AF_UNIX; 1103 nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length); 1104 1105 return sa; 1106 1107#else /* !(NXT_HAVE_UNIX_DOMAIN) */ 1108 1109 nxt_log(task, NXT_LOG_CRIT, "unix domain socket \"%V\" is not supported", 1110 addr); 1111 1112 return NULL; 1113 1114#endif 1115} 1116 1117 1118static nxt_sockaddr_t * 1119nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, nxt_mp_t *mp, 1120 nxt_str_t *addr) 1121{ 1122#if (NXT_INET6) 1123 u_char *p, *addr_end; 1124 size_t length; 1125 nxt_int_t port; 1126 nxt_sockaddr_t *sa; 1127 struct in6_addr *in6_addr; 1128 1129 length = addr->length - 1; 1130 p = addr->start + 1; 1131 1132 addr_end = nxt_memchr(p, ']', length); 1133 1134 if (addr_end == NULL) { 1135 goto invalid_address; 1136 } 1137 1138 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6), 1139 NXT_INET6_ADDR_STR_LEN); 1140 if (nxt_slow_path(sa == NULL)) { 1141 return NULL; 1142 } 1143 1144 in6_addr = &sa->u.sockaddr_in6.sin6_addr; 1145 1146 if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) { 1147 goto invalid_address; 1148 } 1149 1150 port = 0; 1151 p = addr_end + 1; 1152 length = (p + length) - p; 1153 1154 if (length == 0) { 1155 goto found; 1156 } 1157 1158 if (*p == ':') { 1159 port = nxt_int_parse(p + 1, length - 1); 1160 1161 if (port >= 1 && port <= 65535) { 1162 goto found; 1163 } 1164 } 1165 1166 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr); 1167 1168 return NULL; 1169 1170found: 1171 1172 sa->type = SOCK_STREAM; 1173 1174 sa->u.sockaddr_in6.sin6_family = AF_INET6; 1175 sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port); 1176 1177 return sa; 1178 1179invalid_address: 1180 1181 nxt_log(task, NXT_LOG_CRIT, "invalid IPv6 address in \"%V\"", addr); 1182 1183 return NULL; 1184 1185#else 1186 1187 nxt_log(task, NXT_LOG_CRIT, "IPv6 socket \"%V\" is not supported", addr); 1188 1189 return NULL; 1190 1191#endif 1192} 1193 1194 1195static nxt_sockaddr_t * 1196nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, nxt_mp_t *mp, 1197 nxt_str_t *string) 1198{ 1199 u_char *p, *ip; 1200 size_t length; 1201 in_addr_t addr; 1202 nxt_int_t port; 1203 nxt_sockaddr_t *sa; 1204 1205 addr = INADDR_ANY; 1206 1207 length = string->length; 1208 ip = string->start; 1209 1210 p = nxt_memchr(ip, ':', length); 1211 1212 if (p == NULL) { 1213 1214 /* single value port, or address */ 1215 1216 port = nxt_int_parse(ip, length); 1217 1218 if (port > 0) { 1219 /* "*:XX" */ 1220 1221 if (port < 1 || port > 65535) { 1222 goto invalid_port; 1223 } 1224 1225 } else { 1226 /* "x.x.x.x" */ 1227 1228 addr = nxt_inet_addr(ip, length); 1229 1230 if (addr == INADDR_NONE) { 1231 goto invalid_port; 1232 } 1233 1234 port = 8080; 1235 } 1236 1237 } else { 1238 1239 /* x.x.x.x:XX */ 1240 1241 p++; 1242 length = (ip + length) - p; 1243 port = nxt_int_parse(p, length); 1244 1245 if (port < 1 || port > 65535) { 1246 goto invalid_port; 1247 } 1248 1249 length = (p - 1) - ip; 1250 1251 if (length != 1 || ip[0] != '*') { 1252 addr = nxt_inet_addr(ip, length); 1253 1254 if (addr == INADDR_NONE) { 1255 goto invalid_addr; 1256 } 1257 1258 /* "x.x.x.x:XX" */ 1259 } 1260 } 1261 1262 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in), 1263 NXT_INET_ADDR_STR_LEN); 1264 if (nxt_slow_path(sa == NULL)) { 1265 return NULL; 1266 } 1267 1268 sa->type = SOCK_STREAM; 1269 1270 sa->u.sockaddr_in.sin_family = AF_INET; 1271 sa->u.sockaddr_in.sin_port = htons((in_port_t) port); 1272 sa->u.sockaddr_in.sin_addr.s_addr = addr; 1273 1274 return sa; 1275 1276invalid_port: 1277 1278 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", string); 1279 1280 return NULL; 1281 1282invalid_addr: 1283 1284 nxt_log(task, NXT_LOG_CRIT, "invalid address in \"%V\"", string); 1285 1286 return NULL; 1287} 1288 1289 1290nxt_listen_socket_t * 1291nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) 1292{ 1293 nxt_mp_t *mp; 1294 nxt_listen_socket_t *ls; 1295 1296 ls = nxt_array_zero_add(rt->listen_sockets); 1297 if (ls == NULL) { 1298 return NULL; 1299 } 1300 1301 mp = rt->mem_pool; 1302 1303 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, 1304 sa->length); 1305 if (ls->sockaddr == NULL) { 1306 return NULL; 1307 } 1308 1309 ls->sockaddr->type = sa->type; 1310 1311 nxt_sockaddr_text(ls->sockaddr); 1312 1313 ls->socket = -1; 1314 ls->backlog = NXT_LISTEN_BACKLOG; 1315 1316 return ls; 1317} 1318 1319 1320static nxt_int_t 1321nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) 1322{ 1323 size_t length; 1324 char hostname[NXT_MAXHOSTNAMELEN + 1]; 1325 1326 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { 1327 nxt_log(task, NXT_LOG_CRIT, "gethostname() failed %E", nxt_errno); 1328 return NXT_ERROR; 1329 } 1330 1331 /* 1332 * Linux gethostname(2): 1333 * 1334 * If the null-terminated hostname is too large to fit, 1335 * then the name is truncated, and no error is returned. 1336 * 1337 * For this reason an additional byte is reserved in the buffer. 1338 */ 1339 hostname[NXT_MAXHOSTNAMELEN] = '\0'; 1340 1341 length = nxt_strlen(hostname); 1342 rt->hostname.length = length; 1343 1344 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length); 1345 1346 if (rt->hostname.start != NULL) { 1347 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); 1348 return NXT_OK; 1349 } 1350 1351 return NXT_ERROR; 1352} 1353 1354 1355static nxt_int_t 1356nxt_runtime_log_files_init(nxt_runtime_t *rt) 1357{ 1358 nxt_file_t *file; 1359 nxt_list_t *log_files; 1360 1361 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); 1362 1363 if (nxt_fast_path(log_files != NULL)) { 1364 rt->log_files = log_files; 1365 1366 /* Preallocate the main log. This allocation cannot fail. */ 1367 file = nxt_list_zero_add(log_files); 1368 1369 file->fd = NXT_FILE_INVALID; 1370 file->log_level = NXT_LOG_CRIT; 1371 1372 return NXT_OK; 1373 } 1374 1375 return NXT_ERROR; 1376} 1377 1378 1379nxt_file_t * 1380nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) 1381{ 1382 nxt_int_t ret; 1383 nxt_file_t *file; 1384 nxt_file_name_str_t file_name; 1385 1386 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name); 1387 1388 if (nxt_slow_path(ret != NXT_OK)) { 1389 return NULL; 1390 } 1391 1392 nxt_list_each(file, rt->log_files) { 1393 1394 /* STUB: hardecoded case sensitive/insensitive. */ 1395 1396 if (file->name != NULL 1397 && nxt_file_name_eq(file->name, file_name.start)) 1398 { 1399 return file; 1400 } 1401 1402 } nxt_list_loop; 1403 1404 file = nxt_list_zero_add(rt->log_files); 1405 1406 if (nxt_slow_path(file == NULL)) { 1407 return NULL; 1408 } 1409 1410 file->fd = NXT_FILE_INVALID; 1411 file->log_level = NXT_LOG_CRIT; 1412 file->name = file_name.start; 1413 1414 return file; 1415} 1416 1417 1418static nxt_int_t 1419nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) 1420{ 1421 nxt_int_t ret; 1422 nxt_file_t *file; 1423 1424 nxt_list_each(file, rt->log_files) { 1425 1426 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, 1427 NXT_FILE_OWNER_ACCESS); 1428 1429 if (ret != NXT_OK) { 1430 return NXT_ERROR; 1431 } 1432 1433 } nxt_list_loop; 1434 1435 file = nxt_list_first(rt->log_files); 1436 1437 return nxt_file_stderr(file); 1438} 1439 1440 1441nxt_int_t 1442nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) 1443{ 1444 nxt_int_t ret; 1445 nxt_uint_t c, p, ncurr, nprev; 1446 nxt_listen_socket_t *curr, *prev; 1447 1448 curr = rt->listen_sockets->elts; 1449 ncurr = rt->listen_sockets->nelts; 1450 1451 if (rt->inherited_sockets != NULL) { 1452 prev = rt->inherited_sockets->elts; 1453 nprev = rt->inherited_sockets->nelts; 1454 1455 } else { 1456 prev = NULL; 1457 nprev = 0; 1458 } 1459 1460 for (c = 0; c < ncurr; c++) { 1461 1462 for (p = 0; p < nprev; p++) { 1463 1464 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { 1465 1466 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); 1467 if (ret != NXT_OK) { 1468 return NXT_ERROR; 1469 } 1470 1471 goto next; 1472 } 1473 } 1474 1475 if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) { 1476 return NXT_ERROR; 1477 } 1478 1479 next: 1480 1481 continue; 1482 } 1483 1484 return NXT_OK; 1485} 1486 1487 1488nxt_int_t 1489nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) 1490{ 1491 nxt_uint_t i, n; 1492 nxt_listen_socket_t *ls; 1493 1494 ls = rt->listen_sockets->elts; 1495 n = rt->listen_sockets->nelts; 1496 1497 for (i = 0; i < n; i++) { 1498 if (ls[i].flags == NXT_NONBLOCK) { 1499 if (nxt_listen_event(task, &ls[i]) == NULL) { 1500 return NXT_ERROR; 1501 } 1502 } 1503 } 1504 1505 return NXT_OK; 1506} 1507 1508 1509nxt_str_t * 1510nxt_current_directory(nxt_mp_t *mp) 1511{ 1512 size_t length; 1513 u_char *p; 1514 nxt_str_t *name; 1515 char buf[NXT_MAX_PATH_LEN]; 1516 1517 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); 1518 1519 if (nxt_fast_path(length != 0)) { 1520 name = nxt_str_alloc(mp, length + 1); 1521 1522 if (nxt_fast_path(name != NULL)) { 1523 p = nxt_cpymem(name->start, buf, length); 1524 *p = '/'; 1525 1526 return name; 1527 } 1528 } 1529 1530 return NULL; 1531} 1532 1533 1534static nxt_int_t 1535nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) 1536{ 1537 ssize_t length; 1538 nxt_int_t n; 1539 nxt_file_t file; 1540 u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE]; 1541 1542 nxt_memzero(&file, sizeof(nxt_file_t)); 1543 1544 file.name = pid_file; 1545 1546 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, 1547 NXT_FILE_DEFAULT_ACCESS); 1548 1549 if (n != NXT_OK) { 1550 return NXT_ERROR; 1551 } 1552 1553 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; 1554 1555 if (nxt_file_write(&file, pid, length, 0) != length) { 1556 return NXT_ERROR; 1557 } 1558 1559 nxt_file_close(task, &file); 1560 1561 return NXT_OK; 1562} 1563 1564 1565nxt_process_t * 1566nxt_runtime_process_new(nxt_runtime_t *rt) 1567{ 1568 nxt_process_t *process; 1569 1570 /* TODO: memory failures. */ 1571 1572 process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t)); 1573 if (nxt_slow_path(process == NULL)) { 1574 return NULL; 1575 } 1576 1577 nxt_queue_init(&process->ports); 1578 1579 nxt_thread_mutex_create(&process->incoming_mutex); 1580 nxt_thread_mutex_create(&process->outgoing_mutex); 1581 nxt_thread_mutex_create(&process->cp_mutex); 1582 1583 process->use_count = 1; 1584 1585 return process; 1586} 1587 1588 1589static void 1590nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) 1591{ 1592 nxt_port_t *port; 1593 nxt_lvlhsh_each_t lhe; 1594 1595 nxt_assert(process->use_count == 0); 1596 nxt_assert(process->registered == 0); 1597 1598 nxt_port_mmaps_destroy(process->incoming, 1); 1599 nxt_port_mmaps_destroy(process->outgoing, 1); 1600 1601 port = nxt_port_hash_first(&process->connected_ports, &lhe); 1602 1603 while(port != NULL) { 1604 nxt_port_hash_remove(&process->connected_ports, port); 1605 1606 port = nxt_port_hash_first(&process->connected_ports, &lhe); 1607 } 1608 1609 nxt_thread_mutex_destroy(&process->incoming_mutex); 1610 nxt_thread_mutex_destroy(&process->outgoing_mutex); 1611 nxt_thread_mutex_destroy(&process->cp_mutex); 1612 1613 nxt_mp_free(rt->mem_pool, process); 1614} 1615 1616 1617static nxt_int_t 1618nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 1619{ 1620 nxt_process_t *process; 1621 1622 process = data; 1623 1624 if (lhq->key.length == sizeof(nxt_pid_t) 1625 && *(nxt_pid_t *) lhq->key.start == process->pid) 1626 { 1627 return NXT_OK; 1628 } 1629 1630 return NXT_DECLINED; 1631} 1632 1633static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 1634 NXT_LVLHSH_DEFAULT, 1635 nxt_runtime_lvlhsh_pid_test, 1636 nxt_lvlhsh_alloc, 1637 nxt_lvlhsh_free, 1638}; 1639 1640 1641nxt_inline void 1642nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) 1643{ 1644 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 1645 lhq->key.length = sizeof(*pid); 1646 lhq->key.start = (u_char *) pid; 1647 lhq->proto = &lvlhsh_processes_proto; 1648} 1649 1650 1651nxt_process_t * 1652nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) 1653{ 1654 nxt_process_t *process; 1655 nxt_lvlhsh_query_t lhq; 1656 1657 process = NULL; 1658 1659 nxt_runtime_process_lhq_pid(&lhq, &pid); 1660 1661 nxt_thread_mutex_lock(&rt->processes_mutex); 1662 1663 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1664 process = lhq.value; 1665 1666 } else { 1667 nxt_thread_log_debug("process %PI not found", pid); 1668 } 1669 1670 nxt_thread_mutex_unlock(&rt->processes_mutex); 1671 1672 return process; 1673} 1674 1675 1676nxt_process_t * 1677nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) 1678{ 1679 nxt_process_t *process; 1680 nxt_lvlhsh_query_t lhq; 1681 1682 nxt_runtime_process_lhq_pid(&lhq, &pid); 1683 1684 nxt_thread_mutex_lock(&rt->processes_mutex); 1685 1686 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1687 nxt_thread_log_debug("process %PI found", pid); 1688 1689 nxt_thread_mutex_unlock(&rt->processes_mutex); 1690 1691 process = lhq.value; 1692 process->use_count++; 1693 1694 return process; 1695 } 1696 1697 process = nxt_runtime_process_new(rt); 1698 if (nxt_slow_path(process == NULL)) { 1699 return NULL; 1700 } 1701 1702 process->pid = pid; 1703 1704 lhq.replace = 0; 1705 lhq.value = process; 1706 lhq.pool = rt->mem_pool; 1707 1708 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1709 1710 case NXT_OK: 1711 if (rt->nprocesses == 0) { 1712 rt->mprocess = process; 1713 } 1714 1715 rt->nprocesses++; 1716 1717 process->registered = 1; 1718 1719 nxt_thread_log_debug("process %PI insert", pid); 1720 break; 1721 1722 default: 1723 nxt_thread_log_debug("process %PI insert failed", pid); 1724 break; 1725 } 1726 1727 nxt_thread_mutex_unlock(&rt->processes_mutex); 1728 1729 return process; 1730} 1731 1732 1733void 1734nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) 1735{ 1736 nxt_port_t *port; 1737 nxt_runtime_t *rt; 1738 nxt_lvlhsh_query_t lhq; 1739 1740 nxt_assert(process->registered == 0); 1741 1742 rt = task->thread->runtime; 1743 1744 nxt_runtime_process_lhq_pid(&lhq, &process->pid); 1745 1746 lhq.replace = 0; 1747 lhq.value = process; 1748 lhq.pool = rt->mem_pool; 1749 1750 nxt_thread_mutex_lock(&rt->processes_mutex); 1751 1752 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1753 1754 case NXT_OK: 1755 if (rt->nprocesses == 0) { 1756 rt->mprocess = process; 1757 } 1758 1759 rt->nprocesses++; 1760 1761 nxt_process_port_each(process, port) { 1762 1763 port->pid = process->pid; 1764 1765 nxt_runtime_port_add(task, port); 1766 1767 } nxt_process_port_loop; 1768 1769 process->registered = 1; 1770 1771 nxt_thread_log_debug("process %PI added", process->pid); 1772 break; 1773 1774 default: 1775 nxt_thread_log_debug("process %PI failed to add", process->pid); 1776 break; 1777 } 1778 1779 nxt_thread_mutex_unlock(&rt->processes_mutex); 1780} 1781 1782 1783static nxt_process_t * 1784nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) 1785{ 1786 nxt_process_t *process; 1787 nxt_lvlhsh_query_t lhq; 1788 1789 process = NULL; 1790 1791 nxt_runtime_process_lhq_pid(&lhq, &pid); 1792 1793 lhq.pool = rt->mem_pool; 1794 1795 nxt_thread_mutex_lock(&rt->processes_mutex); 1796 1797 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { 1798 1799 case NXT_OK: 1800 rt->nprocesses--; 1801 1802 process = lhq.value; 1803 1804 process->registered = 0; 1805 1806 nxt_thread_log_debug("process %PI removed", pid); 1807 break; 1808 1809 default: 1810 nxt_thread_log_debug("process %PI remove failed", pid); 1811 break; 1812 } 1813 1814 nxt_thread_mutex_unlock(&rt->processes_mutex); 1815 1816 return process; 1817} 1818 1819 1820void 1821nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) 1822{ 1823 nxt_runtime_t *rt; 1824 1825 process->use_count += i; 1826 1827 if (process->use_count == 0) { 1828 rt = task->thread->runtime; 1829 1830 if (process->registered == 1) { 1831 nxt_runtime_process_remove_pid(rt, process->pid); 1832 } 1833 1834 nxt_runtime_process_destroy(rt, process); 1835 } 1836} 1837 1838 1839nxt_process_t * 1840nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1841{ 1842 nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t)); 1843 1844 lhe->proto = &lvlhsh_processes_proto; 1845 1846 return nxt_runtime_process_next(rt, lhe); 1847} 1848 1849 1850nxt_port_t * 1851nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1852{ 1853 return nxt_port_hash_first(&rt->ports, lhe); 1854} 1855 1856 1857void 1858nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port) 1859{ 1860 nxt_int_t res; 1861 nxt_runtime_t *rt; 1862 1863 rt = task->thread->runtime; 1864 1865 res = nxt_port_hash_add(&rt->ports, port); 1866 1867 if (res != NXT_OK) { 1868 return; 1869 } 1870 1871 rt->port_by_type[port->type] = port; 1872 1873 nxt_port_use(task, port, 1); 1874} 1875 1876 1877void 1878nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port) 1879{ 1880 nxt_int_t res; 1881 nxt_runtime_t *rt; 1882 1883 rt = task->thread->runtime; 1884 1885 res = nxt_port_hash_remove(&rt->ports, port); 1886 1887 if (res != NXT_OK) { 1888 return; 1889 } 1890 1891 if (rt->port_by_type[port->type] == port) { 1892 rt->port_by_type[port->type] = NULL; 1893 } 1894 1895 nxt_port_use(task, port, -1); 1896} 1897 1898 1899nxt_port_t * 1900nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, 1901 nxt_port_id_t port_id) 1902{ 1903 return nxt_port_hash_find(&rt->ports, pid, port_id); 1904}
| 94 lang->version = (u_char *) ""; 95 lang->file = NULL; 96 lang->module = &nxt_go_module; 97 98 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); 99 if (nxt_slow_path(listen_sockets == NULL)) { 100 goto fail; 101 } 102 103 rt->listen_sockets = listen_sockets; 104 105 ret = nxt_runtime_inherited_listen_sockets(task, rt); 106 if (nxt_slow_path(ret != NXT_OK)) { 107 goto fail; 108 } 109 110 if (nxt_runtime_hostname(task, rt) != NXT_OK) { 111 goto fail; 112 } 113 114 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { 115 goto fail; 116 } 117 118 if (nxt_runtime_event_engines(task, rt) != NXT_OK) { 119 goto fail; 120 } 121 122 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { 123 goto fail; 124 } 125 126 rt->start = nxt_runtime_initial_start; 127 128 if (nxt_runtime_conf_init(task, rt) != NXT_OK) { 129 goto fail; 130 } 131 132 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 133 nxt_runtime_start, task, rt, NULL); 134 135 return NXT_OK; 136 137fail: 138 139 nxt_mp_destroy(mp); 140 141 return NXT_ERROR; 142} 143 144 145static nxt_int_t 146nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 147{ 148 u_char *v, *p; 149 nxt_int_t type; 150 nxt_array_t *inherited_sockets; 151 nxt_socket_t s; 152 nxt_listen_socket_t *ls; 153 154 v = (u_char *) getenv("NGINX"); 155 156 if (v == NULL) { 157 return nxt_runtime_systemd_listen_sockets(task, rt); 158 } 159 160 nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v); 161 162 inherited_sockets = nxt_array_create(rt->mem_pool, 163 1, sizeof(nxt_listen_socket_t)); 164 if (inherited_sockets == NULL) { 165 return NXT_ERROR; 166 } 167 168 rt->inherited_sockets = inherited_sockets; 169 170 for (p = v; *p != '\0'; p++) { 171 172 if (*p == ';') { 173 s = nxt_int_parse(v, p - v); 174 175 if (nxt_slow_path(s < 0)) { 176 nxt_log(task, NXT_LOG_CRIT, "invalid socket number " 177 "\"%s\" in NGINX environment variable, " 178 "ignoring the rest of the variable", v); 179 return NXT_ERROR; 180 } 181 182 v = p + 1; 183 184 ls = nxt_array_zero_add(inherited_sockets); 185 if (nxt_slow_path(ls == NULL)) { 186 return NXT_ERROR; 187 } 188 189 ls->socket = s; 190 191 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 192 if (nxt_slow_path(ls->sockaddr == NULL)) { 193 return NXT_ERROR; 194 } 195 196 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); 197 if (nxt_slow_path(type == -1)) { 198 return NXT_ERROR; 199 } 200 201 ls->sockaddr->type = (uint16_t) type; 202 } 203 } 204 205 return NXT_OK; 206} 207 208 209static nxt_int_t 210nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 211{ 212 u_char *nfd, *pid; 213 nxt_int_t n; 214 nxt_array_t *inherited_sockets; 215 nxt_socket_t s; 216 nxt_listen_socket_t *ls; 217 218 /* 219 * Number of listening sockets passed. The socket 220 * descriptors start from number 3 and are sequential. 221 */ 222 nfd = (u_char *) getenv("LISTEN_FDS"); 223 if (nfd == NULL) { 224 return NXT_OK; 225 } 226 227 /* The pid of the service process. */ 228 pid = (u_char *) getenv("LISTEN_PID"); 229 if (pid == NULL) { 230 return NXT_OK; 231 } 232 233 n = nxt_int_parse(nfd, nxt_strlen(nfd)); 234 if (n < 0) { 235 return NXT_OK; 236 } 237 238 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { 239 return NXT_OK; 240 } 241 242 nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n); 243 244 inherited_sockets = nxt_array_create(rt->mem_pool, 245 n, sizeof(nxt_listen_socket_t)); 246 if (inherited_sockets == NULL) { 247 return NXT_ERROR; 248 } 249 250 rt->inherited_sockets = inherited_sockets; 251 252 for (s = 3; s < n; s++) { 253 ls = nxt_array_zero_add(inherited_sockets); 254 if (nxt_slow_path(ls == NULL)) { 255 return NXT_ERROR; 256 } 257 258 ls->socket = s; 259 260 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 261 if (nxt_slow_path(ls->sockaddr == NULL)) { 262 return NXT_ERROR; 263 } 264 265 ls->sockaddr->type = SOCK_STREAM; 266 } 267 268 return NXT_OK; 269} 270 271 272static nxt_int_t 273nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) 274{ 275 nxt_thread_t *thread; 276 nxt_event_engine_t *engine; 277 const nxt_event_interface_t *interface; 278 279 interface = nxt_service_get(rt->services, "engine", NULL); 280 281 if (nxt_slow_path(interface == NULL)) { 282 /* TODO: log */ 283 return NXT_ERROR; 284 } 285 286 engine = nxt_event_engine_create(task, interface, 287 nxt_main_process_signals, 0, 0); 288 289 if (nxt_slow_path(engine == NULL)) { 290 return NXT_ERROR; 291 } 292 293 thread = task->thread; 294 thread->engine = engine; 295#if 0 296 thread->fiber = &engine->fibers->fiber; 297#endif 298 299 engine->id = rt->last_engine_id++; 300 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32); 301 302 nxt_queue_init(&rt->engines); 303 nxt_queue_insert_tail(&rt->engines, &engine->link); 304 305 return NXT_OK; 306} 307 308 309static nxt_int_t 310nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) 311{ 312 nxt_int_t ret; 313 nxt_array_t *thread_pools; 314 315 thread_pools = nxt_array_create(rt->mem_pool, 1, 316 sizeof(nxt_thread_pool_t *)); 317 318 if (nxt_slow_path(thread_pools == NULL)) { 319 return NXT_ERROR; 320 } 321 322 rt->thread_pools = thread_pools; 323 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); 324 325 if (nxt_slow_path(ret != NXT_OK)) { 326 return NXT_ERROR; 327 } 328 329 return NXT_OK; 330} 331 332 333static void 334nxt_runtime_start(nxt_task_t *task, void *obj, void *data) 335{ 336 nxt_runtime_t *rt; 337 338 rt = obj; 339 340 nxt_debug(task, "rt conf done"); 341 342 task->thread->log->ctx_handler = NULL; 343 task->thread->log->ctx = NULL; 344 345 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { 346 goto fail; 347 } 348 349 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 350 goto fail; 351 } 352 353 /* 354 * Thread pools should be destroyed before starting worker 355 * processes, because thread pool semaphores will stick in 356 * locked state in new processes after fork(). 357 */ 358 nxt_runtime_thread_pool_destroy(task, rt, rt->start); 359 360 return; 361 362fail: 363 364 nxt_runtime_quit(task); 365} 366 367 368static void 369nxt_runtime_initial_start(nxt_task_t *task) 370{ 371 nxt_int_t ret; 372 nxt_thread_t *thr; 373 nxt_runtime_t *rt; 374 const nxt_event_interface_t *interface; 375 376 thr = task->thread; 377 rt = thr->runtime; 378 379 if (rt->inherited_sockets == NULL && rt->daemon) { 380 381 if (nxt_process_daemon(task) != NXT_OK) { 382 goto fail; 383 } 384 385 /* 386 * An event engine should be updated after fork() 387 * even if an event facility was not changed because: 388 * 1) inherited kqueue descriptor is invalid, 389 * 2) the signal thread is not inherited. 390 */ 391 interface = nxt_service_get(rt->services, "engine", rt->engine); 392 if (interface == NULL) { 393 goto fail; 394 } 395 396 ret = nxt_event_engine_change(task->thread->engine, interface, 397 rt->batch); 398 if (ret != NXT_OK) { 399 goto fail; 400 } 401 } 402 403 ret = nxt_runtime_pid_file_create(task, rt->pid_file); 404 if (ret != NXT_OK) { 405 goto fail; 406 } 407 408 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 409 goto fail; 410 } 411 412 thr->engine->max_connections = rt->engine_connections; 413 414 if (rt->main_process) { 415 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) { 416 return; 417 } 418 419 } else { 420 nxt_single_process_start(thr, task, rt); 421 return; 422 } 423 424fail: 425 426 nxt_runtime_quit(task); 427} 428 429 430static void 431nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt) 432{ 433 nxt_int_t ret; 434 435 ret = nxt_runtime_thread_pool_create(thr, rt, rt->auxiliary_threads, 436 60000 * 1000000LL); 437 438 if (nxt_slow_path(ret != NXT_OK)) { 439 nxt_runtime_quit(task); 440 return; 441 } 442 443 rt->types |= (1U << NXT_PROCESS_SINGLE); 444 445 nxt_runtime_listen_sockets_enable(task, rt); 446 447 return; 448} 449 450 451void 452nxt_runtime_quit(nxt_task_t *task) 453{ 454 nxt_bool_t done; 455 nxt_runtime_t *rt; 456 nxt_event_engine_t *engine; 457 458 rt = task->thread->runtime; 459 engine = task->thread->engine; 460 461 nxt_debug(task, "exiting"); 462 463 done = 1; 464 465 if (!engine->shutdown) { 466 engine->shutdown = 1; 467 468 if (!nxt_array_is_empty(rt->thread_pools)) { 469 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); 470 done = 0; 471 } 472 473 if (nxt_runtime_is_main(rt)) { 474 nxt_main_stop_worker_processes(task, rt); 475 done = 0; 476 } 477 } 478 479 nxt_runtime_close_idle_connections(engine); 480 481 if (done) { 482 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, 483 task, rt, engine); 484 } 485} 486 487 488static void 489nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) 490{ 491 nxt_conn_t *c; 492 nxt_queue_t *idle; 493 nxt_queue_link_t *link, *next; 494 495 nxt_debug(&engine->task, "close idle connections"); 496 497 idle = &engine->idle_connections; 498 499 for (link = nxt_queue_head(idle); 500 link != nxt_queue_tail(idle); 501 link = next) 502 { 503 next = nxt_queue_next(link); 504 c = nxt_queue_link_data(link, nxt_conn_t, link); 505 506 if (!c->socket.read_ready) { 507 nxt_queue_remove(link); 508 nxt_conn_close(engine, c); 509 } 510 } 511} 512 513 514static void 515nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) 516{ 517 nxt_runtime_t *rt; 518 nxt_process_t *process; 519 nxt_event_engine_t *engine; 520 521 rt = obj; 522 engine = data; 523 524 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); 525 526 if (!nxt_array_is_empty(rt->thread_pools)) { 527 return; 528 } 529 530 if (nxt_runtime_is_main(rt)) { 531 if (rt->pid_file != NULL) { 532 nxt_file_delete(rt->pid_file); 533 } 534 535#if (NXT_HAVE_UNIX_DOMAIN) 536 { 537 nxt_sockaddr_t *sa; 538 nxt_file_name_t *name; 539 540 sa = rt->controller_listen; 541 542 if (sa->u.sockaddr.sa_family == AF_UNIX) { 543 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path; 544 (void) nxt_file_delete(name); 545 } 546 } 547#endif 548 } 549 550 if (!engine->event.signal_support) { 551 nxt_event_engine_signals_stop(engine); 552 } 553 554 nxt_runtime_process_each(rt, process) { 555 556 nxt_process_close_ports(task, process); 557 558 } nxt_runtime_process_loop; 559 560 nxt_thread_mutex_destroy(&rt->processes_mutex); 561 562 nxt_mp_destroy(rt->mem_pool); 563 564 nxt_debug(task, "exit"); 565 566 exit(0); 567 nxt_unreachable(); 568} 569 570 571static nxt_int_t 572nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) 573{ 574 nxt_event_engine_t *engine; 575 const nxt_event_interface_t *interface; 576 577 engine = task->thread->engine; 578 579 if (engine->batch == rt->batch 580 && nxt_strcmp(engine->event.name, rt->engine) == 0) 581 { 582 return NXT_OK; 583 } 584 585 interface = nxt_service_get(rt->services, "engine", rt->engine); 586 587 if (interface != NULL) { 588 return nxt_event_engine_change(engine, interface, rt->batch); 589 } 590 591 return NXT_ERROR; 592} 593 594 595void 596nxt_runtime_event_engine_free(nxt_runtime_t *rt) 597{ 598 nxt_queue_link_t *link; 599 nxt_event_engine_t *engine; 600 601 link = nxt_queue_first(&rt->engines); 602 nxt_queue_remove(link); 603 604 engine = nxt_queue_link_data(link, nxt_event_engine_t, link); 605 nxt_event_engine_free(engine); 606} 607 608 609nxt_int_t 610nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, 611 nxt_uint_t max_threads, nxt_nsec_t timeout) 612{ 613 nxt_thread_pool_t *thread_pool, **tp; 614 615 tp = nxt_array_add(rt->thread_pools); 616 if (tp == NULL) { 617 return NXT_ERROR; 618 } 619 620 thread_pool = nxt_thread_pool_create(max_threads, timeout, 621 nxt_runtime_thread_pool_init, 622 thr->engine, 623 nxt_runtime_thread_pool_exit); 624 625 if (nxt_fast_path(thread_pool != NULL)) { 626 *tp = thread_pool; 627 } 628 629 return NXT_OK; 630} 631 632 633static void 634nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 635 nxt_runtime_cont_t cont) 636{ 637 nxt_uint_t n; 638 nxt_thread_pool_t **tp; 639 640 rt->continuation = cont; 641 642 n = rt->thread_pools->nelts; 643 644 if (n == 0) { 645 cont(task); 646 return; 647 } 648 649 tp = rt->thread_pools->elts; 650 651 do { 652 nxt_thread_pool_destroy(*tp); 653 654 tp++; 655 n--; 656 } while (n != 0); 657} 658 659 660static void 661nxt_runtime_thread_pool_init(void) 662{ 663#if (NXT_REGEX) 664 nxt_regex_init(0); 665#endif 666} 667 668 669static void 670nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 671{ 672 nxt_uint_t i, n; 673 nxt_runtime_t *rt; 674 nxt_thread_pool_t *tp, **thread_pools; 675 nxt_thread_handle_t handle; 676 677 tp = obj; 678 679 if (data != NULL) { 680 handle = (nxt_thread_handle_t) (uintptr_t) data; 681 nxt_thread_wait(handle); 682 } 683 684 rt = task->thread->runtime; 685 686 thread_pools = rt->thread_pools->elts; 687 n = rt->thread_pools->nelts; 688 689 nxt_debug(task, "thread pools: %ui", n); 690 691 for (i = 0; i < n; i++) { 692 693 if (tp == thread_pools[i]) { 694 nxt_array_remove(rt->thread_pools, &thread_pools[i]); 695 696 if (n == 1) { 697 /* The last thread pool. */ 698 rt->continuation(task); 699 } 700 701 return; 702 } 703 } 704} 705 706 707static nxt_int_t 708nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) 709{ 710 nxt_int_t ret; 711 nxt_str_t control; 712 nxt_uint_t n; 713 nxt_file_t *file; 714 const char *slash; 715 nxt_sockaddr_t *sa; 716 nxt_file_name_str_t file_name; 717 const nxt_event_interface_t *interface; 718 719 rt->daemon = 1; 720 rt->main_process = 1; 721 rt->engine_connections = 256; 722 rt->auxiliary_threads = 2; 723 rt->user_cred.user = NXT_USER; 724 rt->group = NXT_GROUP; 725 rt->pid = NXT_PID; 726 rt->log = NXT_LOG; 727 rt->modules = NXT_MODULES; 728 rt->state = NXT_STATE; 729 rt->control = NXT_CONTROL_SOCK; 730 731 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { 732 return NXT_ERROR; 733 } 734 735 if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) { 736 return NXT_ERROR; 737 } 738 739 /* An engine's parameters. */ 740 741 interface = nxt_service_get(rt->services, "engine", rt->engine); 742 if (interface == NULL) { 743 return NXT_ERROR; 744 } 745 746 rt->engine = interface->name; 747 748 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid); 749 if (nxt_slow_path(ret != NXT_OK)) { 750 return NXT_ERROR; 751 } 752 753 rt->pid_file = file_name.start; 754 755 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log); 756 if (nxt_slow_path(ret != NXT_OK)) { 757 return NXT_ERROR; 758 } 759 760 file = nxt_list_first(rt->log_files); 761 file->name = file_name.start; 762 763 slash = ""; 764 n = nxt_strlen(rt->modules); 765 766 if (n > 1 && rt->modules[n - 1] != '/') { 767 slash = "/"; 768 } 769 770 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z", 771 rt->modules, slash); 772 if (nxt_slow_path(ret != NXT_OK)) { 773 return NXT_ERROR; 774 } 775 776 rt->modules = (char *) file_name.start; 777 778 slash = ""; 779 n = nxt_strlen(rt->state); 780 781 if (n > 1 && rt->state[n - 1] != '/') { 782 slash = "/"; 783 } 784 785 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z", 786 rt->state, slash); 787 if (nxt_slow_path(ret != NXT_OK)) { 788 return NXT_ERROR; 789 } 790 791 rt->conf = (char *) file_name.start; 792 793 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf); 794 if (nxt_slow_path(ret != NXT_OK)) { 795 return NXT_ERROR; 796 } 797 798 rt->conf_tmp = (char *) file_name.start; 799 800 control.length = nxt_strlen(rt->control); 801 control.start = (u_char *) rt->control; 802 803 sa = nxt_runtime_sockaddr_parse(task, rt->mem_pool, &control); 804 if (nxt_slow_path(sa == NULL)) { 805 return NXT_ERROR; 806 } 807 808 rt->controller_listen = sa; 809 810 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { 811 return NXT_ERROR; 812 } 813 814 return NXT_OK; 815} 816 817 818static nxt_int_t 819nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) 820{ 821 char *p, **argv; 822 u_char *end; 823 u_char buf[1024]; 824 825 static const char version[] = 826 "unit version: " NXT_VERSION "\n" 827 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n"; 828 829 static const char no_control[] = 830 "option \"--control\" requires socket address\n"; 831 static const char no_user[] = "option \"--user\" requires username\n"; 832 static const char no_group[] = "option \"--group\" requires group name\n"; 833 static const char no_pid[] = "option \"--pid\" requires filename\n"; 834 static const char no_log[] = "option \"--log\" requires filename\n"; 835 static const char no_modules[] = 836 "option \"--modules\" requires directory\n"; 837 static const char no_state[] = "option \"--state\" requires directory\n"; 838 839 static const char help[] = 840 "\n" 841 "unit options:\n" 842 "\n" 843 " --version print unit version and configure options\n" 844 "\n" 845 " --no-daemon run unit in non-daemon mode\n" 846 "\n" 847 " --control ADDRESS set address of control API socket\n" 848 " default: \"" NXT_CONTROL_SOCK "\"\n" 849 "\n" 850 " --pid FILE set pid filename\n" 851 " default: \"" NXT_PID "\"\n" 852 "\n" 853 " --log FILE set log filename\n" 854 " default: \"" NXT_LOG "\"\n" 855 "\n" 856 " --modules DIRECTORY set modules directory name\n" 857 " default: \"" NXT_MODULES "\"\n" 858 "\n" 859 " --state DIRECTORY set state directory name\n" 860 " default: \"" NXT_STATE "\"\n" 861 "\n" 862 " --user USER set non-privileged processes to run" 863 " as specified user\n" 864 " default: \"" NXT_USER "\"\n" 865 "\n" 866 " --group GROUP set non-privileged processes to run" 867 " as specified group\n" 868 " default: "; 869 870 static const char group[] = "\"" NXT_GROUP "\"\n\n"; 871 static const char primary[] = "user's primary group\n\n"; 872 873 argv = &nxt_process_argv[1]; 874 875 while (*argv != NULL) { 876 p = *argv++; 877 878 if (nxt_strcmp(p, "--control") == 0) { 879 if (*argv == NULL) { 880 write(STDERR_FILENO, no_control, sizeof(no_control) - 1); 881 return NXT_ERROR; 882 } 883 884 p = *argv++; 885 886 rt->control = p; 887 888 continue; 889 } 890 891 if (nxt_strcmp(p, "--upstream") == 0) { 892 if (*argv == NULL) { 893 nxt_log(task, NXT_LOG_CRIT, 894 "no argument for option \"--upstream\""); 895 return NXT_ERROR; 896 } 897 898 p = *argv++; 899 900 rt->upstream.length = nxt_strlen(p); 901 rt->upstream.start = (u_char *) p; 902 903 continue; 904 } 905 906 if (nxt_strcmp(p, "--user") == 0) { 907 if (*argv == NULL) { 908 write(STDERR_FILENO, no_user, sizeof(no_user) - 1); 909 return NXT_ERROR; 910 } 911 912 p = *argv++; 913 914 rt->user_cred.user = p; 915 916 continue; 917 } 918 919 if (nxt_strcmp(p, "--group") == 0) { 920 if (*argv == NULL) { 921 write(STDERR_FILENO, no_group, sizeof(no_group) - 1); 922 return NXT_ERROR; 923 } 924 925 p = *argv++; 926 927 rt->group = p; 928 929 continue; 930 } 931 932 if (nxt_strcmp(p, "--pid") == 0) { 933 if (*argv == NULL) { 934 write(STDERR_FILENO, no_pid, sizeof(no_pid) - 1); 935 return NXT_ERROR; 936 } 937 938 p = *argv++; 939 940 rt->pid = p; 941 942 continue; 943 } 944 945 if (nxt_strcmp(p, "--log") == 0) { 946 if (*argv == NULL) { 947 write(STDERR_FILENO, no_log, sizeof(no_log) - 1); 948 return NXT_ERROR; 949 } 950 951 p = *argv++; 952 953 rt->log = p; 954 955 continue; 956 } 957 958 if (nxt_strcmp(p, "--modules") == 0) { 959 if (*argv == NULL) { 960 write(STDERR_FILENO, no_modules, sizeof(no_modules) - 1); 961 return NXT_ERROR; 962 } 963 964 p = *argv++; 965 966 rt->modules = p; 967 968 continue; 969 } 970 971 if (nxt_strcmp(p, "--state") == 0) { 972 if (*argv == NULL) { 973 write(STDERR_FILENO, no_state, sizeof(no_state) - 1); 974 return NXT_ERROR; 975 } 976 977 p = *argv++; 978 979 rt->state = p; 980 981 continue; 982 } 983 984 if (nxt_strcmp(p, "--no-daemon") == 0) { 985 rt->daemon = 0; 986 continue; 987 } 988 989 if (nxt_strcmp(p, "--version") == 0) { 990 write(STDERR_FILENO, version, sizeof(version) - 1); 991 exit(0); 992 } 993 994 if (nxt_strcmp(p, "--help") == 0) { 995 write(STDOUT_FILENO, help, sizeof(help) - 1); 996 997 if (sizeof(NXT_GROUP) == 1) { 998 write(STDOUT_FILENO, primary, sizeof(primary) - 1); 999 1000 } else { 1001 write(STDOUT_FILENO, group, sizeof(group) - 1); 1002 } 1003 1004 exit(0); 1005 } 1006 1007 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\"\n", p); 1008 write(STDERR_FILENO, buf, end - buf); 1009 1010 return NXT_ERROR; 1011 } 1012 1013 return NXT_OK; 1014} 1015 1016 1017static nxt_sockaddr_t * 1018nxt_runtime_sockaddr_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 1019{ 1020 u_char *p; 1021 size_t length; 1022 1023 length = addr->length; 1024 p = addr->start; 1025 1026 if (length >= 5 && nxt_memcmp(p, "unix:", 5) == 0) { 1027 return nxt_runtime_sockaddr_unix_parse(task, mp, addr); 1028 } 1029 1030 if (length != 0 && *p == '[') { 1031 return nxt_runtime_sockaddr_inet6_parse(task, mp, addr); 1032 } 1033 1034 return nxt_runtime_sockaddr_inet_parse(task, mp, addr); 1035} 1036 1037 1038static nxt_sockaddr_t * 1039nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 1040{ 1041#if (NXT_HAVE_UNIX_DOMAIN) 1042 u_char *p; 1043 size_t length, socklen; 1044 nxt_sockaddr_t *sa; 1045 1046 /* 1047 * Actual sockaddr_un length can be lesser or even larger than defined 1048 * struct sockaddr_un length (see comment in nxt_socket.h). So 1049 * limit maximum Unix domain socket address length by defined sun_path[] 1050 * length because some OSes accept addresses twice larger than defined 1051 * struct sockaddr_un. Also reserve space for a trailing zero to avoid 1052 * ambiguity, since many OSes accept Unix domain socket addresses 1053 * without a trailing zero. 1054 */ 1055 const size_t max_len = sizeof(struct sockaddr_un) 1056 - offsetof(struct sockaddr_un, sun_path) - 1; 1057 1058 /* cutting "unix:" */ 1059 length = addr->length - 5; 1060 p = addr->start + 5; 1061 1062 if (length == 0) { 1063 nxt_log(task, NXT_LOG_CRIT, 1064 "unix domain socket \"%V\" name is invalid", addr); 1065 return NULL; 1066 } 1067 1068 if (length > max_len) { 1069 nxt_log(task, NXT_LOG_CRIT, 1070 "unix domain socket \"%V\" name is too long", addr); 1071 return NULL; 1072 } 1073 1074 socklen = offsetof(struct sockaddr_un, sun_path) + length + 1; 1075 1076#if (NXT_LINUX) 1077 1078 /* 1079 * Linux unix(7): 1080 * 1081 * abstract: an abstract socket address is distinguished by the fact 1082 * that sun_path[0] is a null byte ('\0'). The socket's address in 1083 * this namespace is given by the additional bytes in sun_path that 1084 * are covered by the specified length of the address structure. 1085 * (Null bytes in the name have no special significance.) 1086 */ 1087 if (p[0] == '@') { 1088 p[0] = '\0'; 1089 socklen--; 1090 } 1091 1092#endif 1093 1094 sa = nxt_sockaddr_alloc(mp, socklen, addr->length); 1095 1096 if (nxt_slow_path(sa == NULL)) { 1097 return NULL; 1098 } 1099 1100 sa->type = SOCK_STREAM; 1101 1102 sa->u.sockaddr_un.sun_family = AF_UNIX; 1103 nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length); 1104 1105 return sa; 1106 1107#else /* !(NXT_HAVE_UNIX_DOMAIN) */ 1108 1109 nxt_log(task, NXT_LOG_CRIT, "unix domain socket \"%V\" is not supported", 1110 addr); 1111 1112 return NULL; 1113 1114#endif 1115} 1116 1117 1118static nxt_sockaddr_t * 1119nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, nxt_mp_t *mp, 1120 nxt_str_t *addr) 1121{ 1122#if (NXT_INET6) 1123 u_char *p, *addr_end; 1124 size_t length; 1125 nxt_int_t port; 1126 nxt_sockaddr_t *sa; 1127 struct in6_addr *in6_addr; 1128 1129 length = addr->length - 1; 1130 p = addr->start + 1; 1131 1132 addr_end = nxt_memchr(p, ']', length); 1133 1134 if (addr_end == NULL) { 1135 goto invalid_address; 1136 } 1137 1138 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6), 1139 NXT_INET6_ADDR_STR_LEN); 1140 if (nxt_slow_path(sa == NULL)) { 1141 return NULL; 1142 } 1143 1144 in6_addr = &sa->u.sockaddr_in6.sin6_addr; 1145 1146 if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) { 1147 goto invalid_address; 1148 } 1149 1150 port = 0; 1151 p = addr_end + 1; 1152 length = (p + length) - p; 1153 1154 if (length == 0) { 1155 goto found; 1156 } 1157 1158 if (*p == ':') { 1159 port = nxt_int_parse(p + 1, length - 1); 1160 1161 if (port >= 1 && port <= 65535) { 1162 goto found; 1163 } 1164 } 1165 1166 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr); 1167 1168 return NULL; 1169 1170found: 1171 1172 sa->type = SOCK_STREAM; 1173 1174 sa->u.sockaddr_in6.sin6_family = AF_INET6; 1175 sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port); 1176 1177 return sa; 1178 1179invalid_address: 1180 1181 nxt_log(task, NXT_LOG_CRIT, "invalid IPv6 address in \"%V\"", addr); 1182 1183 return NULL; 1184 1185#else 1186 1187 nxt_log(task, NXT_LOG_CRIT, "IPv6 socket \"%V\" is not supported", addr); 1188 1189 return NULL; 1190 1191#endif 1192} 1193 1194 1195static nxt_sockaddr_t * 1196nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, nxt_mp_t *mp, 1197 nxt_str_t *string) 1198{ 1199 u_char *p, *ip; 1200 size_t length; 1201 in_addr_t addr; 1202 nxt_int_t port; 1203 nxt_sockaddr_t *sa; 1204 1205 addr = INADDR_ANY; 1206 1207 length = string->length; 1208 ip = string->start; 1209 1210 p = nxt_memchr(ip, ':', length); 1211 1212 if (p == NULL) { 1213 1214 /* single value port, or address */ 1215 1216 port = nxt_int_parse(ip, length); 1217 1218 if (port > 0) { 1219 /* "*:XX" */ 1220 1221 if (port < 1 || port > 65535) { 1222 goto invalid_port; 1223 } 1224 1225 } else { 1226 /* "x.x.x.x" */ 1227 1228 addr = nxt_inet_addr(ip, length); 1229 1230 if (addr == INADDR_NONE) { 1231 goto invalid_port; 1232 } 1233 1234 port = 8080; 1235 } 1236 1237 } else { 1238 1239 /* x.x.x.x:XX */ 1240 1241 p++; 1242 length = (ip + length) - p; 1243 port = nxt_int_parse(p, length); 1244 1245 if (port < 1 || port > 65535) { 1246 goto invalid_port; 1247 } 1248 1249 length = (p - 1) - ip; 1250 1251 if (length != 1 || ip[0] != '*') { 1252 addr = nxt_inet_addr(ip, length); 1253 1254 if (addr == INADDR_NONE) { 1255 goto invalid_addr; 1256 } 1257 1258 /* "x.x.x.x:XX" */ 1259 } 1260 } 1261 1262 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in), 1263 NXT_INET_ADDR_STR_LEN); 1264 if (nxt_slow_path(sa == NULL)) { 1265 return NULL; 1266 } 1267 1268 sa->type = SOCK_STREAM; 1269 1270 sa->u.sockaddr_in.sin_family = AF_INET; 1271 sa->u.sockaddr_in.sin_port = htons((in_port_t) port); 1272 sa->u.sockaddr_in.sin_addr.s_addr = addr; 1273 1274 return sa; 1275 1276invalid_port: 1277 1278 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", string); 1279 1280 return NULL; 1281 1282invalid_addr: 1283 1284 nxt_log(task, NXT_LOG_CRIT, "invalid address in \"%V\"", string); 1285 1286 return NULL; 1287} 1288 1289 1290nxt_listen_socket_t * 1291nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) 1292{ 1293 nxt_mp_t *mp; 1294 nxt_listen_socket_t *ls; 1295 1296 ls = nxt_array_zero_add(rt->listen_sockets); 1297 if (ls == NULL) { 1298 return NULL; 1299 } 1300 1301 mp = rt->mem_pool; 1302 1303 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, 1304 sa->length); 1305 if (ls->sockaddr == NULL) { 1306 return NULL; 1307 } 1308 1309 ls->sockaddr->type = sa->type; 1310 1311 nxt_sockaddr_text(ls->sockaddr); 1312 1313 ls->socket = -1; 1314 ls->backlog = NXT_LISTEN_BACKLOG; 1315 1316 return ls; 1317} 1318 1319 1320static nxt_int_t 1321nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) 1322{ 1323 size_t length; 1324 char hostname[NXT_MAXHOSTNAMELEN + 1]; 1325 1326 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { 1327 nxt_log(task, NXT_LOG_CRIT, "gethostname() failed %E", nxt_errno); 1328 return NXT_ERROR; 1329 } 1330 1331 /* 1332 * Linux gethostname(2): 1333 * 1334 * If the null-terminated hostname is too large to fit, 1335 * then the name is truncated, and no error is returned. 1336 * 1337 * For this reason an additional byte is reserved in the buffer. 1338 */ 1339 hostname[NXT_MAXHOSTNAMELEN] = '\0'; 1340 1341 length = nxt_strlen(hostname); 1342 rt->hostname.length = length; 1343 1344 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length); 1345 1346 if (rt->hostname.start != NULL) { 1347 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); 1348 return NXT_OK; 1349 } 1350 1351 return NXT_ERROR; 1352} 1353 1354 1355static nxt_int_t 1356nxt_runtime_log_files_init(nxt_runtime_t *rt) 1357{ 1358 nxt_file_t *file; 1359 nxt_list_t *log_files; 1360 1361 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); 1362 1363 if (nxt_fast_path(log_files != NULL)) { 1364 rt->log_files = log_files; 1365 1366 /* Preallocate the main log. This allocation cannot fail. */ 1367 file = nxt_list_zero_add(log_files); 1368 1369 file->fd = NXT_FILE_INVALID; 1370 file->log_level = NXT_LOG_CRIT; 1371 1372 return NXT_OK; 1373 } 1374 1375 return NXT_ERROR; 1376} 1377 1378 1379nxt_file_t * 1380nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) 1381{ 1382 nxt_int_t ret; 1383 nxt_file_t *file; 1384 nxt_file_name_str_t file_name; 1385 1386 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name); 1387 1388 if (nxt_slow_path(ret != NXT_OK)) { 1389 return NULL; 1390 } 1391 1392 nxt_list_each(file, rt->log_files) { 1393 1394 /* STUB: hardecoded case sensitive/insensitive. */ 1395 1396 if (file->name != NULL 1397 && nxt_file_name_eq(file->name, file_name.start)) 1398 { 1399 return file; 1400 } 1401 1402 } nxt_list_loop; 1403 1404 file = nxt_list_zero_add(rt->log_files); 1405 1406 if (nxt_slow_path(file == NULL)) { 1407 return NULL; 1408 } 1409 1410 file->fd = NXT_FILE_INVALID; 1411 file->log_level = NXT_LOG_CRIT; 1412 file->name = file_name.start; 1413 1414 return file; 1415} 1416 1417 1418static nxt_int_t 1419nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) 1420{ 1421 nxt_int_t ret; 1422 nxt_file_t *file; 1423 1424 nxt_list_each(file, rt->log_files) { 1425 1426 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, 1427 NXT_FILE_OWNER_ACCESS); 1428 1429 if (ret != NXT_OK) { 1430 return NXT_ERROR; 1431 } 1432 1433 } nxt_list_loop; 1434 1435 file = nxt_list_first(rt->log_files); 1436 1437 return nxt_file_stderr(file); 1438} 1439 1440 1441nxt_int_t 1442nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) 1443{ 1444 nxt_int_t ret; 1445 nxt_uint_t c, p, ncurr, nprev; 1446 nxt_listen_socket_t *curr, *prev; 1447 1448 curr = rt->listen_sockets->elts; 1449 ncurr = rt->listen_sockets->nelts; 1450 1451 if (rt->inherited_sockets != NULL) { 1452 prev = rt->inherited_sockets->elts; 1453 nprev = rt->inherited_sockets->nelts; 1454 1455 } else { 1456 prev = NULL; 1457 nprev = 0; 1458 } 1459 1460 for (c = 0; c < ncurr; c++) { 1461 1462 for (p = 0; p < nprev; p++) { 1463 1464 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { 1465 1466 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); 1467 if (ret != NXT_OK) { 1468 return NXT_ERROR; 1469 } 1470 1471 goto next; 1472 } 1473 } 1474 1475 if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) { 1476 return NXT_ERROR; 1477 } 1478 1479 next: 1480 1481 continue; 1482 } 1483 1484 return NXT_OK; 1485} 1486 1487 1488nxt_int_t 1489nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) 1490{ 1491 nxt_uint_t i, n; 1492 nxt_listen_socket_t *ls; 1493 1494 ls = rt->listen_sockets->elts; 1495 n = rt->listen_sockets->nelts; 1496 1497 for (i = 0; i < n; i++) { 1498 if (ls[i].flags == NXT_NONBLOCK) { 1499 if (nxt_listen_event(task, &ls[i]) == NULL) { 1500 return NXT_ERROR; 1501 } 1502 } 1503 } 1504 1505 return NXT_OK; 1506} 1507 1508 1509nxt_str_t * 1510nxt_current_directory(nxt_mp_t *mp) 1511{ 1512 size_t length; 1513 u_char *p; 1514 nxt_str_t *name; 1515 char buf[NXT_MAX_PATH_LEN]; 1516 1517 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); 1518 1519 if (nxt_fast_path(length != 0)) { 1520 name = nxt_str_alloc(mp, length + 1); 1521 1522 if (nxt_fast_path(name != NULL)) { 1523 p = nxt_cpymem(name->start, buf, length); 1524 *p = '/'; 1525 1526 return name; 1527 } 1528 } 1529 1530 return NULL; 1531} 1532 1533 1534static nxt_int_t 1535nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) 1536{ 1537 ssize_t length; 1538 nxt_int_t n; 1539 nxt_file_t file; 1540 u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE]; 1541 1542 nxt_memzero(&file, sizeof(nxt_file_t)); 1543 1544 file.name = pid_file; 1545 1546 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, 1547 NXT_FILE_DEFAULT_ACCESS); 1548 1549 if (n != NXT_OK) { 1550 return NXT_ERROR; 1551 } 1552 1553 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; 1554 1555 if (nxt_file_write(&file, pid, length, 0) != length) { 1556 return NXT_ERROR; 1557 } 1558 1559 nxt_file_close(task, &file); 1560 1561 return NXT_OK; 1562} 1563 1564 1565nxt_process_t * 1566nxt_runtime_process_new(nxt_runtime_t *rt) 1567{ 1568 nxt_process_t *process; 1569 1570 /* TODO: memory failures. */ 1571 1572 process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t)); 1573 if (nxt_slow_path(process == NULL)) { 1574 return NULL; 1575 } 1576 1577 nxt_queue_init(&process->ports); 1578 1579 nxt_thread_mutex_create(&process->incoming_mutex); 1580 nxt_thread_mutex_create(&process->outgoing_mutex); 1581 nxt_thread_mutex_create(&process->cp_mutex); 1582 1583 process->use_count = 1; 1584 1585 return process; 1586} 1587 1588 1589static void 1590nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) 1591{ 1592 nxt_port_t *port; 1593 nxt_lvlhsh_each_t lhe; 1594 1595 nxt_assert(process->use_count == 0); 1596 nxt_assert(process->registered == 0); 1597 1598 nxt_port_mmaps_destroy(process->incoming, 1); 1599 nxt_port_mmaps_destroy(process->outgoing, 1); 1600 1601 port = nxt_port_hash_first(&process->connected_ports, &lhe); 1602 1603 while(port != NULL) { 1604 nxt_port_hash_remove(&process->connected_ports, port); 1605 1606 port = nxt_port_hash_first(&process->connected_ports, &lhe); 1607 } 1608 1609 nxt_thread_mutex_destroy(&process->incoming_mutex); 1610 nxt_thread_mutex_destroy(&process->outgoing_mutex); 1611 nxt_thread_mutex_destroy(&process->cp_mutex); 1612 1613 nxt_mp_free(rt->mem_pool, process); 1614} 1615 1616 1617static nxt_int_t 1618nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 1619{ 1620 nxt_process_t *process; 1621 1622 process = data; 1623 1624 if (lhq->key.length == sizeof(nxt_pid_t) 1625 && *(nxt_pid_t *) lhq->key.start == process->pid) 1626 { 1627 return NXT_OK; 1628 } 1629 1630 return NXT_DECLINED; 1631} 1632 1633static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 1634 NXT_LVLHSH_DEFAULT, 1635 nxt_runtime_lvlhsh_pid_test, 1636 nxt_lvlhsh_alloc, 1637 nxt_lvlhsh_free, 1638}; 1639 1640 1641nxt_inline void 1642nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) 1643{ 1644 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 1645 lhq->key.length = sizeof(*pid); 1646 lhq->key.start = (u_char *) pid; 1647 lhq->proto = &lvlhsh_processes_proto; 1648} 1649 1650 1651nxt_process_t * 1652nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) 1653{ 1654 nxt_process_t *process; 1655 nxt_lvlhsh_query_t lhq; 1656 1657 process = NULL; 1658 1659 nxt_runtime_process_lhq_pid(&lhq, &pid); 1660 1661 nxt_thread_mutex_lock(&rt->processes_mutex); 1662 1663 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1664 process = lhq.value; 1665 1666 } else { 1667 nxt_thread_log_debug("process %PI not found", pid); 1668 } 1669 1670 nxt_thread_mutex_unlock(&rt->processes_mutex); 1671 1672 return process; 1673} 1674 1675 1676nxt_process_t * 1677nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) 1678{ 1679 nxt_process_t *process; 1680 nxt_lvlhsh_query_t lhq; 1681 1682 nxt_runtime_process_lhq_pid(&lhq, &pid); 1683 1684 nxt_thread_mutex_lock(&rt->processes_mutex); 1685 1686 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1687 nxt_thread_log_debug("process %PI found", pid); 1688 1689 nxt_thread_mutex_unlock(&rt->processes_mutex); 1690 1691 process = lhq.value; 1692 process->use_count++; 1693 1694 return process; 1695 } 1696 1697 process = nxt_runtime_process_new(rt); 1698 if (nxt_slow_path(process == NULL)) { 1699 return NULL; 1700 } 1701 1702 process->pid = pid; 1703 1704 lhq.replace = 0; 1705 lhq.value = process; 1706 lhq.pool = rt->mem_pool; 1707 1708 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1709 1710 case NXT_OK: 1711 if (rt->nprocesses == 0) { 1712 rt->mprocess = process; 1713 } 1714 1715 rt->nprocesses++; 1716 1717 process->registered = 1; 1718 1719 nxt_thread_log_debug("process %PI insert", pid); 1720 break; 1721 1722 default: 1723 nxt_thread_log_debug("process %PI insert failed", pid); 1724 break; 1725 } 1726 1727 nxt_thread_mutex_unlock(&rt->processes_mutex); 1728 1729 return process; 1730} 1731 1732 1733void 1734nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) 1735{ 1736 nxt_port_t *port; 1737 nxt_runtime_t *rt; 1738 nxt_lvlhsh_query_t lhq; 1739 1740 nxt_assert(process->registered == 0); 1741 1742 rt = task->thread->runtime; 1743 1744 nxt_runtime_process_lhq_pid(&lhq, &process->pid); 1745 1746 lhq.replace = 0; 1747 lhq.value = process; 1748 lhq.pool = rt->mem_pool; 1749 1750 nxt_thread_mutex_lock(&rt->processes_mutex); 1751 1752 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1753 1754 case NXT_OK: 1755 if (rt->nprocesses == 0) { 1756 rt->mprocess = process; 1757 } 1758 1759 rt->nprocesses++; 1760 1761 nxt_process_port_each(process, port) { 1762 1763 port->pid = process->pid; 1764 1765 nxt_runtime_port_add(task, port); 1766 1767 } nxt_process_port_loop; 1768 1769 process->registered = 1; 1770 1771 nxt_thread_log_debug("process %PI added", process->pid); 1772 break; 1773 1774 default: 1775 nxt_thread_log_debug("process %PI failed to add", process->pid); 1776 break; 1777 } 1778 1779 nxt_thread_mutex_unlock(&rt->processes_mutex); 1780} 1781 1782 1783static nxt_process_t * 1784nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) 1785{ 1786 nxt_process_t *process; 1787 nxt_lvlhsh_query_t lhq; 1788 1789 process = NULL; 1790 1791 nxt_runtime_process_lhq_pid(&lhq, &pid); 1792 1793 lhq.pool = rt->mem_pool; 1794 1795 nxt_thread_mutex_lock(&rt->processes_mutex); 1796 1797 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { 1798 1799 case NXT_OK: 1800 rt->nprocesses--; 1801 1802 process = lhq.value; 1803 1804 process->registered = 0; 1805 1806 nxt_thread_log_debug("process %PI removed", pid); 1807 break; 1808 1809 default: 1810 nxt_thread_log_debug("process %PI remove failed", pid); 1811 break; 1812 } 1813 1814 nxt_thread_mutex_unlock(&rt->processes_mutex); 1815 1816 return process; 1817} 1818 1819 1820void 1821nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) 1822{ 1823 nxt_runtime_t *rt; 1824 1825 process->use_count += i; 1826 1827 if (process->use_count == 0) { 1828 rt = task->thread->runtime; 1829 1830 if (process->registered == 1) { 1831 nxt_runtime_process_remove_pid(rt, process->pid); 1832 } 1833 1834 nxt_runtime_process_destroy(rt, process); 1835 } 1836} 1837 1838 1839nxt_process_t * 1840nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1841{ 1842 nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t)); 1843 1844 lhe->proto = &lvlhsh_processes_proto; 1845 1846 return nxt_runtime_process_next(rt, lhe); 1847} 1848 1849 1850nxt_port_t * 1851nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1852{ 1853 return nxt_port_hash_first(&rt->ports, lhe); 1854} 1855 1856 1857void 1858nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port) 1859{ 1860 nxt_int_t res; 1861 nxt_runtime_t *rt; 1862 1863 rt = task->thread->runtime; 1864 1865 res = nxt_port_hash_add(&rt->ports, port); 1866 1867 if (res != NXT_OK) { 1868 return; 1869 } 1870 1871 rt->port_by_type[port->type] = port; 1872 1873 nxt_port_use(task, port, 1); 1874} 1875 1876 1877void 1878nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port) 1879{ 1880 nxt_int_t res; 1881 nxt_runtime_t *rt; 1882 1883 rt = task->thread->runtime; 1884 1885 res = nxt_port_hash_remove(&rt->ports, port); 1886 1887 if (res != NXT_OK) { 1888 return; 1889 } 1890 1891 if (rt->port_by_type[port->type] == port) { 1892 rt->port_by_type[port->type] = NULL; 1893 } 1894 1895 nxt_port_use(task, port, -1); 1896} 1897 1898 1899nxt_port_t * 1900nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, 1901 nxt_port_id_t port_id) 1902{ 1903 return nxt_port_hash_find(&rt->ports, pid, port_id); 1904}
|