1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_main.h> 9 #include <nxt_runtime.h> 10 #include <nxt_port.h> 11 #include <nxt_main_process.h> 12 #include <nxt_router.h> 13 #include <nxt_regex.h> 14 15 16 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, 17 nxt_runtime_t *rt); 18 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, 19 nxt_runtime_t *rt); 20 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); 21 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); 22 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); 23 static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status); 24 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); 25 static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt); 26 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); 27 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, 28 nxt_runtime_t *rt); 29 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt); 30 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt); 31 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt); 32 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt); 33 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task, 34 nxt_runtime_t *rt); 35 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task, 36 nxt_file_name_t *pid_file); 37 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 38 nxt_runtime_cont_t cont); 39 static void nxt_runtime_thread_pool_init(void); 40 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, 41 void *data); 42 static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); 43 static void nxt_runtime_process_remove(nxt_runtime_t *rt, 44 nxt_process_t *process); 45 static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); 46 47 48 nxt_int_t 49 nxt_runtime_create(nxt_task_t *task) 50 { 51 nxt_mp_t *mp; 52 nxt_int_t ret; 53 nxt_array_t *listen_sockets; 54 nxt_runtime_t *rt; 55 nxt_app_lang_module_t *lang; 56 57 mp = nxt_mp_create(1024, 128, 256, 32); 58 if (nxt_slow_path(mp == NULL)) { 59 return NXT_ERROR; 60 } 61 62 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t)); 63 if (nxt_slow_path(rt == NULL)) { 64 goto fail; 65 } 66 67 task->thread->runtime = rt; 68 rt->mem_pool = mp; 69 70 nxt_thread_mutex_create(&rt->processes_mutex); 71 72 rt->services = nxt_services_init(mp); 73 if (nxt_slow_path(rt->services == NULL)) { 74 goto fail; 75 } 76 77 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t)); 78 if (nxt_slow_path(rt->languages == NULL)) { 79 goto fail; 80 } 81 82 /* Should not fail. */ 83 lang = nxt_array_add(rt->languages); 84 lang->type = NXT_APP_EXTERNAL; 85 lang->version = (u_char *) ""; 86 lang->file = NULL; 87 lang->module = &nxt_external_module; 88 89 lang->mounts = nxt_array_create(mp, 1, sizeof(nxt_fs_mount_t)); 90 if (nxt_slow_path(lang->mounts == NULL)) { 91 goto fail; 92 } 93 94 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); 95 if (nxt_slow_path(listen_sockets == NULL)) { 96 goto fail; 97 } 98 99 rt->listen_sockets = listen_sockets; 100 101 ret = nxt_runtime_inherited_listen_sockets(task, rt); 102 if (nxt_slow_path(ret != NXT_OK)) { 103 goto fail; 104 } 105 106 if (nxt_runtime_hostname(task, rt) != NXT_OK) { 107 goto fail; 108 } 109 110 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { 111 goto fail; 112 } 113 114 if (nxt_runtime_event_engines(task, rt) != NXT_OK) { 115 goto fail; 116 } 117 118 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { 119 goto fail; 120 } 121 122 rt->start = nxt_runtime_initial_start; 123 124 if (nxt_runtime_conf_init(task, rt) != NXT_OK) { 125 goto fail; 126 } 127 128 if (nxt_port_rpc_init() != NXT_OK) { 129 goto fail; 130 } 131 132 if (nxt_slow_path(nxt_http_register_variables() != NXT_OK)) { 133 goto fail; 134 } 135 136 if (nxt_slow_path(nxt_var_index_init() != NXT_OK)) { 137 goto fail; 138 } 139 140 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 141 nxt_runtime_start, task, rt, NULL); 142 143 return NXT_OK; 144 145 fail: 146 147 nxt_mp_destroy(mp); 148 149 return NXT_ERROR; 150 } 151 152 153 static nxt_int_t 154 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 155 { 156 u_char *v, *p; 157 nxt_int_t type; 158 nxt_array_t *inherited_sockets; 159 nxt_socket_t s; 160 nxt_listen_socket_t *ls; 161 162 v = (u_char *) getenv("NGINX"); 163 164 if (v == NULL) { 165 return nxt_runtime_systemd_listen_sockets(task, rt); 166 } 167 168 nxt_alert(task, "using inherited listen sockets: %s", v); 169 170 inherited_sockets = nxt_array_create(rt->mem_pool, 171 1, sizeof(nxt_listen_socket_t)); 172 if (inherited_sockets == NULL) { 173 return NXT_ERROR; 174 } 175 176 rt->inherited_sockets = inherited_sockets; 177 178 for (p = v; *p != '\0'; p++) { 179 180 if (*p == ';') { 181 s = nxt_int_parse(v, p - v); 182 183 if (nxt_slow_path(s < 0)) { 184 nxt_alert(task, "invalid socket number \"%s\" " 185 "in NGINX environment variable, " 186 "ignoring the rest of the variable", v); 187 return NXT_ERROR; 188 } 189 190 v = p + 1; 191 192 ls = nxt_array_zero_add(inherited_sockets); 193 if (nxt_slow_path(ls == NULL)) { 194 return NXT_ERROR; 195 } 196 197 ls->socket = s; 198 199 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 200 if (nxt_slow_path(ls->sockaddr == NULL)) { 201 return NXT_ERROR; 202 } 203 204 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); 205 if (nxt_slow_path(type == -1)) { 206 return NXT_ERROR; 207 } 208 209 ls->sockaddr->type = (uint16_t) type; 210 } 211 } 212 213 return NXT_OK; 214 } 215 216 217 static nxt_int_t 218 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 219 { 220 u_char *nfd, *pid; 221 nxt_int_t n; 222 nxt_array_t *inherited_sockets; 223 nxt_socket_t s; 224 nxt_listen_socket_t *ls; 225 226 /* 227 * Number of listening sockets passed. The socket 228 * descriptors start from number 3 and are sequential. 229 */ 230 nfd = (u_char *) getenv("LISTEN_FDS"); 231 if (nfd == NULL) { 232 return NXT_OK; 233 } 234 235 /* The pid of the service process. */ 236 pid = (u_char *) getenv("LISTEN_PID"); 237 if (pid == NULL) { 238 return NXT_OK; 239 } 240 241 n = nxt_int_parse(nfd, nxt_strlen(nfd)); 242 if (n < 0) { 243 return NXT_OK; 244 } 245 246 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { 247 return NXT_OK; 248 } 249 250 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n); 251 252 inherited_sockets = nxt_array_create(rt->mem_pool, 253 n, sizeof(nxt_listen_socket_t)); 254 if (inherited_sockets == NULL) { 255 return NXT_ERROR; 256 } 257 258 rt->inherited_sockets = inherited_sockets; 259 260 for (s = 3; s < n; s++) { 261 ls = nxt_array_zero_add(inherited_sockets); 262 if (nxt_slow_path(ls == NULL)) { 263 return NXT_ERROR; 264 } 265 266 ls->socket = s; 267 268 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 269 if (nxt_slow_path(ls->sockaddr == NULL)) { 270 return NXT_ERROR; 271 } 272 273 ls->sockaddr->type = SOCK_STREAM; 274 } 275 276 return NXT_OK; 277 } 278 279 280 static nxt_int_t 281 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) 282 { 283 nxt_thread_t *thread; 284 nxt_event_engine_t *engine; 285 const nxt_event_interface_t *interface; 286 287 interface = nxt_service_get(rt->services, "engine", NULL); 288 289 if (nxt_slow_path(interface == NULL)) { 290 /* TODO: log */ 291 return NXT_ERROR; 292 } 293 294 engine = nxt_event_engine_create(task, interface, 295 nxt_main_process_signals, 0, 0); 296 297 if (nxt_slow_path(engine == NULL)) { 298 return NXT_ERROR; 299 } 300 301 thread = task->thread; 302 thread->engine = engine; 303 #if 0 304 thread->fiber = &engine->fibers->fiber; 305 #endif 306 307 engine->id = rt->last_engine_id++; 308 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32); 309 310 nxt_queue_init(&rt->engines); 311 nxt_queue_insert_tail(&rt->engines, &engine->link); 312 313 return NXT_OK; 314 } 315 316 317 static nxt_int_t 318 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) 319 { 320 nxt_int_t ret; 321 nxt_array_t *thread_pools; 322 323 thread_pools = nxt_array_create(rt->mem_pool, 1, 324 sizeof(nxt_thread_pool_t *)); 325 326 if (nxt_slow_path(thread_pools == NULL)) { 327 return NXT_ERROR; 328 } 329 330 rt->thread_pools = thread_pools; 331 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); 332 333 if (nxt_slow_path(ret != NXT_OK)) { 334 return NXT_ERROR; 335 } 336 337 return NXT_OK; 338 } 339 340 341 static void 342 nxt_runtime_start(nxt_task_t *task, void *obj, void *data) 343 { 344 nxt_runtime_t *rt; 345 346 rt = obj; 347 348 nxt_debug(task, "rt conf done"); 349 350 task->thread->log->ctx_handler = NULL; 351 task->thread->log->ctx = NULL; 352 353 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { 354 goto fail; 355 } 356 357 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 358 goto fail; 359 } 360 361 /* 362 * Thread pools should be destroyed before starting worker 363 * processes, because thread pool semaphores will stick in 364 * locked state in new processes after fork(). 365 */ 366 nxt_runtime_thread_pool_destroy(task, rt, rt->start); 367 368 return; 369 370 fail: 371 372 nxt_runtime_quit(task, 1); 373 } 374 375 376 static void 377 nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status) 378 { 379 nxt_int_t ret; 380 nxt_thread_t *thr; 381 nxt_runtime_t *rt; 382 const nxt_event_interface_t *interface; 383 384 thr = task->thread; 385 rt = thr->runtime; 386 387 if (rt->inherited_sockets == NULL && rt->daemon) { 388 389 if (nxt_process_daemon(task) != NXT_OK) { 390 goto fail; 391 } 392 393 /* 394 * An event engine should be updated after fork() 395 * even if an event facility was not changed because: 396 * 1) inherited kqueue descriptor is invalid, 397 * 2) the signal thread is not inherited. 398 */ 399 interface = nxt_service_get(rt->services, "engine", rt->engine); 400 if (interface == NULL) { 401 goto fail; 402 } 403 404 ret = nxt_event_engine_change(task->thread->engine, interface, 405 rt->batch); 406 if (ret != NXT_OK) { 407 goto fail; 408 } 409 } 410 411 ret = nxt_runtime_pid_file_create(task, rt->pid_file); 412 if (ret != NXT_OK) { 413 goto fail; 414 } 415 416 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 417 goto fail; 418 } 419 420 thr->engine->max_connections = rt->engine_connections; 421 422 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) { 423 return; 424 } 425 426 fail: 427 428 nxt_runtime_quit(task, 1); 429 } 430 431 432 void 433 nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status) 434 { 435 nxt_bool_t done; 436 nxt_runtime_t *rt; 437 nxt_event_engine_t *engine; 438 439 rt = task->thread->runtime; 440 rt->status |= status; 441 engine = task->thread->engine; 442 443 nxt_debug(task, "exiting"); 444 445 done = 1; 446 447 if (!engine->shutdown) { 448 engine->shutdown = 1; 449 450 if (!nxt_array_is_empty(rt->thread_pools)) { 451 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); 452 done = 0; 453 } 454 455 if (rt->type == NXT_PROCESS_MAIN) { 456 nxt_runtime_stop_all_processes(task, rt); 457 done = 0; 458 } 459 } 460 461 nxt_runtime_close_idle_connections(engine); 462 463 if (done) { 464 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, 465 task, rt, engine); 466 } 467 } 468 469 470 static void 471 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) 472 { 473 nxt_conn_t *c; 474 nxt_queue_t *idle; 475 nxt_queue_link_t *link, *next; 476 477 nxt_debug(&engine->task, "close idle connections"); 478 479 idle = &engine->idle_connections; 480 481 for (link = nxt_queue_first(idle); 482 link != nxt_queue_tail(idle); 483 link = next) 484 { 485 next = nxt_queue_next(link); 486 c = nxt_queue_link_data(link, nxt_conn_t, link); 487 488 if (!c->socket.read_ready) { 489 nxt_queue_remove(link); 490 nxt_conn_close(engine, c); 491 } 492 } 493 } 494 495 496 void 497 nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt) 498 { 499 nxt_port_t *port; 500 nxt_process_t *process; 501 nxt_process_init_t *init; 502 503 nxt_runtime_process_each(rt, process) { 504 505 init = nxt_process_init(process); 506 507 if (init->type == NXT_PROCESS_APP) { 508 509 nxt_process_port_each(process, port) { 510 511 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 512 0, 0, NULL); 513 514 } nxt_process_port_loop; 515 } 516 517 } nxt_runtime_process_loop; 518 } 519 520 521 static void 522 nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) 523 { 524 nxt_port_t *port; 525 nxt_process_t *process; 526 527 nxt_runtime_process_each(rt, process) { 528 529 nxt_process_port_each(process, port) { 530 531 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 532 0, NULL); 533 534 } nxt_process_port_loop; 535 536 } nxt_runtime_process_loop; 537 } 538 539 540 static void 541 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) 542 { 543 int status, engine_count; 544 nxt_runtime_t *rt; 545 nxt_process_t *process; 546 nxt_event_engine_t *engine; 547 548 rt = obj; 549 engine = data; 550 551 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); 552 553 if (!nxt_array_is_empty(rt->thread_pools)) { 554 return; 555 } 556 557 if (rt->type == NXT_PROCESS_MAIN) { 558 if (rt->pid_file != NULL) { 559 nxt_file_delete(rt->pid_file); 560 } 561 562 #if (NXT_HAVE_UNIX_DOMAIN) 563 { 564 nxt_sockaddr_t *sa; 565 nxt_file_name_t *name; 566 567 sa = rt->controller_listen; 568 569 if (sa->u.sockaddr.sa_family == AF_UNIX) { 570 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path; 571 (void) nxt_file_delete(name); 572 } 573 } 574 #endif 575 } 576 577 if (!engine->event.signal_support) { 578 nxt_event_engine_signals_stop(engine); 579 } 580 581 nxt_runtime_process_each(rt, process) { 582 583 nxt_process_close_ports(task, process); 584 585 } nxt_runtime_process_loop; 586 587 status = rt->status; 588 589 engine_count = 0; 590 591 nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) { 592 593 engine_count++; 594 595 } nxt_queue_loop; 596 597 if (engine_count <= 1) { 598 if (rt->port_by_type[rt->type] != NULL) { 599 nxt_port_use(task, rt->port_by_type[rt->type], -1); 600 } 601 602 nxt_thread_mutex_destroy(&rt->processes_mutex); 603 604 nxt_mp_destroy(rt->mem_pool); 605 } 606 607 nxt_debug(task, "exit: %d", status); 608 609 exit(status); 610 nxt_unreachable(); 611 } 612 613 614 static nxt_int_t 615 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) 616 { 617 nxt_event_engine_t *engine; 618 const nxt_event_interface_t *interface; 619 620 engine = task->thread->engine; 621 622 if (engine->batch == rt->batch 623 && nxt_strcmp(engine->event.name, rt->engine) == 0) 624 { 625 return NXT_OK; 626 } 627 628 interface = nxt_service_get(rt->services, "engine", rt->engine); 629 630 if (interface != NULL) { 631 return nxt_event_engine_change(engine, interface, rt->batch); 632 } 633 634 return NXT_ERROR; 635 } 636 637 638 void 639 nxt_runtime_event_engine_free(nxt_runtime_t *rt) 640 { 641 nxt_queue_link_t *link; 642 nxt_event_engine_t *engine; 643 644 link = nxt_queue_first(&rt->engines); 645 nxt_queue_remove(link); 646 647 engine = nxt_queue_link_data(link, nxt_event_engine_t, link); 648 nxt_event_engine_free(engine); 649 } 650 651 652 nxt_int_t 653 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, 654 nxt_uint_t max_threads, nxt_nsec_t timeout) 655 { 656 nxt_thread_pool_t *thread_pool, **tp; 657 658 tp = nxt_array_add(rt->thread_pools); 659 if (tp == NULL) { 660 return NXT_ERROR; 661 } 662 663 thread_pool = nxt_thread_pool_create(max_threads, timeout, 664 nxt_runtime_thread_pool_init, 665 thr->engine, 666 nxt_runtime_thread_pool_exit); 667 668 if (nxt_fast_path(thread_pool != NULL)) { 669 *tp = thread_pool; 670 } 671 672 return NXT_OK; 673 } 674 675 676 static void 677 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 678 nxt_runtime_cont_t cont) 679 { 680 nxt_uint_t n; 681 nxt_thread_pool_t **tp; 682 683 rt->continuation = cont; 684 685 n = rt->thread_pools->nelts; 686 687 if (n == 0) { 688 cont(task, 0); 689 return; 690 } 691 692 tp = rt->thread_pools->elts; 693 694 do { 695 nxt_thread_pool_destroy(*tp); 696 697 tp++; 698 n--; 699 } while (n != 0); 700 } 701 702 703 static void 704 nxt_runtime_thread_pool_init(void) 705 { 706 } 707 708 709 static void 710 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 711 { 712 nxt_uint_t i, n; 713 nxt_runtime_t *rt; 714 nxt_thread_pool_t *tp, **thread_pools; 715 nxt_thread_handle_t handle; 716 717 tp = obj; 718 719 if (data != NULL) { 720 handle = (nxt_thread_handle_t) (uintptr_t) data; 721 nxt_thread_wait(handle); 722 } 723 724 rt = task->thread->runtime; 725 726 thread_pools = rt->thread_pools->elts; 727 n = rt->thread_pools->nelts; 728 729 nxt_debug(task, "thread pools: %ui", n); 730 731 for (i = 0; i < n; i++) { 732 733 if (tp == thread_pools[i]) { 734 nxt_array_remove(rt->thread_pools, &thread_pools[i]); 735 736 nxt_free(tp); 737 738 if (n == 1) { 739 /* The last thread pool. */ 740 rt->continuation(task, 0); 741 } 742 743 return; 744 } 745 } 746 } 747 748 749 static nxt_int_t 750 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) 751 { 752 nxt_int_t ret; 753 nxt_str_t control; 754 nxt_uint_t n; 755 nxt_file_t *file; 756 const char *slash; 757 nxt_sockaddr_t *sa; 758 nxt_file_name_str_t file_name; 759 const nxt_event_interface_t *interface; 760 761 rt->daemon = 1; 762 rt->engine_connections = 256; 763 rt->auxiliary_threads = 2; 764 rt->user_cred.user = NXT_USER; 765 rt->group = NXT_GROUP; 766 rt->pid = NXT_PID; 767 rt->log = NXT_LOG; 768 rt->modules = NXT_MODULES; 769 rt->state = NXT_STATE; 770 rt->control = NXT_CONTROL_SOCK; 771 rt->tmp = NXT_TMP; 772 773 nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t)); 774 775 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { 776 return NXT_ERROR; 777 } 778 779 if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) { 780 return NXT_ERROR; 781 } 782 783 if (rt->capabilities.setid) { 784 ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred, 785 rt->group); 786 787 if (nxt_slow_path(ret != NXT_OK)) { 788 return NXT_ERROR; 789 } 790 791 } else { 792 nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it " 793 "cannot use arbitrary user and group."); 794 } 795 796 /* An engine's parameters. */ 797 798 interface = nxt_service_get(rt->services, "engine", rt->engine); 799 if (interface == NULL) { 800 return NXT_ERROR; 801 } 802 803 rt->engine = interface->name; 804 805 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid); 806 if (nxt_slow_path(ret != NXT_OK)) { 807 return NXT_ERROR; 808 } 809 810 rt->pid_file = file_name.start; 811 812 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log); 813 if (nxt_slow_path(ret != NXT_OK)) { 814 return NXT_ERROR; 815 } 816 817 file = nxt_list_first(rt->log_files); 818 file->name = file_name.start; 819 820 slash = ""; 821 n = nxt_strlen(rt->modules); 822 823 if (n > 1 && rt->modules[n - 1] != '/') { 824 slash = "/"; 825 } 826 827 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z", 828 rt->modules, slash); 829 if (nxt_slow_path(ret != NXT_OK)) { 830 return NXT_ERROR; 831 } 832 833 rt->modules = (char *) file_name.start; 834 835 slash = ""; 836 n = nxt_strlen(rt->state); 837 838 if (n > 1 && rt->state[n - 1] != '/') { 839 slash = "/"; 840 } 841 842 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z", 843 rt->state, slash); 844 if (nxt_slow_path(ret != NXT_OK)) { 845 return NXT_ERROR; 846 } 847 848 rt->conf = (char *) file_name.start; 849 850 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf); 851 if (nxt_slow_path(ret != NXT_OK)) { 852 return NXT_ERROR; 853 } 854 855 rt->conf_tmp = (char *) file_name.start; 856 857 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z", 858 rt->state, slash); 859 if (nxt_slow_path(ret != NXT_OK)) { 860 return NXT_ERROR; 861 } 862 863 ret = mkdir((char *) file_name.start, S_IRWXU); 864 865 if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) { 866 rt->certs.length = file_name.len; 867 rt->certs.start = file_name.start; 868 869 } else { 870 nxt_alert(task, "Unable to create certificates storage directory: " 871 "mkdir(%s) failed %E", file_name.start, nxt_errno); 872 } 873 874 control.length = nxt_strlen(rt->control); 875 control.start = (u_char *) rt->control; 876 877 sa = nxt_sockaddr_parse(rt->mem_pool, &control); 878 if (nxt_slow_path(sa == NULL)) { 879 return NXT_ERROR; 880 } 881 882 sa->type = SOCK_STREAM; 883 884 rt->controller_listen = sa; 885 886 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { 887 return NXT_ERROR; 888 } 889 890 return NXT_OK; 891 } 892 893 894 static nxt_int_t 895 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) 896 { 897 char *p, **argv; 898 u_char *end; 899 u_char buf[1024]; 900 901 static const char version[] = 902 "unit version: " NXT_VERSION "\n" 903 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n"; 904 905 static const char no_control[] = 906 "option \"--control\" requires socket address\n"; 907 static const char no_user[] = "option \"--user\" requires username\n"; 908 static const char no_group[] = "option \"--group\" requires group name\n"; 909 static const char no_pid[] = "option \"--pid\" requires filename\n"; 910 static const char no_log[] = "option \"--log\" requires filename\n"; 911 static const char no_modules[] = 912 "option \"--modules\" requires directory\n"; 913 static const char no_state[] = "option \"--state\" requires directory\n"; 914 static const char no_tmp[] = "option \"--tmp\" requires directory\n"; 915 916 static const char help[] = 917 "\n" 918 "unit options:\n" 919 "\n" 920 " --version print unit version and configure options\n" 921 "\n" 922 " --no-daemon run unit in non-daemon mode\n" 923 "\n" 924 " --control ADDRESS set address of control API socket\n" 925 " default: \"" NXT_CONTROL_SOCK "\"\n" 926 "\n" 927 " --pid FILE set pid filename\n" 928 " default: \"" NXT_PID "\"\n" 929 "\n" 930 " --log FILE set log filename\n" 931 " default: \"" NXT_LOG "\"\n" 932 "\n" 933 " --modules DIRECTORY set modules directory name\n" 934 " default: \"" NXT_MODULES "\"\n" 935 "\n" 936 " --state DIRECTORY set state directory name\n" 937 " default: \"" NXT_STATE "\"\n" 938 "\n" 939 " --tmp DIRECTORY set tmp directory name\n" 940 " default: \"" NXT_TMP "\"\n" 941 "\n" 942 " --user USER set non-privileged processes to run" 943 " as specified user\n" 944 " default: \"" NXT_USER "\"\n" 945 "\n" 946 " --group GROUP set non-privileged processes to run" 947 " as specified group\n" 948 " default: "; 949 950 static const char group[] = "\"" NXT_GROUP "\"\n\n"; 951 static const char primary[] = "user's primary group\n\n"; 952 953 argv = &nxt_process_argv[1]; 954 955 while (*argv != NULL) { 956 p = *argv++; 957 958 if (nxt_strcmp(p, "--control") == 0) { 959 if (*argv == NULL) { 960 write(STDERR_FILENO, no_control, nxt_length(no_control)); 961 return NXT_ERROR; 962 } 963 964 p = *argv++; 965 966 rt->control = p; 967 968 continue; 969 } 970 971 if (nxt_strcmp(p, "--user") == 0) { 972 if (*argv == NULL) { 973 write(STDERR_FILENO, no_user, nxt_length(no_user)); 974 return NXT_ERROR; 975 } 976 977 p = *argv++; 978 979 rt->user_cred.user = p; 980 981 continue; 982 } 983 984 if (nxt_strcmp(p, "--group") == 0) { 985 if (*argv == NULL) { 986 write(STDERR_FILENO, no_group, nxt_length(no_group)); 987 return NXT_ERROR; 988 } 989 990 p = *argv++; 991 992 rt->group = p; 993 994 continue; 995 } 996 997 if (nxt_strcmp(p, "--pid") == 0) { 998 if (*argv == NULL) { 999 write(STDERR_FILENO, no_pid, nxt_length(no_pid)); 1000 return NXT_ERROR; 1001 } 1002 1003 p = *argv++; 1004 1005 rt->pid = p; 1006 1007 continue; 1008 } 1009 1010 if (nxt_strcmp(p, "--log") == 0) { 1011 if (*argv == NULL) { 1012 write(STDERR_FILENO, no_log, nxt_length(no_log)); 1013 return NXT_ERROR; 1014 } 1015 1016 p = *argv++; 1017 1018 rt->log = p; 1019 1020 continue; 1021 } 1022 1023 if (nxt_strcmp(p, "--modules") == 0) { 1024 if (*argv == NULL) { 1025 write(STDERR_FILENO, no_modules, nxt_length(no_modules)); 1026 return NXT_ERROR; 1027 } 1028 1029 p = *argv++; 1030 1031 rt->modules = p; 1032 1033 continue; 1034 } 1035 1036 if (nxt_strcmp(p, "--state") == 0) { 1037 if (*argv == NULL) { 1038 write(STDERR_FILENO, no_state, nxt_length(no_state)); 1039 return NXT_ERROR; 1040 } 1041 1042 p = *argv++; 1043 1044 rt->state = p; 1045 1046 continue; 1047 } 1048 1049 if (nxt_strcmp(p, "--tmp") == 0) { 1050 if (*argv == NULL) { 1051 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp)); 1052 return NXT_ERROR; 1053 } 1054 1055 p = *argv++; 1056 1057 rt->tmp = p; 1058 1059 continue; 1060 } 1061 1062 if (nxt_strcmp(p, "--no-daemon") == 0) { 1063 rt->daemon = 0; 1064 continue; 1065 } 1066 1067 if (nxt_strcmp(p, "--version") == 0) { 1068 write(STDERR_FILENO, version, nxt_length(version)); 1069 exit(0); 1070 } 1071 1072 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) { 1073 write(STDOUT_FILENO, help, nxt_length(help)); 1074 1075 if (sizeof(NXT_GROUP) == 1) { 1076 write(STDOUT_FILENO, primary, nxt_length(primary)); 1077 1078 } else { 1079 write(STDOUT_FILENO, group, nxt_length(group)); 1080 } 1081 1082 exit(0); 1083 } 1084 1085 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", " 1086 "try \"%s -h\" for available options\n", 1087 p, nxt_process_argv[0]); 1088 1089 write(STDERR_FILENO, buf, end - buf); 1090 1091 return NXT_ERROR; 1092 } 1093 1094 return NXT_OK; 1095 } 1096 1097 1098 nxt_listen_socket_t * 1099 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) 1100 { 1101 nxt_mp_t *mp; 1102 nxt_listen_socket_t *ls; 1103 1104 ls = nxt_array_zero_add(rt->listen_sockets); 1105 if (ls == NULL) { 1106 return NULL; 1107 } 1108 1109 mp = rt->mem_pool; 1110 1111 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, 1112 sa->length); 1113 if (ls->sockaddr == NULL) { 1114 return NULL; 1115 } 1116 1117 ls->sockaddr->type = sa->type; 1118 1119 nxt_sockaddr_text(ls->sockaddr); 1120 1121 ls->socket = -1; 1122 ls->backlog = NXT_LISTEN_BACKLOG; 1123 1124 return ls; 1125 } 1126 1127 1128 static nxt_int_t 1129 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) 1130 { 1131 size_t length; 1132 char hostname[NXT_MAXHOSTNAMELEN + 1]; 1133 1134 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { 1135 nxt_alert(task, "gethostname() failed %E", nxt_errno); 1136 return NXT_ERROR; 1137 } 1138 1139 /* 1140 * Linux gethostname(2): 1141 * 1142 * If the null-terminated hostname is too large to fit, 1143 * then the name is truncated, and no error is returned. 1144 * 1145 * For this reason an additional byte is reserved in the buffer. 1146 */ 1147 hostname[NXT_MAXHOSTNAMELEN] = '\0'; 1148 1149 length = nxt_strlen(hostname); 1150 rt->hostname.length = length; 1151 1152 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length); 1153 1154 if (rt->hostname.start != NULL) { 1155 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); 1156 return NXT_OK; 1157 } 1158 1159 return NXT_ERROR; 1160 } 1161 1162 1163 static nxt_int_t 1164 nxt_runtime_log_files_init(nxt_runtime_t *rt) 1165 { 1166 nxt_file_t *file; 1167 nxt_list_t *log_files; 1168 1169 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); 1170 1171 if (nxt_fast_path(log_files != NULL)) { 1172 rt->log_files = log_files; 1173 1174 /* Preallocate the main log. This allocation cannot fail. */ 1175 file = nxt_list_zero_add(log_files); 1176 1177 file->fd = NXT_FILE_INVALID; 1178 file->log_level = NXT_LOG_ALERT; 1179 1180 return NXT_OK; 1181 } 1182 1183 return NXT_ERROR; 1184 } 1185 1186 1187 nxt_file_t * 1188 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) 1189 { 1190 nxt_int_t ret; 1191 nxt_file_t *file; 1192 nxt_file_name_str_t file_name; 1193 1194 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name); 1195 1196 if (nxt_slow_path(ret != NXT_OK)) { 1197 return NULL; 1198 } 1199 1200 nxt_list_each(file, rt->log_files) { 1201 1202 /* STUB: hardecoded case sensitive/insensitive. */ 1203 1204 if (file->name != NULL 1205 && nxt_file_name_eq(file->name, file_name.start)) 1206 { 1207 return file; 1208 } 1209 1210 } nxt_list_loop; 1211 1212 file = nxt_list_zero_add(rt->log_files); 1213 1214 if (nxt_slow_path(file == NULL)) { 1215 return NULL; 1216 } 1217 1218 file->fd = NXT_FILE_INVALID; 1219 file->log_level = NXT_LOG_ALERT; 1220 file->name = file_name.start; 1221 1222 return file; 1223 } 1224 1225 1226 static nxt_int_t 1227 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) 1228 { 1229 nxt_int_t ret; 1230 nxt_file_t *file; 1231 1232 nxt_list_each(file, rt->log_files) { 1233 1234 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, 1235 NXT_FILE_OWNER_ACCESS); 1236 1237 if (ret != NXT_OK) { 1238 return NXT_ERROR; 1239 } 1240 1241 } nxt_list_loop; 1242 1243 file = nxt_list_first(rt->log_files); 1244 1245 return nxt_file_stderr(file); 1246 } 1247 1248 1249 nxt_int_t 1250 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) 1251 { 1252 nxt_int_t ret; 1253 nxt_uint_t c, p, ncurr, nprev; 1254 nxt_listen_socket_t *curr, *prev; 1255 1256 curr = rt->listen_sockets->elts; 1257 ncurr = rt->listen_sockets->nelts; 1258 1259 if (rt->inherited_sockets != NULL) { 1260 prev = rt->inherited_sockets->elts; 1261 nprev = rt->inherited_sockets->nelts; 1262 1263 } else { 1264 prev = NULL; 1265 nprev = 0; 1266 } 1267 1268 for (c = 0; c < ncurr; c++) { 1269 1270 for (p = 0; p < nprev; p++) { 1271 1272 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { 1273 1274 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); 1275 if (ret != NXT_OK) { 1276 return NXT_ERROR; 1277 } 1278 1279 goto next; 1280 } 1281 } 1282 1283 if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) { 1284 return NXT_ERROR; 1285 } 1286 1287 next: 1288 1289 continue; 1290 } 1291 1292 return NXT_OK; 1293 } 1294 1295 1296 nxt_int_t 1297 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) 1298 { 1299 nxt_uint_t i, n; 1300 nxt_listen_socket_t *ls; 1301 1302 ls = rt->listen_sockets->elts; 1303 n = rt->listen_sockets->nelts; 1304 1305 for (i = 0; i < n; i++) { 1306 if (ls[i].flags == NXT_NONBLOCK) { 1307 if (nxt_listen_event(task, &ls[i]) == NULL) { 1308 return NXT_ERROR; 1309 } 1310 } 1311 } 1312 1313 return NXT_OK; 1314 } 1315 1316 1317 nxt_str_t * 1318 nxt_current_directory(nxt_mp_t *mp) 1319 { 1320 size_t length; 1321 u_char *p; 1322 nxt_str_t *name; 1323 char buf[NXT_MAX_PATH_LEN]; 1324 1325 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); 1326 1327 if (nxt_fast_path(length != 0)) { 1328 name = nxt_str_alloc(mp, length + 1); 1329 1330 if (nxt_fast_path(name != NULL)) { 1331 p = nxt_cpymem(name->start, buf, length); 1332 *p = '/'; 1333 1334 return name; 1335 } 1336 } 1337 1338 return NULL; 1339 } 1340 1341 1342 static nxt_int_t 1343 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) 1344 { 1345 ssize_t length; 1346 nxt_int_t n; 1347 nxt_file_t file; 1348 u_char pid[NXT_INT64_T_LEN + nxt_length("\n")]; 1349 1350 nxt_memzero(&file, sizeof(nxt_file_t)); 1351 1352 file.name = pid_file; 1353 1354 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, 1355 NXT_FILE_DEFAULT_ACCESS); 1356 1357 if (n != NXT_OK) { 1358 return NXT_ERROR; 1359 } 1360 1361 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; 1362 1363 if (nxt_file_write(&file, pid, length, 0) != length) { 1364 return NXT_ERROR; 1365 } 1366 1367 nxt_file_close(task, &file); 1368 1369 return NXT_OK; 1370 } 1371 1372 1373 nxt_process_t * 1374 nxt_runtime_process_new(nxt_runtime_t *rt) 1375 { 1376 nxt_process_t *process; 1377 1378 /* TODO: memory failures. */ 1379 1380 process = nxt_mp_zalloc(rt->mem_pool, 1381 sizeof(nxt_process_t) + sizeof(nxt_process_init_t)); 1382 1383 if (nxt_slow_path(process == NULL)) { 1384 return NULL; 1385 } 1386 1387 nxt_queue_init(&process->ports); 1388 1389 nxt_thread_mutex_create(&process->incoming.mutex); 1390 1391 process->use_count = 1; 1392 1393 return process; 1394 } 1395 1396 1397 void 1398 nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) 1399 { 1400 if (process->registered == 1) { 1401 nxt_runtime_process_remove(rt, process); 1402 } 1403 1404 nxt_assert(process->use_count == 0); 1405 nxt_assert(process->registered == 0); 1406 1407 nxt_port_mmaps_destroy(&process->incoming, 1); 1408 1409 nxt_thread_mutex_destroy(&process->incoming.mutex); 1410 1411 /* processes from nxt_runtime_process_get() have no memory pool */ 1412 if (process->mem_pool != NULL) { 1413 nxt_mp_destroy(process->mem_pool); 1414 } 1415 1416 nxt_mp_free(rt->mem_pool, process); 1417 } 1418 1419 1420 static nxt_int_t 1421 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 1422 { 1423 nxt_process_t *process; 1424 1425 process = data; 1426 1427 if (lhq->key.length == sizeof(nxt_pid_t) 1428 && *(nxt_pid_t *) lhq->key.start == process->pid) 1429 { 1430 return NXT_OK; 1431 } 1432 1433 return NXT_DECLINED; 1434 } 1435 1436 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 1437 NXT_LVLHSH_DEFAULT, 1438 nxt_runtime_lvlhsh_pid_test, 1439 nxt_lvlhsh_alloc, 1440 nxt_lvlhsh_free, 1441 }; 1442 1443 1444 nxt_inline void 1445 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) 1446 { 1447 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 1448 lhq->key.length = sizeof(*pid); 1449 lhq->key.start = (u_char *) pid; 1450 lhq->proto = &lvlhsh_processes_proto; 1451 } 1452 1453 1454 nxt_process_t * 1455 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) 1456 { 1457 nxt_process_t *process; 1458 nxt_lvlhsh_query_t lhq; 1459 1460 process = NULL; 1461 1462 nxt_runtime_process_lhq_pid(&lhq, &pid); 1463 1464 nxt_thread_mutex_lock(&rt->processes_mutex); 1465 1466 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1467 process = lhq.value; 1468 1469 } else { 1470 nxt_thread_log_debug("process %PI not found", pid); 1471 } 1472 1473 nxt_thread_mutex_unlock(&rt->processes_mutex); 1474 1475 return process; 1476 } 1477 1478 1479 static nxt_process_t * 1480 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) 1481 { 1482 nxt_process_t *process; 1483 nxt_lvlhsh_query_t lhq; 1484 1485 nxt_runtime_process_lhq_pid(&lhq, &pid); 1486 1487 nxt_thread_mutex_lock(&rt->processes_mutex); 1488 1489 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1490 nxt_thread_log_debug("process %PI found", pid); 1491 1492 nxt_thread_mutex_unlock(&rt->processes_mutex); 1493 1494 process = lhq.value; 1495 process->use_count++; 1496 1497 return process; 1498 } 1499 1500 process = nxt_runtime_process_new(rt); 1501 if (nxt_slow_path(process == NULL)) { 1502 1503 nxt_thread_mutex_unlock(&rt->processes_mutex); 1504 1505 return NULL; 1506 } 1507 1508 process->pid = pid; 1509 1510 lhq.replace = 0; 1511 lhq.value = process; 1512 lhq.pool = rt->mem_pool; 1513 1514 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1515 1516 case NXT_OK: 1517 if (rt->nprocesses == 0) { 1518 rt->mprocess = process; 1519 } 1520 1521 rt->nprocesses++; 1522 1523 process->registered = 1; 1524 1525 nxt_thread_log_debug("process %PI insert", pid); 1526 break; 1527 1528 default: 1529 nxt_thread_log_debug("process %PI insert failed", pid); 1530 break; 1531 } 1532 1533 nxt_thread_mutex_unlock(&rt->processes_mutex); 1534 1535 return process; 1536 } 1537 1538 1539 void 1540 nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) 1541 { 1542 nxt_port_t *port; 1543 nxt_runtime_t *rt; 1544 nxt_lvlhsh_query_t lhq; 1545 1546 nxt_assert(process->registered == 0); 1547 1548 rt = task->thread->runtime; 1549 1550 nxt_runtime_process_lhq_pid(&lhq, &process->pid); 1551 1552 lhq.replace = 0; 1553 lhq.value = process; 1554 lhq.pool = rt->mem_pool; 1555 1556 nxt_thread_mutex_lock(&rt->processes_mutex); 1557 1558 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1559 1560 case NXT_OK: 1561 if (rt->nprocesses == 0) { 1562 rt->mprocess = process; 1563 } 1564 1565 rt->nprocesses++; 1566 1567 nxt_process_port_each(process, port) { 1568 1569 port->pid = process->pid; 1570 1571 nxt_runtime_port_add(task, port); 1572 1573 } nxt_process_port_loop; 1574 1575 process->registered = 1; 1576 1577 nxt_thread_log_debug("process %PI added", process->pid); 1578 break; 1579 1580 default: 1581 nxt_thread_log_debug("process %PI failed to add", process->pid); 1582 break; 1583 } 1584 1585 nxt_thread_mutex_unlock(&rt->processes_mutex); 1586 } 1587 1588 1589 static void 1590 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) 1591 { 1592 nxt_pid_t pid; 1593 nxt_lvlhsh_query_t lhq; 1594 1595 pid = process->pid; 1596 1597 nxt_runtime_process_lhq_pid(&lhq, &pid); 1598 1599 lhq.pool = rt->mem_pool; 1600 1601 nxt_thread_mutex_lock(&rt->processes_mutex); 1602 1603 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { 1604 1605 case NXT_OK: 1606 rt->nprocesses--; 1607 1608 process = lhq.value; 1609 1610 process->registered = 0; 1611 1612 nxt_thread_log_debug("process %PI removed", pid); 1613 break; 1614 1615 default: 1616 nxt_thread_log_debug("process %PI remove failed", pid); 1617 break; 1618 } 1619 1620 nxt_thread_mutex_unlock(&rt->processes_mutex); 1621 } 1622 1623 1624 nxt_process_t * 1625 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1626 { 1627 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto); 1628 1629 return nxt_runtime_process_next(rt, lhe); 1630 } 1631 1632 1633 nxt_port_t * 1634 nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, 1635 nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type) 1636 { 1637 nxt_port_t *port; 1638 nxt_process_t *process; 1639 1640 process = nxt_runtime_process_get(rt, pid); 1641 if (nxt_slow_path(process == NULL)) { 1642 return NULL; 1643 } 1644 1645 port = nxt_port_new(task, id, pid, type); 1646 if (nxt_slow_path(port == NULL)) { 1647 nxt_process_use(task, process, -1); 1648 return NULL; 1649 } 1650 1651 nxt_process_port_add(task, process, port); 1652 1653 nxt_process_use(task, process, -1); 1654 1655 nxt_runtime_port_add(task, port); 1656 1657 nxt_port_use(task, port, -1); 1658 1659 return port; 1660 } 1661 1662 1663 static void 1664 nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port) 1665 { 1666 nxt_int_t res; 1667 nxt_runtime_t *rt; 1668 1669 rt = task->thread->runtime; 1670 1671 res = nxt_port_hash_add(&rt->ports, port); 1672 1673 if (res != NXT_OK) { 1674 return; 1675 } 1676 1677 rt->port_by_type[port->type] = port; 1678 1679 nxt_port_use(task, port, 1); 1680 } 1681 1682 1683 void 1684 nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port) 1685 { 1686 nxt_int_t res; 1687 nxt_runtime_t *rt; 1688 1689 rt = task->thread->runtime; 1690 1691 res = nxt_port_hash_remove(&rt->ports, port); 1692 1693 if (res != NXT_OK) { 1694 return; 1695 } 1696 1697 if (rt->port_by_type[port->type] == port) { 1698 rt->port_by_type[port->type] = NULL; 1699 } 1700 1701 nxt_port_use(task, port, -1); 1702 } 1703 1704 1705 nxt_port_t * 1706 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, 1707 nxt_port_id_t port_id) 1708 { 1709 return nxt_port_hash_find(&rt->ports, pid, port_id); 1710 } 1711