1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_main.h> 9 #include <nxt_runtime.h> 10 #include <nxt_port.h> 11 #include <nxt_main_process.h> 12 #include <nxt_router.h> 13 #include <nxt_regex.h> 14 15 16 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, 17 nxt_runtime_t *rt); 18 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, 19 nxt_runtime_t *rt); 20 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); 21 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); 22 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); 23 static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status); 24 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); 25 static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt); 26 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); 27 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, 28 nxt_runtime_t *rt); 29 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt); 30 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt); 31 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt); 32 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt); 33 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task, 34 nxt_runtime_t *rt); 35 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task, 36 nxt_file_name_t *pid_file); 37 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 38 nxt_runtime_cont_t cont); 39 static void nxt_runtime_thread_pool_init(void); 40 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, 41 void *data); 42 static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); 43 static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); 44 45 46 nxt_int_t 47 nxt_runtime_create(nxt_task_t *task) 48 { 49 nxt_mp_t *mp; 50 nxt_int_t ret; 51 nxt_array_t *listen_sockets; 52 nxt_runtime_t *rt; 53 nxt_app_lang_module_t *lang; 54 55 mp = nxt_mp_create(1024, 128, 256, 32); 56 if (nxt_slow_path(mp == NULL)) { 57 return NXT_ERROR; 58 } 59 60 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t)); 61 if (nxt_slow_path(rt == NULL)) { 62 goto fail; 63 } 64 65 task->thread->runtime = rt; 66 rt->mem_pool = mp; 67 68 nxt_thread_mutex_create(&rt->processes_mutex); 69 70 rt->services = nxt_services_init(mp); 71 if (nxt_slow_path(rt->services == NULL)) { 72 goto fail; 73 } 74 75 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t)); 76 if (nxt_slow_path(rt->languages == NULL)) { 77 goto fail; 78 } 79 80 /* Should not fail. */ 81 lang = nxt_array_add(rt->languages); 82 lang->type = NXT_APP_EXTERNAL; 83 lang->version = (u_char *) ""; 84 lang->file = NULL; 85 lang->module = &nxt_external_module; 86 87 lang->mounts = nxt_array_create(mp, 1, sizeof(nxt_fs_mount_t)); 88 if (nxt_slow_path(lang->mounts == NULL)) { 89 goto fail; 90 } 91 92 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); 93 if (nxt_slow_path(listen_sockets == NULL)) { 94 goto fail; 95 } 96 97 rt->listen_sockets = listen_sockets; 98 99 ret = nxt_runtime_inherited_listen_sockets(task, rt); 100 if (nxt_slow_path(ret != NXT_OK)) { 101 goto fail; 102 } 103 104 if (nxt_runtime_hostname(task, rt) != NXT_OK) { 105 goto fail; 106 } 107 108 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { 109 goto fail; 110 } 111 112 if (nxt_runtime_event_engines(task, rt) != NXT_OK) { 113 goto fail; 114 } 115 116 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { 117 goto fail; 118 } 119 120 rt->start = nxt_runtime_initial_start; 121 122 if (nxt_runtime_conf_init(task, rt) != NXT_OK) { 123 goto fail; 124 } 125 126 if (nxt_port_rpc_init() != NXT_OK) { 127 goto fail; 128 } 129 130 if (nxt_slow_path(nxt_http_register_variables() != NXT_OK)) { 131 goto fail; 132 } 133 134 if (nxt_slow_path(nxt_var_index_init() != NXT_OK)) { 135 goto fail; 136 } 137 138 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 139 nxt_runtime_start, task, rt, NULL); 140 141 return NXT_OK; 142 143 fail: 144 145 nxt_mp_destroy(mp); 146 147 return NXT_ERROR; 148 } 149 150 151 static nxt_int_t 152 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 153 { 154 u_char *v, *p; 155 nxt_int_t type; 156 nxt_array_t *inherited_sockets; 157 nxt_socket_t s; 158 nxt_listen_socket_t *ls; 159 160 v = (u_char *) getenv("NGINX"); 161 162 if (v == NULL) { 163 return nxt_runtime_systemd_listen_sockets(task, rt); 164 } 165 166 nxt_alert(task, "using inherited listen sockets: %s", v); 167 168 inherited_sockets = nxt_array_create(rt->mem_pool, 169 1, sizeof(nxt_listen_socket_t)); 170 if (inherited_sockets == NULL) { 171 return NXT_ERROR; 172 } 173 174 rt->inherited_sockets = inherited_sockets; 175 176 for (p = v; *p != '\0'; p++) { 177 178 if (*p == ';') { 179 s = nxt_int_parse(v, p - v); 180 181 if (nxt_slow_path(s < 0)) { 182 nxt_alert(task, "invalid socket number \"%s\" " 183 "in NGINX environment variable, " 184 "ignoring the rest of the variable", v); 185 return NXT_ERROR; 186 } 187 188 v = p + 1; 189 190 ls = nxt_array_zero_add(inherited_sockets); 191 if (nxt_slow_path(ls == NULL)) { 192 return NXT_ERROR; 193 } 194 195 ls->socket = s; 196 197 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 198 if (nxt_slow_path(ls->sockaddr == NULL)) { 199 return NXT_ERROR; 200 } 201 202 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); 203 if (nxt_slow_path(type == -1)) { 204 return NXT_ERROR; 205 } 206 207 ls->sockaddr->type = (uint16_t) type; 208 } 209 } 210 211 return NXT_OK; 212 } 213 214 215 static nxt_int_t 216 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 217 { 218 u_char *nfd, *pid; 219 nxt_int_t n; 220 nxt_array_t *inherited_sockets; 221 nxt_socket_t s; 222 nxt_listen_socket_t *ls; 223 224 /* 225 * Number of listening sockets passed. The socket 226 * descriptors start from number 3 and are sequential. 227 */ 228 nfd = (u_char *) getenv("LISTEN_FDS"); 229 if (nfd == NULL) { 230 return NXT_OK; 231 } 232 233 /* The pid of the service process. */ 234 pid = (u_char *) getenv("LISTEN_PID"); 235 if (pid == NULL) { 236 return NXT_OK; 237 } 238 239 n = nxt_int_parse(nfd, nxt_strlen(nfd)); 240 if (n < 0) { 241 return NXT_OK; 242 } 243 244 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { 245 return NXT_OK; 246 } 247 248 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n); 249 250 inherited_sockets = nxt_array_create(rt->mem_pool, 251 n, sizeof(nxt_listen_socket_t)); 252 if (inherited_sockets == NULL) { 253 return NXT_ERROR; 254 } 255 256 rt->inherited_sockets = inherited_sockets; 257 258 for (s = 3; s < n; s++) { 259 ls = nxt_array_zero_add(inherited_sockets); 260 if (nxt_slow_path(ls == NULL)) { 261 return NXT_ERROR; 262 } 263 264 ls->socket = s; 265 266 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 267 if (nxt_slow_path(ls->sockaddr == NULL)) { 268 return NXT_ERROR; 269 } 270 271 ls->sockaddr->type = SOCK_STREAM; 272 } 273 274 return NXT_OK; 275 } 276 277 278 static nxt_int_t 279 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) 280 { 281 nxt_thread_t *thread; 282 nxt_event_engine_t *engine; 283 const nxt_event_interface_t *interface; 284 285 interface = nxt_service_get(rt->services, "engine", NULL); 286 287 if (nxt_slow_path(interface == NULL)) { 288 /* TODO: log */ 289 return NXT_ERROR; 290 } 291 292 engine = nxt_event_engine_create(task, interface, 293 nxt_main_process_signals, 0, 0); 294 295 if (nxt_slow_path(engine == NULL)) { 296 return NXT_ERROR; 297 } 298 299 thread = task->thread; 300 thread->engine = engine; 301 #if 0 302 thread->fiber = &engine->fibers->fiber; 303 #endif 304 305 engine->id = rt->last_engine_id++; 306 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32); 307 308 nxt_queue_init(&rt->engines); 309 nxt_queue_insert_tail(&rt->engines, &engine->link); 310 311 return NXT_OK; 312 } 313 314 315 static nxt_int_t 316 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) 317 { 318 nxt_int_t ret; 319 nxt_array_t *thread_pools; 320 321 thread_pools = nxt_array_create(rt->mem_pool, 1, 322 sizeof(nxt_thread_pool_t *)); 323 324 if (nxt_slow_path(thread_pools == NULL)) { 325 return NXT_ERROR; 326 } 327 328 rt->thread_pools = thread_pools; 329 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); 330 331 if (nxt_slow_path(ret != NXT_OK)) { 332 return NXT_ERROR; 333 } 334 335 return NXT_OK; 336 } 337 338 339 static void 340 nxt_runtime_start(nxt_task_t *task, void *obj, void *data) 341 { 342 nxt_runtime_t *rt; 343 344 rt = obj; 345 346 nxt_debug(task, "rt conf done"); 347 348 task->thread->log->ctx_handler = NULL; 349 task->thread->log->ctx = NULL; 350 351 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { 352 goto fail; 353 } 354 355 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 356 goto fail; 357 } 358 359 /* 360 * Thread pools should be destroyed before starting worker 361 * processes, because thread pool semaphores will stick in 362 * locked state in new processes after fork(). 363 */ 364 nxt_runtime_thread_pool_destroy(task, rt, rt->start); 365 366 return; 367 368 fail: 369 370 nxt_runtime_quit(task, 1); 371 } 372 373 374 static void 375 nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status) 376 { 377 nxt_int_t ret; 378 nxt_thread_t *thr; 379 nxt_runtime_t *rt; 380 const nxt_event_interface_t *interface; 381 382 thr = task->thread; 383 rt = thr->runtime; 384 385 if (rt->inherited_sockets == NULL && rt->daemon) { 386 387 if (nxt_process_daemon(task) != NXT_OK) { 388 goto fail; 389 } 390 391 /* 392 * An event engine should be updated after fork() 393 * even if an event facility was not changed because: 394 * 1) inherited kqueue descriptor is invalid, 395 * 2) the signal thread is not inherited. 396 */ 397 interface = nxt_service_get(rt->services, "engine", rt->engine); 398 if (interface == NULL) { 399 goto fail; 400 } 401 402 ret = nxt_event_engine_change(task->thread->engine, interface, 403 rt->batch); 404 if (ret != NXT_OK) { 405 goto fail; 406 } 407 } 408 409 ret = nxt_runtime_pid_file_create(task, rt->pid_file); 410 if (ret != NXT_OK) { 411 goto fail; 412 } 413 414 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 415 goto fail; 416 } 417 418 thr->engine->max_connections = rt->engine_connections; 419 420 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) { 421 return; 422 } 423 424 fail: 425 426 nxt_runtime_quit(task, 1); 427 } 428 429 430 void 431 nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status) 432 { 433 nxt_bool_t done; 434 nxt_runtime_t *rt; 435 nxt_event_engine_t *engine; 436 437 rt = task->thread->runtime; 438 rt->status |= status; 439 engine = task->thread->engine; 440 441 nxt_debug(task, "exiting"); 442 443 done = 1; 444 445 if (!engine->shutdown) { 446 engine->shutdown = 1; 447 448 if (!nxt_array_is_empty(rt->thread_pools)) { 449 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); 450 done = 0; 451 } 452 453 if (rt->type == NXT_PROCESS_MAIN) { 454 nxt_runtime_stop_all_processes(task, rt); 455 done = 0; 456 } 457 } 458 459 nxt_runtime_close_idle_connections(engine); 460 461 if (done) { 462 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, 463 task, rt, engine); 464 } 465 } 466 467 468 static void 469 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) 470 { 471 nxt_conn_t *c; 472 nxt_queue_t *idle; 473 nxt_queue_link_t *link, *next; 474 475 nxt_debug(&engine->task, "close idle connections"); 476 477 idle = &engine->idle_connections; 478 479 for (link = nxt_queue_first(idle); 480 link != nxt_queue_tail(idle); 481 link = next) 482 { 483 next = nxt_queue_next(link); 484 c = nxt_queue_link_data(link, nxt_conn_t, link); 485 486 if (!c->socket.read_ready) { 487 nxt_queue_remove(link); 488 nxt_conn_close(engine, c); 489 } 490 } 491 } 492 493 494 void 495 nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt) 496 { 497 nxt_port_t *port; 498 nxt_process_t *process; 499 nxt_process_init_t *init; 500 501 nxt_runtime_process_each(rt, process) { 502 503 init = nxt_process_init(process); 504 505 if (init->type == NXT_PROCESS_APP 506 || init->type == NXT_PROCESS_PROTOTYPE) 507 { 508 509 nxt_process_port_each(process, port) { 510 511 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 512 0, 0, NULL); 513 514 } nxt_process_port_loop; 515 } 516 517 } nxt_runtime_process_loop; 518 } 519 520 521 static void 522 nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) 523 { 524 nxt_port_t *port; 525 nxt_process_t *process; 526 527 nxt_runtime_process_each(rt, process) { 528 529 nxt_process_port_each(process, port) { 530 531 nxt_debug(task, "%d sending quit to %PI", rt->type, port->pid); 532 533 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 534 0, NULL); 535 536 } nxt_process_port_loop; 537 538 } nxt_runtime_process_loop; 539 } 540 541 542 static void 543 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) 544 { 545 int status, engine_count; 546 nxt_runtime_t *rt; 547 nxt_process_t *process; 548 nxt_event_engine_t *engine; 549 550 rt = obj; 551 engine = data; 552 553 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); 554 555 if (!nxt_array_is_empty(rt->thread_pools)) { 556 return; 557 } 558 559 if (rt->type == NXT_PROCESS_MAIN) { 560 if (rt->pid_file != NULL) { 561 nxt_file_delete(rt->pid_file); 562 } 563 564 #if (NXT_HAVE_UNIX_DOMAIN) 565 { 566 nxt_sockaddr_t *sa; 567 nxt_file_name_t *name; 568 569 sa = rt->controller_listen; 570 571 if (sa->u.sockaddr.sa_family == AF_UNIX) { 572 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path; 573 (void) nxt_file_delete(name); 574 } 575 } 576 #endif 577 } 578 579 if (!engine->event.signal_support) { 580 nxt_event_engine_signals_stop(engine); 581 } 582 583 nxt_runtime_process_each(rt, process) { 584 585 nxt_runtime_process_remove(rt, process); 586 nxt_process_close_ports(task, process); 587 588 } nxt_runtime_process_loop; 589 590 status = rt->status; 591 592 engine_count = 0; 593 594 nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) { 595 596 engine_count++; 597 598 } nxt_queue_loop; 599 600 if (engine_count <= 1) { 601 if (rt->port_by_type[rt->type] != NULL) { 602 nxt_port_use(task, rt->port_by_type[rt->type], -1); 603 } 604 605 nxt_thread_mutex_destroy(&rt->processes_mutex); 606 607 nxt_mp_destroy(rt->mem_pool); 608 } 609 610 nxt_debug(task, "exit: %d", status); 611 612 exit(status); 613 nxt_unreachable(); 614 } 615 616 617 static nxt_int_t 618 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) 619 { 620 nxt_event_engine_t *engine; 621 const nxt_event_interface_t *interface; 622 623 engine = task->thread->engine; 624 625 if (engine->batch == rt->batch 626 && nxt_strcmp(engine->event.name, rt->engine) == 0) 627 { 628 return NXT_OK; 629 } 630 631 interface = nxt_service_get(rt->services, "engine", rt->engine); 632 633 if (interface != NULL) { 634 return nxt_event_engine_change(engine, interface, rt->batch); 635 } 636 637 return NXT_ERROR; 638 } 639 640 641 void 642 nxt_runtime_event_engine_free(nxt_runtime_t *rt) 643 { 644 nxt_queue_link_t *link; 645 nxt_event_engine_t *engine; 646 647 link = nxt_queue_first(&rt->engines); 648 nxt_queue_remove(link); 649 650 engine = nxt_queue_link_data(link, nxt_event_engine_t, link); 651 nxt_event_engine_free(engine); 652 } 653 654 655 nxt_int_t 656 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, 657 nxt_uint_t max_threads, nxt_nsec_t timeout) 658 { 659 nxt_thread_pool_t *thread_pool, **tp; 660 661 tp = nxt_array_add(rt->thread_pools); 662 if (tp == NULL) { 663 return NXT_ERROR; 664 } 665 666 thread_pool = nxt_thread_pool_create(max_threads, timeout, 667 nxt_runtime_thread_pool_init, 668 thr->engine, 669 nxt_runtime_thread_pool_exit); 670 671 if (nxt_fast_path(thread_pool != NULL)) { 672 *tp = thread_pool; 673 } 674 675 return NXT_OK; 676 } 677 678 679 static void 680 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 681 nxt_runtime_cont_t cont) 682 { 683 nxt_uint_t n; 684 nxt_thread_pool_t **tp; 685 686 rt->continuation = cont; 687 688 n = rt->thread_pools->nelts; 689 690 if (n == 0) { 691 cont(task, 0); 692 return; 693 } 694 695 tp = rt->thread_pools->elts; 696 697 do { 698 nxt_thread_pool_destroy(*tp); 699 700 tp++; 701 n--; 702 } while (n != 0); 703 } 704 705 706 static void 707 nxt_runtime_thread_pool_init(void) 708 { 709 } 710 711 712 static void 713 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 714 { 715 nxt_uint_t i, n; 716 nxt_runtime_t *rt; 717 nxt_thread_pool_t *tp, **thread_pools; 718 nxt_thread_handle_t handle; 719 720 tp = obj; 721 722 if (data != NULL) { 723 handle = (nxt_thread_handle_t) (uintptr_t) data; 724 nxt_thread_wait(handle); 725 } 726 727 rt = task->thread->runtime; 728 729 thread_pools = rt->thread_pools->elts; 730 n = rt->thread_pools->nelts; 731 732 nxt_debug(task, "thread pools: %ui", n); 733 734 for (i = 0; i < n; i++) { 735 736 if (tp == thread_pools[i]) { 737 nxt_array_remove(rt->thread_pools, &thread_pools[i]); 738 739 nxt_free(tp); 740 741 if (n == 1) { 742 /* The last thread pool. */ 743 rt->continuation(task, 0); 744 } 745 746 return; 747 } 748 } 749 } 750 751 752 static nxt_int_t 753 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) 754 { 755 nxt_int_t ret; 756 nxt_str_t control; 757 nxt_uint_t n; 758 nxt_file_t *file; 759 const char *slash; 760 nxt_sockaddr_t *sa; 761 nxt_file_name_str_t file_name; 762 const nxt_event_interface_t *interface; 763 764 rt->daemon = 1; 765 rt->engine_connections = 256; 766 rt->auxiliary_threads = 2; 767 rt->user_cred.user = NXT_USER; 768 rt->group = NXT_GROUP; 769 rt->pid = NXT_PID; 770 rt->log = NXT_LOG; 771 rt->modules = NXT_MODULES; 772 rt->state = NXT_STATE; 773 rt->control = NXT_CONTROL_SOCK; 774 rt->tmp = NXT_TMP; 775 776 nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t)); 777 778 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { 779 return NXT_ERROR; 780 } 781 782 if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) { 783 return NXT_ERROR; 784 } 785 786 if (rt->capabilities.setid) { 787 ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred, 788 rt->group); 789 790 if (nxt_slow_path(ret != NXT_OK)) { 791 return NXT_ERROR; 792 } 793 794 } else { 795 nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it " 796 "cannot use arbitrary user and group."); 797 } 798 799 /* An engine's parameters. */ 800 801 interface = nxt_service_get(rt->services, "engine", rt->engine); 802 if (interface == NULL) { 803 return NXT_ERROR; 804 } 805 806 rt->engine = interface->name; 807 808 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid); 809 if (nxt_slow_path(ret != NXT_OK)) { 810 return NXT_ERROR; 811 } 812 813 rt->pid_file = file_name.start; 814 815 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log); 816 if (nxt_slow_path(ret != NXT_OK)) { 817 return NXT_ERROR; 818 } 819 820 file = nxt_list_first(rt->log_files); 821 file->name = file_name.start; 822 823 slash = ""; 824 n = nxt_strlen(rt->modules); 825 826 if (n > 1 && rt->modules[n - 1] != '/') { 827 slash = "/"; 828 } 829 830 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z", 831 rt->modules, slash); 832 if (nxt_slow_path(ret != NXT_OK)) { 833 return NXT_ERROR; 834 } 835 836 rt->modules = (char *) file_name.start; 837 838 slash = ""; 839 n = nxt_strlen(rt->state); 840 841 if (n > 1 && rt->state[n - 1] != '/') { 842 slash = "/"; 843 } 844 845 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sversion%Z", 846 rt->state, slash); 847 if (nxt_slow_path(ret != NXT_OK)) { 848 return NXT_ERROR; 849 } 850 851 rt->ver = (char *) file_name.start; 852 853 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->ver); 854 if (nxt_slow_path(ret != NXT_OK)) { 855 return NXT_ERROR; 856 } 857 858 rt->ver_tmp = (char *) file_name.start; 859 860 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z", 861 rt->state, slash); 862 if (nxt_slow_path(ret != NXT_OK)) { 863 return NXT_ERROR; 864 } 865 866 rt->conf = (char *) file_name.start; 867 868 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf); 869 if (nxt_slow_path(ret != NXT_OK)) { 870 return NXT_ERROR; 871 } 872 873 rt->conf_tmp = (char *) file_name.start; 874 875 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z", 876 rt->state, slash); 877 if (nxt_slow_path(ret != NXT_OK)) { 878 return NXT_ERROR; 879 } 880 881 ret = mkdir((char *) file_name.start, S_IRWXU); 882 883 if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) { 884 rt->certs.length = file_name.len; 885 rt->certs.start = file_name.start; 886 887 } else { 888 nxt_alert(task, "Unable to create certificates storage directory: " 889 "mkdir(%s) failed %E", file_name.start, nxt_errno); 890 } 891 892 control.length = nxt_strlen(rt->control); 893 control.start = (u_char *) rt->control; 894 895 sa = nxt_sockaddr_parse(rt->mem_pool, &control); 896 if (nxt_slow_path(sa == NULL)) { 897 return NXT_ERROR; 898 } 899 900 sa->type = SOCK_STREAM; 901 902 rt->controller_listen = sa; 903 904 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { 905 return NXT_ERROR; 906 } 907 908 return NXT_OK; 909 } 910 911 912 static nxt_int_t 913 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) 914 { 915 char *p, **argv; 916 u_char *end; 917 u_char buf[1024]; 918 919 static const char version[] = 920 "unit version: " NXT_VERSION "\n" 921 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n"; 922 923 static const char no_control[] = 924 "option \"--control\" requires socket address\n"; 925 static const char no_user[] = "option \"--user\" requires username\n"; 926 static const char no_group[] = "option \"--group\" requires group name\n"; 927 static const char no_pid[] = "option \"--pid\" requires filename\n"; 928 static const char no_log[] = "option \"--log\" requires filename\n"; 929 static const char no_modules[] = 930 "option \"--modules\" requires directory\n"; 931 static const char no_state[] = "option \"--state\" requires directory\n"; 932 static const char no_tmp[] = "option \"--tmp\" requires directory\n"; 933 934 static const char help[] = 935 "\n" 936 "unit options:\n" 937 "\n" 938 " --version print unit version and configure options\n" 939 "\n" 940 " --no-daemon run unit in non-daemon mode\n" 941 "\n" 942 " --control ADDRESS set address of control API socket\n" 943 " default: \"" NXT_CONTROL_SOCK "\"\n" 944 "\n" 945 " --pid FILE set pid filename\n" 946 " default: \"" NXT_PID "\"\n" 947 "\n" 948 " --log FILE set log filename\n" 949 " default: \"" NXT_LOG "\"\n" 950 "\n" 951 " --modules DIRECTORY set modules directory name\n" 952 " default: \"" NXT_MODULES "\"\n" 953 "\n" 954 " --state DIRECTORY set state directory name\n" 955 " default: \"" NXT_STATE "\"\n" 956 "\n" 957 " --tmp DIRECTORY set tmp directory name\n" 958 " default: \"" NXT_TMP "\"\n" 959 "\n" 960 " --user USER set non-privileged processes to run" 961 " as specified user\n" 962 " default: \"" NXT_USER "\"\n" 963 "\n" 964 " --group GROUP set non-privileged processes to run" 965 " as specified group\n" 966 " default: "; 967 968 static const char group[] = "\"" NXT_GROUP "\"\n\n"; 969 static const char primary[] = "user's primary group\n\n"; 970 971 argv = &nxt_process_argv[1]; 972 973 while (*argv != NULL) { 974 p = *argv++; 975 976 if (nxt_strcmp(p, "--control") == 0) { 977 if (*argv == NULL) { 978 write(STDERR_FILENO, no_control, nxt_length(no_control)); 979 return NXT_ERROR; 980 } 981 982 p = *argv++; 983 984 rt->control = p; 985 986 continue; 987 } 988 989 if (nxt_strcmp(p, "--user") == 0) { 990 if (*argv == NULL) { 991 write(STDERR_FILENO, no_user, nxt_length(no_user)); 992 return NXT_ERROR; 993 } 994 995 p = *argv++; 996 997 rt->user_cred.user = p; 998 999 continue; 1000 } 1001 1002 if (nxt_strcmp(p, "--group") == 0) { 1003 if (*argv == NULL) { 1004 write(STDERR_FILENO, no_group, nxt_length(no_group)); 1005 return NXT_ERROR; 1006 } 1007 1008 p = *argv++; 1009 1010 rt->group = p; 1011 1012 continue; 1013 } 1014 1015 if (nxt_strcmp(p, "--pid") == 0) { 1016 if (*argv == NULL) { 1017 write(STDERR_FILENO, no_pid, nxt_length(no_pid)); 1018 return NXT_ERROR; 1019 } 1020 1021 p = *argv++; 1022 1023 rt->pid = p; 1024 1025 continue; 1026 } 1027 1028 if (nxt_strcmp(p, "--log") == 0) { 1029 if (*argv == NULL) { 1030 write(STDERR_FILENO, no_log, nxt_length(no_log)); 1031 return NXT_ERROR; 1032 } 1033 1034 p = *argv++; 1035 1036 rt->log = p; 1037 1038 continue; 1039 } 1040 1041 if (nxt_strcmp(p, "--modules") == 0) { 1042 if (*argv == NULL) { 1043 write(STDERR_FILENO, no_modules, nxt_length(no_modules)); 1044 return NXT_ERROR; 1045 } 1046 1047 p = *argv++; 1048 1049 rt->modules = p; 1050 1051 continue; 1052 } 1053 1054 if (nxt_strcmp(p, "--state") == 0) { 1055 if (*argv == NULL) { 1056 write(STDERR_FILENO, no_state, nxt_length(no_state)); 1057 return NXT_ERROR; 1058 } 1059 1060 p = *argv++; 1061 1062 rt->state = p; 1063 1064 continue; 1065 } 1066 1067 if (nxt_strcmp(p, "--tmp") == 0) { 1068 if (*argv == NULL) { 1069 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp)); 1070 return NXT_ERROR; 1071 } 1072 1073 p = *argv++; 1074 1075 rt->tmp = p; 1076 1077 continue; 1078 } 1079 1080 if (nxt_strcmp(p, "--no-daemon") == 0) { 1081 rt->daemon = 0; 1082 continue; 1083 } 1084 1085 if (nxt_strcmp(p, "--version") == 0) { 1086 write(STDERR_FILENO, version, nxt_length(version)); 1087 exit(0); 1088 } 1089 1090 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) { 1091 write(STDOUT_FILENO, help, nxt_length(help)); 1092 1093 if (sizeof(NXT_GROUP) == 1) { 1094 write(STDOUT_FILENO, primary, nxt_length(primary)); 1095 1096 } else { 1097 write(STDOUT_FILENO, group, nxt_length(group)); 1098 } 1099 1100 exit(0); 1101 } 1102 1103 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", " 1104 "try \"%s -h\" for available options\n", 1105 p, nxt_process_argv[0]); 1106 1107 write(STDERR_FILENO, buf, end - buf); 1108 1109 return NXT_ERROR; 1110 } 1111 1112 return NXT_OK; 1113 } 1114 1115 1116 nxt_listen_socket_t * 1117 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) 1118 { 1119 nxt_mp_t *mp; 1120 nxt_listen_socket_t *ls; 1121 1122 ls = nxt_array_zero_add(rt->listen_sockets); 1123 if (ls == NULL) { 1124 return NULL; 1125 } 1126 1127 mp = rt->mem_pool; 1128 1129 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, 1130 sa->length); 1131 if (ls->sockaddr == NULL) { 1132 return NULL; 1133 } 1134 1135 ls->sockaddr->type = sa->type; 1136 1137 nxt_sockaddr_text(ls->sockaddr); 1138 1139 ls->socket = -1; 1140 ls->backlog = NXT_LISTEN_BACKLOG; 1141 1142 return ls; 1143 } 1144 1145 1146 static nxt_int_t 1147 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) 1148 { 1149 size_t length; 1150 char hostname[NXT_MAXHOSTNAMELEN + 1]; 1151 1152 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { 1153 nxt_alert(task, "gethostname() failed %E", nxt_errno); 1154 return NXT_ERROR; 1155 } 1156 1157 /* 1158 * Linux gethostname(2): 1159 * 1160 * If the null-terminated hostname is too large to fit, 1161 * then the name is truncated, and no error is returned. 1162 * 1163 * For this reason an additional byte is reserved in the buffer. 1164 */ 1165 hostname[NXT_MAXHOSTNAMELEN] = '\0'; 1166 1167 length = nxt_strlen(hostname); 1168 rt->hostname.length = length; 1169 1170 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length); 1171 1172 if (rt->hostname.start != NULL) { 1173 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); 1174 return NXT_OK; 1175 } 1176 1177 return NXT_ERROR; 1178 } 1179 1180 1181 static nxt_int_t 1182 nxt_runtime_log_files_init(nxt_runtime_t *rt) 1183 { 1184 nxt_file_t *file; 1185 nxt_list_t *log_files; 1186 1187 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); 1188 1189 if (nxt_fast_path(log_files != NULL)) { 1190 rt->log_files = log_files; 1191 1192 /* Preallocate the main log. This allocation cannot fail. */ 1193 file = nxt_list_zero_add(log_files); 1194 1195 file->fd = NXT_FILE_INVALID; 1196 file->log_level = NXT_LOG_ALERT; 1197 1198 return NXT_OK; 1199 } 1200 1201 return NXT_ERROR; 1202 } 1203 1204 1205 nxt_file_t * 1206 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) 1207 { 1208 nxt_int_t ret; 1209 nxt_file_t *file; 1210 nxt_file_name_str_t file_name; 1211 1212 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name); 1213 1214 if (nxt_slow_path(ret != NXT_OK)) { 1215 return NULL; 1216 } 1217 1218 nxt_list_each(file, rt->log_files) { 1219 1220 /* STUB: hardecoded case sensitive/insensitive. */ 1221 1222 if (file->name != NULL 1223 && nxt_file_name_eq(file->name, file_name.start)) 1224 { 1225 return file; 1226 } 1227 1228 } nxt_list_loop; 1229 1230 file = nxt_list_zero_add(rt->log_files); 1231 1232 if (nxt_slow_path(file == NULL)) { 1233 return NULL; 1234 } 1235 1236 file->fd = NXT_FILE_INVALID; 1237 file->log_level = NXT_LOG_ALERT; 1238 file->name = file_name.start; 1239 1240 return file; 1241 } 1242 1243 1244 static nxt_int_t 1245 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) 1246 { 1247 nxt_int_t ret; 1248 nxt_file_t *file; 1249 1250 nxt_list_each(file, rt->log_files) { 1251 1252 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, 1253 NXT_FILE_OWNER_ACCESS); 1254 1255 if (ret != NXT_OK) { 1256 return NXT_ERROR; 1257 } 1258 1259 } nxt_list_loop; 1260 1261 file = nxt_list_first(rt->log_files); 1262 1263 return nxt_file_stderr(file); 1264 } 1265 1266 1267 nxt_int_t 1268 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) 1269 { 1270 nxt_int_t ret; 1271 nxt_uint_t c, p, ncurr, nprev; 1272 nxt_listen_socket_t *curr, *prev; 1273 1274 curr = rt->listen_sockets->elts; 1275 ncurr = rt->listen_sockets->nelts; 1276 1277 if (rt->inherited_sockets != NULL) { 1278 prev = rt->inherited_sockets->elts; 1279 nprev = rt->inherited_sockets->nelts; 1280 1281 } else { 1282 prev = NULL; 1283 nprev = 0; 1284 } 1285 1286 for (c = 0; c < ncurr; c++) { 1287 1288 for (p = 0; p < nprev; p++) { 1289 1290 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { 1291 1292 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); 1293 if (ret != NXT_OK) { 1294 return NXT_ERROR; 1295 } 1296 1297 goto next; 1298 } 1299 } 1300 1301 if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) { 1302 return NXT_ERROR; 1303 } 1304 1305 next: 1306 1307 continue; 1308 } 1309 1310 return NXT_OK; 1311 } 1312 1313 1314 nxt_int_t 1315 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) 1316 { 1317 nxt_uint_t i, n; 1318 nxt_listen_socket_t *ls; 1319 1320 ls = rt->listen_sockets->elts; 1321 n = rt->listen_sockets->nelts; 1322 1323 for (i = 0; i < n; i++) { 1324 if (ls[i].flags == NXT_NONBLOCK) { 1325 if (nxt_listen_event(task, &ls[i]) == NULL) { 1326 return NXT_ERROR; 1327 } 1328 } 1329 } 1330 1331 return NXT_OK; 1332 } 1333 1334 1335 nxt_str_t * 1336 nxt_current_directory(nxt_mp_t *mp) 1337 { 1338 size_t length; 1339 u_char *p; 1340 nxt_str_t *name; 1341 char buf[NXT_MAX_PATH_LEN]; 1342 1343 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); 1344 1345 if (nxt_fast_path(length != 0)) { 1346 name = nxt_str_alloc(mp, length + 1); 1347 1348 if (nxt_fast_path(name != NULL)) { 1349 p = nxt_cpymem(name->start, buf, length); 1350 *p = '/'; 1351 1352 return name; 1353 } 1354 } 1355 1356 return NULL; 1357 } 1358 1359 1360 static nxt_int_t 1361 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) 1362 { 1363 ssize_t length; 1364 nxt_int_t n; 1365 nxt_file_t file; 1366 u_char pid[NXT_INT64_T_LEN + nxt_length("\n")]; 1367 1368 nxt_memzero(&file, sizeof(nxt_file_t)); 1369 1370 file.name = pid_file; 1371 1372 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, 1373 NXT_FILE_DEFAULT_ACCESS); 1374 1375 if (n != NXT_OK) { 1376 return NXT_ERROR; 1377 } 1378 1379 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; 1380 1381 if (nxt_file_write(&file, pid, length, 0) != length) { 1382 return NXT_ERROR; 1383 } 1384 1385 nxt_file_close(task, &file); 1386 1387 return NXT_OK; 1388 } 1389 1390 1391 void 1392 nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) 1393 { 1394 nxt_process_t *child; 1395 1396 if (process->registered == 1) { 1397 nxt_runtime_process_remove(rt, process); 1398 } 1399 1400 if (process->link.next != NULL) { 1401 nxt_queue_remove(&process->link); 1402 } 1403 1404 nxt_queue_each(child, &process->children, nxt_process_t, link) { 1405 nxt_queue_remove(&child->link); 1406 child->link.next = NULL; 1407 } nxt_queue_loop; 1408 1409 nxt_assert(process->use_count == 0); 1410 nxt_assert(process->registered == 0); 1411 nxt_assert(nxt_queue_is_empty(&process->ports)); 1412 1413 nxt_port_mmaps_destroy(&process->incoming, 1); 1414 1415 nxt_thread_mutex_destroy(&process->incoming.mutex); 1416 1417 /* processes from nxt_runtime_process_get() have no memory pool */ 1418 if (process->mem_pool != NULL) { 1419 nxt_mp_destroy(process->mem_pool); 1420 } 1421 1422 nxt_mp_free(rt->mem_pool, process); 1423 } 1424 1425 1426 static nxt_int_t 1427 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 1428 { 1429 nxt_process_t *process; 1430 1431 process = data; 1432 1433 if (lhq->key.length == sizeof(nxt_pid_t) 1434 && *(nxt_pid_t *) lhq->key.start == process->pid) 1435 { 1436 return NXT_OK; 1437 } 1438 1439 return NXT_DECLINED; 1440 } 1441 1442 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 1443 NXT_LVLHSH_DEFAULT, 1444 nxt_runtime_lvlhsh_pid_test, 1445 nxt_lvlhsh_alloc, 1446 nxt_lvlhsh_free, 1447 }; 1448 1449 1450 nxt_inline void 1451 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) 1452 { 1453 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 1454 lhq->key.length = sizeof(*pid); 1455 lhq->key.start = (u_char *) pid; 1456 lhq->proto = &lvlhsh_processes_proto; 1457 } 1458 1459 1460 nxt_process_t * 1461 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) 1462 { 1463 nxt_process_t *process; 1464 nxt_lvlhsh_query_t lhq; 1465 1466 process = NULL; 1467 1468 nxt_runtime_process_lhq_pid(&lhq, &pid); 1469 1470 nxt_thread_mutex_lock(&rt->processes_mutex); 1471 1472 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1473 process = lhq.value; 1474 1475 } else { 1476 nxt_thread_log_debug("process %PI not found", pid); 1477 } 1478 1479 nxt_thread_mutex_unlock(&rt->processes_mutex); 1480 1481 return process; 1482 } 1483 1484 1485 static nxt_process_t * 1486 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) 1487 { 1488 nxt_process_t *process; 1489 nxt_lvlhsh_query_t lhq; 1490 1491 nxt_runtime_process_lhq_pid(&lhq, &pid); 1492 1493 nxt_thread_mutex_lock(&rt->processes_mutex); 1494 1495 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1496 nxt_thread_log_debug("process %PI found", pid); 1497 1498 nxt_thread_mutex_unlock(&rt->processes_mutex); 1499 1500 process = lhq.value; 1501 process->use_count++; 1502 1503 return process; 1504 } 1505 1506 process = nxt_process_new(rt); 1507 if (nxt_slow_path(process == NULL)) { 1508 1509 nxt_thread_mutex_unlock(&rt->processes_mutex); 1510 1511 return NULL; 1512 } 1513 1514 process->pid = pid; 1515 1516 lhq.replace = 0; 1517 lhq.value = process; 1518 lhq.pool = rt->mem_pool; 1519 1520 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1521 1522 case NXT_OK: 1523 if (rt->nprocesses == 0) { 1524 rt->mprocess = process; 1525 } 1526 1527 rt->nprocesses++; 1528 1529 process->registered = 1; 1530 1531 nxt_thread_log_debug("process %PI insert", pid); 1532 break; 1533 1534 default: 1535 nxt_thread_log_debug("process %PI insert failed", pid); 1536 break; 1537 } 1538 1539 nxt_thread_mutex_unlock(&rt->processes_mutex); 1540 1541 return process; 1542 } 1543 1544 1545 void 1546 nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) 1547 { 1548 nxt_port_t *port; 1549 nxt_runtime_t *rt; 1550 nxt_lvlhsh_query_t lhq; 1551 1552 nxt_assert(process->registered == 0); 1553 1554 rt = task->thread->runtime; 1555 1556 nxt_runtime_process_lhq_pid(&lhq, &process->pid); 1557 1558 lhq.replace = 0; 1559 lhq.value = process; 1560 lhq.pool = rt->mem_pool; 1561 1562 nxt_thread_mutex_lock(&rt->processes_mutex); 1563 1564 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1565 1566 case NXT_OK: 1567 if (rt->nprocesses == 0) { 1568 rt->mprocess = process; 1569 } 1570 1571 rt->nprocesses++; 1572 1573 nxt_process_port_each(process, port) { 1574 1575 port->pid = process->pid; 1576 1577 nxt_runtime_port_add(task, port); 1578 1579 } nxt_process_port_loop; 1580 1581 process->registered = 1; 1582 1583 nxt_debug(task, "process %PI added", process->pid); 1584 break; 1585 1586 default: 1587 nxt_alert(task, "process %PI failed to add", process->pid); 1588 break; 1589 } 1590 1591 nxt_thread_mutex_unlock(&rt->processes_mutex); 1592 } 1593 1594 1595 void 1596 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) 1597 { 1598 nxt_pid_t pid; 1599 nxt_lvlhsh_query_t lhq; 1600 1601 nxt_assert(process->registered != 0); 1602 1603 pid = process->pid; 1604 1605 nxt_runtime_process_lhq_pid(&lhq, &pid); 1606 1607 lhq.pool = rt->mem_pool; 1608 1609 nxt_thread_mutex_lock(&rt->processes_mutex); 1610 1611 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { 1612 1613 case NXT_OK: 1614 nxt_assert(lhq.value == process); 1615 1616 rt->nprocesses--; 1617 1618 process->registered = 0; 1619 1620 nxt_thread_log_debug("process %PI removed", pid); 1621 break; 1622 1623 default: 1624 nxt_thread_log_alert("process %PI remove failed", pid); 1625 break; 1626 } 1627 1628 nxt_thread_mutex_unlock(&rt->processes_mutex); 1629 } 1630 1631 1632 nxt_process_t * 1633 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1634 { 1635 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto); 1636 1637 return nxt_runtime_process_next(rt, lhe); 1638 } 1639 1640 1641 nxt_port_t * 1642 nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, 1643 nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type) 1644 { 1645 nxt_port_t *port; 1646 nxt_process_t *process; 1647 1648 process = nxt_runtime_process_get(rt, pid); 1649 if (nxt_slow_path(process == NULL)) { 1650 return NULL; 1651 } 1652 1653 port = nxt_port_new(task, id, pid, type); 1654 if (nxt_slow_path(port == NULL)) { 1655 nxt_process_use(task, process, -1); 1656 return NULL; 1657 } 1658 1659 nxt_process_port_add(task, process, port); 1660 1661 nxt_process_use(task, process, -1); 1662 1663 nxt_runtime_port_add(task, port); 1664 1665 nxt_port_use(task, port, -1); 1666 1667 return port; 1668 } 1669 1670 1671 static void 1672 nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port) 1673 { 1674 nxt_int_t res; 1675 nxt_runtime_t *rt; 1676 1677 rt = task->thread->runtime; 1678 1679 res = nxt_port_hash_add(&rt->ports, port); 1680 1681 if (res != NXT_OK) { 1682 return; 1683 } 1684 1685 rt->port_by_type[port->type] = port; 1686 1687 nxt_port_use(task, port, 1); 1688 } 1689 1690 1691 void 1692 nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port) 1693 { 1694 nxt_int_t res; 1695 nxt_runtime_t *rt; 1696 1697 rt = task->thread->runtime; 1698 1699 res = nxt_port_hash_remove(&rt->ports, port); 1700 1701 if (res != NXT_OK) { 1702 return; 1703 } 1704 1705 if (rt->port_by_type[port->type] == port) { 1706 rt->port_by_type[port->type] = NULL; 1707 } 1708 1709 nxt_port_use(task, port, -1); 1710 } 1711 1712 1713 nxt_port_t * 1714 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, 1715 nxt_port_id_t port_id) 1716 { 1717 return nxt_port_hash_find(&rt->ports, pid, port_id); 1718 } 1719