1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_main.h> 9#include <nxt_runtime.h> 10#include <nxt_port.h> 11#include <nxt_master_process.h> 12#include <nxt_router.h> 13 14 15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, 16 nxt_runtime_t *rt); 17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, 18 nxt_runtime_t *rt); 19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); 20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); 21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); 22static void nxt_runtime_initial_start(nxt_task_t *task); 23static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, 24 nxt_runtime_t *rt); 25static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); 26static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); 27static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, 28 nxt_runtime_t *rt); 29static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt); 30static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt); 31static nxt_sockaddr_t *nxt_runtime_sockaddr_parse(nxt_task_t *task, 32 nxt_mp_t *mp, nxt_str_t *addr); 33static nxt_sockaddr_t *nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, 34 nxt_mp_t *mp, nxt_str_t *addr); 35static nxt_sockaddr_t *nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, 36 nxt_mp_t *mp, nxt_str_t *addr); 37static nxt_sockaddr_t *nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, 38 nxt_mp_t *mp, nxt_str_t *addr); 39static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt); 40static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt); 41static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task, 42 nxt_runtime_t *rt); 43static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task, 44 nxt_file_name_t *pid_file); 45 46#if (NXT_THREADS) 47static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 48 nxt_runtime_cont_t cont); 49#endif 50 51static void nxt_runtime_process_destroy(nxt_runtime_t *rt, 52 nxt_process_t *process); 53static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt, 54 nxt_pid_t pid); 55 56 57nxt_int_t 58nxt_runtime_create(nxt_task_t *task) 59{ 60 nxt_mp_t *mp; 61 nxt_int_t ret; 62 nxt_array_t *listen_sockets; 63 nxt_runtime_t *rt; 64 65 mp = nxt_mp_create(1024, 128, 256, 32); 66 67 if (nxt_slow_path(mp == NULL)) { 68 return NXT_ERROR; 69 } 70 71 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t)); 72 if (nxt_slow_path(rt == NULL)) { 73 return NXT_ERROR; 74 } 75 76 task->thread->runtime = rt; 77 rt->mem_pool = mp; 78 79 nxt_thread_mutex_create(&rt->processes_mutex); 80 81 rt->prefix = nxt_current_directory(mp); 82 if (nxt_slow_path(rt->prefix == NULL)) { 83 goto fail; 84 } 85 86 rt->conf_prefix = rt->prefix; 87 88 rt->services = nxt_services_init(mp); 89 if (nxt_slow_path(rt->services == NULL)) { 90 goto fail; 91 } 92 93 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); 94 if (nxt_slow_path(listen_sockets == NULL)) { 95 goto fail; 96 } 97 98 rt->listen_sockets = listen_sockets; 99 100 ret = nxt_runtime_inherited_listen_sockets(task, rt); 101 if (nxt_slow_path(ret != NXT_OK)) { 102 goto fail; 103 } 104 105 if (nxt_runtime_hostname(task, rt) != NXT_OK) { 106 goto fail; 107 } 108 109 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { 110 goto fail; 111 } 112 113 if (nxt_runtime_event_engines(task, rt) != NXT_OK) { 114 goto fail; 115 } 116 117 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { 118 goto fail; 119 } 120 121 rt->start = nxt_runtime_initial_start; 122 123 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 124 nxt_runtime_start, task, rt, NULL); 125 126 return NXT_OK; 127 128fail: 129 130 nxt_mp_destroy(mp); 131 132 return NXT_ERROR; 133} 134 135 136static nxt_int_t 137nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 138{ 139 u_char *v, *p; 140 nxt_int_t type; 141 nxt_array_t *inherited_sockets; 142 nxt_socket_t s; 143 nxt_listen_socket_t *ls; 144 145 v = (u_char *) getenv("NGINX"); 146 147 if (v == NULL) { 148 return nxt_runtime_systemd_listen_sockets(task, rt); 149 } 150 151 nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v); 152 153 inherited_sockets = nxt_array_create(rt->mem_pool, 154 1, sizeof(nxt_listen_socket_t)); 155 if (inherited_sockets == NULL) { 156 return NXT_ERROR; 157 } 158 159 rt->inherited_sockets = inherited_sockets; 160 161 for (p = v; *p != '\0'; p++) { 162 163 if (*p == ';') { 164 s = nxt_int_parse(v, p - v); 165 166 if (nxt_slow_path(s < 0)) { 167 nxt_log(task, NXT_LOG_CRIT, "invalid socket number " 168 "\"%s\" in NGINX environment variable, " 169 "ignoring the rest of the variable", v); 170 return NXT_ERROR; 171 } 172 173 v = p + 1; 174 175 ls = nxt_array_zero_add(inherited_sockets); 176 if (nxt_slow_path(ls == NULL)) { 177 return NXT_ERROR; 178 } 179 180 ls->socket = s; 181 182 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 183 if (nxt_slow_path(ls->sockaddr == NULL)) { 184 return NXT_ERROR; 185 } 186 187 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); 188 if (nxt_slow_path(type == -1)) { 189 return NXT_ERROR; 190 } 191 192 ls->sockaddr->type = (uint16_t) type; 193 } 194 } 195 196 return NXT_OK; 197} 198 199 200static nxt_int_t 201nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 202{ 203 u_char *nfd, *pid; 204 nxt_int_t n; 205 nxt_array_t *inherited_sockets; 206 nxt_socket_t s; 207 nxt_listen_socket_t *ls; 208 209 /* 210 * Number of listening sockets passed. The socket 211 * descriptors start from number 3 and are sequential. 212 */ 213 nfd = (u_char *) getenv("LISTEN_FDS"); 214 if (nfd == NULL) { 215 return NXT_OK; 216 } 217 218 /* The pid of the service process. */ 219 pid = (u_char *) getenv("LISTEN_PID"); 220 if (pid == NULL) { 221 return NXT_OK; 222 } 223 224 n = nxt_int_parse(nfd, nxt_strlen(nfd)); 225 if (n < 0) { 226 return NXT_OK; 227 } 228 229 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { 230 return NXT_OK; 231 } 232 233 nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n); 234 235 inherited_sockets = nxt_array_create(rt->mem_pool, 236 n, sizeof(nxt_listen_socket_t)); 237 if (inherited_sockets == NULL) { 238 return NXT_ERROR; 239 } 240 241 rt->inherited_sockets = inherited_sockets; 242 243 for (s = 3; s < n; s++) { 244 ls = nxt_array_zero_add(inherited_sockets); 245 if (nxt_slow_path(ls == NULL)) { 246 return NXT_ERROR; 247 } 248 249 ls->socket = s; 250 251 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 252 if (nxt_slow_path(ls->sockaddr == NULL)) { 253 return NXT_ERROR; 254 } 255 256 ls->sockaddr->type = SOCK_STREAM; 257 } 258 259 return NXT_OK; 260} 261 262 263static nxt_int_t 264nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) 265{ 266 nxt_thread_t *thread; 267 nxt_event_engine_t *engine; 268 const nxt_event_interface_t *interface; 269 270 interface = nxt_service_get(rt->services, "engine", NULL); 271 272 if (nxt_slow_path(interface == NULL)) { 273 /* TODO: log */ 274 return NXT_ERROR; 275 } 276 277 engine = nxt_event_engine_create(task, interface, 278 nxt_master_process_signals, 0, 0); 279 280 if (nxt_slow_path(engine == NULL)) { 281 return NXT_ERROR; 282 } 283 284 thread = task->thread; 285 thread->engine = engine; 286 thread->fiber = &engine->fibers->fiber; 287 288 engine->id = rt->last_engine_id++; 289 290 nxt_queue_init(&rt->engines); 291 nxt_queue_insert_tail(&rt->engines, &engine->link); 292 293 return NXT_OK; 294} 295 296 297static nxt_int_t 298nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) 299{ 300#if (NXT_THREADS) 301 nxt_int_t ret; 302 nxt_array_t *thread_pools; 303 304 thread_pools = nxt_array_create(rt->mem_pool, 1, 305 sizeof(nxt_thread_pool_t *)); 306 307 if (nxt_slow_path(thread_pools == NULL)) { 308 return NXT_ERROR; 309 } 310 311 rt->thread_pools = thread_pools; 312 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); 313 314 if (nxt_slow_path(ret != NXT_OK)) { 315 return NXT_ERROR; 316 } 317 318#endif 319 320 return NXT_OK; 321} 322 323 324static void 325nxt_runtime_start(nxt_task_t *task, void *obj, void *data) 326{ 327 nxt_uint_t i; 328 nxt_runtime_t *rt; 329 330 rt = obj; 331 332 nxt_debug(task, "rt conf done"); 333 334 task->thread->log->ctx_handler = NULL; 335 task->thread->log->ctx = NULL; 336 337 if (nxt_runtime_conf_init(task, rt) != NXT_OK) { 338 goto fail; 339 } 340 341 for (i = 0; i < nxt_init_modules_n; i++) { 342 if (nxt_init_modules[i](task->thread, rt) != NXT_OK) { 343 goto fail; 344 } 345 } 346 347 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { 348 goto fail; 349 } 350 351 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 352 goto fail; 353 } 354 355#if (NXT_THREADS) 356 357 /* 358 * Thread pools should be destroyed before starting worker 359 * processes, because thread pool semaphores will stick in 360 * locked state in new processes after fork(). 361 */ 362 nxt_runtime_thread_pool_destroy(task, rt, rt->start); 363 364#else 365 366 rt->start(task->thread, rt); 367 368#endif 369 370 return; 371 372fail: 373 374 nxt_runtime_quit(task); 375} 376 377 378static void 379nxt_runtime_initial_start(nxt_task_t *task) 380{ 381 nxt_int_t ret; 382 nxt_thread_t *thr; 383 nxt_runtime_t *rt; 384 const nxt_event_interface_t *interface; 385 386 thr = task->thread; 387 rt = thr->runtime; 388 389 if (rt->inherited_sockets == NULL && rt->daemon) { 390 391 if (nxt_process_daemon(task) != NXT_OK) { 392 goto fail; 393 } 394 395 /* 396 * An event engine should be updated after fork() 397 * even if an event facility was not changed because: 398 * 1) inherited kqueue descriptor is invalid, 399 * 2) the signal thread is not inherited. 400 */ 401 interface = nxt_service_get(rt->services, "engine", rt->engine); 402 if (interface == NULL) { 403 goto fail; 404 } 405 406 ret = nxt_event_engine_change(task->thread->engine, interface, 407 rt->batch); 408 if (ret != NXT_OK) { 409 goto fail; 410 } 411 } 412 413 ret = nxt_runtime_pid_file_create(task, rt->pid_file); 414 if (ret != NXT_OK) { 415 goto fail; 416 } 417 418 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 419 goto fail; 420 } 421 422 thr->engine->max_connections = rt->engine_connections; 423 424 if (rt->master_process) { 425 if (nxt_master_process_start(thr, task, rt) != NXT_ERROR) { 426 return; 427 } 428 429 } else { 430 nxt_single_process_start(thr, task, rt); 431 return; 432 } 433 434fail: 435 436 nxt_runtime_quit(task); 437} 438 439 440static void 441nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt) 442{ 443#if (NXT_THREADS) 444 nxt_int_t ret; 445 446 ret = nxt_runtime_thread_pool_create(thr, rt, rt->auxiliary_threads, 447 60000 * 1000000LL); 448 449 if (nxt_slow_path(ret != NXT_OK)) { 450 nxt_runtime_quit(task); 451 return; 452 } 453 454#endif 455 456 rt->types |= (1U << NXT_PROCESS_SINGLE); 457 458 nxt_runtime_listen_sockets_enable(task, rt); 459 460 return; 461} 462 463 464void 465nxt_runtime_quit(nxt_task_t *task) 466{ 467 nxt_bool_t done; 468 nxt_runtime_t *rt; 469 nxt_event_engine_t *engine; 470 471 rt = task->thread->runtime; 472 engine = task->thread->engine; 473 474 nxt_debug(task, "exiting"); 475 476 done = 1; 477 478 if (!engine->shutdown) { 479 engine->shutdown = 1; 480 481#if (NXT_THREADS) 482 483 if (!nxt_array_is_empty(rt->thread_pools)) { 484 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); 485 done = 0; 486 } 487 488#endif 489 490 if (nxt_runtime_is_master(rt)) { 491 nxt_master_stop_worker_processes(task, rt); 492 done = 0; 493 } 494 } 495 496 nxt_runtime_close_idle_connections(engine); 497 498 if (done) { 499 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, 500 task, rt, engine); 501 } 502} 503 504 505static void 506nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) 507{ 508 nxt_conn_t *c; 509 nxt_queue_t *idle; 510 nxt_queue_link_t *link, *next; 511 512 nxt_debug(&engine->task, "close idle connections"); 513 514 idle = &engine->idle_connections; 515 516 for (link = nxt_queue_head(idle); 517 link != nxt_queue_tail(idle); 518 link = next) 519 { 520 next = nxt_queue_next(link); 521 c = nxt_queue_link_data(link, nxt_conn_t, link); 522 523 if (!c->socket.read_ready) { 524 nxt_queue_remove(link); 525 nxt_conn_close(engine, c); 526 } 527 } 528} 529 530 531static void 532nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) 533{ 534 nxt_runtime_t *rt; 535 nxt_process_t *process; 536 nxt_event_engine_t *engine; 537 538 rt = obj; 539 engine = data; 540 541#if (NXT_THREADS) 542 543 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); 544 545 if (!nxt_array_is_empty(rt->thread_pools)) { 546 return; 547 } 548 549#endif 550 551 if (nxt_runtime_is_master(rt)) { 552 if (rt->pid_file != NULL) { 553 nxt_file_delete(rt->pid_file); 554 } 555 } 556 557 if (!engine->event.signal_support) { 558 nxt_event_engine_signals_stop(engine); 559 } 560 561 nxt_runtime_process_each(rt, process) { 562 563 nxt_runtime_process_remove(rt, process); 564 565 } nxt_runtime_process_loop; 566 567 nxt_thread_mutex_destroy(&rt->processes_mutex); 568 569 nxt_mp_destroy(rt->mem_pool); 570 571 nxt_debug(task, "exit"); 572 573 exit(0); 574 nxt_unreachable(); 575} 576 577 578static nxt_int_t 579nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) 580{ 581 nxt_event_engine_t *engine; 582 const nxt_event_interface_t *interface; 583 584 engine = task->thread->engine; 585 586 if (engine->batch == rt->batch 587 && nxt_strcmp(engine->event.name, rt->engine) == 0) 588 { 589 return NXT_OK; 590 } 591 592 interface = nxt_service_get(rt->services, "engine", rt->engine); 593 594 if (interface != NULL) { 595 return nxt_event_engine_change(engine, interface, rt->batch); 596 } 597 598 return NXT_ERROR; 599} 600 601 602void 603nxt_runtime_event_engine_free(nxt_runtime_t *rt) 604{ 605 nxt_queue_link_t *link; 606 nxt_event_engine_t *engine; 607 608 link = nxt_queue_first(&rt->engines); 609 nxt_queue_remove(link); 610 611 engine = nxt_queue_link_data(link, nxt_event_engine_t, link); 612 nxt_event_engine_free(engine); 613} 614 615 616#if (NXT_THREADS) 617 618static void nxt_runtime_thread_pool_init(void); 619static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, 620 void *data); 621 622 623nxt_int_t 624nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, 625 nxt_uint_t max_threads, nxt_nsec_t timeout) 626{ 627 nxt_thread_pool_t *thread_pool, **tp; 628 629 tp = nxt_array_add(rt->thread_pools); 630 if (tp == NULL) { 631 return NXT_ERROR; 632 } 633 634 thread_pool = nxt_thread_pool_create(max_threads, timeout, 635 nxt_runtime_thread_pool_init, 636 thr->engine, 637 nxt_runtime_thread_pool_exit); 638 639 if (nxt_fast_path(thread_pool != NULL)) { 640 *tp = thread_pool; 641 } 642 643 return NXT_OK; 644} 645 646 647static void 648nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 649 nxt_runtime_cont_t cont) 650{ 651 nxt_uint_t n; 652 nxt_thread_pool_t **tp; 653 654 rt->continuation = cont; 655 656 n = rt->thread_pools->nelts; 657 658 if (n == 0) { 659 cont(task); 660 return; 661 } 662 663 tp = rt->thread_pools->elts; 664 665 do { 666 nxt_thread_pool_destroy(*tp); 667 668 tp++; 669 n--; 670 } while (n != 0); 671} 672 673 674static void 675nxt_runtime_thread_pool_init(void) 676{ 677#if (NXT_REGEX) 678 nxt_regex_init(0); 679#endif 680} 681 682 683static void 684nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 685{ 686 nxt_uint_t i, n; 687 nxt_runtime_t *rt; 688 nxt_thread_pool_t *tp, **thread_pools; 689 nxt_thread_handle_t handle; 690 691 tp = obj; 692 693 if (data != NULL) { 694 handle = (nxt_thread_handle_t) (uintptr_t) data; 695 nxt_thread_wait(handle); 696 } 697 698 rt = task->thread->runtime; 699 700 thread_pools = rt->thread_pools->elts; 701 n = rt->thread_pools->nelts; 702 703 nxt_debug(task, "thread pools: %ui", n); 704 705 for (i = 0; i < n; i++) { 706 707 if (tp == thread_pools[i]) { 708 nxt_array_remove(rt->thread_pools, &thread_pools[i]); 709 710 if (n == 1) { 711 /* The last thread pool. */ 712 rt->continuation(task); 713 } 714 715 return; 716 } 717 } 718} 719 720#endif 721 722 723static nxt_int_t 724nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) 725{ 726 nxt_int_t ret; 727 nxt_str_t *prefix; 728 nxt_file_t *file; 729 nxt_file_name_str_t file_name; 730 const nxt_event_interface_t *interface; 731 732 rt->daemon = 1; 733 rt->master_process = 1; 734 rt->engine_connections = 256; 735 rt->worker_processes = 1; 736 rt->auxiliary_threads = 2; 737 rt->user_cred.user = "nobody"; 738 rt->group = NULL; 739 rt->pid = "nginext.pid"; 740 rt->error_log = "error.log"; 741 742 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { 743 return NXT_ERROR; 744 } 745 746 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { 747 return NXT_ERROR; 748 } 749 750 if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) { 751 return NXT_ERROR; 752 } 753 754 /* An engine's parameters. */ 755 756 interface = nxt_service_get(rt->services, "engine", rt->engine); 757 if (interface == NULL) { 758 return NXT_ERROR; 759 } 760 761 rt->engine = interface->name; 762 763 prefix = nxt_file_name_is_absolute(rt->pid) ? NULL : rt->prefix; 764 765 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z", 766 prefix, rt->pid); 767 if (nxt_slow_path(ret != NXT_OK)) { 768 return NXT_ERROR; 769 } 770 771 rt->pid_file = file_name.start; 772 773 prefix = nxt_file_name_is_absolute(rt->error_log) ? NULL : rt->prefix; 774 775 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z", 776 prefix, rt->error_log); 777 if (nxt_slow_path(ret != NXT_OK)) { 778 return NXT_ERROR; 779 } 780 781 file = nxt_list_first(rt->log_files); 782 file->name = file_name.start; 783 784 return NXT_OK; 785} 786 787 788static nxt_int_t 789nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) 790{ 791 char *p, **argv; 792 nxt_int_t n; 793 nxt_str_t addr; 794 nxt_sockaddr_t *sa; 795 796 argv = nxt_process_argv; 797 798 while (*argv != NULL) { 799 p = *argv++; 800 801 if (nxt_strcmp(p, "--listen") == 0) { 802 if (*argv == NULL) { 803 nxt_log(task, NXT_LOG_CRIT, 804 "no argument for option \"--listen\""); 805 return NXT_ERROR; 806 } 807 808 p = *argv++; 809 810 addr.length = nxt_strlen(p); 811 addr.start = (u_char *) p; 812 813 sa = nxt_runtime_sockaddr_parse(task, rt->mem_pool, &addr); 814 815 if (sa == NULL) { 816 return NXT_ERROR; 817 } 818 819 rt->controller_listen = sa; 820 821 continue; 822 } 823 824 if (nxt_strcmp(p, "--upstream") == 0) { 825 if (*argv == NULL) { 826 nxt_log(task, NXT_LOG_CRIT, 827 "no argument for option \"--upstream\""); 828 return NXT_ERROR; 829 } 830 831 p = *argv++; 832 833 rt->upstream.length = nxt_strlen(p); 834 rt->upstream.start = (u_char *) p; 835 836 continue; 837 } 838 839 if (nxt_strcmp(p, "--workers") == 0) { 840 if (*argv == NULL) { 841 nxt_log(task, NXT_LOG_CRIT, 842 "no argument for option \"--workers\""); 843 return NXT_ERROR; 844 } 845 846 p = *argv++; 847 n = nxt_int_parse((u_char *) p, nxt_strlen(p)); 848 849 if (n < 1) { 850 nxt_log(task, NXT_LOG_CRIT, 851 "invalid number of workers: \"%s\"", p); 852 return NXT_ERROR; 853 } 854 855 rt->worker_processes = n; 856 857 continue; 858 } 859 860 if (nxt_strcmp(p, "--user") == 0) { 861 if (*argv == NULL) { 862 nxt_log(task, NXT_LOG_CRIT, 863 "no argument for option \"--user\""); 864 return NXT_ERROR; 865 } 866 867 p = *argv++; 868 869 rt->user_cred.user = p; 870 871 continue; 872 } 873 874 if (nxt_strcmp(p, "--group") == 0) { 875 if (*argv == NULL) { 876 nxt_log(task, NXT_LOG_CRIT, 877 "no argument for option \"--group\""); 878 return NXT_ERROR; 879 } 880 881 p = *argv++; 882 883 rt->group = p; 884 885 continue; 886 } 887 888 if (nxt_strcmp(p, "--pid") == 0) { 889 if (*argv == NULL) { 890 nxt_log(task, NXT_LOG_CRIT, 891 "no argument for option \"--pid\""); 892 return NXT_ERROR; 893 } 894 895 p = *argv++; 896 897 rt->pid = p; 898 899 continue; 900 } 901 902 if (nxt_strcmp(p, "--log") == 0) { 903 if (*argv == NULL) { 904 nxt_log(task, NXT_LOG_CRIT, 905 "no argument for option \"--log\""); 906 return NXT_ERROR; 907 } 908 909 p = *argv++; 910 911 rt->error_log = p; 912 913 continue; 914 } 915 916 if (nxt_strcmp(p, "--no-daemonize") == 0) { 917 rt->daemon = 0; 918 continue; 919 } 920 } 921 922 return NXT_OK; 923} 924 925 926static nxt_sockaddr_t * 927nxt_runtime_sockaddr_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 928{ 929 u_char *p; 930 size_t length; 931 932 length = addr->length; 933 p = addr->start; 934 935 if (length >= 5 && nxt_memcmp(p, "unix:", 5) == 0) { 936 return nxt_runtime_sockaddr_unix_parse(task, mp, addr); 937 } 938 939 if (length != 0 && *p == '[') { 940 return nxt_runtime_sockaddr_inet6_parse(task, mp, addr); 941 } 942 943 return nxt_runtime_sockaddr_inet_parse(task, mp, addr); 944} 945 946 947static nxt_sockaddr_t * 948nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 949{ 950#if (NXT_HAVE_UNIX_DOMAIN) 951 u_char *p; 952 size_t length, socklen; 953 nxt_sockaddr_t *sa; 954 955 /* 956 * Actual sockaddr_un length can be lesser or even larger than defined
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_main.h> 9#include <nxt_runtime.h> 10#include <nxt_port.h> 11#include <nxt_master_process.h> 12#include <nxt_router.h> 13 14 15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, 16 nxt_runtime_t *rt); 17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, 18 nxt_runtime_t *rt); 19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); 20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); 21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); 22static void nxt_runtime_initial_start(nxt_task_t *task); 23static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, 24 nxt_runtime_t *rt); 25static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); 26static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); 27static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, 28 nxt_runtime_t *rt); 29static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt); 30static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt); 31static nxt_sockaddr_t *nxt_runtime_sockaddr_parse(nxt_task_t *task, 32 nxt_mp_t *mp, nxt_str_t *addr); 33static nxt_sockaddr_t *nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, 34 nxt_mp_t *mp, nxt_str_t *addr); 35static nxt_sockaddr_t *nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, 36 nxt_mp_t *mp, nxt_str_t *addr); 37static nxt_sockaddr_t *nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, 38 nxt_mp_t *mp, nxt_str_t *addr); 39static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt); 40static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt); 41static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task, 42 nxt_runtime_t *rt); 43static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task, 44 nxt_file_name_t *pid_file); 45 46#if (NXT_THREADS) 47static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 48 nxt_runtime_cont_t cont); 49#endif 50 51static void nxt_runtime_process_destroy(nxt_runtime_t *rt, 52 nxt_process_t *process); 53static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt, 54 nxt_pid_t pid); 55 56 57nxt_int_t 58nxt_runtime_create(nxt_task_t *task) 59{ 60 nxt_mp_t *mp; 61 nxt_int_t ret; 62 nxt_array_t *listen_sockets; 63 nxt_runtime_t *rt; 64 65 mp = nxt_mp_create(1024, 128, 256, 32); 66 67 if (nxt_slow_path(mp == NULL)) { 68 return NXT_ERROR; 69 } 70 71 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t)); 72 if (nxt_slow_path(rt == NULL)) { 73 return NXT_ERROR; 74 } 75 76 task->thread->runtime = rt; 77 rt->mem_pool = mp; 78 79 nxt_thread_mutex_create(&rt->processes_mutex); 80 81 rt->prefix = nxt_current_directory(mp); 82 if (nxt_slow_path(rt->prefix == NULL)) { 83 goto fail; 84 } 85 86 rt->conf_prefix = rt->prefix; 87 88 rt->services = nxt_services_init(mp); 89 if (nxt_slow_path(rt->services == NULL)) { 90 goto fail; 91 } 92 93 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); 94 if (nxt_slow_path(listen_sockets == NULL)) { 95 goto fail; 96 } 97 98 rt->listen_sockets = listen_sockets; 99 100 ret = nxt_runtime_inherited_listen_sockets(task, rt); 101 if (nxt_slow_path(ret != NXT_OK)) { 102 goto fail; 103 } 104 105 if (nxt_runtime_hostname(task, rt) != NXT_OK) { 106 goto fail; 107 } 108 109 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { 110 goto fail; 111 } 112 113 if (nxt_runtime_event_engines(task, rt) != NXT_OK) { 114 goto fail; 115 } 116 117 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { 118 goto fail; 119 } 120 121 rt->start = nxt_runtime_initial_start; 122 123 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 124 nxt_runtime_start, task, rt, NULL); 125 126 return NXT_OK; 127 128fail: 129 130 nxt_mp_destroy(mp); 131 132 return NXT_ERROR; 133} 134 135 136static nxt_int_t 137nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 138{ 139 u_char *v, *p; 140 nxt_int_t type; 141 nxt_array_t *inherited_sockets; 142 nxt_socket_t s; 143 nxt_listen_socket_t *ls; 144 145 v = (u_char *) getenv("NGINX"); 146 147 if (v == NULL) { 148 return nxt_runtime_systemd_listen_sockets(task, rt); 149 } 150 151 nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v); 152 153 inherited_sockets = nxt_array_create(rt->mem_pool, 154 1, sizeof(nxt_listen_socket_t)); 155 if (inherited_sockets == NULL) { 156 return NXT_ERROR; 157 } 158 159 rt->inherited_sockets = inherited_sockets; 160 161 for (p = v; *p != '\0'; p++) { 162 163 if (*p == ';') { 164 s = nxt_int_parse(v, p - v); 165 166 if (nxt_slow_path(s < 0)) { 167 nxt_log(task, NXT_LOG_CRIT, "invalid socket number " 168 "\"%s\" in NGINX environment variable, " 169 "ignoring the rest of the variable", v); 170 return NXT_ERROR; 171 } 172 173 v = p + 1; 174 175 ls = nxt_array_zero_add(inherited_sockets); 176 if (nxt_slow_path(ls == NULL)) { 177 return NXT_ERROR; 178 } 179 180 ls->socket = s; 181 182 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 183 if (nxt_slow_path(ls->sockaddr == NULL)) { 184 return NXT_ERROR; 185 } 186 187 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); 188 if (nxt_slow_path(type == -1)) { 189 return NXT_ERROR; 190 } 191 192 ls->sockaddr->type = (uint16_t) type; 193 } 194 } 195 196 return NXT_OK; 197} 198 199 200static nxt_int_t 201nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 202{ 203 u_char *nfd, *pid; 204 nxt_int_t n; 205 nxt_array_t *inherited_sockets; 206 nxt_socket_t s; 207 nxt_listen_socket_t *ls; 208 209 /* 210 * Number of listening sockets passed. The socket 211 * descriptors start from number 3 and are sequential. 212 */ 213 nfd = (u_char *) getenv("LISTEN_FDS"); 214 if (nfd == NULL) { 215 return NXT_OK; 216 } 217 218 /* The pid of the service process. */ 219 pid = (u_char *) getenv("LISTEN_PID"); 220 if (pid == NULL) { 221 return NXT_OK; 222 } 223 224 n = nxt_int_parse(nfd, nxt_strlen(nfd)); 225 if (n < 0) { 226 return NXT_OK; 227 } 228 229 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { 230 return NXT_OK; 231 } 232 233 nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n); 234 235 inherited_sockets = nxt_array_create(rt->mem_pool, 236 n, sizeof(nxt_listen_socket_t)); 237 if (inherited_sockets == NULL) { 238 return NXT_ERROR; 239 } 240 241 rt->inherited_sockets = inherited_sockets; 242 243 for (s = 3; s < n; s++) { 244 ls = nxt_array_zero_add(inherited_sockets); 245 if (nxt_slow_path(ls == NULL)) { 246 return NXT_ERROR; 247 } 248 249 ls->socket = s; 250 251 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 252 if (nxt_slow_path(ls->sockaddr == NULL)) { 253 return NXT_ERROR; 254 } 255 256 ls->sockaddr->type = SOCK_STREAM; 257 } 258 259 return NXT_OK; 260} 261 262 263static nxt_int_t 264nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) 265{ 266 nxt_thread_t *thread; 267 nxt_event_engine_t *engine; 268 const nxt_event_interface_t *interface; 269 270 interface = nxt_service_get(rt->services, "engine", NULL); 271 272 if (nxt_slow_path(interface == NULL)) { 273 /* TODO: log */ 274 return NXT_ERROR; 275 } 276 277 engine = nxt_event_engine_create(task, interface, 278 nxt_master_process_signals, 0, 0); 279 280 if (nxt_slow_path(engine == NULL)) { 281 return NXT_ERROR; 282 } 283 284 thread = task->thread; 285 thread->engine = engine; 286 thread->fiber = &engine->fibers->fiber; 287 288 engine->id = rt->last_engine_id++; 289 290 nxt_queue_init(&rt->engines); 291 nxt_queue_insert_tail(&rt->engines, &engine->link); 292 293 return NXT_OK; 294} 295 296 297static nxt_int_t 298nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) 299{ 300#if (NXT_THREADS) 301 nxt_int_t ret; 302 nxt_array_t *thread_pools; 303 304 thread_pools = nxt_array_create(rt->mem_pool, 1, 305 sizeof(nxt_thread_pool_t *)); 306 307 if (nxt_slow_path(thread_pools == NULL)) { 308 return NXT_ERROR; 309 } 310 311 rt->thread_pools = thread_pools; 312 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); 313 314 if (nxt_slow_path(ret != NXT_OK)) { 315 return NXT_ERROR; 316 } 317 318#endif 319 320 return NXT_OK; 321} 322 323 324static void 325nxt_runtime_start(nxt_task_t *task, void *obj, void *data) 326{ 327 nxt_uint_t i; 328 nxt_runtime_t *rt; 329 330 rt = obj; 331 332 nxt_debug(task, "rt conf done"); 333 334 task->thread->log->ctx_handler = NULL; 335 task->thread->log->ctx = NULL; 336 337 if (nxt_runtime_conf_init(task, rt) != NXT_OK) { 338 goto fail; 339 } 340 341 for (i = 0; i < nxt_init_modules_n; i++) { 342 if (nxt_init_modules[i](task->thread, rt) != NXT_OK) { 343 goto fail; 344 } 345 } 346 347 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { 348 goto fail; 349 } 350 351 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 352 goto fail; 353 } 354 355#if (NXT_THREADS) 356 357 /* 358 * Thread pools should be destroyed before starting worker 359 * processes, because thread pool semaphores will stick in 360 * locked state in new processes after fork(). 361 */ 362 nxt_runtime_thread_pool_destroy(task, rt, rt->start); 363 364#else 365 366 rt->start(task->thread, rt); 367 368#endif 369 370 return; 371 372fail: 373 374 nxt_runtime_quit(task); 375} 376 377 378static void 379nxt_runtime_initial_start(nxt_task_t *task) 380{ 381 nxt_int_t ret; 382 nxt_thread_t *thr; 383 nxt_runtime_t *rt; 384 const nxt_event_interface_t *interface; 385 386 thr = task->thread; 387 rt = thr->runtime; 388 389 if (rt->inherited_sockets == NULL && rt->daemon) { 390 391 if (nxt_process_daemon(task) != NXT_OK) { 392 goto fail; 393 } 394 395 /* 396 * An event engine should be updated after fork() 397 * even if an event facility was not changed because: 398 * 1) inherited kqueue descriptor is invalid, 399 * 2) the signal thread is not inherited. 400 */ 401 interface = nxt_service_get(rt->services, "engine", rt->engine); 402 if (interface == NULL) { 403 goto fail; 404 } 405 406 ret = nxt_event_engine_change(task->thread->engine, interface, 407 rt->batch); 408 if (ret != NXT_OK) { 409 goto fail; 410 } 411 } 412 413 ret = nxt_runtime_pid_file_create(task, rt->pid_file); 414 if (ret != NXT_OK) { 415 goto fail; 416 } 417 418 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 419 goto fail; 420 } 421 422 thr->engine->max_connections = rt->engine_connections; 423 424 if (rt->master_process) { 425 if (nxt_master_process_start(thr, task, rt) != NXT_ERROR) { 426 return; 427 } 428 429 } else { 430 nxt_single_process_start(thr, task, rt); 431 return; 432 } 433 434fail: 435 436 nxt_runtime_quit(task); 437} 438 439 440static void 441nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt) 442{ 443#if (NXT_THREADS) 444 nxt_int_t ret; 445 446 ret = nxt_runtime_thread_pool_create(thr, rt, rt->auxiliary_threads, 447 60000 * 1000000LL); 448 449 if (nxt_slow_path(ret != NXT_OK)) { 450 nxt_runtime_quit(task); 451 return; 452 } 453 454#endif 455 456 rt->types |= (1U << NXT_PROCESS_SINGLE); 457 458 nxt_runtime_listen_sockets_enable(task, rt); 459 460 return; 461} 462 463 464void 465nxt_runtime_quit(nxt_task_t *task) 466{ 467 nxt_bool_t done; 468 nxt_runtime_t *rt; 469 nxt_event_engine_t *engine; 470 471 rt = task->thread->runtime; 472 engine = task->thread->engine; 473 474 nxt_debug(task, "exiting"); 475 476 done = 1; 477 478 if (!engine->shutdown) { 479 engine->shutdown = 1; 480 481#if (NXT_THREADS) 482 483 if (!nxt_array_is_empty(rt->thread_pools)) { 484 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); 485 done = 0; 486 } 487 488#endif 489 490 if (nxt_runtime_is_master(rt)) { 491 nxt_master_stop_worker_processes(task, rt); 492 done = 0; 493 } 494 } 495 496 nxt_runtime_close_idle_connections(engine); 497 498 if (done) { 499 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, 500 task, rt, engine); 501 } 502} 503 504 505static void 506nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) 507{ 508 nxt_conn_t *c; 509 nxt_queue_t *idle; 510 nxt_queue_link_t *link, *next; 511 512 nxt_debug(&engine->task, "close idle connections"); 513 514 idle = &engine->idle_connections; 515 516 for (link = nxt_queue_head(idle); 517 link != nxt_queue_tail(idle); 518 link = next) 519 { 520 next = nxt_queue_next(link); 521 c = nxt_queue_link_data(link, nxt_conn_t, link); 522 523 if (!c->socket.read_ready) { 524 nxt_queue_remove(link); 525 nxt_conn_close(engine, c); 526 } 527 } 528} 529 530 531static void 532nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) 533{ 534 nxt_runtime_t *rt; 535 nxt_process_t *process; 536 nxt_event_engine_t *engine; 537 538 rt = obj; 539 engine = data; 540 541#if (NXT_THREADS) 542 543 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); 544 545 if (!nxt_array_is_empty(rt->thread_pools)) { 546 return; 547 } 548 549#endif 550 551 if (nxt_runtime_is_master(rt)) { 552 if (rt->pid_file != NULL) { 553 nxt_file_delete(rt->pid_file); 554 } 555 } 556 557 if (!engine->event.signal_support) { 558 nxt_event_engine_signals_stop(engine); 559 } 560 561 nxt_runtime_process_each(rt, process) { 562 563 nxt_runtime_process_remove(rt, process); 564 565 } nxt_runtime_process_loop; 566 567 nxt_thread_mutex_destroy(&rt->processes_mutex); 568 569 nxt_mp_destroy(rt->mem_pool); 570 571 nxt_debug(task, "exit"); 572 573 exit(0); 574 nxt_unreachable(); 575} 576 577 578static nxt_int_t 579nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) 580{ 581 nxt_event_engine_t *engine; 582 const nxt_event_interface_t *interface; 583 584 engine = task->thread->engine; 585 586 if (engine->batch == rt->batch 587 && nxt_strcmp(engine->event.name, rt->engine) == 0) 588 { 589 return NXT_OK; 590 } 591 592 interface = nxt_service_get(rt->services, "engine", rt->engine); 593 594 if (interface != NULL) { 595 return nxt_event_engine_change(engine, interface, rt->batch); 596 } 597 598 return NXT_ERROR; 599} 600 601 602void 603nxt_runtime_event_engine_free(nxt_runtime_t *rt) 604{ 605 nxt_queue_link_t *link; 606 nxt_event_engine_t *engine; 607 608 link = nxt_queue_first(&rt->engines); 609 nxt_queue_remove(link); 610 611 engine = nxt_queue_link_data(link, nxt_event_engine_t, link); 612 nxt_event_engine_free(engine); 613} 614 615 616#if (NXT_THREADS) 617 618static void nxt_runtime_thread_pool_init(void); 619static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, 620 void *data); 621 622 623nxt_int_t 624nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, 625 nxt_uint_t max_threads, nxt_nsec_t timeout) 626{ 627 nxt_thread_pool_t *thread_pool, **tp; 628 629 tp = nxt_array_add(rt->thread_pools); 630 if (tp == NULL) { 631 return NXT_ERROR; 632 } 633 634 thread_pool = nxt_thread_pool_create(max_threads, timeout, 635 nxt_runtime_thread_pool_init, 636 thr->engine, 637 nxt_runtime_thread_pool_exit); 638 639 if (nxt_fast_path(thread_pool != NULL)) { 640 *tp = thread_pool; 641 } 642 643 return NXT_OK; 644} 645 646 647static void 648nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 649 nxt_runtime_cont_t cont) 650{ 651 nxt_uint_t n; 652 nxt_thread_pool_t **tp; 653 654 rt->continuation = cont; 655 656 n = rt->thread_pools->nelts; 657 658 if (n == 0) { 659 cont(task); 660 return; 661 } 662 663 tp = rt->thread_pools->elts; 664 665 do { 666 nxt_thread_pool_destroy(*tp); 667 668 tp++; 669 n--; 670 } while (n != 0); 671} 672 673 674static void 675nxt_runtime_thread_pool_init(void) 676{ 677#if (NXT_REGEX) 678 nxt_regex_init(0); 679#endif 680} 681 682 683static void 684nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 685{ 686 nxt_uint_t i, n; 687 nxt_runtime_t *rt; 688 nxt_thread_pool_t *tp, **thread_pools; 689 nxt_thread_handle_t handle; 690 691 tp = obj; 692 693 if (data != NULL) { 694 handle = (nxt_thread_handle_t) (uintptr_t) data; 695 nxt_thread_wait(handle); 696 } 697 698 rt = task->thread->runtime; 699 700 thread_pools = rt->thread_pools->elts; 701 n = rt->thread_pools->nelts; 702 703 nxt_debug(task, "thread pools: %ui", n); 704 705 for (i = 0; i < n; i++) { 706 707 if (tp == thread_pools[i]) { 708 nxt_array_remove(rt->thread_pools, &thread_pools[i]); 709 710 if (n == 1) { 711 /* The last thread pool. */ 712 rt->continuation(task); 713 } 714 715 return; 716 } 717 } 718} 719 720#endif 721 722 723static nxt_int_t 724nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) 725{ 726 nxt_int_t ret; 727 nxt_str_t *prefix; 728 nxt_file_t *file; 729 nxt_file_name_str_t file_name; 730 const nxt_event_interface_t *interface; 731 732 rt->daemon = 1; 733 rt->master_process = 1; 734 rt->engine_connections = 256; 735 rt->worker_processes = 1; 736 rt->auxiliary_threads = 2; 737 rt->user_cred.user = "nobody"; 738 rt->group = NULL; 739 rt->pid = "nginext.pid"; 740 rt->error_log = "error.log"; 741 742 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { 743 return NXT_ERROR; 744 } 745 746 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { 747 return NXT_ERROR; 748 } 749 750 if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) { 751 return NXT_ERROR; 752 } 753 754 /* An engine's parameters. */ 755 756 interface = nxt_service_get(rt->services, "engine", rt->engine); 757 if (interface == NULL) { 758 return NXT_ERROR; 759 } 760 761 rt->engine = interface->name; 762 763 prefix = nxt_file_name_is_absolute(rt->pid) ? NULL : rt->prefix; 764 765 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z", 766 prefix, rt->pid); 767 if (nxt_slow_path(ret != NXT_OK)) { 768 return NXT_ERROR; 769 } 770 771 rt->pid_file = file_name.start; 772 773 prefix = nxt_file_name_is_absolute(rt->error_log) ? NULL : rt->prefix; 774 775 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z", 776 prefix, rt->error_log); 777 if (nxt_slow_path(ret != NXT_OK)) { 778 return NXT_ERROR; 779 } 780 781 file = nxt_list_first(rt->log_files); 782 file->name = file_name.start; 783 784 return NXT_OK; 785} 786 787 788static nxt_int_t 789nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) 790{ 791 char *p, **argv; 792 nxt_int_t n; 793 nxt_str_t addr; 794 nxt_sockaddr_t *sa; 795 796 argv = nxt_process_argv; 797 798 while (*argv != NULL) { 799 p = *argv++; 800 801 if (nxt_strcmp(p, "--listen") == 0) { 802 if (*argv == NULL) { 803 nxt_log(task, NXT_LOG_CRIT, 804 "no argument for option \"--listen\""); 805 return NXT_ERROR; 806 } 807 808 p = *argv++; 809 810 addr.length = nxt_strlen(p); 811 addr.start = (u_char *) p; 812 813 sa = nxt_runtime_sockaddr_parse(task, rt->mem_pool, &addr); 814 815 if (sa == NULL) { 816 return NXT_ERROR; 817 } 818 819 rt->controller_listen = sa; 820 821 continue; 822 } 823 824 if (nxt_strcmp(p, "--upstream") == 0) { 825 if (*argv == NULL) { 826 nxt_log(task, NXT_LOG_CRIT, 827 "no argument for option \"--upstream\""); 828 return NXT_ERROR; 829 } 830 831 p = *argv++; 832 833 rt->upstream.length = nxt_strlen(p); 834 rt->upstream.start = (u_char *) p; 835 836 continue; 837 } 838 839 if (nxt_strcmp(p, "--workers") == 0) { 840 if (*argv == NULL) { 841 nxt_log(task, NXT_LOG_CRIT, 842 "no argument for option \"--workers\""); 843 return NXT_ERROR; 844 } 845 846 p = *argv++; 847 n = nxt_int_parse((u_char *) p, nxt_strlen(p)); 848 849 if (n < 1) { 850 nxt_log(task, NXT_LOG_CRIT, 851 "invalid number of workers: \"%s\"", p); 852 return NXT_ERROR; 853 } 854 855 rt->worker_processes = n; 856 857 continue; 858 } 859 860 if (nxt_strcmp(p, "--user") == 0) { 861 if (*argv == NULL) { 862 nxt_log(task, NXT_LOG_CRIT, 863 "no argument for option \"--user\""); 864 return NXT_ERROR; 865 } 866 867 p = *argv++; 868 869 rt->user_cred.user = p; 870 871 continue; 872 } 873 874 if (nxt_strcmp(p, "--group") == 0) { 875 if (*argv == NULL) { 876 nxt_log(task, NXT_LOG_CRIT, 877 "no argument for option \"--group\""); 878 return NXT_ERROR; 879 } 880 881 p = *argv++; 882 883 rt->group = p; 884 885 continue; 886 } 887 888 if (nxt_strcmp(p, "--pid") == 0) { 889 if (*argv == NULL) { 890 nxt_log(task, NXT_LOG_CRIT, 891 "no argument for option \"--pid\""); 892 return NXT_ERROR; 893 } 894 895 p = *argv++; 896 897 rt->pid = p; 898 899 continue; 900 } 901 902 if (nxt_strcmp(p, "--log") == 0) { 903 if (*argv == NULL) { 904 nxt_log(task, NXT_LOG_CRIT, 905 "no argument for option \"--log\""); 906 return NXT_ERROR; 907 } 908 909 p = *argv++; 910 911 rt->error_log = p; 912 913 continue; 914 } 915 916 if (nxt_strcmp(p, "--no-daemonize") == 0) { 917 rt->daemon = 0; 918 continue; 919 } 920 } 921 922 return NXT_OK; 923} 924 925 926static nxt_sockaddr_t * 927nxt_runtime_sockaddr_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 928{ 929 u_char *p; 930 size_t length; 931 932 length = addr->length; 933 p = addr->start; 934 935 if (length >= 5 && nxt_memcmp(p, "unix:", 5) == 0) { 936 return nxt_runtime_sockaddr_unix_parse(task, mp, addr); 937 } 938 939 if (length != 0 && *p == '[') { 940 return nxt_runtime_sockaddr_inet6_parse(task, mp, addr); 941 } 942 943 return nxt_runtime_sockaddr_inet_parse(task, mp, addr); 944} 945 946 947static nxt_sockaddr_t * 948nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 949{ 950#if (NXT_HAVE_UNIX_DOMAIN) 951 u_char *p; 952 size_t length, socklen; 953 nxt_sockaddr_t *sa; 954 955 /* 956 * Actual sockaddr_un length can be lesser or even larger than defined
|
958 * limit maximum Unix domain socket address length by defined sun_path[] 959 * length because some OSes accept addresses twice larger than defined 960 * struct sockaddr_un. Also reserve space for a trailing zero to avoid 961 * ambiguity, since many OSes accept Unix domain socket addresses 962 * without a trailing zero. 963 */ 964 const size_t max_len = sizeof(struct sockaddr_un) 965 - offsetof(struct sockaddr_un, sun_path) - 1; 966 967 /* cutting "unix:" */ 968 length = addr->length - 5; 969 p = addr->start + 5; 970 971 if (length == 0) { 972 nxt_log(task, NXT_LOG_CRIT, 973 "unix domain socket \"%V\" name is invalid", addr); 974 return NULL; 975 } 976 977 if (length > max_len) { 978 nxt_log(task, NXT_LOG_CRIT, 979 "unix domain socket \"%V\" name is too long", addr); 980 return NULL; 981 } 982 983 socklen = offsetof(struct sockaddr_un, sun_path) + length + 1; 984 985#if (NXT_LINUX) 986 987 /* 988 * Linux unix(7): 989 * 990 * abstract: an abstract socket address is distinguished by the fact 991 * that sun_path[0] is a null byte ('\0'). The socket's address in 992 * this namespace is given by the additional bytes in sun_path that 993 * are covered by the specified length of the address structure. 994 * (Null bytes in the name have no special significance.) 995 */ 996 if (p[0] == '@') { 997 p[0] = '\0'; 998 socklen--; 999 } 1000 1001#endif 1002 1003 sa = nxt_sockaddr_alloc(mp, socklen, addr->length); 1004 1005 if (nxt_slow_path(sa == NULL)) { 1006 return NULL; 1007 } 1008 1009 sa->type = SOCK_STREAM; 1010 1011 sa->u.sockaddr_un.sun_family = AF_UNIX; 1012 nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length); 1013 1014 return sa; 1015 1016#else /* !(NXT_HAVE_UNIX_DOMAIN) */ 1017 1018 nxt_log(task, NXT_LOG_CRIT, "unix domain socket \"%V\" is not supported", 1019 addr); 1020 1021 return NULL; 1022 1023#endif 1024} 1025 1026 1027static nxt_sockaddr_t * 1028nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, nxt_mp_t *mp, 1029 nxt_str_t *addr) 1030{ 1031#if (NXT_INET6) 1032 u_char *p, *addr_end; 1033 size_t length; 1034 nxt_int_t port; 1035 nxt_sockaddr_t *sa; 1036 struct in6_addr *in6_addr; 1037 1038 length = addr->length - 1; 1039 p = addr->start + 1; 1040 1041 addr_end = nxt_memchr(p, ']', length); 1042 1043 if (addr_end == NULL) { 1044 goto invalid_address; 1045 } 1046 1047 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6), 1048 NXT_INET6_ADDR_STR_LEN); 1049 if (nxt_slow_path(sa == NULL)) { 1050 return NULL; 1051 } 1052 1053 in6_addr = &sa->u.sockaddr_in6.sin6_addr; 1054 1055 if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) { 1056 goto invalid_address; 1057 } 1058 1059 port = 0; 1060 p = addr_end + 1; 1061 length = (p + length) - p; 1062 1063 if (length == 0) { 1064 goto found; 1065 } 1066 1067 if (*p == ':') { 1068 port = nxt_int_parse(p + 1, length - 1); 1069 1070 if (port >= 1 && port <= 65535) { 1071 goto found; 1072 } 1073 } 1074 1075 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr); 1076 1077 return NULL; 1078 1079found: 1080 1081 sa->type = SOCK_STREAM; 1082 1083 sa->u.sockaddr_in6.sin6_family = AF_INET6; 1084 sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port); 1085 1086 return sa; 1087 1088invalid_address: 1089 1090 nxt_log(task, NXT_LOG_CRIT, "invalid IPv6 address in \"%V\"", addr); 1091 1092 return NULL; 1093 1094#else 1095 1096 nxt_log(task, NXT_LOG_CRIT, "IPv6 socket \"%V\" is not supported", addr); 1097 1098 return NULL; 1099 1100#endif 1101} 1102 1103 1104static nxt_sockaddr_t * 1105nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, nxt_mp_t *mp, 1106 nxt_str_t *string) 1107{ 1108 u_char *p, *ip; 1109 size_t length; 1110 in_addr_t addr; 1111 nxt_int_t port; 1112 nxt_sockaddr_t *sa; 1113 1114 addr = INADDR_ANY; 1115 1116 length = string->length; 1117 ip = string->start; 1118 1119 p = nxt_memchr(ip, ':', length); 1120 1121 if (p == NULL) { 1122 1123 /* single value port, or address */ 1124 1125 port = nxt_int_parse(ip, length); 1126 1127 if (port > 0) { 1128 /* "*:XX" */ 1129 1130 if (port < 1 || port > 65535) { 1131 goto invalid_port; 1132 } 1133 1134 } else { 1135 /* "x.x.x.x" */ 1136 1137 addr = nxt_inet_addr(ip, length); 1138 1139 if (addr == INADDR_NONE) { 1140 goto invalid_port; 1141 } 1142 1143 port = 8080; 1144 } 1145 1146 } else { 1147 1148 /* x.x.x.x:XX */ 1149 1150 p++; 1151 length = (ip + length) - p; 1152 port = nxt_int_parse(p, length); 1153 1154 if (port < 1 || port > 65535) { 1155 goto invalid_port; 1156 } 1157 1158 length = (p - 1) - ip; 1159 1160 if (length != 1 || ip[0] != '*') { 1161 addr = nxt_inet_addr(ip, length); 1162 1163 if (addr == INADDR_NONE) { 1164 goto invalid_addr; 1165 } 1166 1167 /* "x.x.x.x:XX" */ 1168 } 1169 } 1170 1171 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in), 1172 NXT_INET_ADDR_STR_LEN); 1173 if (nxt_slow_path(sa == NULL)) { 1174 return NULL; 1175 } 1176 1177 sa->type = SOCK_STREAM; 1178 1179 sa->u.sockaddr_in.sin_family = AF_INET; 1180 sa->u.sockaddr_in.sin_port = htons((in_port_t) port); 1181 sa->u.sockaddr_in.sin_addr.s_addr = addr; 1182 1183 return sa; 1184 1185invalid_port: 1186 1187 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", string); 1188 1189 return NULL; 1190 1191invalid_addr: 1192 1193 nxt_log(task, NXT_LOG_CRIT, "invalid address in \"%V\"", string); 1194 1195 return NULL; 1196} 1197 1198 1199nxt_listen_socket_t * 1200nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) 1201{ 1202 nxt_mp_t *mp; 1203 nxt_listen_socket_t *ls; 1204 1205 ls = nxt_array_zero_add(rt->listen_sockets); 1206 if (ls == NULL) { 1207 return NULL; 1208 } 1209 1210 mp = rt->mem_pool; 1211 1212 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, 1213 sa->length); 1214 if (ls->sockaddr == NULL) { 1215 return NULL; 1216 } 1217 1218 ls->sockaddr->type = sa->type; 1219 1220 nxt_sockaddr_text(ls->sockaddr); 1221 1222 ls->socket = -1; 1223 ls->backlog = NXT_LISTEN_BACKLOG; 1224 1225 return ls; 1226} 1227 1228 1229static nxt_int_t 1230nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) 1231{ 1232 size_t length; 1233 char hostname[NXT_MAXHOSTNAMELEN + 1]; 1234 1235 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { 1236 nxt_log(task, NXT_LOG_CRIT, "gethostname() failed %E", nxt_errno); 1237 return NXT_ERROR; 1238 } 1239 1240 /* 1241 * Linux gethostname(2): 1242 * 1243 * If the null-terminated hostname is too large to fit, 1244 * then the name is truncated, and no error is returned. 1245 * 1246 * For this reason an additional byte is reserved in the buffer. 1247 */ 1248 hostname[NXT_MAXHOSTNAMELEN] = '\0'; 1249 1250 length = nxt_strlen(hostname); 1251 rt->hostname.length = length; 1252 1253 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length); 1254 1255 if (rt->hostname.start != NULL) { 1256 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); 1257 return NXT_OK; 1258 } 1259 1260 return NXT_ERROR; 1261} 1262 1263 1264static nxt_int_t 1265nxt_runtime_log_files_init(nxt_runtime_t *rt) 1266{ 1267 nxt_file_t *file; 1268 nxt_list_t *log_files; 1269 1270 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); 1271 1272 if (nxt_fast_path(log_files != NULL)) { 1273 rt->log_files = log_files; 1274 1275 /* Preallocate the main error_log. This allocation cannot fail. */ 1276 file = nxt_list_zero_add(log_files); 1277 1278 file->fd = NXT_FILE_INVALID; 1279 file->log_level = NXT_LOG_CRIT; 1280 1281 return NXT_OK; 1282 } 1283 1284 return NXT_ERROR; 1285} 1286 1287 1288nxt_file_t * 1289nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) 1290{ 1291 nxt_int_t ret; 1292 nxt_str_t *prefix; 1293 nxt_file_t *file; 1294 nxt_file_name_str_t file_name; 1295 1296 prefix = nxt_file_name_is_absolute(name->start) ? NULL : rt->prefix; 1297 1298 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%V%Z", 1299 prefix, name); 1300 1301 if (nxt_slow_path(ret != NXT_OK)) { 1302 return NULL; 1303 } 1304 1305 nxt_list_each(file, rt->log_files) { 1306 1307 /* STUB: hardecoded case sensitive/insensitive. */ 1308 1309 if (file->name != NULL 1310 && nxt_file_name_eq(file->name, file_name.start)) 1311 { 1312 return file; 1313 } 1314 1315 } nxt_list_loop; 1316 1317 file = nxt_list_zero_add(rt->log_files); 1318 1319 if (nxt_slow_path(file == NULL)) { 1320 return NULL; 1321 } 1322 1323 file->fd = NXT_FILE_INVALID; 1324 file->log_level = NXT_LOG_CRIT; 1325 file->name = file_name.start; 1326 1327 return file; 1328} 1329 1330 1331static nxt_int_t 1332nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) 1333{ 1334 nxt_int_t ret; 1335 nxt_file_t *file; 1336 1337 nxt_list_each(file, rt->log_files) { 1338 1339 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, 1340 NXT_FILE_OWNER_ACCESS); 1341 1342 if (ret != NXT_OK) { 1343 return NXT_ERROR; 1344 } 1345 1346 } nxt_list_loop; 1347 1348 file = nxt_list_first(rt->log_files); 1349 1350 return nxt_file_stderr(file); 1351} 1352 1353 1354nxt_int_t 1355nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) 1356{ 1357 nxt_int_t ret; 1358 nxt_uint_t c, p, ncurr, nprev; 1359 nxt_listen_socket_t *curr, *prev; 1360 1361 curr = rt->listen_sockets->elts; 1362 ncurr = rt->listen_sockets->nelts; 1363 1364 if (rt->inherited_sockets != NULL) { 1365 prev = rt->inherited_sockets->elts; 1366 nprev = rt->inherited_sockets->nelts; 1367 1368 } else { 1369 prev = NULL; 1370 nprev = 0; 1371 } 1372 1373 for (c = 0; c < ncurr; c++) { 1374 1375 for (p = 0; p < nprev; p++) { 1376 1377 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { 1378 1379 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); 1380 if (ret != NXT_OK) { 1381 return NXT_ERROR; 1382 } 1383 1384 goto next; 1385 } 1386 } 1387 1388 if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) { 1389 return NXT_ERROR; 1390 } 1391 1392 next: 1393 1394 continue; 1395 } 1396 1397 return NXT_OK; 1398} 1399 1400 1401nxt_int_t 1402nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) 1403{ 1404 nxt_uint_t i, n; 1405 nxt_listen_socket_t *ls; 1406 1407 ls = rt->listen_sockets->elts; 1408 n = rt->listen_sockets->nelts; 1409 1410 for (i = 0; i < n; i++) { 1411 if (ls[i].flags == NXT_NONBLOCK) { 1412 if (nxt_listen_event(task, &ls[i]) == NULL) { 1413 return NXT_ERROR; 1414 } 1415 } 1416 } 1417 1418 return NXT_OK; 1419} 1420 1421 1422nxt_str_t * 1423nxt_current_directory(nxt_mp_t *mp) 1424{ 1425 size_t length; 1426 u_char *p; 1427 nxt_str_t *name; 1428 char buf[NXT_MAX_PATH_LEN]; 1429 1430 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); 1431 1432 if (nxt_fast_path(length != 0)) { 1433 name = nxt_str_alloc(mp, length + 1); 1434 1435 if (nxt_fast_path(name != NULL)) { 1436 p = nxt_cpymem(name->start, buf, length); 1437 *p = '/'; 1438 1439 return name; 1440 } 1441 } 1442 1443 return NULL; 1444} 1445 1446 1447static nxt_int_t 1448nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) 1449{ 1450 ssize_t length; 1451 nxt_int_t n; 1452 nxt_file_t file; 1453 u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE]; 1454 1455 nxt_memzero(&file, sizeof(nxt_file_t)); 1456 1457 file.name = pid_file; 1458 1459 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, 1460 NXT_FILE_DEFAULT_ACCESS); 1461 1462 if (n != NXT_OK) { 1463 return NXT_ERROR; 1464 } 1465 1466 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; 1467 1468 if (nxt_file_write(&file, pid, length, 0) != length) { 1469 return NXT_ERROR; 1470 } 1471 1472 nxt_file_close(task, &file); 1473 1474 return NXT_OK; 1475} 1476 1477 1478nxt_process_t * 1479nxt_runtime_process_new(nxt_runtime_t *rt) 1480{ 1481 nxt_process_t *process; 1482 1483 /* TODO: memory failures. */ 1484 1485 process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t)); 1486 if (nxt_slow_path(process == NULL)) { 1487 return NULL; 1488 } 1489 1490 nxt_queue_init(&process->ports); 1491 1492 nxt_thread_mutex_create(&process->incoming_mutex); 1493 nxt_thread_mutex_create(&process->outgoing_mutex); 1494 nxt_thread_mutex_create(&process->cp_mutex); 1495 1496 return process; 1497} 1498 1499 1500static void 1501nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) 1502{ 1503 nxt_assert(process->port_cleanups == 0); 1504 nxt_assert(process->registered == 0); 1505 1506 nxt_port_mmaps_destroy(process->incoming, 1); 1507 nxt_port_mmaps_destroy(process->outgoing, 1); 1508 1509 if (process->cp_mem_pool != NULL) { 1510 nxt_mp_thread_adopt(process->cp_mem_pool); 1511 1512 nxt_mp_destroy(process->cp_mem_pool); 1513 } 1514 1515 nxt_thread_mutex_destroy(&process->incoming_mutex); 1516 nxt_thread_mutex_destroy(&process->outgoing_mutex); 1517 nxt_thread_mutex_destroy(&process->cp_mutex); 1518 1519 nxt_mp_free(rt->mem_pool, process); 1520} 1521 1522 1523static nxt_int_t 1524nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 1525{ 1526 nxt_process_t *process; 1527 1528 process = data; 1529 1530 if (lhq->key.length == sizeof(nxt_pid_t) && 1531 *(nxt_pid_t *) lhq->key.start == process->pid) { 1532 return NXT_OK; 1533 } 1534 1535 return NXT_DECLINED; 1536} 1537 1538static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 1539 NXT_LVLHSH_DEFAULT, 1540 nxt_runtime_lvlhsh_pid_test, 1541 nxt_lvlhsh_alloc, 1542 nxt_lvlhsh_free, 1543}; 1544 1545 1546nxt_inline void 1547nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) 1548{ 1549 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 1550 lhq->key.length = sizeof(*pid); 1551 lhq->key.start = (u_char *) pid; 1552 lhq->proto = &lvlhsh_processes_proto; 1553} 1554 1555 1556nxt_process_t * 1557nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) 1558{ 1559 nxt_process_t *process; 1560 nxt_lvlhsh_query_t lhq; 1561 1562 process = NULL; 1563 1564 nxt_runtime_process_lhq_pid(&lhq, &pid); 1565 1566 nxt_thread_mutex_lock(&rt->processes_mutex); 1567 1568 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1569 process = lhq.value; 1570 1571 } else { 1572 nxt_thread_log_debug("process %PI not found", pid); 1573 } 1574 1575 nxt_thread_mutex_unlock(&rt->processes_mutex); 1576 1577 return process; 1578} 1579 1580 1581nxt_process_t * 1582nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) 1583{ 1584 nxt_process_t *process; 1585 nxt_lvlhsh_query_t lhq; 1586 1587 nxt_runtime_process_lhq_pid(&lhq, &pid); 1588 1589 nxt_thread_mutex_lock(&rt->processes_mutex); 1590 1591 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1592 nxt_thread_log_debug("process %PI found", pid); 1593 1594 nxt_thread_mutex_unlock(&rt->processes_mutex); 1595 return lhq.value; 1596 } 1597 1598 process = nxt_runtime_process_new(rt); 1599 if (nxt_slow_path(process == NULL)) { 1600 return NULL; 1601 } 1602 1603 process->pid = pid; 1604 1605 lhq.replace = 0; 1606 lhq.value = process; 1607 lhq.pool = rt->mem_pool; 1608 1609 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1610 1611 case NXT_OK: 1612 if (rt->nprocesses == 0) { 1613 rt->mprocess = process; 1614 } 1615 1616 rt->nprocesses++; 1617 1618 process->registered = 1; 1619 1620 nxt_thread_log_debug("process %PI insert", pid); 1621 break; 1622 1623 default: 1624 nxt_thread_log_debug("process %PI insert failed", pid); 1625 break; 1626 } 1627 1628 nxt_thread_mutex_unlock(&rt->processes_mutex); 1629 1630 return process; 1631} 1632 1633 1634void 1635nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) 1636{ 1637 nxt_port_t *port; 1638 nxt_lvlhsh_query_t lhq; 1639 1640 nxt_assert(process->registered == 0); 1641 1642 nxt_runtime_process_lhq_pid(&lhq, &process->pid); 1643 1644 lhq.replace = 0; 1645 lhq.value = process; 1646 lhq.pool = rt->mem_pool; 1647 1648 nxt_thread_mutex_lock(&rt->processes_mutex); 1649 1650 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1651 1652 case NXT_OK: 1653 if (rt->nprocesses == 0) { 1654 rt->mprocess = process; 1655 } 1656 1657 rt->nprocesses++; 1658 1659 nxt_process_port_each(process, port) { 1660 1661 port->pid = process->pid; 1662 1663 nxt_runtime_port_add(rt, port); 1664 1665 } nxt_process_port_loop; 1666 1667 process->registered = 1; 1668 1669 nxt_thread_log_debug("process %PI added", process->pid); 1670 break; 1671 1672 default: 1673 nxt_thread_log_debug("process %PI failed to add", process->pid); 1674 break; 1675 } 1676 1677 nxt_thread_mutex_unlock(&rt->processes_mutex); 1678} 1679 1680 1681static nxt_process_t * 1682nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) 1683{ 1684 nxt_process_t *process; 1685 nxt_lvlhsh_query_t lhq; 1686 1687 process = NULL; 1688 1689 nxt_runtime_process_lhq_pid(&lhq, &pid); 1690 1691 lhq.pool = rt->mem_pool; 1692 1693 nxt_thread_mutex_lock(&rt->processes_mutex); 1694 1695 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { 1696 1697 case NXT_OK: 1698 rt->nprocesses--; 1699 1700 process = lhq.value; 1701 1702 process->registered = 0; 1703 1704 nxt_thread_log_debug("process %PI removed", pid); 1705 break; 1706 1707 default: 1708 nxt_thread_log_debug("process %PI remove failed", pid); 1709 break; 1710 } 1711 1712 nxt_thread_mutex_unlock(&rt->processes_mutex); 1713 1714 return process; 1715} 1716 1717 1718void 1719nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) 1720{ 1721 nxt_port_t *port; 1722 1723 if (process->port_cleanups == 0) { 1724 if (process->registered == 1) { 1725 nxt_runtime_process_remove_pid(rt, process->pid); 1726 } 1727 1728 nxt_runtime_process_destroy(rt, process); 1729 1730 } else { 1731 nxt_process_port_each(process, port) { 1732 1733 nxt_runtime_port_remove(rt, port); 1734 1735 nxt_port_release(port); 1736 1737 } nxt_process_port_loop; 1738 } 1739} 1740 1741 1742nxt_process_t * 1743nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1744{ 1745 nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t)); 1746 1747 lhe->proto = &lvlhsh_processes_proto; 1748 1749 return nxt_runtime_process_next(rt, lhe); 1750} 1751 1752 1753nxt_port_t * 1754nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1755{ 1756 return nxt_port_hash_first(&rt->ports, lhe); 1757} 1758 1759 1760void 1761nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) 1762{ 1763 nxt_port_hash_add(&rt->ports, rt->mem_pool, port); 1764 1765 rt->port_by_type[port->type] = port; 1766} 1767 1768 1769void 1770nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) 1771{ 1772 nxt_port_hash_remove(&rt->ports, rt->mem_pool, port); 1773 1774 if (rt->port_by_type[port->type] == port) { 1775 rt->port_by_type[port->type] = NULL; 1776 } 1777} 1778 1779 1780nxt_port_t * 1781nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, 1782 nxt_port_id_t port_id) 1783{ 1784 return nxt_port_hash_find(&rt->ports, pid, port_id); 1785}
| 958 * limit maximum Unix domain socket address length by defined sun_path[] 959 * length because some OSes accept addresses twice larger than defined 960 * struct sockaddr_un. Also reserve space for a trailing zero to avoid 961 * ambiguity, since many OSes accept Unix domain socket addresses 962 * without a trailing zero. 963 */ 964 const size_t max_len = sizeof(struct sockaddr_un) 965 - offsetof(struct sockaddr_un, sun_path) - 1; 966 967 /* cutting "unix:" */ 968 length = addr->length - 5; 969 p = addr->start + 5; 970 971 if (length == 0) { 972 nxt_log(task, NXT_LOG_CRIT, 973 "unix domain socket \"%V\" name is invalid", addr); 974 return NULL; 975 } 976 977 if (length > max_len) { 978 nxt_log(task, NXT_LOG_CRIT, 979 "unix domain socket \"%V\" name is too long", addr); 980 return NULL; 981 } 982 983 socklen = offsetof(struct sockaddr_un, sun_path) + length + 1; 984 985#if (NXT_LINUX) 986 987 /* 988 * Linux unix(7): 989 * 990 * abstract: an abstract socket address is distinguished by the fact 991 * that sun_path[0] is a null byte ('\0'). The socket's address in 992 * this namespace is given by the additional bytes in sun_path that 993 * are covered by the specified length of the address structure. 994 * (Null bytes in the name have no special significance.) 995 */ 996 if (p[0] == '@') { 997 p[0] = '\0'; 998 socklen--; 999 } 1000 1001#endif 1002 1003 sa = nxt_sockaddr_alloc(mp, socklen, addr->length); 1004 1005 if (nxt_slow_path(sa == NULL)) { 1006 return NULL; 1007 } 1008 1009 sa->type = SOCK_STREAM; 1010 1011 sa->u.sockaddr_un.sun_family = AF_UNIX; 1012 nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length); 1013 1014 return sa; 1015 1016#else /* !(NXT_HAVE_UNIX_DOMAIN) */ 1017 1018 nxt_log(task, NXT_LOG_CRIT, "unix domain socket \"%V\" is not supported", 1019 addr); 1020 1021 return NULL; 1022 1023#endif 1024} 1025 1026 1027static nxt_sockaddr_t * 1028nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, nxt_mp_t *mp, 1029 nxt_str_t *addr) 1030{ 1031#if (NXT_INET6) 1032 u_char *p, *addr_end; 1033 size_t length; 1034 nxt_int_t port; 1035 nxt_sockaddr_t *sa; 1036 struct in6_addr *in6_addr; 1037 1038 length = addr->length - 1; 1039 p = addr->start + 1; 1040 1041 addr_end = nxt_memchr(p, ']', length); 1042 1043 if (addr_end == NULL) { 1044 goto invalid_address; 1045 } 1046 1047 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6), 1048 NXT_INET6_ADDR_STR_LEN); 1049 if (nxt_slow_path(sa == NULL)) { 1050 return NULL; 1051 } 1052 1053 in6_addr = &sa->u.sockaddr_in6.sin6_addr; 1054 1055 if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) { 1056 goto invalid_address; 1057 } 1058 1059 port = 0; 1060 p = addr_end + 1; 1061 length = (p + length) - p; 1062 1063 if (length == 0) { 1064 goto found; 1065 } 1066 1067 if (*p == ':') { 1068 port = nxt_int_parse(p + 1, length - 1); 1069 1070 if (port >= 1 && port <= 65535) { 1071 goto found; 1072 } 1073 } 1074 1075 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr); 1076 1077 return NULL; 1078 1079found: 1080 1081 sa->type = SOCK_STREAM; 1082 1083 sa->u.sockaddr_in6.sin6_family = AF_INET6; 1084 sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port); 1085 1086 return sa; 1087 1088invalid_address: 1089 1090 nxt_log(task, NXT_LOG_CRIT, "invalid IPv6 address in \"%V\"", addr); 1091 1092 return NULL; 1093 1094#else 1095 1096 nxt_log(task, NXT_LOG_CRIT, "IPv6 socket \"%V\" is not supported", addr); 1097 1098 return NULL; 1099 1100#endif 1101} 1102 1103 1104static nxt_sockaddr_t * 1105nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, nxt_mp_t *mp, 1106 nxt_str_t *string) 1107{ 1108 u_char *p, *ip; 1109 size_t length; 1110 in_addr_t addr; 1111 nxt_int_t port; 1112 nxt_sockaddr_t *sa; 1113 1114 addr = INADDR_ANY; 1115 1116 length = string->length; 1117 ip = string->start; 1118 1119 p = nxt_memchr(ip, ':', length); 1120 1121 if (p == NULL) { 1122 1123 /* single value port, or address */ 1124 1125 port = nxt_int_parse(ip, length); 1126 1127 if (port > 0) { 1128 /* "*:XX" */ 1129 1130 if (port < 1 || port > 65535) { 1131 goto invalid_port; 1132 } 1133 1134 } else { 1135 /* "x.x.x.x" */ 1136 1137 addr = nxt_inet_addr(ip, length); 1138 1139 if (addr == INADDR_NONE) { 1140 goto invalid_port; 1141 } 1142 1143 port = 8080; 1144 } 1145 1146 } else { 1147 1148 /* x.x.x.x:XX */ 1149 1150 p++; 1151 length = (ip + length) - p; 1152 port = nxt_int_parse(p, length); 1153 1154 if (port < 1 || port > 65535) { 1155 goto invalid_port; 1156 } 1157 1158 length = (p - 1) - ip; 1159 1160 if (length != 1 || ip[0] != '*') { 1161 addr = nxt_inet_addr(ip, length); 1162 1163 if (addr == INADDR_NONE) { 1164 goto invalid_addr; 1165 } 1166 1167 /* "x.x.x.x:XX" */ 1168 } 1169 } 1170 1171 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in), 1172 NXT_INET_ADDR_STR_LEN); 1173 if (nxt_slow_path(sa == NULL)) { 1174 return NULL; 1175 } 1176 1177 sa->type = SOCK_STREAM; 1178 1179 sa->u.sockaddr_in.sin_family = AF_INET; 1180 sa->u.sockaddr_in.sin_port = htons((in_port_t) port); 1181 sa->u.sockaddr_in.sin_addr.s_addr = addr; 1182 1183 return sa; 1184 1185invalid_port: 1186 1187 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", string); 1188 1189 return NULL; 1190 1191invalid_addr: 1192 1193 nxt_log(task, NXT_LOG_CRIT, "invalid address in \"%V\"", string); 1194 1195 return NULL; 1196} 1197 1198 1199nxt_listen_socket_t * 1200nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) 1201{ 1202 nxt_mp_t *mp; 1203 nxt_listen_socket_t *ls; 1204 1205 ls = nxt_array_zero_add(rt->listen_sockets); 1206 if (ls == NULL) { 1207 return NULL; 1208 } 1209 1210 mp = rt->mem_pool; 1211 1212 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, 1213 sa->length); 1214 if (ls->sockaddr == NULL) { 1215 return NULL; 1216 } 1217 1218 ls->sockaddr->type = sa->type; 1219 1220 nxt_sockaddr_text(ls->sockaddr); 1221 1222 ls->socket = -1; 1223 ls->backlog = NXT_LISTEN_BACKLOG; 1224 1225 return ls; 1226} 1227 1228 1229static nxt_int_t 1230nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) 1231{ 1232 size_t length; 1233 char hostname[NXT_MAXHOSTNAMELEN + 1]; 1234 1235 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { 1236 nxt_log(task, NXT_LOG_CRIT, "gethostname() failed %E", nxt_errno); 1237 return NXT_ERROR; 1238 } 1239 1240 /* 1241 * Linux gethostname(2): 1242 * 1243 * If the null-terminated hostname is too large to fit, 1244 * then the name is truncated, and no error is returned. 1245 * 1246 * For this reason an additional byte is reserved in the buffer. 1247 */ 1248 hostname[NXT_MAXHOSTNAMELEN] = '\0'; 1249 1250 length = nxt_strlen(hostname); 1251 rt->hostname.length = length; 1252 1253 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length); 1254 1255 if (rt->hostname.start != NULL) { 1256 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); 1257 return NXT_OK; 1258 } 1259 1260 return NXT_ERROR; 1261} 1262 1263 1264static nxt_int_t 1265nxt_runtime_log_files_init(nxt_runtime_t *rt) 1266{ 1267 nxt_file_t *file; 1268 nxt_list_t *log_files; 1269 1270 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); 1271 1272 if (nxt_fast_path(log_files != NULL)) { 1273 rt->log_files = log_files; 1274 1275 /* Preallocate the main error_log. This allocation cannot fail. */ 1276 file = nxt_list_zero_add(log_files); 1277 1278 file->fd = NXT_FILE_INVALID; 1279 file->log_level = NXT_LOG_CRIT; 1280 1281 return NXT_OK; 1282 } 1283 1284 return NXT_ERROR; 1285} 1286 1287 1288nxt_file_t * 1289nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) 1290{ 1291 nxt_int_t ret; 1292 nxt_str_t *prefix; 1293 nxt_file_t *file; 1294 nxt_file_name_str_t file_name; 1295 1296 prefix = nxt_file_name_is_absolute(name->start) ? NULL : rt->prefix; 1297 1298 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%V%Z", 1299 prefix, name); 1300 1301 if (nxt_slow_path(ret != NXT_OK)) { 1302 return NULL; 1303 } 1304 1305 nxt_list_each(file, rt->log_files) { 1306 1307 /* STUB: hardecoded case sensitive/insensitive. */ 1308 1309 if (file->name != NULL 1310 && nxt_file_name_eq(file->name, file_name.start)) 1311 { 1312 return file; 1313 } 1314 1315 } nxt_list_loop; 1316 1317 file = nxt_list_zero_add(rt->log_files); 1318 1319 if (nxt_slow_path(file == NULL)) { 1320 return NULL; 1321 } 1322 1323 file->fd = NXT_FILE_INVALID; 1324 file->log_level = NXT_LOG_CRIT; 1325 file->name = file_name.start; 1326 1327 return file; 1328} 1329 1330 1331static nxt_int_t 1332nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) 1333{ 1334 nxt_int_t ret; 1335 nxt_file_t *file; 1336 1337 nxt_list_each(file, rt->log_files) { 1338 1339 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, 1340 NXT_FILE_OWNER_ACCESS); 1341 1342 if (ret != NXT_OK) { 1343 return NXT_ERROR; 1344 } 1345 1346 } nxt_list_loop; 1347 1348 file = nxt_list_first(rt->log_files); 1349 1350 return nxt_file_stderr(file); 1351} 1352 1353 1354nxt_int_t 1355nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) 1356{ 1357 nxt_int_t ret; 1358 nxt_uint_t c, p, ncurr, nprev; 1359 nxt_listen_socket_t *curr, *prev; 1360 1361 curr = rt->listen_sockets->elts; 1362 ncurr = rt->listen_sockets->nelts; 1363 1364 if (rt->inherited_sockets != NULL) { 1365 prev = rt->inherited_sockets->elts; 1366 nprev = rt->inherited_sockets->nelts; 1367 1368 } else { 1369 prev = NULL; 1370 nprev = 0; 1371 } 1372 1373 for (c = 0; c < ncurr; c++) { 1374 1375 for (p = 0; p < nprev; p++) { 1376 1377 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { 1378 1379 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); 1380 if (ret != NXT_OK) { 1381 return NXT_ERROR; 1382 } 1383 1384 goto next; 1385 } 1386 } 1387 1388 if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) { 1389 return NXT_ERROR; 1390 } 1391 1392 next: 1393 1394 continue; 1395 } 1396 1397 return NXT_OK; 1398} 1399 1400 1401nxt_int_t 1402nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) 1403{ 1404 nxt_uint_t i, n; 1405 nxt_listen_socket_t *ls; 1406 1407 ls = rt->listen_sockets->elts; 1408 n = rt->listen_sockets->nelts; 1409 1410 for (i = 0; i < n; i++) { 1411 if (ls[i].flags == NXT_NONBLOCK) { 1412 if (nxt_listen_event(task, &ls[i]) == NULL) { 1413 return NXT_ERROR; 1414 } 1415 } 1416 } 1417 1418 return NXT_OK; 1419} 1420 1421 1422nxt_str_t * 1423nxt_current_directory(nxt_mp_t *mp) 1424{ 1425 size_t length; 1426 u_char *p; 1427 nxt_str_t *name; 1428 char buf[NXT_MAX_PATH_LEN]; 1429 1430 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); 1431 1432 if (nxt_fast_path(length != 0)) { 1433 name = nxt_str_alloc(mp, length + 1); 1434 1435 if (nxt_fast_path(name != NULL)) { 1436 p = nxt_cpymem(name->start, buf, length); 1437 *p = '/'; 1438 1439 return name; 1440 } 1441 } 1442 1443 return NULL; 1444} 1445 1446 1447static nxt_int_t 1448nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) 1449{ 1450 ssize_t length; 1451 nxt_int_t n; 1452 nxt_file_t file; 1453 u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE]; 1454 1455 nxt_memzero(&file, sizeof(nxt_file_t)); 1456 1457 file.name = pid_file; 1458 1459 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, 1460 NXT_FILE_DEFAULT_ACCESS); 1461 1462 if (n != NXT_OK) { 1463 return NXT_ERROR; 1464 } 1465 1466 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; 1467 1468 if (nxt_file_write(&file, pid, length, 0) != length) { 1469 return NXT_ERROR; 1470 } 1471 1472 nxt_file_close(task, &file); 1473 1474 return NXT_OK; 1475} 1476 1477 1478nxt_process_t * 1479nxt_runtime_process_new(nxt_runtime_t *rt) 1480{ 1481 nxt_process_t *process; 1482 1483 /* TODO: memory failures. */ 1484 1485 process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t)); 1486 if (nxt_slow_path(process == NULL)) { 1487 return NULL; 1488 } 1489 1490 nxt_queue_init(&process->ports); 1491 1492 nxt_thread_mutex_create(&process->incoming_mutex); 1493 nxt_thread_mutex_create(&process->outgoing_mutex); 1494 nxt_thread_mutex_create(&process->cp_mutex); 1495 1496 return process; 1497} 1498 1499 1500static void 1501nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) 1502{ 1503 nxt_assert(process->port_cleanups == 0); 1504 nxt_assert(process->registered == 0); 1505 1506 nxt_port_mmaps_destroy(process->incoming, 1); 1507 nxt_port_mmaps_destroy(process->outgoing, 1); 1508 1509 if (process->cp_mem_pool != NULL) { 1510 nxt_mp_thread_adopt(process->cp_mem_pool); 1511 1512 nxt_mp_destroy(process->cp_mem_pool); 1513 } 1514 1515 nxt_thread_mutex_destroy(&process->incoming_mutex); 1516 nxt_thread_mutex_destroy(&process->outgoing_mutex); 1517 nxt_thread_mutex_destroy(&process->cp_mutex); 1518 1519 nxt_mp_free(rt->mem_pool, process); 1520} 1521 1522 1523static nxt_int_t 1524nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 1525{ 1526 nxt_process_t *process; 1527 1528 process = data; 1529 1530 if (lhq->key.length == sizeof(nxt_pid_t) && 1531 *(nxt_pid_t *) lhq->key.start == process->pid) { 1532 return NXT_OK; 1533 } 1534 1535 return NXT_DECLINED; 1536} 1537 1538static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 1539 NXT_LVLHSH_DEFAULT, 1540 nxt_runtime_lvlhsh_pid_test, 1541 nxt_lvlhsh_alloc, 1542 nxt_lvlhsh_free, 1543}; 1544 1545 1546nxt_inline void 1547nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) 1548{ 1549 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 1550 lhq->key.length = sizeof(*pid); 1551 lhq->key.start = (u_char *) pid; 1552 lhq->proto = &lvlhsh_processes_proto; 1553} 1554 1555 1556nxt_process_t * 1557nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) 1558{ 1559 nxt_process_t *process; 1560 nxt_lvlhsh_query_t lhq; 1561 1562 process = NULL; 1563 1564 nxt_runtime_process_lhq_pid(&lhq, &pid); 1565 1566 nxt_thread_mutex_lock(&rt->processes_mutex); 1567 1568 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1569 process = lhq.value; 1570 1571 } else { 1572 nxt_thread_log_debug("process %PI not found", pid); 1573 } 1574 1575 nxt_thread_mutex_unlock(&rt->processes_mutex); 1576 1577 return process; 1578} 1579 1580 1581nxt_process_t * 1582nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) 1583{ 1584 nxt_process_t *process; 1585 nxt_lvlhsh_query_t lhq; 1586 1587 nxt_runtime_process_lhq_pid(&lhq, &pid); 1588 1589 nxt_thread_mutex_lock(&rt->processes_mutex); 1590 1591 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1592 nxt_thread_log_debug("process %PI found", pid); 1593 1594 nxt_thread_mutex_unlock(&rt->processes_mutex); 1595 return lhq.value; 1596 } 1597 1598 process = nxt_runtime_process_new(rt); 1599 if (nxt_slow_path(process == NULL)) { 1600 return NULL; 1601 } 1602 1603 process->pid = pid; 1604 1605 lhq.replace = 0; 1606 lhq.value = process; 1607 lhq.pool = rt->mem_pool; 1608 1609 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1610 1611 case NXT_OK: 1612 if (rt->nprocesses == 0) { 1613 rt->mprocess = process; 1614 } 1615 1616 rt->nprocesses++; 1617 1618 process->registered = 1; 1619 1620 nxt_thread_log_debug("process %PI insert", pid); 1621 break; 1622 1623 default: 1624 nxt_thread_log_debug("process %PI insert failed", pid); 1625 break; 1626 } 1627 1628 nxt_thread_mutex_unlock(&rt->processes_mutex); 1629 1630 return process; 1631} 1632 1633 1634void 1635nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) 1636{ 1637 nxt_port_t *port; 1638 nxt_lvlhsh_query_t lhq; 1639 1640 nxt_assert(process->registered == 0); 1641 1642 nxt_runtime_process_lhq_pid(&lhq, &process->pid); 1643 1644 lhq.replace = 0; 1645 lhq.value = process; 1646 lhq.pool = rt->mem_pool; 1647 1648 nxt_thread_mutex_lock(&rt->processes_mutex); 1649 1650 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1651 1652 case NXT_OK: 1653 if (rt->nprocesses == 0) { 1654 rt->mprocess = process; 1655 } 1656 1657 rt->nprocesses++; 1658 1659 nxt_process_port_each(process, port) { 1660 1661 port->pid = process->pid; 1662 1663 nxt_runtime_port_add(rt, port); 1664 1665 } nxt_process_port_loop; 1666 1667 process->registered = 1; 1668 1669 nxt_thread_log_debug("process %PI added", process->pid); 1670 break; 1671 1672 default: 1673 nxt_thread_log_debug("process %PI failed to add", process->pid); 1674 break; 1675 } 1676 1677 nxt_thread_mutex_unlock(&rt->processes_mutex); 1678} 1679 1680 1681static nxt_process_t * 1682nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) 1683{ 1684 nxt_process_t *process; 1685 nxt_lvlhsh_query_t lhq; 1686 1687 process = NULL; 1688 1689 nxt_runtime_process_lhq_pid(&lhq, &pid); 1690 1691 lhq.pool = rt->mem_pool; 1692 1693 nxt_thread_mutex_lock(&rt->processes_mutex); 1694 1695 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { 1696 1697 case NXT_OK: 1698 rt->nprocesses--; 1699 1700 process = lhq.value; 1701 1702 process->registered = 0; 1703 1704 nxt_thread_log_debug("process %PI removed", pid); 1705 break; 1706 1707 default: 1708 nxt_thread_log_debug("process %PI remove failed", pid); 1709 break; 1710 } 1711 1712 nxt_thread_mutex_unlock(&rt->processes_mutex); 1713 1714 return process; 1715} 1716 1717 1718void 1719nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) 1720{ 1721 nxt_port_t *port; 1722 1723 if (process->port_cleanups == 0) { 1724 if (process->registered == 1) { 1725 nxt_runtime_process_remove_pid(rt, process->pid); 1726 } 1727 1728 nxt_runtime_process_destroy(rt, process); 1729 1730 } else { 1731 nxt_process_port_each(process, port) { 1732 1733 nxt_runtime_port_remove(rt, port); 1734 1735 nxt_port_release(port); 1736 1737 } nxt_process_port_loop; 1738 } 1739} 1740 1741 1742nxt_process_t * 1743nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1744{ 1745 nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t)); 1746 1747 lhe->proto = &lvlhsh_processes_proto; 1748 1749 return nxt_runtime_process_next(rt, lhe); 1750} 1751 1752 1753nxt_port_t * 1754nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1755{ 1756 return nxt_port_hash_first(&rt->ports, lhe); 1757} 1758 1759 1760void 1761nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) 1762{ 1763 nxt_port_hash_add(&rt->ports, rt->mem_pool, port); 1764 1765 rt->port_by_type[port->type] = port; 1766} 1767 1768 1769void 1770nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) 1771{ 1772 nxt_port_hash_remove(&rt->ports, rt->mem_pool, port); 1773 1774 if (rt->port_by_type[port->type] == port) { 1775 rt->port_by_type[port->type] = NULL; 1776 } 1777} 1778 1779 1780nxt_port_t * 1781nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, 1782 nxt_port_id_t port_id) 1783{ 1784 return nxt_port_hash_find(&rt->ports, pid, port_id); 1785}
|