1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_main.h> 9 #include <nxt_runtime.h> 10 #include <nxt_port.h> 11 #include <nxt_master_process.h> 12 #include <nxt_router.h> 13 14 15 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, 16 nxt_runtime_t *rt); 17 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, 18 nxt_runtime_t *rt); 19 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); 20 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); 21 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); 22 static void nxt_runtime_initial_start(nxt_task_t *task); 23 static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, 24 nxt_runtime_t *rt); 25 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); 26 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); 27 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, 28 nxt_runtime_t *rt); 29 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt); 30 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt); 31 static nxt_sockaddr_t *nxt_runtime_sockaddr_parse(nxt_task_t *task, 32 nxt_mp_t *mp, nxt_str_t *addr); 33 static nxt_sockaddr_t *nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, 34 nxt_mp_t *mp, nxt_str_t *addr); 35 static nxt_sockaddr_t *nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, 36 nxt_mp_t *mp, nxt_str_t *addr); 37 static nxt_sockaddr_t *nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, 38 nxt_mp_t *mp, nxt_str_t *addr); 39 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt); 40 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt); 41 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task, 42 nxt_runtime_t *rt); 43 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task, 44 nxt_file_name_t *pid_file); 45 46 #if (NXT_THREADS) 47 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 48 nxt_runtime_cont_t cont); 49 #endif 50 51 static void nxt_runtime_process_destroy(nxt_runtime_t *rt, 52 nxt_process_t *process); 53 static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt, 54 nxt_pid_t pid); 55 56 57 nxt_int_t 58 nxt_runtime_create(nxt_task_t *task) 59 { 60 nxt_mp_t *mp; 61 nxt_int_t ret; 62 nxt_array_t *listen_sockets; 63 nxt_runtime_t *rt; 64 nxt_app_lang_module_t *lang; 65 66 mp = nxt_mp_create(1024, 128, 256, 32); 67 68 if (nxt_slow_path(mp == NULL)) { 69 return NXT_ERROR; 70 } 71 72 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t)); 73 if (nxt_slow_path(rt == NULL)) { 74 return NXT_ERROR; 75 } 76 77 task->thread->runtime = rt; 78 rt->mem_pool = mp; 79 80 nxt_thread_mutex_create(&rt->processes_mutex); 81 82 rt->prefix = nxt_current_directory(mp); 83 if (nxt_slow_path(rt->prefix == NULL)) { 84 goto fail; 85 } 86 87 rt->conf_prefix = rt->prefix; 88 89 rt->services = nxt_services_init(mp); 90 if (nxt_slow_path(rt->services == NULL)) { 91 goto fail; 92 } 93 94 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t)); 95 if (nxt_slow_path(rt->languages == NULL)) { 96 goto fail; 97 } 98 99 /* Should not fail. */ 100 lang = nxt_array_add(rt->languages); 101 lang->type = (nxt_str_t) nxt_string("go"); 102 lang->version = (nxt_str_t) nxt_null_string; 103 lang->file = NULL; 104 lang->module = &nxt_go_module; 105 106 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); 107 if (nxt_slow_path(listen_sockets == NULL)) { 108 goto fail; 109 } 110 111 rt->listen_sockets = listen_sockets; 112 113 ret = nxt_runtime_inherited_listen_sockets(task, rt); 114 if (nxt_slow_path(ret != NXT_OK)) { 115 goto fail; 116 } 117 118 if (nxt_runtime_hostname(task, rt) != NXT_OK) { 119 goto fail; 120 } 121 122 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { 123 goto fail; 124 } 125 126 if (nxt_runtime_event_engines(task, rt) != NXT_OK) { 127 goto fail; 128 } 129 130 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { 131 goto fail; 132 } 133 134 rt->start = nxt_runtime_initial_start; 135 136 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 137 nxt_runtime_start, task, rt, NULL); 138 139 return NXT_OK; 140 141 fail: 142 143 nxt_mp_destroy(mp); 144 145 return NXT_ERROR; 146 } 147 148 149 static nxt_int_t 150 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 151 { 152 u_char *v, *p; 153 nxt_int_t type; 154 nxt_array_t *inherited_sockets; 155 nxt_socket_t s; 156 nxt_listen_socket_t *ls; 157 158 v = (u_char *) getenv("NGINX"); 159 160 if (v == NULL) { 161 return nxt_runtime_systemd_listen_sockets(task, rt); 162 } 163 164 nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v); 165 166 inherited_sockets = nxt_array_create(rt->mem_pool, 167 1, sizeof(nxt_listen_socket_t)); 168 if (inherited_sockets == NULL) { 169 return NXT_ERROR; 170 } 171 172 rt->inherited_sockets = inherited_sockets; 173 174 for (p = v; *p != '\0'; p++) { 175 176 if (*p == ';') { 177 s = nxt_int_parse(v, p - v); 178 179 if (nxt_slow_path(s < 0)) { 180 nxt_log(task, NXT_LOG_CRIT, "invalid socket number " 181 "\"%s\" in NGINX environment variable, " 182 "ignoring the rest of the variable", v); 183 return NXT_ERROR; 184 } 185 186 v = p + 1; 187 188 ls = nxt_array_zero_add(inherited_sockets); 189 if (nxt_slow_path(ls == NULL)) { 190 return NXT_ERROR; 191 } 192 193 ls->socket = s; 194 195 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 196 if (nxt_slow_path(ls->sockaddr == NULL)) { 197 return NXT_ERROR; 198 } 199 200 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); 201 if (nxt_slow_path(type == -1)) { 202 return NXT_ERROR; 203 } 204 205 ls->sockaddr->type = (uint16_t) type; 206 } 207 } 208 209 return NXT_OK; 210 } 211 212 213 static nxt_int_t 214 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 215 { 216 u_char *nfd, *pid; 217 nxt_int_t n; 218 nxt_array_t *inherited_sockets; 219 nxt_socket_t s; 220 nxt_listen_socket_t *ls; 221 222 /* 223 * Number of listening sockets passed. The socket 224 * descriptors start from number 3 and are sequential. 225 */ 226 nfd = (u_char *) getenv("LISTEN_FDS"); 227 if (nfd == NULL) { 228 return NXT_OK; 229 } 230 231 /* The pid of the service process. */ 232 pid = (u_char *) getenv("LISTEN_PID"); 233 if (pid == NULL) { 234 return NXT_OK; 235 } 236 237 n = nxt_int_parse(nfd, nxt_strlen(nfd)); 238 if (n < 0) { 239 return NXT_OK; 240 } 241 242 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { 243 return NXT_OK; 244 } 245 246 nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n); 247 248 inherited_sockets = nxt_array_create(rt->mem_pool, 249 n, sizeof(nxt_listen_socket_t)); 250 if (inherited_sockets == NULL) { 251 return NXT_ERROR; 252 } 253 254 rt->inherited_sockets = inherited_sockets; 255 256 for (s = 3; s < n; s++) { 257 ls = nxt_array_zero_add(inherited_sockets); 258 if (nxt_slow_path(ls == NULL)) { 259 return NXT_ERROR; 260 } 261 262 ls->socket = s; 263 264 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 265 if (nxt_slow_path(ls->sockaddr == NULL)) { 266 return NXT_ERROR; 267 } 268 269 ls->sockaddr->type = SOCK_STREAM; 270 } 271 272 return NXT_OK; 273 } 274 275 276 static nxt_int_t 277 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) 278 { 279 nxt_thread_t *thread; 280 nxt_event_engine_t *engine; 281 const nxt_event_interface_t *interface; 282 283 interface = nxt_service_get(rt->services, "engine", NULL); 284 285 if (nxt_slow_path(interface == NULL)) { 286 /* TODO: log */ 287 return NXT_ERROR; 288 } 289 290 engine = nxt_event_engine_create(task, interface, 291 nxt_master_process_signals, 0, 0); 292 293 if (nxt_slow_path(engine == NULL)) { 294 return NXT_ERROR; 295 } 296 297 thread = task->thread; 298 thread->engine = engine; 299 thread->fiber = &engine->fibers->fiber; 300 301 engine->id = rt->last_engine_id++; 302 303 nxt_queue_init(&rt->engines); 304 nxt_queue_insert_tail(&rt->engines, &engine->link); 305 306 return NXT_OK; 307 } 308 309 310 static nxt_int_t 311 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) 312 { 313 #if (NXT_THREADS) 314 nxt_int_t ret; 315 nxt_array_t *thread_pools; 316 317 thread_pools = nxt_array_create(rt->mem_pool, 1, 318 sizeof(nxt_thread_pool_t *)); 319 320 if (nxt_slow_path(thread_pools == NULL)) { 321 return NXT_ERROR; 322 } 323 324 rt->thread_pools = thread_pools; 325 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); 326 327 if (nxt_slow_path(ret != NXT_OK)) { 328 return NXT_ERROR; 329 } 330 331 #endif 332 333 return NXT_OK; 334 } 335 336 337 static void 338 nxt_runtime_start(nxt_task_t *task, void *obj, void *data) 339 { 340 nxt_runtime_t *rt; 341 342 rt = obj; 343 344 nxt_debug(task, "rt conf done"); 345 346 task->thread->log->ctx_handler = NULL; 347 task->thread->log->ctx = NULL; 348 349 if (nxt_runtime_conf_init(task, rt) != NXT_OK) { 350 goto fail; 351 } 352 353 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { 354 goto fail; 355 } 356 357 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 358 goto fail; 359 } 360 361 #if (NXT_THREADS) 362 363 /* 364 * Thread pools should be destroyed before starting worker 365 * processes, because thread pool semaphores will stick in 366 * locked state in new processes after fork(). 367 */ 368 nxt_runtime_thread_pool_destroy(task, rt, rt->start); 369 370 #else 371 372 rt->start(task->thread, rt); 373 374 #endif 375 376 return; 377 378 fail: 379 380 nxt_runtime_quit(task); 381 } 382 383 384 static void 385 nxt_runtime_initial_start(nxt_task_t *task) 386 { 387 nxt_int_t ret; 388 nxt_thread_t *thr; 389 nxt_runtime_t *rt; 390 const nxt_event_interface_t *interface; 391 392 thr = task->thread; 393 rt = thr->runtime; 394 395 if (rt->inherited_sockets == NULL && rt->daemon) { 396 397 if (nxt_process_daemon(task) != NXT_OK) { 398 goto fail; 399 } 400 401 /* 402 * An event engine should be updated after fork() 403 * even if an event facility was not changed because: 404 * 1) inherited kqueue descriptor is invalid, 405 * 2) the signal thread is not inherited. 406 */ 407 interface = nxt_service_get(rt->services, "engine", rt->engine); 408 if (interface == NULL) { 409 goto fail; 410 } 411 412 ret = nxt_event_engine_change(task->thread->engine, interface, 413 rt->batch); 414 if (ret != NXT_OK) { 415 goto fail; 416 } 417 } 418 419 ret = nxt_runtime_pid_file_create(task, rt->pid_file); 420 if (ret != NXT_OK) { 421 goto fail; 422 } 423 424 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 425 goto fail; 426 } 427 428 thr->engine->max_connections = rt->engine_connections; 429 430 if (rt->master_process) { 431 if (nxt_master_process_start(thr, task, rt) != NXT_ERROR) { 432 return; 433 } 434 435 } else { 436 nxt_single_process_start(thr, task, rt); 437 return; 438 } 439 440 fail: 441 442 nxt_runtime_quit(task); 443 } 444 445 446 static void 447 nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt) 448 { 449 #if (NXT_THREADS) 450 nxt_int_t ret; 451 452 ret = nxt_runtime_thread_pool_create(thr, rt, rt->auxiliary_threads, 453 60000 * 1000000LL); 454 455 if (nxt_slow_path(ret != NXT_OK)) { 456 nxt_runtime_quit(task); 457 return; 458 } 459 460 #endif 461 462 rt->types |= (1U << NXT_PROCESS_SINGLE); 463 464 nxt_runtime_listen_sockets_enable(task, rt); 465 466 return; 467 } 468 469 470 void 471 nxt_runtime_quit(nxt_task_t *task) 472 { 473 nxt_bool_t done; 474 nxt_runtime_t *rt; 475 nxt_event_engine_t *engine; 476 477 rt = task->thread->runtime; 478 engine = task->thread->engine; 479 480 nxt_debug(task, "exiting"); 481 482 done = 1; 483 484 if (!engine->shutdown) { 485 engine->shutdown = 1; 486 487 #if (NXT_THREADS) 488 489 if (!nxt_array_is_empty(rt->thread_pools)) { 490 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); 491 done = 0; 492 } 493 494 #endif 495 496 if (nxt_runtime_is_master(rt)) { 497 nxt_master_stop_worker_processes(task, rt); 498 done = 0; 499 } 500 } 501 502 nxt_runtime_close_idle_connections(engine); 503 504 if (done) { 505 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, 506 task, rt, engine); 507 } 508 } 509 510 511 static void 512 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) 513 { 514 nxt_conn_t *c; 515 nxt_queue_t *idle; 516 nxt_queue_link_t *link, *next; 517 518 nxt_debug(&engine->task, "close idle connections"); 519 520 idle = &engine->idle_connections; 521 522 for (link = nxt_queue_head(idle); 523 link != nxt_queue_tail(idle); 524 link = next) 525 { 526 next = nxt_queue_next(link); 527 c = nxt_queue_link_data(link, nxt_conn_t, link); 528 529 if (!c->socket.read_ready) { 530 nxt_queue_remove(link); 531 nxt_conn_close(engine, c); 532 } 533 } 534 } 535 536 537 static void 538 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) 539 { 540 nxt_runtime_t *rt; 541 nxt_process_t *process; 542 nxt_event_engine_t *engine; 543 544 rt = obj; 545 engine = data; 546 547 #if (NXT_THREADS) 548 549 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); 550 551 if (!nxt_array_is_empty(rt->thread_pools)) { 552 return; 553 } 554 555 #endif 556 557 if (nxt_runtime_is_master(rt)) { 558 if (rt->pid_file != NULL) { 559 nxt_file_delete(rt->pid_file); 560 } 561 } 562 563 if (!engine->event.signal_support) { 564 nxt_event_engine_signals_stop(engine); 565 } 566 567 nxt_runtime_process_each(rt, process) { 568 569 nxt_runtime_process_remove(rt, process); 570 571 } nxt_runtime_process_loop; 572 573 nxt_thread_mutex_destroy(&rt->processes_mutex); 574 575 nxt_mp_destroy(rt->mem_pool); 576 577 nxt_debug(task, "exit"); 578 579 exit(0); 580 nxt_unreachable(); 581 } 582 583 584 static nxt_int_t 585 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) 586 { 587 nxt_event_engine_t *engine; 588 const nxt_event_interface_t *interface; 589 590 engine = task->thread->engine; 591 592 if (engine->batch == rt->batch 593 && nxt_strcmp(engine->event.name, rt->engine) == 0) 594 { 595 return NXT_OK; 596 } 597 598 interface = nxt_service_get(rt->services, "engine", rt->engine); 599 600 if (interface != NULL) { 601 return nxt_event_engine_change(engine, interface, rt->batch); 602 } 603 604 return NXT_ERROR; 605 } 606 607 608 void 609 nxt_runtime_event_engine_free(nxt_runtime_t *rt) 610 { 611 nxt_queue_link_t *link; 612 nxt_event_engine_t *engine; 613 614 link = nxt_queue_first(&rt->engines); 615 nxt_queue_remove(link); 616 617 engine = nxt_queue_link_data(link, nxt_event_engine_t, link); 618 nxt_event_engine_free(engine); 619 } 620 621 622 #if (NXT_THREADS) 623 624 static void nxt_runtime_thread_pool_init(void); 625 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, 626 void *data); 627 628 629 nxt_int_t 630 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, 631 nxt_uint_t max_threads, nxt_nsec_t timeout) 632 { 633 nxt_thread_pool_t *thread_pool, **tp; 634 635 tp = nxt_array_add(rt->thread_pools); 636 if (tp == NULL) { 637 return NXT_ERROR; 638 } 639 640 thread_pool = nxt_thread_pool_create(max_threads, timeout, 641 nxt_runtime_thread_pool_init, 642 thr->engine, 643 nxt_runtime_thread_pool_exit); 644 645 if (nxt_fast_path(thread_pool != NULL)) { 646 *tp = thread_pool; 647 } 648 649 return NXT_OK; 650 } 651 652 653 static void 654 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 655 nxt_runtime_cont_t cont) 656 { 657 nxt_uint_t n; 658 nxt_thread_pool_t **tp; 659 660 rt->continuation = cont; 661 662 n = rt->thread_pools->nelts; 663 664 if (n == 0) { 665 cont(task); 666 return; 667 } 668 669 tp = rt->thread_pools->elts; 670 671 do { 672 nxt_thread_pool_destroy(*tp); 673 674 tp++; 675 n--; 676 } while (n != 0); 677 } 678 679 680 static void 681 nxt_runtime_thread_pool_init(void) 682 { 683 #if (NXT_REGEX) 684 nxt_regex_init(0); 685 #endif 686 } 687 688 689 static void 690 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 691 { 692 nxt_uint_t i, n; 693 nxt_runtime_t *rt; 694 nxt_thread_pool_t *tp, **thread_pools; 695 nxt_thread_handle_t handle; 696 697 tp = obj; 698 699 if (data != NULL) { 700 handle = (nxt_thread_handle_t) (uintptr_t) data; 701 nxt_thread_wait(handle); 702 } 703 704 rt = task->thread->runtime; 705 706 thread_pools = rt->thread_pools->elts; 707 n = rt->thread_pools->nelts; 708 709 nxt_debug(task, "thread pools: %ui", n); 710 711 for (i = 0; i < n; i++) { 712 713 if (tp == thread_pools[i]) { 714 nxt_array_remove(rt->thread_pools, &thread_pools[i]); 715 716 if (n == 1) { 717 /* The last thread pool. */ 718 rt->continuation(task); 719 } 720 721 return; 722 } 723 } 724 } 725 726 #endif 727 728 729 static nxt_int_t 730 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) 731 { 732 nxt_int_t ret; 733 nxt_str_t *prefix; 734 nxt_file_t *file; 735 nxt_file_name_str_t file_name; 736 const nxt_event_interface_t *interface; 737 738 rt->daemon = 1; 739 rt->master_process = 1; 740 rt->engine_connections = 256; 741 rt->worker_processes = 1; 742 rt->auxiliary_threads = 2; 743 rt->user_cred.user = "nobody"; 744 rt->group = NULL; 745 rt->pid = "nginext.pid"; 746 rt->error_log = "error.log"; 747 748 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { 749 return NXT_ERROR; 750 } 751 752 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { 753 return NXT_ERROR; 754 } 755 756 if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) { 757 return NXT_ERROR; 758 } 759 760 /* An engine's parameters. */ 761 762 interface = nxt_service_get(rt->services, "engine", rt->engine); 763 if (interface == NULL) { 764 return NXT_ERROR; 765 } 766 767 rt->engine = interface->name; 768 769 prefix = nxt_file_name_is_absolute(rt->pid) ? NULL : rt->prefix; 770 771 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z", 772 prefix, rt->pid); 773 if (nxt_slow_path(ret != NXT_OK)) { 774 return NXT_ERROR; 775 } 776 777 rt->pid_file = file_name.start; 778 779 prefix = nxt_file_name_is_absolute(rt->error_log) ? NULL : rt->prefix; 780 781 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z", 782 prefix, rt->error_log); 783 if (nxt_slow_path(ret != NXT_OK)) { 784 return NXT_ERROR; 785 } 786 787 file = nxt_list_first(rt->log_files); 788 file->name = file_name.start; 789 790 return NXT_OK; 791 } 792 793 794 static nxt_int_t 795 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) 796 { 797 char *p, **argv; 798 nxt_int_t n; 799 nxt_str_t addr; 800 nxt_sockaddr_t *sa; 801 802 argv = nxt_process_argv; 803 804 while (*argv != NULL) { 805 p = *argv++; 806 807 if (nxt_strcmp(p, "--listen") == 0) { 808 if (*argv == NULL) { 809 nxt_log(task, NXT_LOG_CRIT, 810 "no argument for option \"--listen\""); 811 return NXT_ERROR; 812 } 813 814 p = *argv++; 815 816 addr.length = nxt_strlen(p); 817 addr.start = (u_char *) p; 818 819 sa = nxt_runtime_sockaddr_parse(task, rt->mem_pool, &addr); 820 821 if (sa == NULL) { 822 return NXT_ERROR; 823 } 824 825 rt->controller_listen = sa; 826 827 continue; 828 } 829 830 if (nxt_strcmp(p, "--upstream") == 0) { 831 if (*argv == NULL) { 832 nxt_log(task, NXT_LOG_CRIT, 833 "no argument for option \"--upstream\""); 834 return NXT_ERROR; 835 } 836 837 p = *argv++; 838 839 rt->upstream.length = nxt_strlen(p); 840 rt->upstream.start = (u_char *) p; 841 842 continue; 843 } 844 845 if (nxt_strcmp(p, "--workers") == 0) { 846 if (*argv == NULL) { 847 nxt_log(task, NXT_LOG_CRIT, 848 "no argument for option \"--workers\""); 849 return NXT_ERROR; 850 } 851 852 p = *argv++; 853 n = nxt_int_parse((u_char *) p, nxt_strlen(p)); 854 855 if (n < 1) { 856 nxt_log(task, NXT_LOG_CRIT, 857 "invalid number of workers: \"%s\"", p); 858 return NXT_ERROR; 859 } 860 861 rt->worker_processes = n; 862 863 continue; 864 } 865 866 if (nxt_strcmp(p, "--user") == 0) { 867 if (*argv == NULL) { 868 nxt_log(task, NXT_LOG_CRIT, 869 "no argument for option \"--user\""); 870 return NXT_ERROR; 871 } 872 873 p = *argv++; 874 875 rt->user_cred.user = p; 876 877 continue; 878 } 879 880 if (nxt_strcmp(p, "--group") == 0) { 881 if (*argv == NULL) { 882 nxt_log(task, NXT_LOG_CRIT, 883 "no argument for option \"--group\""); 884 return NXT_ERROR; 885 } 886 887 p = *argv++; 888 889 rt->group = p; 890 891 continue; 892 } 893 894 if (nxt_strcmp(p, "--pid") == 0) { 895 if (*argv == NULL) { 896 nxt_log(task, NXT_LOG_CRIT, 897 "no argument for option \"--pid\""); 898 return NXT_ERROR; 899 } 900 901 p = *argv++; 902 903 rt->pid = p; 904 905 continue; 906 } 907 908 if (nxt_strcmp(p, "--log") == 0) { 909 if (*argv == NULL) { 910 nxt_log(task, NXT_LOG_CRIT, 911 "no argument for option \"--log\""); 912 return NXT_ERROR; 913 } 914 915 p = *argv++; 916 917 rt->error_log = p; 918 919 continue; 920 } 921 922 if (nxt_strcmp(p, "--no-daemonize") == 0) { 923 rt->daemon = 0; 924 continue; 925 } 926 } 927 928 return NXT_OK; 929 } 930 931 932 static nxt_sockaddr_t * 933 nxt_runtime_sockaddr_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 934 { 935 u_char *p; 936 size_t length; 937 938 length = addr->length; 939 p = addr->start; 940 941 if (length >= 5 && nxt_memcmp(p, "unix:", 5) == 0) { 942 return nxt_runtime_sockaddr_unix_parse(task, mp, addr); 943 } 944 945 if (length != 0 && *p == '[') { 946 return nxt_runtime_sockaddr_inet6_parse(task, mp, addr); 947 } 948 949 return nxt_runtime_sockaddr_inet_parse(task, mp, addr); 950 } 951 952 953 static nxt_sockaddr_t * 954 nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr) 955 { 956 #if (NXT_HAVE_UNIX_DOMAIN) 957 u_char *p; 958 size_t length, socklen; 959 nxt_sockaddr_t *sa; 960 961 /* 962 * Actual sockaddr_un length can be lesser or even larger than defined 963 * struct sockaddr_un length (see comment in nxt_socket.h). So 964 * limit maximum Unix domain socket address length by defined sun_path[] 965 * length because some OSes accept addresses twice larger than defined 966 * struct sockaddr_un. Also reserve space for a trailing zero to avoid 967 * ambiguity, since many OSes accept Unix domain socket addresses 968 * without a trailing zero. 969 */ 970 const size_t max_len = sizeof(struct sockaddr_un) 971 - offsetof(struct sockaddr_un, sun_path) - 1; 972 973 /* cutting "unix:" */ 974 length = addr->length - 5; 975 p = addr->start + 5; 976 977 if (length == 0) { 978 nxt_log(task, NXT_LOG_CRIT, 979 "unix domain socket \"%V\" name is invalid", addr); 980 return NULL; 981 } 982 983 if (length > max_len) { 984 nxt_log(task, NXT_LOG_CRIT, 985 "unix domain socket \"%V\" name is too long", addr); 986 return NULL; 987 } 988 989 socklen = offsetof(struct sockaddr_un, sun_path) + length + 1; 990 991 #if (NXT_LINUX) 992 993 /* 994 * Linux unix(7): 995 * 996 * abstract: an abstract socket address is distinguished by the fact 997 * that sun_path[0] is a null byte ('\0'). The socket's address in 998 * this namespace is given by the additional bytes in sun_path that 999 * are covered by the specified length of the address structure. 1000 * (Null bytes in the name have no special significance.) 1001 */ 1002 if (p[0] == '@') { 1003 p[0] = '\0'; 1004 socklen--; 1005 } 1006 1007 #endif 1008 1009 sa = nxt_sockaddr_alloc(mp, socklen, addr->length); 1010 1011 if (nxt_slow_path(sa == NULL)) { 1012 return NULL; 1013 } 1014 1015 sa->type = SOCK_STREAM; 1016 1017 sa->u.sockaddr_un.sun_family = AF_UNIX; 1018 nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length); 1019 1020 return sa; 1021 1022 #else /* !(NXT_HAVE_UNIX_DOMAIN) */ 1023 1024 nxt_log(task, NXT_LOG_CRIT, "unix domain socket \"%V\" is not supported", 1025 addr); 1026 1027 return NULL; 1028 1029 #endif 1030 } 1031 1032 1033 static nxt_sockaddr_t * 1034 nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, nxt_mp_t *mp, 1035 nxt_str_t *addr) 1036 { 1037 #if (NXT_INET6) 1038 u_char *p, *addr_end; 1039 size_t length; 1040 nxt_int_t port; 1041 nxt_sockaddr_t *sa; 1042 struct in6_addr *in6_addr; 1043 1044 length = addr->length - 1; 1045 p = addr->start + 1; 1046 1047 addr_end = nxt_memchr(p, ']', length); 1048 1049 if (addr_end == NULL) { 1050 goto invalid_address; 1051 } 1052 1053 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6), 1054 NXT_INET6_ADDR_STR_LEN); 1055 if (nxt_slow_path(sa == NULL)) { 1056 return NULL; 1057 } 1058 1059 in6_addr = &sa->u.sockaddr_in6.sin6_addr; 1060 1061 if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) { 1062 goto invalid_address; 1063 } 1064 1065 port = 0; 1066 p = addr_end + 1; 1067 length = (p + length) - p; 1068 1069 if (length == 0) { 1070 goto found; 1071 } 1072 1073 if (*p == ':') { 1074 port = nxt_int_parse(p + 1, length - 1); 1075 1076 if (port >= 1 && port <= 65535) { 1077 goto found; 1078 } 1079 } 1080 1081 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr); 1082 1083 return NULL; 1084 1085 found: 1086 1087 sa->type = SOCK_STREAM; 1088 1089 sa->u.sockaddr_in6.sin6_family = AF_INET6; 1090 sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port); 1091 1092 return sa; 1093 1094 invalid_address: 1095 1096 nxt_log(task, NXT_LOG_CRIT, "invalid IPv6 address in \"%V\"", addr); 1097 1098 return NULL; 1099 1100 #else 1101 1102 nxt_log(task, NXT_LOG_CRIT, "IPv6 socket \"%V\" is not supported", addr); 1103 1104 return NULL; 1105 1106 #endif 1107 } 1108 1109 1110 static nxt_sockaddr_t * 1111 nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, nxt_mp_t *mp, 1112 nxt_str_t *string) 1113 { 1114 u_char *p, *ip; 1115 size_t length; 1116 in_addr_t addr; 1117 nxt_int_t port; 1118 nxt_sockaddr_t *sa; 1119 1120 addr = INADDR_ANY; 1121 1122 length = string->length; 1123 ip = string->start; 1124 1125 p = nxt_memchr(ip, ':', length); 1126 1127 if (p == NULL) { 1128 1129 /* single value port, or address */ 1130 1131 port = nxt_int_parse(ip, length); 1132 1133 if (port > 0) { 1134 /* "*:XX" */ 1135 1136 if (port < 1 || port > 65535) { 1137 goto invalid_port; 1138 } 1139 1140 } else { 1141 /* "x.x.x.x" */ 1142 1143 addr = nxt_inet_addr(ip, length); 1144 1145 if (addr == INADDR_NONE) { 1146 goto invalid_port; 1147 } 1148 1149 port = 8080; 1150 } 1151 1152 } else { 1153 1154 /* x.x.x.x:XX */ 1155 1156 p++; 1157 length = (ip + length) - p; 1158 port = nxt_int_parse(p, length); 1159 1160 if (port < 1 || port > 65535) { 1161 goto invalid_port; 1162 } 1163 1164 length = (p - 1) - ip; 1165 1166 if (length != 1 || ip[0] != '*') { 1167 addr = nxt_inet_addr(ip, length); 1168 1169 if (addr == INADDR_NONE) { 1170 goto invalid_addr; 1171 } 1172 1173 /* "x.x.x.x:XX" */ 1174 } 1175 } 1176 1177 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in), 1178 NXT_INET_ADDR_STR_LEN); 1179 if (nxt_slow_path(sa == NULL)) { 1180 return NULL; 1181 } 1182 1183 sa->type = SOCK_STREAM; 1184 1185 sa->u.sockaddr_in.sin_family = AF_INET; 1186 sa->u.sockaddr_in.sin_port = htons((in_port_t) port); 1187 sa->u.sockaddr_in.sin_addr.s_addr = addr; 1188 1189 return sa; 1190 1191 invalid_port: 1192 1193 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", string); 1194 1195 return NULL; 1196 1197 invalid_addr: 1198 1199 nxt_log(task, NXT_LOG_CRIT, "invalid address in \"%V\"", string); 1200 1201 return NULL; 1202 } 1203 1204 1205 nxt_listen_socket_t * 1206 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) 1207 { 1208 nxt_mp_t *mp; 1209 nxt_listen_socket_t *ls; 1210 1211 ls = nxt_array_zero_add(rt->listen_sockets); 1212 if (ls == NULL) { 1213 return NULL; 1214 } 1215 1216 mp = rt->mem_pool; 1217 1218 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, 1219 sa->length); 1220 if (ls->sockaddr == NULL) { 1221 return NULL; 1222 } 1223 1224 ls->sockaddr->type = sa->type; 1225 1226 nxt_sockaddr_text(ls->sockaddr); 1227 1228 ls->socket = -1; 1229 ls->backlog = NXT_LISTEN_BACKLOG; 1230 1231 return ls; 1232 } 1233 1234 1235 static nxt_int_t 1236 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) 1237 { 1238 size_t length; 1239 char hostname[NXT_MAXHOSTNAMELEN + 1]; 1240 1241 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { 1242 nxt_log(task, NXT_LOG_CRIT, "gethostname() failed %E", nxt_errno); 1243 return NXT_ERROR; 1244 } 1245 1246 /* 1247 * Linux gethostname(2): 1248 * 1249 * If the null-terminated hostname is too large to fit, 1250 * then the name is truncated, and no error is returned. 1251 * 1252 * For this reason an additional byte is reserved in the buffer. 1253 */ 1254 hostname[NXT_MAXHOSTNAMELEN] = '\0'; 1255 1256 length = nxt_strlen(hostname); 1257 rt->hostname.length = length; 1258 1259 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length); 1260 1261 if (rt->hostname.start != NULL) { 1262 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); 1263 return NXT_OK; 1264 } 1265 1266 return NXT_ERROR; 1267 } 1268 1269 1270 static nxt_int_t 1271 nxt_runtime_log_files_init(nxt_runtime_t *rt) 1272 { 1273 nxt_file_t *file; 1274 nxt_list_t *log_files; 1275 1276 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); 1277 1278 if (nxt_fast_path(log_files != NULL)) { 1279 rt->log_files = log_files; 1280 1281 /* Preallocate the main error_log. This allocation cannot fail. */ 1282 file = nxt_list_zero_add(log_files); 1283 1284 file->fd = NXT_FILE_INVALID; 1285 file->log_level = NXT_LOG_CRIT; 1286 1287 return NXT_OK; 1288 } 1289 1290 return NXT_ERROR; 1291 } 1292 1293 1294 nxt_file_t * 1295 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) 1296 { 1297 nxt_int_t ret; 1298 nxt_str_t *prefix; 1299 nxt_file_t *file; 1300 nxt_file_name_str_t file_name; 1301 1302 prefix = nxt_file_name_is_absolute(name->start) ? NULL : rt->prefix; 1303 1304 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%V%Z", 1305 prefix, name); 1306 1307 if (nxt_slow_path(ret != NXT_OK)) { 1308 return NULL; 1309 } 1310 1311 nxt_list_each(file, rt->log_files) { 1312 1313 /* STUB: hardecoded case sensitive/insensitive. */ 1314 1315 if (file->name != NULL 1316 && nxt_file_name_eq(file->name, file_name.start)) 1317 { 1318 return file; 1319 } 1320 1321 } nxt_list_loop; 1322 1323 file = nxt_list_zero_add(rt->log_files); 1324 1325 if (nxt_slow_path(file == NULL)) { 1326 return NULL; 1327 } 1328 1329 file->fd = NXT_FILE_INVALID; 1330 file->log_level = NXT_LOG_CRIT; 1331 file->name = file_name.start; 1332 1333 return file; 1334 } 1335 1336 1337 static nxt_int_t 1338 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) 1339 { 1340 nxt_int_t ret; 1341 nxt_file_t *file; 1342 1343 nxt_list_each(file, rt->log_files) { 1344 1345 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, 1346 NXT_FILE_OWNER_ACCESS); 1347 1348 if (ret != NXT_OK) { 1349 return NXT_ERROR; 1350 } 1351 1352 } nxt_list_loop; 1353 1354 file = nxt_list_first(rt->log_files); 1355 1356 return nxt_file_stderr(file); 1357 } 1358 1359 1360 nxt_int_t 1361 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) 1362 { 1363 nxt_int_t ret; 1364 nxt_uint_t c, p, ncurr, nprev; 1365 nxt_listen_socket_t *curr, *prev; 1366 1367 curr = rt->listen_sockets->elts; 1368 ncurr = rt->listen_sockets->nelts; 1369 1370 if (rt->inherited_sockets != NULL) { 1371 prev = rt->inherited_sockets->elts; 1372 nprev = rt->inherited_sockets->nelts; 1373 1374 } else { 1375 prev = NULL; 1376 nprev = 0; 1377 } 1378 1379 for (c = 0; c < ncurr; c++) { 1380 1381 for (p = 0; p < nprev; p++) { 1382 1383 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { 1384 1385 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); 1386 if (ret != NXT_OK) { 1387 return NXT_ERROR; 1388 } 1389 1390 goto next; 1391 } 1392 } 1393 1394 if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) { 1395 return NXT_ERROR; 1396 } 1397 1398 next: 1399 1400 continue; 1401 } 1402 1403 return NXT_OK; 1404 } 1405 1406 1407 nxt_int_t 1408 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) 1409 { 1410 nxt_uint_t i, n; 1411 nxt_listen_socket_t *ls; 1412 1413 ls = rt->listen_sockets->elts; 1414 n = rt->listen_sockets->nelts; 1415 1416 for (i = 0; i < n; i++) { 1417 if (ls[i].flags == NXT_NONBLOCK) { 1418 if (nxt_listen_event(task, &ls[i]) == NULL) { 1419 return NXT_ERROR; 1420 } 1421 } 1422 } 1423 1424 return NXT_OK; 1425 } 1426 1427 1428 nxt_str_t * 1429 nxt_current_directory(nxt_mp_t *mp) 1430 { 1431 size_t length; 1432 u_char *p; 1433 nxt_str_t *name; 1434 char buf[NXT_MAX_PATH_LEN]; 1435 1436 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); 1437 1438 if (nxt_fast_path(length != 0)) { 1439 name = nxt_str_alloc(mp, length + 1); 1440 1441 if (nxt_fast_path(name != NULL)) { 1442 p = nxt_cpymem(name->start, buf, length); 1443 *p = '/'; 1444 1445 return name; 1446 } 1447 } 1448 1449 return NULL; 1450 } 1451 1452 1453 static nxt_int_t 1454 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) 1455 { 1456 ssize_t length; 1457 nxt_int_t n; 1458 nxt_file_t file; 1459 u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE]; 1460 1461 nxt_memzero(&file, sizeof(nxt_file_t)); 1462 1463 file.name = pid_file; 1464 1465 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, 1466 NXT_FILE_DEFAULT_ACCESS); 1467 1468 if (n != NXT_OK) { 1469 return NXT_ERROR; 1470 } 1471 1472 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; 1473 1474 if (nxt_file_write(&file, pid, length, 0) != length) { 1475 return NXT_ERROR; 1476 } 1477 1478 nxt_file_close(task, &file); 1479 1480 return NXT_OK; 1481 } 1482 1483 1484 nxt_process_t * 1485 nxt_runtime_process_new(nxt_runtime_t *rt) 1486 { 1487 nxt_process_t *process; 1488 1489 /* TODO: memory failures. */ 1490 1491 process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t)); 1492 if (nxt_slow_path(process == NULL)) { 1493 return NULL; 1494 } 1495 1496 nxt_queue_init(&process->ports); 1497 1498 nxt_thread_mutex_create(&process->incoming_mutex); 1499 nxt_thread_mutex_create(&process->outgoing_mutex); 1500 nxt_thread_mutex_create(&process->cp_mutex); 1501 1502 return process; 1503 } 1504 1505 1506 static void 1507 nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) 1508 { 1509 nxt_assert(process->port_cleanups == 0); 1510 nxt_assert(process->registered == 0); 1511 1512 nxt_port_mmaps_destroy(process->incoming, 1); 1513 nxt_port_mmaps_destroy(process->outgoing, 1); 1514 1515 if (process->cp_mem_pool != NULL) { 1516 nxt_mp_thread_adopt(process->cp_mem_pool); 1517 1518 nxt_mp_destroy(process->cp_mem_pool); 1519 } 1520 1521 nxt_thread_mutex_destroy(&process->incoming_mutex); 1522 nxt_thread_mutex_destroy(&process->outgoing_mutex); 1523 nxt_thread_mutex_destroy(&process->cp_mutex); 1524 1525 nxt_mp_free(rt->mem_pool, process); 1526 } 1527 1528 1529 static nxt_int_t 1530 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 1531 { 1532 nxt_process_t *process; 1533 1534 process = data; 1535 1536 if (lhq->key.length == sizeof(nxt_pid_t) && 1537 *(nxt_pid_t *) lhq->key.start == process->pid) { 1538 return NXT_OK; 1539 } 1540 1541 return NXT_DECLINED; 1542 } 1543 1544 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 1545 NXT_LVLHSH_DEFAULT, 1546 nxt_runtime_lvlhsh_pid_test, 1547 nxt_lvlhsh_alloc, 1548 nxt_lvlhsh_free, 1549 }; 1550 1551 1552 nxt_inline void 1553 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) 1554 { 1555 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 1556 lhq->key.length = sizeof(*pid); 1557 lhq->key.start = (u_char *) pid; 1558 lhq->proto = &lvlhsh_processes_proto; 1559 } 1560 1561 1562 nxt_process_t * 1563 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) 1564 { 1565 nxt_process_t *process; 1566 nxt_lvlhsh_query_t lhq; 1567 1568 process = NULL; 1569 1570 nxt_runtime_process_lhq_pid(&lhq, &pid); 1571 1572 nxt_thread_mutex_lock(&rt->processes_mutex); 1573 1574 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1575 process = lhq.value; 1576 1577 } else { 1578 nxt_thread_log_debug("process %PI not found", pid); 1579 } 1580 1581 nxt_thread_mutex_unlock(&rt->processes_mutex); 1582 1583 return process; 1584 } 1585 1586 1587 nxt_process_t * 1588 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) 1589 { 1590 nxt_process_t *process; 1591 nxt_lvlhsh_query_t lhq; 1592 1593 nxt_runtime_process_lhq_pid(&lhq, &pid); 1594 1595 nxt_thread_mutex_lock(&rt->processes_mutex); 1596 1597 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1598 nxt_thread_log_debug("process %PI found", pid); 1599 1600 nxt_thread_mutex_unlock(&rt->processes_mutex); 1601 return lhq.value; 1602 } 1603 1604 process = nxt_runtime_process_new(rt); 1605 if (nxt_slow_path(process == NULL)) { 1606 return NULL; 1607 } 1608 1609 process->pid = pid; 1610 1611 lhq.replace = 0; 1612 lhq.value = process; 1613 lhq.pool = rt->mem_pool; 1614 1615 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1616 1617 case NXT_OK: 1618 if (rt->nprocesses == 0) { 1619 rt->mprocess = process; 1620 } 1621 1622 rt->nprocesses++; 1623 1624 process->registered = 1; 1625 1626 nxt_thread_log_debug("process %PI insert", pid); 1627 break; 1628 1629 default: 1630 nxt_thread_log_debug("process %PI insert failed", pid); 1631 break; 1632 } 1633 1634 nxt_thread_mutex_unlock(&rt->processes_mutex); 1635 1636 return process; 1637 } 1638 1639 1640 void 1641 nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) 1642 { 1643 nxt_port_t *port; 1644 nxt_lvlhsh_query_t lhq; 1645 1646 nxt_assert(process->registered == 0); 1647 1648 nxt_runtime_process_lhq_pid(&lhq, &process->pid); 1649 1650 lhq.replace = 0; 1651 lhq.value = process; 1652 lhq.pool = rt->mem_pool; 1653 1654 nxt_thread_mutex_lock(&rt->processes_mutex); 1655 1656 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1657 1658 case NXT_OK: 1659 if (rt->nprocesses == 0) { 1660 rt->mprocess = process; 1661 } 1662 1663 rt->nprocesses++; 1664 1665 nxt_process_port_each(process, port) { 1666 1667 port->pid = process->pid; 1668 1669 nxt_runtime_port_add(rt, port); 1670 1671 } nxt_process_port_loop; 1672 1673 process->registered = 1; 1674 1675 nxt_thread_log_debug("process %PI added", process->pid); 1676 break; 1677 1678 default: 1679 nxt_thread_log_debug("process %PI failed to add", process->pid); 1680 break; 1681 } 1682 1683 nxt_thread_mutex_unlock(&rt->processes_mutex); 1684 } 1685 1686 1687 static nxt_process_t * 1688 nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) 1689 { 1690 nxt_process_t *process; 1691 nxt_lvlhsh_query_t lhq; 1692 1693 process = NULL; 1694 1695 nxt_runtime_process_lhq_pid(&lhq, &pid); 1696 1697 lhq.pool = rt->mem_pool; 1698 1699 nxt_thread_mutex_lock(&rt->processes_mutex); 1700 1701 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { 1702 1703 case NXT_OK: 1704 rt->nprocesses--; 1705 1706 process = lhq.value; 1707 1708 process->registered = 0; 1709 1710 nxt_thread_log_debug("process %PI removed", pid); 1711 break; 1712 1713 default: 1714 nxt_thread_log_debug("process %PI remove failed", pid); 1715 break; 1716 } 1717 1718 nxt_thread_mutex_unlock(&rt->processes_mutex); 1719 1720 return process; 1721 } 1722 1723 1724 void 1725 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) 1726 { 1727 nxt_port_t *port; 1728 1729 if (process->port_cleanups == 0) { 1730 if (process->registered == 1) { 1731 nxt_runtime_process_remove_pid(rt, process->pid); 1732 } 1733 1734 nxt_runtime_process_destroy(rt, process); 1735 1736 } else { 1737 nxt_process_port_each(process, port) { 1738 1739 nxt_runtime_port_remove(rt, port); 1740 1741 nxt_port_release(port); 1742 1743 } nxt_process_port_loop; 1744 } 1745 } 1746 1747 1748 nxt_process_t * 1749 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1750 { 1751 nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t)); 1752 1753 lhe->proto = &lvlhsh_processes_proto; 1754 1755 return nxt_runtime_process_next(rt, lhe); 1756 } 1757 1758 1759 nxt_port_t * 1760 nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1761 { 1762 return nxt_port_hash_first(&rt->ports, lhe); 1763 } 1764 1765 1766 void 1767 nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) 1768 { 1769 nxt_port_hash_add(&rt->ports, rt->mem_pool, port); 1770 1771 rt->port_by_type[port->type] = port; 1772 } 1773 1774 1775 void 1776 nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) 1777 { 1778 nxt_port_hash_remove(&rt->ports, rt->mem_pool, port); 1779 1780 if (rt->port_by_type[port->type] == port) { 1781 rt->port_by_type[port->type] = NULL; 1782 } 1783 } 1784 1785 1786 nxt_port_t * 1787 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, 1788 nxt_port_id_t port_id) 1789 { 1790 return nxt_port_hash_find(&rt->ports, pid, port_id); 1791 } 1792