1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_main.h> 9 #include <nxt_runtime.h> 10 #include <nxt_port.h> 11 #include <nxt_main_process.h> 12 #include <nxt_router.h> 13 #include <nxt_regex.h> 14 15 16 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, 17 nxt_runtime_t *rt); 18 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, 19 nxt_runtime_t *rt); 20 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); 21 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); 22 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); 23 static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status); 24 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); 25 static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt); 26 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); 27 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, 28 nxt_runtime_t *rt); 29 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt); 30 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt); 31 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt); 32 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt); 33 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task, 34 nxt_runtime_t *rt); 35 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task, 36 nxt_file_name_t *pid_file); 37 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 38 nxt_runtime_cont_t cont); 39 static void nxt_runtime_thread_pool_init(void); 40 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, 41 void *data); 42 static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); 43 static void nxt_runtime_process_remove(nxt_runtime_t *rt, 44 nxt_process_t *process); 45 static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); 46 47 48 nxt_int_t 49 nxt_runtime_create(nxt_task_t *task) 50 { 51 nxt_mp_t *mp; 52 nxt_int_t ret; 53 nxt_array_t *listen_sockets; 54 nxt_runtime_t *rt; 55 nxt_app_lang_module_t *lang; 56 57 mp = nxt_mp_create(1024, 128, 256, 32); 58 if (nxt_slow_path(mp == NULL)) { 59 return NXT_ERROR; 60 } 61 62 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t)); 63 if (nxt_slow_path(rt == NULL)) { 64 goto fail; 65 } 66 67 task->thread->runtime = rt; 68 rt->mem_pool = mp; 69 70 nxt_thread_mutex_create(&rt->processes_mutex); 71 72 rt->services = nxt_services_init(mp); 73 if (nxt_slow_path(rt->services == NULL)) { 74 goto fail; 75 } 76 77 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t)); 78 if (nxt_slow_path(rt->languages == NULL)) { 79 goto fail; 80 } 81 82 /* Should not fail. */ 83 lang = nxt_array_add(rt->languages); 84 lang->type = NXT_APP_EXTERNAL; 85 lang->version = (u_char *) ""; 86 lang->file = NULL; 87 lang->module = &nxt_external_module; 88 89 lang->mounts = nxt_array_create(mp, 1, sizeof(nxt_fs_mount_t)); 90 if (nxt_slow_path(lang->mounts == NULL)) { 91 goto fail; 92 } 93 94 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); 95 if (nxt_slow_path(listen_sockets == NULL)) { 96 goto fail; 97 } 98 99 rt->listen_sockets = listen_sockets; 100 101 ret = nxt_runtime_inherited_listen_sockets(task, rt); 102 if (nxt_slow_path(ret != NXT_OK)) { 103 goto fail; 104 } 105 106 if (nxt_runtime_hostname(task, rt) != NXT_OK) { 107 goto fail; 108 } 109 110 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { 111 goto fail; 112 } 113 114 if (nxt_runtime_event_engines(task, rt) != NXT_OK) { 115 goto fail; 116 } 117 118 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { 119 goto fail; 120 } 121 122 rt->start = nxt_runtime_initial_start; 123 124 if (nxt_runtime_conf_init(task, rt) != NXT_OK) { 125 goto fail; 126 } 127 128 if (nxt_port_rpc_init() != NXT_OK) { 129 goto fail; 130 } 131 132 if (nxt_slow_path(nxt_http_register_variables() != NXT_OK)) { 133 goto fail; 134 } 135 136 if (nxt_slow_path(nxt_var_index_init() != NXT_OK)) { 137 goto fail; 138 } 139 140 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 141 nxt_runtime_start, task, rt, NULL); 142 143 return NXT_OK; 144 145 fail: 146 147 nxt_mp_destroy(mp); 148 149 return NXT_ERROR; 150 } 151 152 153 static nxt_int_t 154 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 155 { 156 u_char *v, *p; 157 nxt_int_t type; 158 nxt_array_t *inherited_sockets; 159 nxt_socket_t s; 160 nxt_listen_socket_t *ls; 161 162 v = (u_char *) getenv("NGINX"); 163 164 if (v == NULL) { 165 return nxt_runtime_systemd_listen_sockets(task, rt); 166 } 167 168 nxt_alert(task, "using inherited listen sockets: %s", v); 169 170 inherited_sockets = nxt_array_create(rt->mem_pool, 171 1, sizeof(nxt_listen_socket_t)); 172 if (inherited_sockets == NULL) { 173 return NXT_ERROR; 174 } 175 176 rt->inherited_sockets = inherited_sockets; 177 178 for (p = v; *p != '\0'; p++) { 179 180 if (*p == ';') { 181 s = nxt_int_parse(v, p - v); 182 183 if (nxt_slow_path(s < 0)) { 184 nxt_alert(task, "invalid socket number \"%s\" " 185 "in NGINX environment variable, " 186 "ignoring the rest of the variable", v); 187 return NXT_ERROR; 188 } 189 190 v = p + 1; 191 192 ls = nxt_array_zero_add(inherited_sockets); 193 if (nxt_slow_path(ls == NULL)) { 194 return NXT_ERROR; 195 } 196 197 ls->socket = s; 198 199 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 200 if (nxt_slow_path(ls->sockaddr == NULL)) { 201 return NXT_ERROR; 202 } 203 204 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); 205 if (nxt_slow_path(type == -1)) { 206 return NXT_ERROR; 207 } 208 209 ls->sockaddr->type = (uint16_t) type; 210 } 211 } 212 213 return NXT_OK; 214 } 215 216 217 static nxt_int_t 218 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 219 { 220 u_char *nfd, *pid; 221 nxt_int_t n; 222 nxt_array_t *inherited_sockets; 223 nxt_socket_t s; 224 nxt_listen_socket_t *ls; 225 226 /* 227 * Number of listening sockets passed. The socket 228 * descriptors start from number 3 and are sequential. 229 */ 230 nfd = (u_char *) getenv("LISTEN_FDS"); 231 if (nfd == NULL) { 232 return NXT_OK; 233 } 234 235 /* The pid of the service process. */ 236 pid = (u_char *) getenv("LISTEN_PID"); 237 if (pid == NULL) { 238 return NXT_OK; 239 } 240 241 n = nxt_int_parse(nfd, nxt_strlen(nfd)); 242 if (n < 0) { 243 return NXT_OK; 244 } 245 246 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { 247 return NXT_OK; 248 } 249 250 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n); 251 252 inherited_sockets = nxt_array_create(rt->mem_pool, 253 n, sizeof(nxt_listen_socket_t)); 254 if (inherited_sockets == NULL) { 255 return NXT_ERROR; 256 } 257 258 rt->inherited_sockets = inherited_sockets; 259 260 for (s = 3; s < n; s++) { 261 ls = nxt_array_zero_add(inherited_sockets); 262 if (nxt_slow_path(ls == NULL)) { 263 return NXT_ERROR; 264 } 265 266 ls->socket = s; 267 268 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 269 if (nxt_slow_path(ls->sockaddr == NULL)) { 270 return NXT_ERROR; 271 } 272 273 ls->sockaddr->type = SOCK_STREAM; 274 } 275 276 return NXT_OK; 277 } 278 279 280 static nxt_int_t 281 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) 282 { 283 nxt_thread_t *thread; 284 nxt_event_engine_t *engine; 285 const nxt_event_interface_t *interface; 286 287 interface = nxt_service_get(rt->services, "engine", NULL); 288 289 if (nxt_slow_path(interface == NULL)) { 290 /* TODO: log */ 291 return NXT_ERROR; 292 } 293 294 engine = nxt_event_engine_create(task, interface, 295 nxt_main_process_signals, 0, 0); 296 297 if (nxt_slow_path(engine == NULL)) { 298 return NXT_ERROR; 299 } 300 301 thread = task->thread; 302 thread->engine = engine; 303 #if 0 304 thread->fiber = &engine->fibers->fiber; 305 #endif 306 307 engine->id = rt->last_engine_id++; 308 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32); 309 310 nxt_queue_init(&rt->engines); 311 nxt_queue_insert_tail(&rt->engines, &engine->link); 312 313 return NXT_OK; 314 } 315 316 317 static nxt_int_t 318 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) 319 { 320 nxt_int_t ret; 321 nxt_array_t *thread_pools; 322 323 thread_pools = nxt_array_create(rt->mem_pool, 1, 324 sizeof(nxt_thread_pool_t *)); 325 326 if (nxt_slow_path(thread_pools == NULL)) { 327 return NXT_ERROR; 328 } 329 330 rt->thread_pools = thread_pools; 331 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); 332 333 if (nxt_slow_path(ret != NXT_OK)) { 334 return NXT_ERROR; 335 } 336 337 return NXT_OK; 338 } 339 340 341 static void 342 nxt_runtime_start(nxt_task_t *task, void *obj, void *data) 343 { 344 nxt_runtime_t *rt; 345 346 rt = obj; 347 348 nxt_debug(task, "rt conf done"); 349 350 task->thread->log->ctx_handler = NULL; 351 task->thread->log->ctx = NULL; 352 353 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { 354 goto fail; 355 } 356 357 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 358 goto fail; 359 } 360 361 /* 362 * Thread pools should be destroyed before starting worker 363 * processes, because thread pool semaphores will stick in 364 * locked state in new processes after fork(). 365 */ 366 nxt_runtime_thread_pool_destroy(task, rt, rt->start); 367 368 return; 369 370 fail: 371 372 nxt_runtime_quit(task, 1); 373 } 374 375 376 static void 377 nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status) 378 { 379 nxt_int_t ret; 380 nxt_thread_t *thr; 381 nxt_runtime_t *rt; 382 const nxt_event_interface_t *interface; 383 384 thr = task->thread; 385 rt = thr->runtime; 386 387 if (rt->inherited_sockets == NULL && rt->daemon) { 388 389 if (nxt_process_daemon(task) != NXT_OK) { 390 goto fail; 391 } 392 393 /* 394 * An event engine should be updated after fork() 395 * even if an event facility was not changed because: 396 * 1) inherited kqueue descriptor is invalid, 397 * 2) the signal thread is not inherited. 398 */ 399 interface = nxt_service_get(rt->services, "engine", rt->engine); 400 if (interface == NULL) { 401 goto fail; 402 } 403 404 ret = nxt_event_engine_change(task->thread->engine, interface, 405 rt->batch); 406 if (ret != NXT_OK) { 407 goto fail; 408 } 409 } 410 411 ret = nxt_runtime_pid_file_create(task, rt->pid_file); 412 if (ret != NXT_OK) { 413 goto fail; 414 } 415 416 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 417 goto fail; 418 } 419 420 thr->engine->max_connections = rt->engine_connections; 421 422 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) { 423 return; 424 } 425 426 fail: 427 428 nxt_runtime_quit(task, 1); 429 } 430 431 432 void 433 nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status) 434 { 435 nxt_bool_t done; 436 nxt_runtime_t *rt; 437 nxt_event_engine_t *engine; 438 439 rt = task->thread->runtime; 440 rt->status |= status; 441 engine = task->thread->engine; 442 443 nxt_debug(task, "exiting"); 444 445 done = 1; 446 447 if (!engine->shutdown) { 448 engine->shutdown = 1; 449 450 if (!nxt_array_is_empty(rt->thread_pools)) { 451 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); 452 done = 0; 453 } 454 455 if (rt->type == NXT_PROCESS_MAIN) { 456 nxt_runtime_stop_all_processes(task, rt); 457 done = 0; 458 } 459 } 460 461 nxt_runtime_close_idle_connections(engine); 462 463 if (done) { 464 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, 465 task, rt, engine); 466 } 467 } 468 469 470 static void 471 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) 472 { 473 nxt_conn_t *c; 474 nxt_queue_t *idle; 475 nxt_queue_link_t *link, *next; 476 477 nxt_debug(&engine->task, "close idle connections"); 478 479 idle = &engine->idle_connections; 480 481 for (link = nxt_queue_first(idle); 482 link != nxt_queue_tail(idle); 483 link = next) 484 { 485 next = nxt_queue_next(link); 486 c = nxt_queue_link_data(link, nxt_conn_t, link); 487 488 if (!c->socket.read_ready) { 489 nxt_queue_remove(link); 490 nxt_conn_close(engine, c); 491 } 492 } 493 } 494 495 496 void 497 nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt) 498 { 499 nxt_port_t *port; 500 nxt_process_t *process; 501 nxt_process_init_t *init; 502 503 nxt_runtime_process_each(rt, process) { 504 505 init = nxt_process_init(process); 506 507 if (init->type == NXT_PROCESS_APP) { 508 509 nxt_process_port_each(process, port) { 510 511 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 512 0, 0, NULL); 513 514 } nxt_process_port_loop; 515 } 516 517 } nxt_runtime_process_loop; 518 } 519 520 521 static void 522 nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) 523 { 524 nxt_port_t *port; 525 nxt_process_t *process; 526 527 nxt_runtime_process_each(rt, process) { 528 529 nxt_process_port_each(process, port) { 530 531 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 532 0, NULL); 533 534 } nxt_process_port_loop; 535 536 } nxt_runtime_process_loop; 537 } 538 539 540 static void 541 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) 542 { 543 int status, engine_count; 544 nxt_runtime_t *rt; 545 nxt_process_t *process; 546 nxt_event_engine_t *engine; 547 548 rt = obj; 549 engine = data; 550 551 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); 552 553 if (!nxt_array_is_empty(rt->thread_pools)) { 554 return; 555 } 556 557 if (rt->type == NXT_PROCESS_MAIN) { 558 if (rt->pid_file != NULL) { 559 nxt_file_delete(rt->pid_file); 560 } 561 562 #if (NXT_HAVE_UNIX_DOMAIN) 563 { 564 nxt_sockaddr_t *sa; 565 nxt_file_name_t *name; 566 567 sa = rt->controller_listen; 568 569 if (sa->u.sockaddr.sa_family == AF_UNIX) { 570 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path; 571 (void) nxt_file_delete(name); 572 } 573 } 574 #endif 575 } 576 577 if (!engine->event.signal_support) { 578 nxt_event_engine_signals_stop(engine); 579 } 580 581 nxt_runtime_process_each(rt, process) { 582 583 nxt_runtime_process_remove(rt, process); 584 nxt_process_close_ports(task, process); 585 586 } nxt_runtime_process_loop; 587 588 status = rt->status; 589 590 engine_count = 0; 591 592 nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) { 593 594 engine_count++; 595 596 } nxt_queue_loop; 597 598 if (engine_count <= 1) { 599 if (rt->port_by_type[rt->type] != NULL) { 600 nxt_port_use(task, rt->port_by_type[rt->type], -1); 601 } 602 603 nxt_thread_mutex_destroy(&rt->processes_mutex); 604 605 nxt_mp_destroy(rt->mem_pool); 606 } 607 608 nxt_debug(task, "exit: %d", status); 609 610 exit(status); 611 nxt_unreachable(); 612 } 613 614 615 static nxt_int_t 616 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) 617 { 618 nxt_event_engine_t *engine; 619 const nxt_event_interface_t *interface; 620 621 engine = task->thread->engine; 622 623 if (engine->batch == rt->batch 624 && nxt_strcmp(engine->event.name, rt->engine) == 0) 625 { 626 return NXT_OK; 627 } 628 629 interface = nxt_service_get(rt->services, "engine", rt->engine); 630 631 if (interface != NULL) { 632 return nxt_event_engine_change(engine, interface, rt->batch); 633 } 634 635 return NXT_ERROR; 636 } 637 638 639 void 640 nxt_runtime_event_engine_free(nxt_runtime_t *rt) 641 { 642 nxt_queue_link_t *link; 643 nxt_event_engine_t *engine; 644 645 link = nxt_queue_first(&rt->engines); 646 nxt_queue_remove(link); 647 648 engine = nxt_queue_link_data(link, nxt_event_engine_t, link); 649 nxt_event_engine_free(engine); 650 } 651 652 653 nxt_int_t 654 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, 655 nxt_uint_t max_threads, nxt_nsec_t timeout) 656 { 657 nxt_thread_pool_t *thread_pool, **tp; 658 659 tp = nxt_array_add(rt->thread_pools); 660 if (tp == NULL) { 661 return NXT_ERROR; 662 } 663 664 thread_pool = nxt_thread_pool_create(max_threads, timeout, 665 nxt_runtime_thread_pool_init, 666 thr->engine, 667 nxt_runtime_thread_pool_exit); 668 669 if (nxt_fast_path(thread_pool != NULL)) { 670 *tp = thread_pool; 671 } 672 673 return NXT_OK; 674 } 675 676 677 static void 678 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 679 nxt_runtime_cont_t cont) 680 { 681 nxt_uint_t n; 682 nxt_thread_pool_t **tp; 683 684 rt->continuation = cont; 685 686 n = rt->thread_pools->nelts; 687 688 if (n == 0) { 689 cont(task, 0); 690 return; 691 } 692 693 tp = rt->thread_pools->elts; 694 695 do { 696 nxt_thread_pool_destroy(*tp); 697 698 tp++; 699 n--; 700 } while (n != 0); 701 } 702 703 704 static void 705 nxt_runtime_thread_pool_init(void) 706 { 707 } 708 709 710 static void 711 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 712 { 713 nxt_uint_t i, n; 714 nxt_runtime_t *rt; 715 nxt_thread_pool_t *tp, **thread_pools; 716 nxt_thread_handle_t handle; 717 718 tp = obj; 719 720 if (data != NULL) { 721 handle = (nxt_thread_handle_t) (uintptr_t) data; 722 nxt_thread_wait(handle); 723 } 724 725 rt = task->thread->runtime; 726 727 thread_pools = rt->thread_pools->elts; 728 n = rt->thread_pools->nelts; 729 730 nxt_debug(task, "thread pools: %ui", n); 731 732 for (i = 0; i < n; i++) { 733 734 if (tp == thread_pools[i]) { 735 nxt_array_remove(rt->thread_pools, &thread_pools[i]); 736 737 nxt_free(tp); 738 739 if (n == 1) { 740 /* The last thread pool. */ 741 rt->continuation(task, 0); 742 } 743 744 return; 745 } 746 } 747 } 748 749 750 static nxt_int_t 751 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) 752 { 753 nxt_int_t ret; 754 nxt_str_t control; 755 nxt_uint_t n; 756 nxt_file_t *file; 757 const char *slash; 758 nxt_sockaddr_t *sa; 759 nxt_file_name_str_t file_name; 760 const nxt_event_interface_t *interface; 761 762 rt->daemon = 1; 763 rt->engine_connections = 256; 764 rt->auxiliary_threads = 2; 765 rt->user_cred.user = NXT_USER; 766 rt->group = NXT_GROUP; 767 rt->pid = NXT_PID; 768 rt->log = NXT_LOG; 769 rt->modules = NXT_MODULES; 770 rt->state = NXT_STATE; 771 rt->control = NXT_CONTROL_SOCK; 772 rt->tmp = NXT_TMP; 773 774 nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t)); 775 776 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { 777 return NXT_ERROR; 778 } 779 780 if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) { 781 return NXT_ERROR; 782 } 783 784 if (rt->capabilities.setid) { 785 ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred, 786 rt->group); 787 788 if (nxt_slow_path(ret != NXT_OK)) { 789 return NXT_ERROR; 790 } 791 792 } else { 793 nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it " 794 "cannot use arbitrary user and group."); 795 } 796 797 /* An engine's parameters. */ 798 799 interface = nxt_service_get(rt->services, "engine", rt->engine); 800 if (interface == NULL) { 801 return NXT_ERROR; 802 } 803 804 rt->engine = interface->name; 805 806 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid); 807 if (nxt_slow_path(ret != NXT_OK)) { 808 return NXT_ERROR; 809 } 810 811 rt->pid_file = file_name.start; 812 813 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log); 814 if (nxt_slow_path(ret != NXT_OK)) { 815 return NXT_ERROR; 816 } 817 818 file = nxt_list_first(rt->log_files); 819 file->name = file_name.start; 820 821 slash = ""; 822 n = nxt_strlen(rt->modules); 823 824 if (n > 1 && rt->modules[n - 1] != '/') { 825 slash = "/"; 826 } 827 828 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z", 829 rt->modules, slash); 830 if (nxt_slow_path(ret != NXT_OK)) { 831 return NXT_ERROR; 832 } 833 834 rt->modules = (char *) file_name.start; 835 836 slash = ""; 837 n = nxt_strlen(rt->state); 838 839 if (n > 1 && rt->state[n - 1] != '/') { 840 slash = "/"; 841 } 842 843 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sversion%Z", 844 rt->state, slash); 845 if (nxt_slow_path(ret != NXT_OK)) { 846 return NXT_ERROR; 847 } 848 849 rt->ver = (char *) file_name.start; 850 851 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->ver); 852 if (nxt_slow_path(ret != NXT_OK)) { 853 return NXT_ERROR; 854 } 855 856 rt->ver_tmp = (char *) file_name.start; 857 858 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z", 859 rt->state, slash); 860 if (nxt_slow_path(ret != NXT_OK)) { 861 return NXT_ERROR; 862 } 863 864 rt->conf = (char *) file_name.start; 865 866 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf); 867 if (nxt_slow_path(ret != NXT_OK)) { 868 return NXT_ERROR; 869 } 870 871 rt->conf_tmp = (char *) file_name.start; 872 873 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z", 874 rt->state, slash); 875 if (nxt_slow_path(ret != NXT_OK)) { 876 return NXT_ERROR; 877 } 878 879 ret = mkdir((char *) file_name.start, S_IRWXU); 880 881 if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) { 882 rt->certs.length = file_name.len; 883 rt->certs.start = file_name.start; 884 885 } else { 886 nxt_alert(task, "Unable to create certificates storage directory: " 887 "mkdir(%s) failed %E", file_name.start, nxt_errno); 888 } 889 890 control.length = nxt_strlen(rt->control); 891 control.start = (u_char *) rt->control; 892 893 sa = nxt_sockaddr_parse(rt->mem_pool, &control); 894 if (nxt_slow_path(sa == NULL)) { 895 return NXT_ERROR; 896 } 897 898 sa->type = SOCK_STREAM; 899 900 rt->controller_listen = sa; 901 902 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { 903 return NXT_ERROR; 904 } 905 906 return NXT_OK; 907 } 908 909 910 static nxt_int_t 911 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) 912 { 913 char *p, **argv; 914 u_char *end; 915 u_char buf[1024]; 916 917 static const char version[] = 918 "unit version: " NXT_VERSION "\n" 919 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n"; 920 921 static const char no_control[] = 922 "option \"--control\" requires socket address\n"; 923 static const char no_user[] = "option \"--user\" requires username\n"; 924 static const char no_group[] = "option \"--group\" requires group name\n"; 925 static const char no_pid[] = "option \"--pid\" requires filename\n"; 926 static const char no_log[] = "option \"--log\" requires filename\n"; 927 static const char no_modules[] = 928 "option \"--modules\" requires directory\n"; 929 static const char no_state[] = "option \"--state\" requires directory\n"; 930 static const char no_tmp[] = "option \"--tmp\" requires directory\n"; 931 932 static const char help[] = 933 "\n" 934 "unit options:\n" 935 "\n" 936 " --version print unit version and configure options\n" 937 "\n" 938 " --no-daemon run unit in non-daemon mode\n" 939 "\n" 940 " --control ADDRESS set address of control API socket\n" 941 " default: \"" NXT_CONTROL_SOCK "\"\n" 942 "\n" 943 " --pid FILE set pid filename\n" 944 " default: \"" NXT_PID "\"\n" 945 "\n" 946 " --log FILE set log filename\n" 947 " default: \"" NXT_LOG "\"\n" 948 "\n" 949 " --modules DIRECTORY set modules directory name\n" 950 " default: \"" NXT_MODULES "\"\n" 951 "\n" 952 " --state DIRECTORY set state directory name\n" 953 " default: \"" NXT_STATE "\"\n" 954 "\n" 955 " --tmp DIRECTORY set tmp directory name\n" 956 " default: \"" NXT_TMP "\"\n" 957 "\n" 958 " --user USER set non-privileged processes to run" 959 " as specified user\n" 960 " default: \"" NXT_USER "\"\n" 961 "\n" 962 " --group GROUP set non-privileged processes to run" 963 " as specified group\n" 964 " default: "; 965 966 static const char group[] = "\"" NXT_GROUP "\"\n\n"; 967 static const char primary[] = "user's primary group\n\n"; 968 969 argv = &nxt_process_argv[1]; 970 971 while (*argv != NULL) { 972 p = *argv++; 973 974 if (nxt_strcmp(p, "--control") == 0) { 975 if (*argv == NULL) { 976 write(STDERR_FILENO, no_control, nxt_length(no_control)); 977 return NXT_ERROR; 978 } 979 980 p = *argv++; 981 982 rt->control = p; 983 984 continue; 985 } 986 987 if (nxt_strcmp(p, "--user") == 0) { 988 if (*argv == NULL) { 989 write(STDERR_FILENO, no_user, nxt_length(no_user)); 990 return NXT_ERROR; 991 } 992 993 p = *argv++; 994 995 rt->user_cred.user = p; 996 997 continue; 998 } 999 1000 if (nxt_strcmp(p, "--group") == 0) { 1001 if (*argv == NULL) { 1002 write(STDERR_FILENO, no_group, nxt_length(no_group)); 1003 return NXT_ERROR; 1004 } 1005 1006 p = *argv++; 1007 1008 rt->group = p; 1009 1010 continue; 1011 } 1012 1013 if (nxt_strcmp(p, "--pid") == 0) { 1014 if (*argv == NULL) { 1015 write(STDERR_FILENO, no_pid, nxt_length(no_pid)); 1016 return NXT_ERROR; 1017 } 1018 1019 p = *argv++; 1020 1021 rt->pid = p; 1022 1023 continue; 1024 } 1025 1026 if (nxt_strcmp(p, "--log") == 0) { 1027 if (*argv == NULL) { 1028 write(STDERR_FILENO, no_log, nxt_length(no_log)); 1029 return NXT_ERROR; 1030 } 1031 1032 p = *argv++; 1033 1034 rt->log = p; 1035 1036 continue; 1037 } 1038 1039 if (nxt_strcmp(p, "--modules") == 0) { 1040 if (*argv == NULL) { 1041 write(STDERR_FILENO, no_modules, nxt_length(no_modules)); 1042 return NXT_ERROR; 1043 } 1044 1045 p = *argv++; 1046 1047 rt->modules = p; 1048 1049 continue; 1050 } 1051 1052 if (nxt_strcmp(p, "--state") == 0) { 1053 if (*argv == NULL) { 1054 write(STDERR_FILENO, no_state, nxt_length(no_state)); 1055 return NXT_ERROR; 1056 } 1057 1058 p = *argv++; 1059 1060 rt->state = p; 1061 1062 continue; 1063 } 1064 1065 if (nxt_strcmp(p, "--tmp") == 0) { 1066 if (*argv == NULL) { 1067 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp)); 1068 return NXT_ERROR; 1069 } 1070 1071 p = *argv++; 1072 1073 rt->tmp = p; 1074 1075 continue; 1076 } 1077 1078 if (nxt_strcmp(p, "--no-daemon") == 0) { 1079 rt->daemon = 0; 1080 continue; 1081 } 1082 1083 if (nxt_strcmp(p, "--version") == 0) { 1084 write(STDERR_FILENO, version, nxt_length(version)); 1085 exit(0); 1086 } 1087 1088 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) { 1089 write(STDOUT_FILENO, help, nxt_length(help)); 1090 1091 if (sizeof(NXT_GROUP) == 1) { 1092 write(STDOUT_FILENO, primary, nxt_length(primary)); 1093 1094 } else { 1095 write(STDOUT_FILENO, group, nxt_length(group)); 1096 } 1097 1098 exit(0); 1099 } 1100 1101 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", " 1102 "try \"%s -h\" for available options\n", 1103 p, nxt_process_argv[0]); 1104 1105 write(STDERR_FILENO, buf, end - buf); 1106 1107 return NXT_ERROR; 1108 } 1109 1110 return NXT_OK; 1111 } 1112 1113 1114 nxt_listen_socket_t * 1115 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) 1116 { 1117 nxt_mp_t *mp; 1118 nxt_listen_socket_t *ls; 1119 1120 ls = nxt_array_zero_add(rt->listen_sockets); 1121 if (ls == NULL) { 1122 return NULL; 1123 } 1124 1125 mp = rt->mem_pool; 1126 1127 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, 1128 sa->length); 1129 if (ls->sockaddr == NULL) { 1130 return NULL; 1131 } 1132 1133 ls->sockaddr->type = sa->type; 1134 1135 nxt_sockaddr_text(ls->sockaddr); 1136 1137 ls->socket = -1; 1138 ls->backlog = NXT_LISTEN_BACKLOG; 1139 1140 return ls; 1141 } 1142 1143 1144 static nxt_int_t 1145 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) 1146 { 1147 size_t length; 1148 char hostname[NXT_MAXHOSTNAMELEN + 1]; 1149 1150 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { 1151 nxt_alert(task, "gethostname() failed %E", nxt_errno); 1152 return NXT_ERROR; 1153 } 1154 1155 /* 1156 * Linux gethostname(2): 1157 * 1158 * If the null-terminated hostname is too large to fit, 1159 * then the name is truncated, and no error is returned. 1160 * 1161 * For this reason an additional byte is reserved in the buffer. 1162 */ 1163 hostname[NXT_MAXHOSTNAMELEN] = '\0'; 1164 1165 length = nxt_strlen(hostname); 1166 rt->hostname.length = length; 1167 1168 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length); 1169 1170 if (rt->hostname.start != NULL) { 1171 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); 1172 return NXT_OK; 1173 } 1174 1175 return NXT_ERROR; 1176 } 1177 1178 1179 static nxt_int_t 1180 nxt_runtime_log_files_init(nxt_runtime_t *rt) 1181 { 1182 nxt_file_t *file; 1183 nxt_list_t *log_files; 1184 1185 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); 1186 1187 if (nxt_fast_path(log_files != NULL)) { 1188 rt->log_files = log_files; 1189 1190 /* Preallocate the main log. This allocation cannot fail. */ 1191 file = nxt_list_zero_add(log_files); 1192 1193 file->fd = NXT_FILE_INVALID; 1194 file->log_level = NXT_LOG_ALERT; 1195 1196 return NXT_OK; 1197 } 1198 1199 return NXT_ERROR; 1200 } 1201 1202 1203 nxt_file_t * 1204 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) 1205 { 1206 nxt_int_t ret; 1207 nxt_file_t *file; 1208 nxt_file_name_str_t file_name; 1209 1210 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name); 1211 1212 if (nxt_slow_path(ret != NXT_OK)) { 1213 return NULL; 1214 } 1215 1216 nxt_list_each(file, rt->log_files) { 1217 1218 /* STUB: hardecoded case sensitive/insensitive. */ 1219 1220 if (file->name != NULL 1221 && nxt_file_name_eq(file->name, file_name.start)) 1222 { 1223 return file; 1224 } 1225 1226 } nxt_list_loop; 1227 1228 file = nxt_list_zero_add(rt->log_files); 1229 1230 if (nxt_slow_path(file == NULL)) { 1231 return NULL; 1232 } 1233 1234 file->fd = NXT_FILE_INVALID; 1235 file->log_level = NXT_LOG_ALERT; 1236 file->name = file_name.start; 1237 1238 return file; 1239 } 1240 1241 1242 static nxt_int_t 1243 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) 1244 { 1245 nxt_int_t ret; 1246 nxt_file_t *file; 1247 1248 nxt_list_each(file, rt->log_files) { 1249 1250 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, 1251 NXT_FILE_OWNER_ACCESS); 1252 1253 if (ret != NXT_OK) { 1254 return NXT_ERROR; 1255 } 1256 1257 } nxt_list_loop; 1258 1259 file = nxt_list_first(rt->log_files); 1260 1261 return nxt_file_stderr(file); 1262 } 1263 1264 1265 nxt_int_t 1266 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) 1267 { 1268 nxt_int_t ret; 1269 nxt_uint_t c, p, ncurr, nprev; 1270 nxt_listen_socket_t *curr, *prev; 1271 1272 curr = rt->listen_sockets->elts; 1273 ncurr = rt->listen_sockets->nelts; 1274 1275 if (rt->inherited_sockets != NULL) { 1276 prev = rt->inherited_sockets->elts; 1277 nprev = rt->inherited_sockets->nelts; 1278 1279 } else { 1280 prev = NULL; 1281 nprev = 0; 1282 } 1283 1284 for (c = 0; c < ncurr; c++) { 1285 1286 for (p = 0; p < nprev; p++) { 1287 1288 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { 1289 1290 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); 1291 if (ret != NXT_OK) { 1292 return NXT_ERROR; 1293 } 1294 1295 goto next; 1296 } 1297 } 1298 1299 if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) { 1300 return NXT_ERROR; 1301 } 1302 1303 next: 1304 1305 continue; 1306 } 1307 1308 return NXT_OK; 1309 } 1310 1311 1312 nxt_int_t 1313 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) 1314 { 1315 nxt_uint_t i, n; 1316 nxt_listen_socket_t *ls; 1317 1318 ls = rt->listen_sockets->elts; 1319 n = rt->listen_sockets->nelts; 1320 1321 for (i = 0; i < n; i++) { 1322 if (ls[i].flags == NXT_NONBLOCK) { 1323 if (nxt_listen_event(task, &ls[i]) == NULL) { 1324 return NXT_ERROR; 1325 } 1326 } 1327 } 1328 1329 return NXT_OK; 1330 } 1331 1332 1333 nxt_str_t * 1334 nxt_current_directory(nxt_mp_t *mp) 1335 { 1336 size_t length; 1337 u_char *p; 1338 nxt_str_t *name; 1339 char buf[NXT_MAX_PATH_LEN]; 1340 1341 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); 1342 1343 if (nxt_fast_path(length != 0)) { 1344 name = nxt_str_alloc(mp, length + 1); 1345 1346 if (nxt_fast_path(name != NULL)) { 1347 p = nxt_cpymem(name->start, buf, length); 1348 *p = '/'; 1349 1350 return name; 1351 } 1352 } 1353 1354 return NULL; 1355 } 1356 1357 1358 static nxt_int_t 1359 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) 1360 { 1361 ssize_t length; 1362 nxt_int_t n; 1363 nxt_file_t file; 1364 u_char pid[NXT_INT64_T_LEN + nxt_length("\n")]; 1365 1366 nxt_memzero(&file, sizeof(nxt_file_t)); 1367 1368 file.name = pid_file; 1369 1370 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, 1371 NXT_FILE_DEFAULT_ACCESS); 1372 1373 if (n != NXT_OK) { 1374 return NXT_ERROR; 1375 } 1376 1377 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; 1378 1379 if (nxt_file_write(&file, pid, length, 0) != length) { 1380 return NXT_ERROR; 1381 } 1382 1383 nxt_file_close(task, &file); 1384 1385 return NXT_OK; 1386 } 1387 1388 1389 void 1390 nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) 1391 { 1392 if (process->registered == 1) { 1393 nxt_runtime_process_remove(rt, process); 1394 } 1395 1396 nxt_assert(process->use_count == 0); 1397 nxt_assert(process->registered == 0); 1398 1399 nxt_port_mmaps_destroy(&process->incoming, 1); 1400 1401 nxt_thread_mutex_destroy(&process->incoming.mutex); 1402 1403 /* processes from nxt_runtime_process_get() have no memory pool */ 1404 if (process->mem_pool != NULL) { 1405 nxt_mp_destroy(process->mem_pool); 1406 } 1407 1408 nxt_mp_free(rt->mem_pool, process); 1409 } 1410 1411 1412 static nxt_int_t 1413 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 1414 { 1415 nxt_process_t *process; 1416 1417 process = data; 1418 1419 if (lhq->key.length == sizeof(nxt_pid_t) 1420 && *(nxt_pid_t *) lhq->key.start == process->pid) 1421 { 1422 return NXT_OK; 1423 } 1424 1425 return NXT_DECLINED; 1426 } 1427 1428 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 1429 NXT_LVLHSH_DEFAULT, 1430 nxt_runtime_lvlhsh_pid_test, 1431 nxt_lvlhsh_alloc, 1432 nxt_lvlhsh_free, 1433 }; 1434 1435 1436 nxt_inline void 1437 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) 1438 { 1439 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 1440 lhq->key.length = sizeof(*pid); 1441 lhq->key.start = (u_char *) pid; 1442 lhq->proto = &lvlhsh_processes_proto; 1443 } 1444 1445 1446 nxt_process_t * 1447 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) 1448 { 1449 nxt_process_t *process; 1450 nxt_lvlhsh_query_t lhq; 1451 1452 process = NULL; 1453 1454 nxt_runtime_process_lhq_pid(&lhq, &pid); 1455 1456 nxt_thread_mutex_lock(&rt->processes_mutex); 1457 1458 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1459 process = lhq.value; 1460 1461 } else { 1462 nxt_thread_log_debug("process %PI not found", pid); 1463 } 1464 1465 nxt_thread_mutex_unlock(&rt->processes_mutex); 1466 1467 return process; 1468 } 1469 1470 1471 static nxt_process_t * 1472 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) 1473 { 1474 nxt_process_t *process; 1475 nxt_lvlhsh_query_t lhq; 1476 1477 nxt_runtime_process_lhq_pid(&lhq, &pid); 1478 1479 nxt_thread_mutex_lock(&rt->processes_mutex); 1480 1481 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1482 nxt_thread_log_debug("process %PI found", pid); 1483 1484 nxt_thread_mutex_unlock(&rt->processes_mutex); 1485 1486 process = lhq.value; 1487 process->use_count++; 1488 1489 return process; 1490 } 1491 1492 process = nxt_process_new(rt); 1493 if (nxt_slow_path(process == NULL)) { 1494 1495 nxt_thread_mutex_unlock(&rt->processes_mutex); 1496 1497 return NULL; 1498 } 1499 1500 process->pid = pid; 1501 1502 lhq.replace = 0; 1503 lhq.value = process; 1504 lhq.pool = rt->mem_pool; 1505 1506 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1507 1508 case NXT_OK: 1509 if (rt->nprocesses == 0) { 1510 rt->mprocess = process; 1511 } 1512 1513 rt->nprocesses++; 1514 1515 process->registered = 1; 1516 1517 nxt_thread_log_debug("process %PI insert", pid); 1518 break; 1519 1520 default: 1521 nxt_thread_log_debug("process %PI insert failed", pid); 1522 break; 1523 } 1524 1525 nxt_thread_mutex_unlock(&rt->processes_mutex); 1526 1527 return process; 1528 } 1529 1530 1531 void 1532 nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) 1533 { 1534 nxt_port_t *port; 1535 nxt_runtime_t *rt; 1536 nxt_lvlhsh_query_t lhq; 1537 1538 nxt_assert(process->registered == 0); 1539 1540 rt = task->thread->runtime; 1541 1542 nxt_runtime_process_lhq_pid(&lhq, &process->pid); 1543 1544 lhq.replace = 0; 1545 lhq.value = process; 1546 lhq.pool = rt->mem_pool; 1547 1548 nxt_thread_mutex_lock(&rt->processes_mutex); 1549 1550 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1551 1552 case NXT_OK: 1553 if (rt->nprocesses == 0) { 1554 rt->mprocess = process; 1555 } 1556 1557 rt->nprocesses++; 1558 1559 nxt_process_port_each(process, port) { 1560 1561 port->pid = process->pid; 1562 1563 nxt_runtime_port_add(task, port); 1564 1565 } nxt_process_port_loop; 1566 1567 process->registered = 1; 1568 1569 nxt_thread_log_debug("process %PI added", process->pid); 1570 break; 1571 1572 default: 1573 nxt_thread_log_debug("process %PI failed to add", process->pid); 1574 break; 1575 } 1576 1577 nxt_thread_mutex_unlock(&rt->processes_mutex); 1578 } 1579 1580 1581 void 1582 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) 1583 { 1584 nxt_pid_t pid; 1585 nxt_lvlhsh_query_t lhq; 1586 1587 pid = process->pid; 1588 1589 nxt_runtime_process_lhq_pid(&lhq, &pid); 1590 1591 lhq.pool = rt->mem_pool; 1592 1593 nxt_thread_mutex_lock(&rt->processes_mutex); 1594 1595 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { 1596 1597 case NXT_OK: 1598 rt->nprocesses--; 1599 1600 process = lhq.value; 1601 1602 process->registered = 0; 1603 1604 nxt_thread_log_debug("process %PI removed", pid); 1605 break; 1606 1607 default: 1608 nxt_thread_log_debug("process %PI remove failed", pid); 1609 break; 1610 } 1611 1612 nxt_thread_mutex_unlock(&rt->processes_mutex); 1613 } 1614 1615 1616 nxt_process_t * 1617 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1618 { 1619 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto); 1620 1621 return nxt_runtime_process_next(rt, lhe); 1622 } 1623 1624 1625 nxt_port_t * 1626 nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, 1627 nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type) 1628 { 1629 nxt_port_t *port; 1630 nxt_process_t *process; 1631 1632 process = nxt_runtime_process_get(rt, pid); 1633 if (nxt_slow_path(process == NULL)) { 1634 return NULL; 1635 } 1636 1637 port = nxt_port_new(task, id, pid, type); 1638 if (nxt_slow_path(port == NULL)) { 1639 nxt_process_use(task, process, -1); 1640 return NULL; 1641 } 1642 1643 nxt_process_port_add(task, process, port); 1644 1645 nxt_process_use(task, process, -1); 1646 1647 nxt_runtime_port_add(task, port); 1648 1649 nxt_port_use(task, port, -1); 1650 1651 return port; 1652 } 1653 1654 1655 static void 1656 nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port) 1657 { 1658 nxt_int_t res; 1659 nxt_runtime_t *rt; 1660 1661 rt = task->thread->runtime; 1662 1663 res = nxt_port_hash_add(&rt->ports, port); 1664 1665 if (res != NXT_OK) { 1666 return; 1667 } 1668 1669 rt->port_by_type[port->type] = port; 1670 1671 nxt_port_use(task, port, 1); 1672 } 1673 1674 1675 void 1676 nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port) 1677 { 1678 nxt_int_t res; 1679 nxt_runtime_t *rt; 1680 1681 rt = task->thread->runtime; 1682 1683 res = nxt_port_hash_remove(&rt->ports, port); 1684 1685 if (res != NXT_OK) { 1686 return; 1687 } 1688 1689 if (rt->port_by_type[port->type] == port) { 1690 rt->port_by_type[port->type] = NULL; 1691 } 1692 1693 nxt_port_use(task, port, -1); 1694 } 1695 1696 1697 nxt_port_t * 1698 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, 1699 nxt_port_id_t port_id) 1700 { 1701 return nxt_port_hash_find(&rt->ports, pid, port_id); 1702 } 1703