1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_main.h> 9 #include <nxt_runtime.h> 10 #include <nxt_port.h> 11 #include <nxt_main_process.h> 12 #include <nxt_router.h> 13 14 15 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, 16 nxt_runtime_t *rt); 17 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, 18 nxt_runtime_t *rt); 19 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); 20 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); 21 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); 22 static void nxt_runtime_initial_start(nxt_task_t *task); 23 static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, 24 nxt_runtime_t *rt); 25 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); 26 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); 27 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, 28 nxt_runtime_t *rt); 29 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt); 30 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt); 31 static nxt_sockaddr_t *nxt_runtime_sockaddr_parse(nxt_task_t *task, 32 nxt_mp_t *mp, nxt_str_t *addr); 33 static nxt_sockaddr_t *nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, 34 nxt_mp_t *mp, nxt_str_t *addr); 35 static nxt_sockaddr_t *nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, 36 nxt_mp_t *mp, nxt_str_t *addr); 37 static nxt_sockaddr_t *nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, 38 nxt_mp_t *mp, nxt_str_t *addr); 39 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt); 40 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt); 41 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task, 42 nxt_runtime_t *rt); 43 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task, 44 nxt_file_name_t *pid_file); 45 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 46 nxt_runtime_cont_t cont); 47 static void nxt_runtime_thread_pool_init(void); 48 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, 49 void *data); 50 static void nxt_runtime_process_destroy(nxt_runtime_t *rt, 51 nxt_process_t *process); 52 static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt, 53 nxt_pid_t pid); 54 55 56 nxt_int_t 57 nxt_runtime_create(nxt_task_t *task) 58 { 59 nxt_mp_t *mp; 60 nxt_int_t ret; 61 nxt_array_t *listen_sockets; 62 nxt_runtime_t *rt; 63 nxt_app_lang_module_t *lang; 64 65 mp = nxt_mp_create(1024, 128, 256, 32); 66 67 if (nxt_slow_path(mp == NULL)) { 68 return NXT_ERROR; 69 } 70 71 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t)); 72 if (nxt_slow_path(rt == NULL)) { 73 return NXT_ERROR; 74 } 75 76 task->thread->runtime = rt; 77 rt->mem_pool = mp; 78 79 nxt_thread_mutex_create(&rt->processes_mutex); 80 81 rt->services = nxt_services_init(mp); 82 if (nxt_slow_path(rt->services == NULL)) { 83 goto fail; 84 } 85 86 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t)); 87 if (nxt_slow_path(rt->languages == NULL)) { 88 goto fail; 89 } 90 91 /* Should not fail. */ 92 lang = nxt_array_add(rt->languages); 93 lang->type = (nxt_str_t) nxt_string("go"); 94 lang->version = (nxt_str_t) nxt_null_string; 95 lang->file = NULL; 96 lang->module = &nxt_go_module; 97 98 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); 99 if (nxt_slow_path(listen_sockets == NULL)) { 100 goto fail; 101 } 102 103 rt->listen_sockets = listen_sockets; 104 105 ret = nxt_runtime_inherited_listen_sockets(task, rt); 106 if (nxt_slow_path(ret != NXT_OK)) { 107 goto fail; 108 } 109 110 if (nxt_runtime_hostname(task, rt) != NXT_OK) { 111 goto fail; 112 } 113 114 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { 115 goto fail; 116 } 117 118 if (nxt_runtime_event_engines(task, rt) != NXT_OK) { 119 goto fail; 120 } 121 122 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { 123 goto fail; 124 } 125 126 rt->start = nxt_runtime_initial_start; 127 128 if (nxt_runtime_conf_init(task, rt) != NXT_OK) { 129 goto fail; 130 } 131 132 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 133 nxt_runtime_start, task, rt, NULL); 134 135 return NXT_OK; 136 137 fail: 138 139 nxt_mp_destroy(mp); 140 141 return NXT_ERROR; 142 } 143 144 145 static nxt_int_t 146 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 147 { 148 u_char *v, *p; 149 nxt_int_t type; 150 nxt_array_t *inherited_sockets; 151 nxt_socket_t s; 152 nxt_listen_socket_t *ls; 153 154 v = (u_char *) getenv("NGINX"); 155 156 if (v == NULL) { 157 return nxt_runtime_systemd_listen_sockets(task, rt); 158 } 159 160 nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v); 161 162 inherited_sockets = nxt_array_create(rt->mem_pool, 163 1, sizeof(nxt_listen_socket_t)); 164 if (inherited_sockets == NULL) { 165 return NXT_ERROR; 166 } 167 168 rt->inherited_sockets = inherited_sockets; 169 170 for (p = v; *p != '\0'; p++) { 171 172 if (*p == ';') { 173 s = nxt_int_parse(v, p - v); 174 175 if (nxt_slow_path(s < 0)) { 176 nxt_log(task, NXT_LOG_CRIT, "invalid socket number " 177 "\"%s\" in NGINX environment variable, " 178 "ignoring the rest of the variable", v); 179 return NXT_ERROR; 180 } 181 182 v = p + 1; 183 184 ls = nxt_array_zero_add(inherited_sockets); 185 if (nxt_slow_path(ls == NULL)) { 186 return NXT_ERROR; 187 } 188 189 ls->socket = s; 190 191 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 192 if (nxt_slow_path(ls->sockaddr == NULL)) { 193 return NXT_ERROR; 194 } 195 196 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); 197 if (nxt_slow_path(type == -1)) { 198 return NXT_ERROR; 199 } 200 201 ls->sockaddr->type = (uint16_t) type; 202 } 203 } 204 205 return NXT_OK; 206 } 207 208 209 static nxt_int_t 210 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 211 { 212 u_char *nfd, *pid; 213 nxt_int_t n; 214 nxt_array_t *inherited_sockets; 215 nxt_socket_t s; 216 nxt_listen_socket_t *ls; 217 218 /* 219 * Number of listening sockets passed. The socket 220 * descriptors start from number 3 and are sequential. 221 */ 222 nfd = (u_char *) getenv("LISTEN_FDS"); 223 if (nfd == NULL) { 224 return NXT_OK; 225 } 226 227 /* The pid of the service process. */ 228 pid = (u_char *) getenv("LISTEN_PID"); 229 if (pid == NULL) { 230 return NXT_OK; 231 } 232 233 n = nxt_int_parse(nfd, nxt_strlen(nfd)); 234 if (n < 0) { 235 return NXT_OK; 236 } 237 238 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { 239 return NXT_OK; 240 } 241 242 nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n); 243 244 inherited_sockets = nxt_array_create(rt->mem_pool, 245 n, sizeof(nxt_listen_socket_t)); 246 if (inherited_sockets == NULL) { 247 return NXT_ERROR; 248 } 249 250 rt->inherited_sockets = inherited_sockets; 251 252 for (s = 3; s < n; s++) { 253 ls = nxt_array_zero_add(inherited_sockets); 254 if (nxt_slow_path(ls == NULL)) { 255 return NXT_ERROR; 256 } 257 258 ls->socket = s; 259 260 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 261 if (nxt_slow_path(ls->sockaddr == NULL)) { 262 return NXT_ERROR; 263 } 264 265 ls->sockaddr->type = SOCK_STREAM; 266 } 267 268 return NXT_OK; 269 } 270 271 272 static nxt_int_t 273 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) 274 { 275 nxt_thread_t *thread; 276 nxt_event_engine_t *engine; 277 const nxt_event_interface_t *interface; 278 279 interface = nxt_service_get(rt->services, "engine", NULL); 280 281 if (nxt_slow_path(interface == NULL)) { 282 /* TODO: log */ 283 return NXT_ERROR; 284 } 285 286 engine = nxt_event_engine_create(task, interface, 287 nxt_main_process_signals, 0, 0); 288 289 if (nxt_slow_path(engine == NULL)) { 290 return NXT_ERROR; 291 } 292 293 thread = task->thread; 294 thread->engine = engine; 295 thread->fiber = &engine->fibers->fiber; 296 297 engine->id = rt->last_engine_id++; 298 299 nxt_queue_init(&rt->engines); 300 nxt_queue_insert_tail(&rt->engines, &engine->link); 301 302 return NXT_OK; 303 } 304 305 306 static nxt_int_t 307 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) 308 { 309 nxt_int_t ret; 310 nxt_array_t *thread_pools; 311 312 thread_pools = nxt_array_create(rt->mem_pool, 1, 313 sizeof(nxt_thread_pool_t *)); 314 315 if (nxt_slow_path(thread_pools == NULL)) { 316 return NXT_ERROR; 317 } 318 319 rt->thread_pools = thread_pools; 320 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); 321 322 if (nxt_slow_path(ret != NXT_OK)) { 323 return NXT_ERROR; 324 } 325 326 return NXT_OK; 327 } 328 329 330 static void 331 nxt_runtime_start(nxt_task_t *task, void *obj, void *data) 332 { 333 nxt_runtime_t *rt; 334 335 rt = obj; 336 337 nxt_debug(task, "rt conf done"); 338 339 task->thread->log->ctx_handler = NULL; 340 task->thread->log->ctx = NULL; 341 342 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { 343 goto fail; 344 } 345 346 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 347 goto fail; 348 } 349 350 /* 351 * Thread pools should be destroyed before starting worker 352 * processes, because thread pool semaphores will stick in 353 * locked state in new processes after fork(). 354 */ 355 nxt_runtime_thread_pool_destroy(task, rt, rt->start); 356 357 return; 358 359 fail: 360 361 nxt_runtime_quit(task); 362 } 363 364 365 static void 366 nxt_runtime_initial_start(nxt_task_t *task) 367 { 368 nxt_int_t ret; 369 nxt_thread_t *thr; 370 nxt_runtime_t *rt; 371 const nxt_event_interface_t *interface; 372 373 thr = task->thread; 374 rt = thr->runtime; 375 376 if (rt->inherited_sockets == NULL && rt->daemon) { 377 378 if (nxt_process_daemon(task) != NXT_OK) { 379 goto fail; 380 } 381 382 /* 383 * An event engine should be updated after fork() 384 * even if an event facility was not changed because: 385 * 1) inherited kqueue descriptor is invalid, 386 * 2) the signal thread is not inherited. 387 */ 388 interface = nxt_service_get(rt->services, "engine", rt->engine); 389 if (interface == NULL) { 390 goto fail; 391 } 392 393 ret = nxt_event_engine_change(task->thread->engine, interface, 394 rt->batch); 395 if (ret != NXT_OK) { 396 goto fail; 397 } 398 } 399 400 ret = nxt_runtime_pid_file_create(task, rt->pid_file); 401 if (ret != NXT_OK) { 402 goto fail; 403 } 404 405 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 406 goto fail; 407 } 408 409 thr->engine->max_connections = rt->engine_connections; 410 411 if (rt->main_process) { 412 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) { 413 return; 414 } 415 416 } else { 417 nxt_single_process_start(thr, task, rt); 418 return; 419 } 420 421 fail: 422 423 nxt_runtime_quit(task); 424 } 425 426 427 static void 428 nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt) 429 { 430 nxt_int_t ret; 431 432 ret = nxt_runtime_thread_pool_create(thr, rt, rt->auxiliary_threads, 433 60000 * 1000000LL); 434 435 if (nxt_slow_path(ret != NXT_OK)) { 436 nxt_runtime_quit(task); 437 return; 438 } 439 440 rt->types |= (1U << NXT_PROCESS_SINGLE); 441 442 nxt_runtime_listen_sockets_enable(task, rt); 443 444 return; 445 } 446 447 448 void 449 nxt_runtime_quit(nxt_task_t *task) 450 { 451 nxt_bool_t done; 452 nxt_runtime_t *rt; 453 nxt_event_engine_t *engine; 454 455 rt = task->thread->runtime; 456 engine = task->thread->engine; 457 458 nxt_debug(task, "exiting"); 459 460 done = 1; 461 462 if (!engine->shutdown) { 463 engine->shutdown = 1; 464 465 if (!nxt_array_is_empty(rt->thread_pools)) { 466 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); 467 done = 0; 468 } 469 470 if (nxt_runtime_is_main(rt)) { 471 nxt_main_stop_worker_processes(task, rt); 472 done = 0; 473 } 474 } 475 476 nxt_runtime_close_idle_connections(engine); 477 478 if (done) { 479 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, 480 task, rt, engine); 481 } 482 } 483 484 485 static void 486 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) 487 { 488 nxt_conn_t *c; 489 nxt_queue_t *idle; 490 nxt_queue_link_t *link, *next; 491 492 nxt_debug(&engine->task, "close idle connections"); 493 494 idle = &engine->idle_connections; 495 496 for (link = nxt_queue_head(idle); 497 link != nxt_queue_tail(idle); 498 link = next) 499 { 500 next = nxt_queue_next(link); 501 c = nxt_queue_link_data(link, nxt_conn_t, link); 502 503 if (!c->socket.read_ready) { 504 nxt_queue_remove(link); 505 nxt_conn_close(engine, c); 506 } 507 } 508 } 509 510 511 static void 512 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) 513 { 514 nxt_runtime_t *rt; 515 nxt_process_t *process; 516 nxt_event_engine_t *engine; 517 518 rt = obj; 519 engine = data; 520 521 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); 522 523 if (!nxt_array_is_empty(rt->thread_pools)) { 524 return; 525 } 526 527 if (nxt_runtime_is_main(rt)) { 528 if (rt->pid_file != NULL) { 529 nxt_file_delete(rt->pid_file); 530 } 531 532 #if (NXT_HAVE_UNIX_DOMAIN) 533 { 534 nxt_sockaddr_t *sa; 535 nxt_file_name_t *name; 536 537 sa = rt->controller_listen; 538 539 if (sa->u.sockaddr.sa_family == AF_UNIX) { 540 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path; 541 (void) nxt_file_delete(name); 542 } 543 } 544 #endif 545 } 546 547 if (!engine->event.signal_support) { 548 nxt_event_engine_signals_stop(engine); 549 } 550 551 nxt_runtime_process_each(rt, process) { 552 553 nxt_runtime_process_remove(rt, process); 554 555 } nxt_runtime_process_loop; 556 557 nxt_thread_mutex_destroy(&rt->processes_mutex); 558 559 nxt_mp_destroy(rt->mem_pool); 560 561 nxt_debug(task, "exit"); 562 563 exit(0); 564 nxt_unreachable(); 565 } 566 567 568 static nxt_int_t 569 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) 570 { 571 nxt_event_engine_t *engine; 572 const nxt_event_interface_t *interface; 573 574 engine = task->thread->engine; 575 576 if (engine->batch == rt->batch 577 && nxt_strcmp(engine->event.name, rt->engine) == 0) 578 { 579 return NXT_OK; 580 } 581 582 interface = nxt_service_get(rt->services, "engine", rt->engine); 583 584 if (interface != NULL) { 585 return nxt_event_engine_change(engine, interface, rt->batch); 586 } 587 588 return NXT_ERROR; 589 } 590 591 592 void 593 nxt_runtime_event_engine_free(nxt_runtime_t *rt) 594 { 595 nxt_queue_link_t *link; 596 nxt_event_engine_t *engine; 597 598 link = nxt_queue_first(&rt->engines); 599 nxt_queue_remove(link); 600 601 engine = nxt_queue_link_data(link, nxt_event_engine_t, link); 602 nxt_event_engine_free(engine); 603 } 604 605 606 nxt_int_t 607 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, 608 nxt_uint_t max_threads, nxt_nsec_t timeout) 609 { 610 nxt_thread_pool_t *thread_pool, **tp; 611 612 tp = nxt_array_add(rt->thread_pools); 613 if (tp == NULL) { 614 return NXT_ERROR; 615 } 616 617 thread_pool = nxt_thread_pool_create(max_threads, timeout, 618 nxt_runtime_thread_pool_init, 619 thr->engine, 620 nxt_runtime_thread_pool_exit); 621 622 if (nxt_fast_path(thread_pool != NULL)) { 623 *tp = thread_pool; 624 } 625 626 return NXT_OK; 627 } 628 629 630 static void 631 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 632 nxt_runtime_cont_t cont) 633 { 634 nxt_uint_t n; 635 nxt_thread_pool_t **tp; 636 637 rt->continuation = cont; 638 639 n = rt->thread_pools->nelts; 640 641 if (n == 0) { 642 cont(task); 643 return; 644 } 645 646 tp = rt->thread_pools->elts; 647 648 do { 649 nxt_thread_pool_destroy(*tp); 650 651 tp++; 652 n--; 653 } while (n != 0); 654 } 655 656 657 static void 658 nxt_runtime_thread_pool_init(void) 659 { 660 #if (NXT_REGEX) 661 nxt_regex_init(0); 662 #endif 663 } 664 665 666 static void 667 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 668 { 669 nxt_uint_t i, n; 670 nxt_runtime_t *rt; 671 nxt_thread_pool_t *tp, **thread_pools; 672 nxt_thread_handle_t handle; 673 674 tp = obj; 675 676 if (data != NULL) { 677 handle = (nxt_thread_handle_t) (uintptr_t) data; 678 nxt_thread_wait(handle); 679 } 680 681 rt = task->thread->runtime; 682 683 thread_pools = rt->thread_pools->elts; 684 n = rt->thread_pools->nelts; 685 686 nxt_debug(task, "thread pools: %ui", n); 687 688 for (i = 0; i < n; i++) { 689 690 if (tp == thread_pools[i]) { 691 nxt_array_remove(rt->thread_pools, &thread_pools[i]); 692 693 if (n == 1) { 694 /* The last thread pool. */ 695 rt->continuation(task); 696 } 697 698 return; 699 } 700 } 701 } 702 703 704 static nxt_int_t 705 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) 706 { 707 nxt_int_t ret; 708 nxt_str_t control; 709 nxt_uint_t n; 710 nxt_file_t *file; 711 const char *slash; 712 nxt_sockaddr_t *sa; 713 nxt_file_name_str_t file_name; 714 const nxt_event_interface_t *interface; 715 716 rt->daemon = 1; 717 rt->main_process = 1; 718 rt->engine_connections = 256; 719 rt->auxiliary_threads = 2; 720 rt->user_cred.user = NXT_USER; 721 rt->group = NXT_GROUP; 722 rt->pid = NXT_PID; 723 rt->log = NXT_LOG; 724 rt->modules = NXT_MODULES; 725 rt->control = NXT_CONTROL_SOCK; 726 727 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { 728 return NXT_ERROR; 729 } 730 731 if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) { 732 return NXT_ERROR; 733 } 734 735 /* An engine's parameters. */ 736 737 interface = nxt_service_get(rt->services, "engine", rt->engine); 738 if (interface == NULL) { 739 return NXT_ERROR; 740 } 741 742 rt->engine = interface->name; 743 744 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid); 745 if (nxt_slow_path(ret != NXT_OK)) { 746 return NXT_ERROR; 747 } 748 749 rt->pid_file = file_name.start; 750 751 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log); 752 if (nxt_slow_path(ret != NXT_OK)) { 753 return NXT_ERROR; 754 } 755 756 file = nxt_list_first(rt->log_files); 757 file->name = file_name.start; 758 759 slash = ""; 760 n = nxt_strlen(rt->modules); 761 762 if (n > 1 && rt->modules[n - 1] != '/') { 763 slash = "/"; 764 } 765 766 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z", 767 rt->modules, slash); 768 if (nxt_slow_path(ret != NXT_OK)) { 769 return NXT_ERROR; 770 } 771 772 rt->modules = (char *) file_name.start; 773 774 control.length = nxt_strlen(rt->control); 775 control.start = (u_char *) rt->control; 776 777 sa = nxt_runtime_sockaddr_parse(task, rt->mem_pool, &control); 778 if (nxt_slow_path(sa == NULL)) { 779 return NXT_ERROR; 780 } 781 782 rt->controller_listen = sa; 783 784 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { 785 return NXT_ERROR; 786 } 787 788 return NXT_OK; 789 } 790 791 792 static nxt_int_t 793 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) 794 { 795 char *p, **argv; 796 u_char *end; 797 u_char buf[1024]; 798 799 static const char version[] = 800 "unit version: " NXT_VERSION "\n" 801 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n"; 802 803 static const char no_control[] = 804 "option \"--control\" requires socket address\n"; 805 static const char no_user[] = "option \"--user\" requires username\n"; 806 static const char no_group[] = "option \"--group\" requires group name\n"; 807 static const char no_pid[] = "option \"--pid\" requires filename\n"; 808 static const char no_log[] = "option \"--log\" requires filename\n"; 809 static const char no_modules[] = 810 "option \"--modules\" requires directory\n"; 811 812 static const char help[] = 813 "\n" 814 "unit options:\n" 815 "\n" 816 " --version print unit version and configure options\n" 817 "\n" 818 " --no-daemon run unit in non-daemon mode\n" 819 "\n" 820 " --control ADDRESS set address of control API socket\n" 821 " default: \"" NXT_CONTROL_SOCK "\"\n" 822 "\n" 823 " --pid FILE set pid filename\n" 824 " default: \"" NXT_PID "\"\n" 825 "\n" 826 " --log FILE set log filename\n" 827 " default: \"" NXT_LOG "\"\n" 828 "\n" 829 " --modules DIRECTORY set modules directory name\n" 830 " default: \"" NXT_MODULES "\"\n" 831 "\n" 832 " --user USER set non-privileged processes to run" 833 " as specified user\n" 834 " default: \"" NXT_USER "\"\n" 835 "\n" 836 " --group GROUP set non-privileged processes to run" 837 " as specified group\n" 838 " default: "; 839 840 static const char group[] = "\"" NXT_GROUP "\"\n\n"; 841 static const char primary[] = "user's primary group\n\n"; 842 843 argv = &nxt_process_argv[1]; 844 845 while (*argv != NULL) { 846 p = *argv++; 847 848 if (nxt_strcmp(p, "--control") == 0) { 849 if (*argv == NULL) { 850 write(STDERR_FILENO, no_control, sizeof(no_control) - 1); 851 return NXT_ERROR; 852 } 853 854 p = *argv++; 855 856 rt->control = p; 857 858 continue; 859 } 860 861 if (nxt_strcmp(p, "--upstream") == 0) { 862 if (*argv == NULL) { 863 nxt_log(task, NXT_LOG_CRIT, 864 "no argument for option \"--upstream\""); 865 return NXT_ERROR; 866 } 867 868 p = *argv++; 869 870 rt->upstream.length = nxt_strlen(p); 871 rt->upstream.start = (u_char *) p; 872 873 continue; 874 } 875 876 if (nxt_strcmp(p, "--user") == 0) { 877 if (*argv == NULL) { 878 write(STDERR_FILENO, no_user, sizeof(no_user) - 1); 879 return NXT_ERROR; 880 } 881 882 p = *argv++; 883 884 rt->user_cred.user = p; 885 886 continue; 887 } 888 889 if (nxt_strcmp(p, "--group") == 0) { 890 if (*argv == NULL) { 891 write(STDERR_FILENO, no_group, sizeof(no_group) - 1); 892 return NXT_ERROR; 893 } 894 895 p = *argv++; 896 897 rt->group = p; 898 899 continue; 900 } 901 902 if (nxt_strcmp(p, "--pid") == 0) { 903 if (*argv == NULL) { 904 write(STDERR_FILENO, no_pid, sizeof(no_pid) - 1); 905 return NXT_ERROR; 906 } 907 908 p = *argv++; 909 910 rt->pid = p; 911 912 continue; 913 } 914 915 if (nxt_strcmp(p, "--log") == 0) { 916 if (*argv == NULL) { 917 write(STDERR_FILENO, no_log, sizeof(no_log) - 1); 918 return NXT_ERROR; 919 } 920 921 p = *argv++; 922 923 rt->log = p; 924 925 continue; 926 } 927 928 if (nxt_strcmp(p, "--modules") == 0) { 929 if (*argv == NULL) { 930 write(STDERR_FILENO, no_modules, sizeof(no_modules) - 1); 931 return NXT_ERROR; 932 } 933 934 p = *argv++; 935 936 rt->modules = p; 937 938 continue; 939 } 940 941 if (nxt_strcmp(p, "--no-daemon") == 0) { 942 rt->daemon = 0; 943 continue; 944 } 945 946 if (nxt_strcmp(p, "--version") == 0) { 947 write(STDERR_FILENO, version, sizeof(version) - 1); 948 exit(0); 949 } 950 951 if (nxt_strcmp(p, "--help") == 0) { 952 write(STDOUT_FILENO, help, sizeof(help) - 1); 953 954 if (sizeof(NXT_GROUP) == 1) { 955 write(STDOUT_FILENO, primary, sizeof(primary) - 1); 956 957 } else { 958 write(STDOUT_FILENO, group, sizeof(group) - 1); 959 } 960 961 exit(0); 962 } 963 964 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\"\n", p); 965 write(STDERR_FILENO, buf, end - buf); 966 967 return NXT_ERROR; 968 } 969 970 return NXT_OK; 971 } 972 973 974 static nxt_sockaddr_t * 975 nxt_runtime_sockaddr_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 976 { 977 u_char *p; 978 size_t length; 979 980 length = addr->length; 981 p = addr->start; 982 983 if (length >= 5 && nxt_memcmp(p, "unix:", 5) == 0) { 984 return nxt_runtime_sockaddr_unix_parse(task, mp, addr); 985 } 986 987 if (length != 0 && *p == '[') { 988 return nxt_runtime_sockaddr_inet6_parse(task, mp, addr); 989 } 990 991 return nxt_runtime_sockaddr_inet_parse(task, mp, addr); 992 } 993 994 995 static nxt_sockaddr_t * 996 nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 997 { 998 #if (NXT_HAVE_UNIX_DOMAIN) 999 u_char *p; 1000 size_t length, socklen; 1001 nxt_sockaddr_t *sa; 1002 1003 /* 1004 * Actual sockaddr_un length can be lesser or even larger than defined 1005 * struct sockaddr_un length (see comment in nxt_socket.h). So 1006 * limit maximum Unix domain socket address length by defined sun_path[] 1007 * length because some OSes accept addresses twice larger than defined 1008 * struct sockaddr_un. Also reserve space for a trailing zero to avoid 1009 * ambiguity, since many OSes accept Unix domain socket addresses 1010 * without a trailing zero. 1011 */ 1012 const size_t max_len = sizeof(struct sockaddr_un) 1013 - offsetof(struct sockaddr_un, sun_path) - 1; 1014 1015 /* cutting "unix:" */ 1016 length = addr->length - 5; 1017 p = addr->start + 5; 1018 1019 if (length == 0) { 1020 nxt_log(task, NXT_LOG_CRIT, 1021 "unix domain socket \"%V\" name is invalid", addr); 1022 return NULL; 1023 } 1024 1025 if (length > max_len) { 1026 nxt_log(task, NXT_LOG_CRIT, 1027 "unix domain socket \"%V\" name is too long", addr); 1028 return NULL; 1029 } 1030 1031 socklen = offsetof(struct sockaddr_un, sun_path) + length + 1; 1032 1033 #if (NXT_LINUX) 1034 1035 /* 1036 * Linux unix(7): 1037 * 1038 * abstract: an abstract socket address is distinguished by the fact 1039 * that sun_path[0] is a null byte ('\0'). The socket's address in 1040 * this namespace is given by the additional bytes in sun_path that 1041 * are covered by the specified length of the address structure. 1042 * (Null bytes in the name have no special significance.) 1043 */ 1044 if (p[0] == '@') { 1045 p[0] = '\0'; 1046 socklen--; 1047 } 1048 1049 #endif 1050 1051 sa = nxt_sockaddr_alloc(mp, socklen, addr->length); 1052 1053 if (nxt_slow_path(sa == NULL)) { 1054 return NULL; 1055 } 1056 1057 sa->type = SOCK_STREAM; 1058 1059 sa->u.sockaddr_un.sun_family = AF_UNIX; 1060 nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length); 1061 1062 return sa; 1063 1064 #else /* !(NXT_HAVE_UNIX_DOMAIN) */ 1065 1066 nxt_log(task, NXT_LOG_CRIT, "unix domain socket \"%V\" is not supported", 1067 addr); 1068 1069 return NULL; 1070 1071 #endif 1072 } 1073 1074 1075 static nxt_sockaddr_t * 1076 nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, nxt_mp_t *mp, 1077 nxt_str_t *addr) 1078 { 1079 #if (NXT_INET6) 1080 u_char *p, *addr_end; 1081 size_t length; 1082 nxt_int_t port; 1083 nxt_sockaddr_t *sa; 1084 struct in6_addr *in6_addr; 1085 1086 length = addr->length - 1; 1087 p = addr->start + 1; 1088 1089 addr_end = nxt_memchr(p, ']', length); 1090 1091 if (addr_end == NULL) { 1092 goto invalid_address; 1093 } 1094 1095 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6), 1096 NXT_INET6_ADDR_STR_LEN); 1097 if (nxt_slow_path(sa == NULL)) { 1098 return NULL; 1099 } 1100 1101 in6_addr = &sa->u.sockaddr_in6.sin6_addr; 1102 1103 if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) { 1104 goto invalid_address; 1105 } 1106 1107 port = 0; 1108 p = addr_end + 1; 1109 length = (p + length) - p; 1110 1111 if (length == 0) { 1112 goto found; 1113 } 1114 1115 if (*p == ':') { 1116 port = nxt_int_parse(p + 1, length - 1); 1117 1118 if (port >= 1 && port <= 65535) { 1119 goto found; 1120 } 1121 } 1122 1123 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr); 1124 1125 return NULL; 1126 1127 found: 1128 1129 sa->type = SOCK_STREAM; 1130 1131 sa->u.sockaddr_in6.sin6_family = AF_INET6; 1132 sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port); 1133 1134 return sa; 1135 1136 invalid_address: 1137 1138 nxt_log(task, NXT_LOG_CRIT, "invalid IPv6 address in \"%V\"", addr); 1139 1140 return NULL; 1141 1142 #else 1143 1144 nxt_log(task, NXT_LOG_CRIT, "IPv6 socket \"%V\" is not supported", addr); 1145 1146 return NULL; 1147 1148 #endif 1149 } 1150 1151 1152 static nxt_sockaddr_t * 1153 nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, nxt_mp_t *mp, 1154 nxt_str_t *string) 1155 { 1156 u_char *p, *ip; 1157 size_t length; 1158 in_addr_t addr; 1159 nxt_int_t port; 1160 nxt_sockaddr_t *sa; 1161 1162 addr = INADDR_ANY; 1163 1164 length = string->length; 1165 ip = string->start; 1166 1167 p = nxt_memchr(ip, ':', length); 1168 1169 if (p == NULL) { 1170 1171 /* single value port, or address */ 1172 1173 port = nxt_int_parse(ip, length); 1174 1175 if (port > 0) { 1176 /* "*:XX" */ 1177 1178 if (port < 1 || port > 65535) { 1179 goto invalid_port; 1180 } 1181 1182 } else { 1183 /* "x.x.x.x" */ 1184 1185 addr = nxt_inet_addr(ip, length); 1186 1187 if (addr == INADDR_NONE) { 1188 goto invalid_port; 1189 } 1190 1191 port = 8080; 1192 } 1193 1194 } else { 1195 1196 /* x.x.x.x:XX */ 1197 1198 p++; 1199 length = (ip + length) - p; 1200 port = nxt_int_parse(p, length); 1201 1202 if (port < 1 || port > 65535) { 1203 goto invalid_port; 1204 } 1205 1206 length = (p - 1) - ip; 1207 1208 if (length != 1 || ip[0] != '*') { 1209 addr = nxt_inet_addr(ip, length); 1210 1211 if (addr == INADDR_NONE) { 1212 goto invalid_addr; 1213 } 1214 1215 /* "x.x.x.x:XX" */ 1216 } 1217 } 1218 1219 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in), 1220 NXT_INET_ADDR_STR_LEN); 1221 if (nxt_slow_path(sa == NULL)) { 1222 return NULL; 1223 } 1224 1225 sa->type = SOCK_STREAM; 1226 1227 sa->u.sockaddr_in.sin_family = AF_INET; 1228 sa->u.sockaddr_in.sin_port = htons((in_port_t) port); 1229 sa->u.sockaddr_in.sin_addr.s_addr = addr; 1230 1231 return sa; 1232 1233 invalid_port: 1234 1235 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", string); 1236 1237 return NULL; 1238 1239 invalid_addr: 1240 1241 nxt_log(task, NXT_LOG_CRIT, "invalid address in \"%V\"", string); 1242 1243 return NULL; 1244 } 1245 1246 1247 nxt_listen_socket_t * 1248 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) 1249 { 1250 nxt_mp_t *mp; 1251 nxt_listen_socket_t *ls; 1252 1253 ls = nxt_array_zero_add(rt->listen_sockets); 1254 if (ls == NULL) { 1255 return NULL; 1256 } 1257 1258 mp = rt->mem_pool; 1259 1260 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, 1261 sa->length); 1262 if (ls->sockaddr == NULL) { 1263 return NULL; 1264 } 1265 1266 ls->sockaddr->type = sa->type; 1267 1268 nxt_sockaddr_text(ls->sockaddr); 1269 1270 ls->socket = -1; 1271 ls->backlog = NXT_LISTEN_BACKLOG; 1272 1273 return ls; 1274 } 1275 1276 1277 static nxt_int_t 1278 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) 1279 { 1280 size_t length; 1281 char hostname[NXT_MAXHOSTNAMELEN + 1]; 1282 1283 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { 1284 nxt_log(task, NXT_LOG_CRIT, "gethostname() failed %E", nxt_errno); 1285 return NXT_ERROR; 1286 } 1287 1288 /* 1289 * Linux gethostname(2): 1290 * 1291 * If the null-terminated hostname is too large to fit, 1292 * then the name is truncated, and no error is returned. 1293 * 1294 * For this reason an additional byte is reserved in the buffer. 1295 */ 1296 hostname[NXT_MAXHOSTNAMELEN] = '\0'; 1297 1298 length = nxt_strlen(hostname); 1299 rt->hostname.length = length; 1300 1301 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length); 1302 1303 if (rt->hostname.start != NULL) { 1304 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); 1305 return NXT_OK; 1306 } 1307 1308 return NXT_ERROR; 1309 } 1310 1311 1312 static nxt_int_t 1313 nxt_runtime_log_files_init(nxt_runtime_t *rt) 1314 { 1315 nxt_file_t *file; 1316 nxt_list_t *log_files; 1317 1318 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); 1319 1320 if (nxt_fast_path(log_files != NULL)) { 1321 rt->log_files = log_files; 1322 1323 /* Preallocate the main log. This allocation cannot fail. */ 1324 file = nxt_list_zero_add(log_files); 1325 1326 file->fd = NXT_FILE_INVALID; 1327 file->log_level = NXT_LOG_CRIT; 1328 1329 return NXT_OK; 1330 } 1331 1332 return NXT_ERROR; 1333 } 1334 1335 1336 nxt_file_t * 1337 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) 1338 { 1339 nxt_int_t ret; 1340 nxt_file_t *file; 1341 nxt_file_name_str_t file_name; 1342 1343 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name); 1344 1345 if (nxt_slow_path(ret != NXT_OK)) { 1346 return NULL; 1347 } 1348 1349 nxt_list_each(file, rt->log_files) { 1350 1351 /* STUB: hardecoded case sensitive/insensitive. */ 1352 1353 if (file->name != NULL 1354 && nxt_file_name_eq(file->name, file_name.start)) 1355 { 1356 return file; 1357 } 1358 1359 } nxt_list_loop; 1360 1361 file = nxt_list_zero_add(rt->log_files); 1362 1363 if (nxt_slow_path(file == NULL)) { 1364 return NULL; 1365 } 1366 1367 file->fd = NXT_FILE_INVALID; 1368 file->log_level = NXT_LOG_CRIT; 1369 file->name = file_name.start; 1370 1371 return file; 1372 } 1373 1374 1375 static nxt_int_t 1376 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) 1377 { 1378 nxt_int_t ret; 1379 nxt_file_t *file; 1380 1381 nxt_list_each(file, rt->log_files) { 1382 1383 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, 1384 NXT_FILE_OWNER_ACCESS); 1385 1386 if (ret != NXT_OK) { 1387 return NXT_ERROR; 1388 } 1389 1390 } nxt_list_loop; 1391 1392 file = nxt_list_first(rt->log_files); 1393 1394 return nxt_file_stderr(file); 1395 } 1396 1397 1398 nxt_int_t 1399 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) 1400 { 1401 nxt_int_t ret; 1402 nxt_uint_t c, p, ncurr, nprev; 1403 nxt_listen_socket_t *curr, *prev; 1404 1405 curr = rt->listen_sockets->elts; 1406 ncurr = rt->listen_sockets->nelts; 1407 1408 if (rt->inherited_sockets != NULL) { 1409 prev = rt->inherited_sockets->elts; 1410 nprev = rt->inherited_sockets->nelts; 1411 1412 } else { 1413 prev = NULL; 1414 nprev = 0; 1415 } 1416 1417 for (c = 0; c < ncurr; c++) { 1418 1419 for (p = 0; p < nprev; p++) { 1420 1421 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { 1422 1423 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); 1424 if (ret != NXT_OK) { 1425 return NXT_ERROR; 1426 } 1427 1428 goto next; 1429 } 1430 } 1431 1432 if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) { 1433 return NXT_ERROR; 1434 } 1435 1436 next: 1437 1438 continue; 1439 } 1440 1441 return NXT_OK; 1442 } 1443 1444 1445 nxt_int_t 1446 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) 1447 { 1448 nxt_uint_t i, n; 1449 nxt_listen_socket_t *ls; 1450 1451 ls = rt->listen_sockets->elts; 1452 n = rt->listen_sockets->nelts; 1453 1454 for (i = 0; i < n; i++) { 1455 if (ls[i].flags == NXT_NONBLOCK) { 1456 if (nxt_listen_event(task, &ls[i]) == NULL) { 1457 return NXT_ERROR; 1458 } 1459 } 1460 } 1461 1462 return NXT_OK; 1463 } 1464 1465 1466 nxt_str_t * 1467 nxt_current_directory(nxt_mp_t *mp) 1468 { 1469 size_t length; 1470 u_char *p; 1471 nxt_str_t *name; 1472 char buf[NXT_MAX_PATH_LEN]; 1473 1474 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); 1475 1476 if (nxt_fast_path(length != 0)) { 1477 name = nxt_str_alloc(mp, length + 1); 1478 1479 if (nxt_fast_path(name != NULL)) { 1480 p = nxt_cpymem(name->start, buf, length); 1481 *p = '/'; 1482 1483 return name; 1484 } 1485 } 1486 1487 return NULL; 1488 } 1489 1490 1491 static nxt_int_t 1492 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) 1493 { 1494 ssize_t length; 1495 nxt_int_t n; 1496 nxt_file_t file; 1497 u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE]; 1498 1499 nxt_memzero(&file, sizeof(nxt_file_t)); 1500 1501 file.name = pid_file; 1502 1503 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, 1504 NXT_FILE_DEFAULT_ACCESS); 1505 1506 if (n != NXT_OK) { 1507 return NXT_ERROR; 1508 } 1509 1510 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; 1511 1512 if (nxt_file_write(&file, pid, length, 0) != length) { 1513 return NXT_ERROR; 1514 } 1515 1516 nxt_file_close(task, &file); 1517 1518 return NXT_OK; 1519 } 1520 1521 1522 nxt_process_t * 1523 nxt_runtime_process_new(nxt_runtime_t *rt) 1524 { 1525 nxt_process_t *process; 1526 1527 /* TODO: memory failures. */ 1528 1529 process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t)); 1530 if (nxt_slow_path(process == NULL)) { 1531 return NULL; 1532 } 1533 1534 nxt_queue_init(&process->ports); 1535 1536 nxt_thread_mutex_create(&process->incoming_mutex); 1537 nxt_thread_mutex_create(&process->outgoing_mutex); 1538 nxt_thread_mutex_create(&process->cp_mutex); 1539 1540 return process; 1541 } 1542 1543 1544 static void 1545 nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) 1546 { 1547 nxt_assert(process->port_cleanups == 0); 1548 nxt_assert(process->registered == 0); 1549 1550 nxt_port_mmaps_destroy(process->incoming, 1); 1551 nxt_port_mmaps_destroy(process->outgoing, 1); 1552 1553 if (process->cp_mem_pool != NULL) { 1554 nxt_mp_thread_adopt(process->cp_mem_pool); 1555 1556 nxt_mp_destroy(process->cp_mem_pool); 1557 } 1558 1559 nxt_thread_mutex_destroy(&process->incoming_mutex); 1560 nxt_thread_mutex_destroy(&process->outgoing_mutex); 1561 nxt_thread_mutex_destroy(&process->cp_mutex); 1562 1563 nxt_mp_free(rt->mem_pool, process); 1564 } 1565 1566 1567 static nxt_int_t 1568 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 1569 { 1570 nxt_process_t *process; 1571 1572 process = data; 1573 1574 if (lhq->key.length == sizeof(nxt_pid_t) 1575 && *(nxt_pid_t *) lhq->key.start == process->pid) 1576 { 1577 return NXT_OK; 1578 } 1579 1580 return NXT_DECLINED; 1581 } 1582 1583 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 1584 NXT_LVLHSH_DEFAULT, 1585 nxt_runtime_lvlhsh_pid_test, 1586 nxt_lvlhsh_alloc, 1587 nxt_lvlhsh_free, 1588 }; 1589 1590 1591 nxt_inline void 1592 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) 1593 { 1594 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 1595 lhq->key.length = sizeof(*pid); 1596 lhq->key.start = (u_char *) pid; 1597 lhq->proto = &lvlhsh_processes_proto; 1598 } 1599 1600 1601 nxt_process_t * 1602 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) 1603 { 1604 nxt_process_t *process; 1605 nxt_lvlhsh_query_t lhq; 1606 1607 process = NULL; 1608 1609 nxt_runtime_process_lhq_pid(&lhq, &pid); 1610 1611 nxt_thread_mutex_lock(&rt->processes_mutex); 1612 1613 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1614 process = lhq.value; 1615 1616 } else { 1617 nxt_thread_log_debug("process %PI not found", pid); 1618 } 1619 1620 nxt_thread_mutex_unlock(&rt->processes_mutex); 1621 1622 return process; 1623 } 1624 1625 1626 nxt_process_t * 1627 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) 1628 { 1629 nxt_process_t *process; 1630 nxt_lvlhsh_query_t lhq; 1631 1632 nxt_runtime_process_lhq_pid(&lhq, &pid); 1633 1634 nxt_thread_mutex_lock(&rt->processes_mutex); 1635 1636 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1637 nxt_thread_log_debug("process %PI found", pid); 1638 1639 nxt_thread_mutex_unlock(&rt->processes_mutex); 1640 return lhq.value; 1641 } 1642 1643 process = nxt_runtime_process_new(rt); 1644 if (nxt_slow_path(process == NULL)) { 1645 return NULL; 1646 } 1647 1648 process->pid = pid; 1649 1650 lhq.replace = 0; 1651 lhq.value = process; 1652 lhq.pool = rt->mem_pool; 1653 1654 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1655 1656 case NXT_OK: 1657 if (rt->nprocesses == 0) { 1658 rt->mprocess = process; 1659 } 1660 1661 rt->nprocesses++; 1662 1663 process->registered = 1; 1664 1665 nxt_thread_log_debug("process %PI insert", pid); 1666 break; 1667 1668 default: 1669 nxt_thread_log_debug("process %PI insert failed", pid); 1670 break; 1671 } 1672 1673 nxt_thread_mutex_unlock(&rt->processes_mutex); 1674 1675 return process; 1676 } 1677 1678 1679 void 1680 nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) 1681 { 1682 nxt_port_t *port; 1683 nxt_lvlhsh_query_t lhq; 1684 1685 nxt_assert(process->registered == 0); 1686 1687 nxt_runtime_process_lhq_pid(&lhq, &process->pid); 1688 1689 lhq.replace = 0; 1690 lhq.value = process; 1691 lhq.pool = rt->mem_pool; 1692 1693 nxt_thread_mutex_lock(&rt->processes_mutex); 1694 1695 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1696 1697 case NXT_OK: 1698 if (rt->nprocesses == 0) { 1699 rt->mprocess = process; 1700 } 1701 1702 rt->nprocesses++; 1703 1704 nxt_process_port_each(process, port) { 1705 1706 port->pid = process->pid; 1707 1708 nxt_runtime_port_add(rt, port); 1709 1710 } nxt_process_port_loop; 1711 1712 process->registered = 1; 1713 1714 nxt_thread_log_debug("process %PI added", process->pid); 1715 break; 1716 1717 default: 1718 nxt_thread_log_debug("process %PI failed to add", process->pid); 1719 break; 1720 } 1721 1722 nxt_thread_mutex_unlock(&rt->processes_mutex); 1723 } 1724 1725 1726 static nxt_process_t * 1727 nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) 1728 { 1729 nxt_process_t *process; 1730 nxt_lvlhsh_query_t lhq; 1731 1732 process = NULL; 1733 1734 nxt_runtime_process_lhq_pid(&lhq, &pid); 1735 1736 lhq.pool = rt->mem_pool; 1737 1738 nxt_thread_mutex_lock(&rt->processes_mutex); 1739 1740 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { 1741 1742 case NXT_OK: 1743 rt->nprocesses--; 1744 1745 process = lhq.value; 1746 1747 process->registered = 0; 1748 1749 nxt_thread_log_debug("process %PI removed", pid); 1750 break; 1751 1752 default: 1753 nxt_thread_log_debug("process %PI remove failed", pid); 1754 break; 1755 } 1756 1757 nxt_thread_mutex_unlock(&rt->processes_mutex); 1758 1759 return process; 1760 } 1761 1762 1763 void 1764 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) 1765 { 1766 nxt_port_t *port; 1767 1768 if (process->port_cleanups == 0) { 1769 if (process->registered == 1) { 1770 nxt_runtime_process_remove_pid(rt, process->pid); 1771 } 1772 1773 nxt_runtime_process_destroy(rt, process); 1774 1775 } else { 1776 nxt_process_port_each(process, port) { 1777 1778 nxt_runtime_port_remove(rt, port); 1779 1780 nxt_port_release(port); 1781 1782 } nxt_process_port_loop; 1783 } 1784 } 1785 1786 1787 nxt_process_t * 1788 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1789 { 1790 nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t)); 1791 1792 lhe->proto = &lvlhsh_processes_proto; 1793 1794 return nxt_runtime_process_next(rt, lhe); 1795 } 1796 1797 1798 nxt_port_t * 1799 nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1800 { 1801 return nxt_port_hash_first(&rt->ports, lhe); 1802 } 1803 1804 1805 void 1806 nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) 1807 { 1808 nxt_port_hash_add(&rt->ports, rt->mem_pool, port); 1809 1810 rt->port_by_type[port->type] = port; 1811 } 1812 1813 1814 void 1815 nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) 1816 { 1817 nxt_port_hash_remove(&rt->ports, rt->mem_pool, port); 1818 1819 if (rt->port_by_type[port->type] == port) { 1820 rt->port_by_type[port->type] = NULL; 1821 } 1822 } 1823 1824 1825 nxt_port_t * 1826 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, 1827 nxt_port_id_t port_id) 1828 { 1829 return nxt_port_hash_find(&rt->ports, pid, port_id); 1830 } 1831