Deleted
Added
nxt_runtime.c (1545:78836321a126) | nxt_runtime.c (1548:a745db447e56) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_main.h> 9#include <nxt_runtime.h> 10#include <nxt_port.h> 11#include <nxt_main_process.h> 12#include <nxt_router.h> 13 14 15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, 16 nxt_runtime_t *rt); 17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, 18 nxt_runtime_t *rt); 19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); 20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); 21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); 22static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status); 23static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); 24static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt); 25static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); 26static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, 27 nxt_runtime_t *rt); 28static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt); 29static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt); 30static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt); 31static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt); 32static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task, 33 nxt_runtime_t *rt); 34static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task, 35 nxt_file_name_t *pid_file); 36static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 37 nxt_runtime_cont_t cont); 38static void nxt_runtime_thread_pool_init(void); 39static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, 40 void *data); 41static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); 42static void nxt_runtime_process_remove(nxt_runtime_t *rt, 43 nxt_process_t *process); 44static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); 45 46 47nxt_int_t 48nxt_runtime_create(nxt_task_t *task) 49{ 50 nxt_mp_t *mp; 51 nxt_int_t ret; 52 nxt_array_t *listen_sockets; 53 nxt_runtime_t *rt; 54 nxt_app_lang_module_t *lang; 55 56 mp = nxt_mp_create(1024, 128, 256, 32); 57 if (nxt_slow_path(mp == NULL)) { 58 return NXT_ERROR; 59 } 60 61 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t)); 62 if (nxt_slow_path(rt == NULL)) { 63 goto fail; 64 } 65 66 task->thread->runtime = rt; 67 rt->mem_pool = mp; 68 69 nxt_thread_mutex_create(&rt->processes_mutex); 70 71 rt->services = nxt_services_init(mp); 72 if (nxt_slow_path(rt->services == NULL)) { 73 goto fail; 74 } 75 76 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t)); 77 if (nxt_slow_path(rt->languages == NULL)) { 78 goto fail; 79 } 80 81 /* Should not fail. */ 82 lang = nxt_array_add(rt->languages); 83 lang->type = NXT_APP_EXTERNAL; 84 lang->version = (u_char *) ""; 85 lang->file = NULL; 86 lang->module = &nxt_external_module; 87 lang->mounts = NULL; 88 89 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); 90 if (nxt_slow_path(listen_sockets == NULL)) { 91 goto fail; 92 } 93 94 rt->listen_sockets = listen_sockets; 95 96 ret = nxt_runtime_inherited_listen_sockets(task, rt); 97 if (nxt_slow_path(ret != NXT_OK)) { 98 goto fail; 99 } 100 101 if (nxt_runtime_hostname(task, rt) != NXT_OK) { 102 goto fail; 103 } 104 105 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { 106 goto fail; 107 } 108 109 if (nxt_runtime_event_engines(task, rt) != NXT_OK) { 110 goto fail; 111 } 112 113 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { 114 goto fail; 115 } 116 117 rt->start = nxt_runtime_initial_start; 118 119 if (nxt_runtime_conf_init(task, rt) != NXT_OK) { 120 goto fail; 121 } 122 123 if (nxt_port_rpc_init() != NXT_OK) { 124 goto fail; 125 } 126 127 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 128 nxt_runtime_start, task, rt, NULL); 129 130 return NXT_OK; 131 132fail: 133 134 nxt_mp_destroy(mp); 135 136 return NXT_ERROR; 137} 138 139 140static nxt_int_t 141nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 142{ 143 u_char *v, *p; 144 nxt_int_t type; 145 nxt_array_t *inherited_sockets; 146 nxt_socket_t s; 147 nxt_listen_socket_t *ls; 148 149 v = (u_char *) getenv("NGINX"); 150 151 if (v == NULL) { 152 return nxt_runtime_systemd_listen_sockets(task, rt); 153 } 154 155 nxt_alert(task, "using inherited listen sockets: %s", v); 156 157 inherited_sockets = nxt_array_create(rt->mem_pool, 158 1, sizeof(nxt_listen_socket_t)); 159 if (inherited_sockets == NULL) { 160 return NXT_ERROR; 161 } 162 163 rt->inherited_sockets = inherited_sockets; 164 165 for (p = v; *p != '\0'; p++) { 166 167 if (*p == ';') { 168 s = nxt_int_parse(v, p - v); 169 170 if (nxt_slow_path(s < 0)) { 171 nxt_alert(task, "invalid socket number \"%s\" " 172 "in NGINX environment variable, " 173 "ignoring the rest of the variable", v); 174 return NXT_ERROR; 175 } 176 177 v = p + 1; 178 179 ls = nxt_array_zero_add(inherited_sockets); 180 if (nxt_slow_path(ls == NULL)) { 181 return NXT_ERROR; 182 } 183 184 ls->socket = s; 185 186 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 187 if (nxt_slow_path(ls->sockaddr == NULL)) { 188 return NXT_ERROR; 189 } 190 191 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); 192 if (nxt_slow_path(type == -1)) { 193 return NXT_ERROR; 194 } 195 196 ls->sockaddr->type = (uint16_t) type; 197 } 198 } 199 200 return NXT_OK; 201} 202 203 204static nxt_int_t 205nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 206{ 207 u_char *nfd, *pid; 208 nxt_int_t n; 209 nxt_array_t *inherited_sockets; 210 nxt_socket_t s; 211 nxt_listen_socket_t *ls; 212 213 /* 214 * Number of listening sockets passed. The socket 215 * descriptors start from number 3 and are sequential. 216 */ 217 nfd = (u_char *) getenv("LISTEN_FDS"); 218 if (nfd == NULL) { 219 return NXT_OK; 220 } 221 222 /* The pid of the service process. */ 223 pid = (u_char *) getenv("LISTEN_PID"); 224 if (pid == NULL) { 225 return NXT_OK; 226 } 227 228 n = nxt_int_parse(nfd, nxt_strlen(nfd)); 229 if (n < 0) { 230 return NXT_OK; 231 } 232 233 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { 234 return NXT_OK; 235 } 236 237 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n); 238 239 inherited_sockets = nxt_array_create(rt->mem_pool, 240 n, sizeof(nxt_listen_socket_t)); 241 if (inherited_sockets == NULL) { 242 return NXT_ERROR; 243 } 244 245 rt->inherited_sockets = inherited_sockets; 246 247 for (s = 3; s < n; s++) { 248 ls = nxt_array_zero_add(inherited_sockets); 249 if (nxt_slow_path(ls == NULL)) { 250 return NXT_ERROR; 251 } 252 253 ls->socket = s; 254 255 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 256 if (nxt_slow_path(ls->sockaddr == NULL)) { 257 return NXT_ERROR; 258 } 259 260 ls->sockaddr->type = SOCK_STREAM; 261 } 262 263 return NXT_OK; 264} 265 266 267static nxt_int_t 268nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) 269{ 270 nxt_thread_t *thread; 271 nxt_event_engine_t *engine; 272 const nxt_event_interface_t *interface; 273 274 interface = nxt_service_get(rt->services, "engine", NULL); 275 276 if (nxt_slow_path(interface == NULL)) { 277 /* TODO: log */ 278 return NXT_ERROR; 279 } 280 281 engine = nxt_event_engine_create(task, interface, 282 nxt_main_process_signals, 0, 0); 283 284 if (nxt_slow_path(engine == NULL)) { 285 return NXT_ERROR; 286 } 287 288 thread = task->thread; 289 thread->engine = engine; 290#if 0 291 thread->fiber = &engine->fibers->fiber; 292#endif 293 294 engine->id = rt->last_engine_id++; 295 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32); 296 297 nxt_queue_init(&rt->engines); 298 nxt_queue_insert_tail(&rt->engines, &engine->link); 299 300 return NXT_OK; 301} 302 303 304static nxt_int_t 305nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) 306{ 307 nxt_int_t ret; 308 nxt_array_t *thread_pools; 309 310 thread_pools = nxt_array_create(rt->mem_pool, 1, 311 sizeof(nxt_thread_pool_t *)); 312 313 if (nxt_slow_path(thread_pools == NULL)) { 314 return NXT_ERROR; 315 } 316 317 rt->thread_pools = thread_pools; 318 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); 319 320 if (nxt_slow_path(ret != NXT_OK)) { 321 return NXT_ERROR; 322 } 323 324 return NXT_OK; 325} 326 327 328static void 329nxt_runtime_start(nxt_task_t *task, void *obj, void *data) 330{ 331 nxt_runtime_t *rt; 332 333 rt = obj; 334 335 nxt_debug(task, "rt conf done"); 336 337 task->thread->log->ctx_handler = NULL; 338 task->thread->log->ctx = NULL; 339 340 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { 341 goto fail; 342 } 343 344 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 345 goto fail; 346 } 347 348 /* 349 * Thread pools should be destroyed before starting worker 350 * processes, because thread pool semaphores will stick in 351 * locked state in new processes after fork(). 352 */ 353 nxt_runtime_thread_pool_destroy(task, rt, rt->start); 354 355 return; 356 357fail: 358 359 nxt_runtime_quit(task, 1); 360} 361 362 363static void 364nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status) 365{ 366 nxt_int_t ret; 367 nxt_thread_t *thr; 368 nxt_runtime_t *rt; 369 const nxt_event_interface_t *interface; 370 371 thr = task->thread; 372 rt = thr->runtime; 373 374 if (rt->inherited_sockets == NULL && rt->daemon) { 375 376 if (nxt_process_daemon(task) != NXT_OK) { 377 goto fail; 378 } 379 380 /* 381 * An event engine should be updated after fork() 382 * even if an event facility was not changed because: 383 * 1) inherited kqueue descriptor is invalid, 384 * 2) the signal thread is not inherited. 385 */ 386 interface = nxt_service_get(rt->services, "engine", rt->engine); 387 if (interface == NULL) { 388 goto fail; 389 } 390 391 ret = nxt_event_engine_change(task->thread->engine, interface, 392 rt->batch); 393 if (ret != NXT_OK) { 394 goto fail; 395 } 396 } 397 398 ret = nxt_runtime_pid_file_create(task, rt->pid_file); 399 if (ret != NXT_OK) { 400 goto fail; 401 } 402 403 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 404 goto fail; 405 } 406 407 thr->engine->max_connections = rt->engine_connections; 408 409 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) { 410 return; 411 } 412 413fail: 414 415 nxt_runtime_quit(task, 1); 416} 417 418 419void 420nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status) 421{ 422 nxt_bool_t done; 423 nxt_runtime_t *rt; 424 nxt_event_engine_t *engine; 425 426 rt = task->thread->runtime; 427 rt->status |= status; 428 engine = task->thread->engine; 429 430 nxt_debug(task, "exiting"); 431 432 done = 1; 433 434 if (!engine->shutdown) { 435 engine->shutdown = 1; 436 437 if (!nxt_array_is_empty(rt->thread_pools)) { 438 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); 439 done = 0; 440 } 441 442 if (rt->type == NXT_PROCESS_MAIN) { 443 nxt_runtime_stop_all_processes(task, rt); 444 done = 0; 445 } 446 } 447 448 nxt_runtime_close_idle_connections(engine); 449 450 if (done) { 451 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, 452 task, rt, engine); 453 } 454} 455 456 457static void 458nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) 459{ 460 nxt_conn_t *c; 461 nxt_queue_t *idle; 462 nxt_queue_link_t *link, *next; 463 464 nxt_debug(&engine->task, "close idle connections"); 465 466 idle = &engine->idle_connections; 467 468 for (link = nxt_queue_first(idle); 469 link != nxt_queue_tail(idle); 470 link = next) 471 { 472 next = nxt_queue_next(link); 473 c = nxt_queue_link_data(link, nxt_conn_t, link); 474 475 if (!c->socket.read_ready) { 476 nxt_queue_remove(link); 477 nxt_conn_close(engine, c); 478 } 479 } 480} 481 482 483void 484nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt) 485{ 486 nxt_port_t *port; 487 nxt_process_t *process; 488 nxt_process_init_t *init; 489 490 nxt_runtime_process_each(rt, process) { 491 492 init = nxt_process_init(process); 493 494 if (init->type == NXT_PROCESS_APP) { 495 496 nxt_process_port_each(process, port) { 497 498 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 499 0, 0, NULL); 500 501 } nxt_process_port_loop; 502 } 503 504 } nxt_runtime_process_loop; 505} 506 507 508static void 509nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) 510{ 511 nxt_port_t *port; 512 nxt_process_t *process; 513 514 nxt_runtime_process_each(rt, process) { 515 516 nxt_process_port_each(process, port) { 517 518 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 519 0, NULL); 520 521 } nxt_process_port_loop; 522 523 } nxt_runtime_process_loop; 524} 525 526 527static void 528nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) 529{ 530 int status, engine_count; 531 nxt_runtime_t *rt; 532 nxt_process_t *process; 533 nxt_event_engine_t *engine; 534 535 rt = obj; 536 engine = data; 537 538 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); 539 540 if (!nxt_array_is_empty(rt->thread_pools)) { 541 return; 542 } 543 544 if (rt->type == NXT_PROCESS_MAIN) { 545 if (rt->pid_file != NULL) { 546 nxt_file_delete(rt->pid_file); 547 } 548 549#if (NXT_HAVE_UNIX_DOMAIN) 550 { 551 nxt_sockaddr_t *sa; 552 nxt_file_name_t *name; 553 554 sa = rt->controller_listen; 555 556 if (sa->u.sockaddr.sa_family == AF_UNIX) { 557 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path; 558 (void) nxt_file_delete(name); 559 } 560 } 561#endif 562 } 563 564 if (!engine->event.signal_support) { 565 nxt_event_engine_signals_stop(engine); 566 } 567 568 nxt_runtime_process_each(rt, process) { 569 570 nxt_process_close_ports(task, process); 571 572 } nxt_runtime_process_loop; 573 574 status = rt->status; 575 576 engine_count = 0; 577 578 nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) { 579 580 engine_count++; 581 582 } nxt_queue_loop; 583 584 if (engine_count <= 1) { 585 if (rt->port_by_type[rt->type] != NULL) { 586 nxt_port_use(task, rt->port_by_type[rt->type], -1); 587 } 588 589 nxt_thread_mutex_destroy(&rt->processes_mutex); 590 591 nxt_mp_destroy(rt->mem_pool); 592 } 593 594 nxt_debug(task, "exit: %d", status); 595 596 exit(status); 597 nxt_unreachable(); 598} 599 600 601static nxt_int_t 602nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) 603{ 604 nxt_event_engine_t *engine; 605 const nxt_event_interface_t *interface; 606 607 engine = task->thread->engine; 608 609 if (engine->batch == rt->batch 610 && nxt_strcmp(engine->event.name, rt->engine) == 0) 611 { 612 return NXT_OK; 613 } 614 615 interface = nxt_service_get(rt->services, "engine", rt->engine); 616 617 if (interface != NULL) { 618 return nxt_event_engine_change(engine, interface, rt->batch); 619 } 620 621 return NXT_ERROR; 622} 623 624 625void 626nxt_runtime_event_engine_free(nxt_runtime_t *rt) 627{ 628 nxt_queue_link_t *link; 629 nxt_event_engine_t *engine; 630 631 link = nxt_queue_first(&rt->engines); 632 nxt_queue_remove(link); 633 634 engine = nxt_queue_link_data(link, nxt_event_engine_t, link); 635 nxt_event_engine_free(engine); 636} 637 638 639nxt_int_t 640nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, 641 nxt_uint_t max_threads, nxt_nsec_t timeout) 642{ 643 nxt_thread_pool_t *thread_pool, **tp; 644 645 tp = nxt_array_add(rt->thread_pools); 646 if (tp == NULL) { 647 return NXT_ERROR; 648 } 649 650 thread_pool = nxt_thread_pool_create(max_threads, timeout, 651 nxt_runtime_thread_pool_init, 652 thr->engine, 653 nxt_runtime_thread_pool_exit); 654 655 if (nxt_fast_path(thread_pool != NULL)) { 656 *tp = thread_pool; 657 } 658 659 return NXT_OK; 660} 661 662 663static void 664nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 665 nxt_runtime_cont_t cont) 666{ 667 nxt_uint_t n; 668 nxt_thread_pool_t **tp; 669 670 rt->continuation = cont; 671 672 n = rt->thread_pools->nelts; 673 674 if (n == 0) { 675 cont(task, 0); 676 return; 677 } 678 679 tp = rt->thread_pools->elts; 680 681 do { 682 nxt_thread_pool_destroy(*tp); 683 684 tp++; 685 n--; 686 } while (n != 0); 687} 688 689 690static void 691nxt_runtime_thread_pool_init(void) 692{ 693#if (NXT_REGEX) 694 nxt_regex_init(0); 695#endif 696} 697 698 699static void 700nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 701{ 702 nxt_uint_t i, n; 703 nxt_runtime_t *rt; 704 nxt_thread_pool_t *tp, **thread_pools; 705 nxt_thread_handle_t handle; 706 707 tp = obj; 708 709 if (data != NULL) { 710 handle = (nxt_thread_handle_t) (uintptr_t) data; 711 nxt_thread_wait(handle); 712 } 713 714 rt = task->thread->runtime; 715 716 thread_pools = rt->thread_pools->elts; 717 n = rt->thread_pools->nelts; 718 719 nxt_debug(task, "thread pools: %ui", n); 720 721 for (i = 0; i < n; i++) { 722 723 if (tp == thread_pools[i]) { 724 nxt_array_remove(rt->thread_pools, &thread_pools[i]); 725 726 nxt_free(tp); 727 728 if (n == 1) { 729 /* The last thread pool. */ 730 rt->continuation(task, 0); 731 } 732 733 return; 734 } 735 } 736} 737 738 739static nxt_int_t 740nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) 741{ 742 nxt_int_t ret; 743 nxt_str_t control; 744 nxt_uint_t n; 745 nxt_file_t *file; 746 const char *slash; 747 nxt_sockaddr_t *sa; 748 nxt_file_name_str_t file_name; 749 const nxt_event_interface_t *interface; 750 751 rt->daemon = 1; 752 rt->engine_connections = 256; 753 rt->auxiliary_threads = 2; 754 rt->user_cred.user = NXT_USER; 755 rt->group = NXT_GROUP; 756 rt->pid = NXT_PID; 757 rt->log = NXT_LOG; 758 rt->modules = NXT_MODULES; 759 rt->state = NXT_STATE; 760 rt->control = NXT_CONTROL_SOCK; 761 rt->tmp = NXT_TMP; 762 763 nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t)); 764 765 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { 766 return NXT_ERROR; 767 } 768 769 if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) { 770 return NXT_ERROR; 771 } 772 773 if (rt->capabilities.setid) { 774 ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred, 775 rt->group); 776 777 if (nxt_slow_path(ret != NXT_OK)) { 778 return NXT_ERROR; 779 } 780 781 } else { 782 nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it " 783 "cannot use arbitrary user and group."); 784 } 785 786 /* An engine's parameters. */ 787 788 interface = nxt_service_get(rt->services, "engine", rt->engine); 789 if (interface == NULL) { 790 return NXT_ERROR; 791 } 792 793 rt->engine = interface->name; 794 795 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid); 796 if (nxt_slow_path(ret != NXT_OK)) { 797 return NXT_ERROR; 798 } 799 800 rt->pid_file = file_name.start; 801 802 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log); 803 if (nxt_slow_path(ret != NXT_OK)) { 804 return NXT_ERROR; 805 } 806 807 file = nxt_list_first(rt->log_files); 808 file->name = file_name.start; 809 810 slash = ""; 811 n = nxt_strlen(rt->modules); 812 813 if (n > 1 && rt->modules[n - 1] != '/') { 814 slash = "/"; 815 } 816 817 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z", 818 rt->modules, slash); 819 if (nxt_slow_path(ret != NXT_OK)) { 820 return NXT_ERROR; 821 } 822 823 rt->modules = (char *) file_name.start; 824 825 slash = ""; 826 n = nxt_strlen(rt->state); 827 828 if (n > 1 && rt->state[n - 1] != '/') { 829 slash = "/"; 830 } 831 832 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z", 833 rt->state, slash); 834 if (nxt_slow_path(ret != NXT_OK)) { 835 return NXT_ERROR; 836 } 837 838 rt->conf = (char *) file_name.start; 839 840 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf); 841 if (nxt_slow_path(ret != NXT_OK)) { 842 return NXT_ERROR; 843 } 844 845 rt->conf_tmp = (char *) file_name.start; 846 847 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z", 848 rt->state, slash); 849 if (nxt_slow_path(ret != NXT_OK)) { 850 return NXT_ERROR; 851 } 852 853 ret = mkdir((char *) file_name.start, S_IRWXU); 854 855 if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) { 856 rt->certs.length = file_name.len; 857 rt->certs.start = file_name.start; 858 859 } else { 860 nxt_alert(task, "Unable to create certificates storage directory: " 861 "mkdir(%s) failed %E", file_name.start, nxt_errno); 862 } 863 864 control.length = nxt_strlen(rt->control); 865 control.start = (u_char *) rt->control; 866 867 sa = nxt_sockaddr_parse(rt->mem_pool, &control); 868 if (nxt_slow_path(sa == NULL)) { 869 return NXT_ERROR; 870 } 871 872 sa->type = SOCK_STREAM; 873 874 rt->controller_listen = sa; 875 876 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { 877 return NXT_ERROR; 878 } 879 880 return NXT_OK; 881} 882 883 884static nxt_int_t 885nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) 886{ 887 char *p, **argv; 888 u_char *end; 889 u_char buf[1024]; 890 891 static const char version[] = 892 "unit version: " NXT_VERSION "\n" 893 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n"; 894 895 static const char no_control[] = 896 "option \"--control\" requires socket address\n"; 897 static const char no_user[] = "option \"--user\" requires username\n"; 898 static const char no_group[] = "option \"--group\" requires group name\n"; 899 static const char no_pid[] = "option \"--pid\" requires filename\n"; 900 static const char no_log[] = "option \"--log\" requires filename\n"; 901 static const char no_modules[] = 902 "option \"--modules\" requires directory\n"; 903 static const char no_state[] = "option \"--state\" requires directory\n"; 904 static const char no_tmp[] = "option \"--tmp\" requires directory\n"; 905 906 static const char help[] = 907 "\n" 908 "unit options:\n" 909 "\n" 910 " --version print unit version and configure options\n" 911 "\n" 912 " --no-daemon run unit in non-daemon mode\n" 913 "\n" 914 " --control ADDRESS set address of control API socket\n" 915 " default: \"" NXT_CONTROL_SOCK "\"\n" 916 "\n" 917 " --pid FILE set pid filename\n" 918 " default: \"" NXT_PID "\"\n" 919 "\n" 920 " --log FILE set log filename\n" 921 " default: \"" NXT_LOG "\"\n" 922 "\n" 923 " --modules DIRECTORY set modules directory name\n" 924 " default: \"" NXT_MODULES "\"\n" 925 "\n" 926 " --state DIRECTORY set state directory name\n" 927 " default: \"" NXT_STATE "\"\n" 928 "\n" 929 " --tmp DIRECTORY set tmp directory name\n" 930 " default: \"" NXT_TMP "\"\n" 931 "\n" 932 " --user USER set non-privileged processes to run" 933 " as specified user\n" 934 " default: \"" NXT_USER "\"\n" 935 "\n" 936 " --group GROUP set non-privileged processes to run" 937 " as specified group\n" 938 " default: "; 939 940 static const char group[] = "\"" NXT_GROUP "\"\n\n"; 941 static const char primary[] = "user's primary group\n\n"; 942 943 argv = &nxt_process_argv[1]; 944 945 while (*argv != NULL) { 946 p = *argv++; 947 948 if (nxt_strcmp(p, "--control") == 0) { 949 if (*argv == NULL) { 950 write(STDERR_FILENO, no_control, nxt_length(no_control)); 951 return NXT_ERROR; 952 } 953 954 p = *argv++; 955 956 rt->control = p; 957 958 continue; 959 } 960 961 if (nxt_strcmp(p, "--user") == 0) { 962 if (*argv == NULL) { 963 write(STDERR_FILENO, no_user, nxt_length(no_user)); 964 return NXT_ERROR; 965 } 966 967 p = *argv++; 968 969 rt->user_cred.user = p; 970 971 continue; 972 } 973 974 if (nxt_strcmp(p, "--group") == 0) { 975 if (*argv == NULL) { 976 write(STDERR_FILENO, no_group, nxt_length(no_group)); 977 return NXT_ERROR; 978 } 979 980 p = *argv++; 981 982 rt->group = p; 983 984 continue; 985 } 986 987 if (nxt_strcmp(p, "--pid") == 0) { 988 if (*argv == NULL) { 989 write(STDERR_FILENO, no_pid, nxt_length(no_pid)); 990 return NXT_ERROR; 991 } 992 993 p = *argv++; 994 995 rt->pid = p; 996 997 continue; 998 } 999 1000 if (nxt_strcmp(p, "--log") == 0) { 1001 if (*argv == NULL) { 1002 write(STDERR_FILENO, no_log, nxt_length(no_log)); 1003 return NXT_ERROR; 1004 } 1005 1006 p = *argv++; 1007 1008 rt->log = p; 1009 1010 continue; 1011 } 1012 1013 if (nxt_strcmp(p, "--modules") == 0) { 1014 if (*argv == NULL) { 1015 write(STDERR_FILENO, no_modules, nxt_length(no_modules)); 1016 return NXT_ERROR; 1017 } 1018 1019 p = *argv++; 1020 1021 rt->modules = p; 1022 1023 continue; 1024 } 1025 1026 if (nxt_strcmp(p, "--state") == 0) { 1027 if (*argv == NULL) { 1028 write(STDERR_FILENO, no_state, nxt_length(no_state)); 1029 return NXT_ERROR; 1030 } 1031 1032 p = *argv++; 1033 1034 rt->state = p; 1035 1036 continue; 1037 } 1038 1039 if (nxt_strcmp(p, "--tmp") == 0) { 1040 if (*argv == NULL) { 1041 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp)); 1042 return NXT_ERROR; 1043 } 1044 1045 p = *argv++; 1046 1047 rt->tmp = p; 1048 1049 continue; 1050 } 1051 1052 if (nxt_strcmp(p, "--no-daemon") == 0) { 1053 rt->daemon = 0; 1054 continue; 1055 } 1056 1057 if (nxt_strcmp(p, "--version") == 0) { 1058 write(STDERR_FILENO, version, nxt_length(version)); 1059 exit(0); 1060 } 1061 1062 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) { 1063 write(STDOUT_FILENO, help, nxt_length(help)); 1064 1065 if (sizeof(NXT_GROUP) == 1) { 1066 write(STDOUT_FILENO, primary, nxt_length(primary)); 1067 1068 } else { 1069 write(STDOUT_FILENO, group, nxt_length(group)); 1070 } 1071 1072 exit(0); 1073 } 1074 1075 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", " 1076 "try \"%s -h\" for available options\n", 1077 p, nxt_process_argv[0]); 1078 1079 write(STDERR_FILENO, buf, end - buf); 1080 1081 return NXT_ERROR; 1082 } 1083 1084 return NXT_OK; 1085} 1086 1087 1088nxt_listen_socket_t * 1089nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) 1090{ 1091 nxt_mp_t *mp; 1092 nxt_listen_socket_t *ls; 1093 1094 ls = nxt_array_zero_add(rt->listen_sockets); 1095 if (ls == NULL) { 1096 return NULL; 1097 } 1098 1099 mp = rt->mem_pool; 1100 1101 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, 1102 sa->length); 1103 if (ls->sockaddr == NULL) { 1104 return NULL; 1105 } 1106 1107 ls->sockaddr->type = sa->type; 1108 1109 nxt_sockaddr_text(ls->sockaddr); 1110 1111 ls->socket = -1; 1112 ls->backlog = NXT_LISTEN_BACKLOG; 1113 1114 return ls; 1115} 1116 1117 1118static nxt_int_t 1119nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) 1120{ 1121 size_t length; 1122 char hostname[NXT_MAXHOSTNAMELEN + 1]; 1123 1124 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { 1125 nxt_alert(task, "gethostname() failed %E", nxt_errno); 1126 return NXT_ERROR; 1127 } 1128 1129 /* 1130 * Linux gethostname(2): 1131 * 1132 * If the null-terminated hostname is too large to fit, 1133 * then the name is truncated, and no error is returned. 1134 * 1135 * For this reason an additional byte is reserved in the buffer. 1136 */ 1137 hostname[NXT_MAXHOSTNAMELEN] = '\0'; 1138 1139 length = nxt_strlen(hostname); 1140 rt->hostname.length = length; 1141 1142 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length); 1143 1144 if (rt->hostname.start != NULL) { 1145 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); 1146 return NXT_OK; 1147 } 1148 1149 return NXT_ERROR; 1150} 1151 1152 1153static nxt_int_t 1154nxt_runtime_log_files_init(nxt_runtime_t *rt) 1155{ 1156 nxt_file_t *file; 1157 nxt_list_t *log_files; 1158 1159 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); 1160 1161 if (nxt_fast_path(log_files != NULL)) { 1162 rt->log_files = log_files; 1163 1164 /* Preallocate the main log. This allocation cannot fail. */ 1165 file = nxt_list_zero_add(log_files); 1166 1167 file->fd = NXT_FILE_INVALID; 1168 file->log_level = NXT_LOG_ALERT; 1169 1170 return NXT_OK; 1171 } 1172 1173 return NXT_ERROR; 1174} 1175 1176 1177nxt_file_t * 1178nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) 1179{ 1180 nxt_int_t ret; 1181 nxt_file_t *file; 1182 nxt_file_name_str_t file_name; 1183 1184 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name); 1185 1186 if (nxt_slow_path(ret != NXT_OK)) { 1187 return NULL; 1188 } 1189 1190 nxt_list_each(file, rt->log_files) { 1191 1192 /* STUB: hardecoded case sensitive/insensitive. */ 1193 1194 if (file->name != NULL 1195 && nxt_file_name_eq(file->name, file_name.start)) 1196 { 1197 return file; 1198 } 1199 1200 } nxt_list_loop; 1201 1202 file = nxt_list_zero_add(rt->log_files); 1203 1204 if (nxt_slow_path(file == NULL)) { 1205 return NULL; 1206 } 1207 1208 file->fd = NXT_FILE_INVALID; 1209 file->log_level = NXT_LOG_ALERT; 1210 file->name = file_name.start; 1211 1212 return file; 1213} 1214 1215 1216static nxt_int_t 1217nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) 1218{ 1219 nxt_int_t ret; 1220 nxt_file_t *file; 1221 1222 nxt_list_each(file, rt->log_files) { 1223 1224 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, 1225 NXT_FILE_OWNER_ACCESS); 1226 1227 if (ret != NXT_OK) { 1228 return NXT_ERROR; 1229 } 1230 1231 } nxt_list_loop; 1232 1233 file = nxt_list_first(rt->log_files); 1234 1235 return nxt_file_stderr(file); 1236} 1237 1238 1239nxt_int_t 1240nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) 1241{ 1242 nxt_int_t ret; 1243 nxt_uint_t c, p, ncurr, nprev; 1244 nxt_listen_socket_t *curr, *prev; 1245 1246 curr = rt->listen_sockets->elts; 1247 ncurr = rt->listen_sockets->nelts; 1248 1249 if (rt->inherited_sockets != NULL) { 1250 prev = rt->inherited_sockets->elts; 1251 nprev = rt->inherited_sockets->nelts; 1252 1253 } else { 1254 prev = NULL; 1255 nprev = 0; 1256 } 1257 1258 for (c = 0; c < ncurr; c++) { 1259 1260 for (p = 0; p < nprev; p++) { 1261 1262 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { 1263 1264 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); 1265 if (ret != NXT_OK) { 1266 return NXT_ERROR; 1267 } 1268 1269 goto next; 1270 } 1271 } 1272 1273 if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) { 1274 return NXT_ERROR; 1275 } 1276 1277 next: 1278 1279 continue; 1280 } 1281 1282 return NXT_OK; 1283} 1284 1285 1286nxt_int_t 1287nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) 1288{ 1289 nxt_uint_t i, n; 1290 nxt_listen_socket_t *ls; 1291 1292 ls = rt->listen_sockets->elts; 1293 n = rt->listen_sockets->nelts; 1294 1295 for (i = 0; i < n; i++) { 1296 if (ls[i].flags == NXT_NONBLOCK) { 1297 if (nxt_listen_event(task, &ls[i]) == NULL) { 1298 return NXT_ERROR; 1299 } 1300 } 1301 } 1302 1303 return NXT_OK; 1304} 1305 1306 1307nxt_str_t * 1308nxt_current_directory(nxt_mp_t *mp) 1309{ 1310 size_t length; 1311 u_char *p; 1312 nxt_str_t *name; 1313 char buf[NXT_MAX_PATH_LEN]; 1314 1315 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); 1316 1317 if (nxt_fast_path(length != 0)) { 1318 name = nxt_str_alloc(mp, length + 1); 1319 1320 if (nxt_fast_path(name != NULL)) { 1321 p = nxt_cpymem(name->start, buf, length); 1322 *p = '/'; 1323 1324 return name; 1325 } 1326 } 1327 1328 return NULL; 1329} 1330 1331 1332static nxt_int_t 1333nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) 1334{ 1335 ssize_t length; 1336 nxt_int_t n; 1337 nxt_file_t file; 1338 u_char pid[NXT_INT64_T_LEN + nxt_length("\n")]; 1339 1340 nxt_memzero(&file, sizeof(nxt_file_t)); 1341 1342 file.name = pid_file; 1343 1344 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, 1345 NXT_FILE_DEFAULT_ACCESS); 1346 1347 if (n != NXT_OK) { 1348 return NXT_ERROR; 1349 } 1350 1351 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; 1352 1353 if (nxt_file_write(&file, pid, length, 0) != length) { 1354 return NXT_ERROR; 1355 } 1356 1357 nxt_file_close(task, &file); 1358 1359 return NXT_OK; 1360} 1361 1362 1363nxt_process_t * 1364nxt_runtime_process_new(nxt_runtime_t *rt) 1365{ 1366 nxt_process_t *process; 1367 1368 /* TODO: memory failures. */ 1369 1370 process = nxt_mp_zalloc(rt->mem_pool, 1371 sizeof(nxt_process_t) + sizeof(nxt_process_init_t)); 1372 1373 if (nxt_slow_path(process == NULL)) { 1374 return NULL; 1375 } 1376 1377 nxt_queue_init(&process->ports); 1378 1379 nxt_thread_mutex_create(&process->incoming.mutex); | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_main.h> 9#include <nxt_runtime.h> 10#include <nxt_port.h> 11#include <nxt_main_process.h> 12#include <nxt_router.h> 13 14 15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, 16 nxt_runtime_t *rt); 17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, 18 nxt_runtime_t *rt); 19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); 20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); 21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); 22static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status); 23static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); 24static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt); 25static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); 26static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, 27 nxt_runtime_t *rt); 28static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt); 29static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt); 30static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt); 31static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt); 32static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task, 33 nxt_runtime_t *rt); 34static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task, 35 nxt_file_name_t *pid_file); 36static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 37 nxt_runtime_cont_t cont); 38static void nxt_runtime_thread_pool_init(void); 39static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, 40 void *data); 41static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); 42static void nxt_runtime_process_remove(nxt_runtime_t *rt, 43 nxt_process_t *process); 44static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); 45 46 47nxt_int_t 48nxt_runtime_create(nxt_task_t *task) 49{ 50 nxt_mp_t *mp; 51 nxt_int_t ret; 52 nxt_array_t *listen_sockets; 53 nxt_runtime_t *rt; 54 nxt_app_lang_module_t *lang; 55 56 mp = nxt_mp_create(1024, 128, 256, 32); 57 if (nxt_slow_path(mp == NULL)) { 58 return NXT_ERROR; 59 } 60 61 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t)); 62 if (nxt_slow_path(rt == NULL)) { 63 goto fail; 64 } 65 66 task->thread->runtime = rt; 67 rt->mem_pool = mp; 68 69 nxt_thread_mutex_create(&rt->processes_mutex); 70 71 rt->services = nxt_services_init(mp); 72 if (nxt_slow_path(rt->services == NULL)) { 73 goto fail; 74 } 75 76 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t)); 77 if (nxt_slow_path(rt->languages == NULL)) { 78 goto fail; 79 } 80 81 /* Should not fail. */ 82 lang = nxt_array_add(rt->languages); 83 lang->type = NXT_APP_EXTERNAL; 84 lang->version = (u_char *) ""; 85 lang->file = NULL; 86 lang->module = &nxt_external_module; 87 lang->mounts = NULL; 88 89 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); 90 if (nxt_slow_path(listen_sockets == NULL)) { 91 goto fail; 92 } 93 94 rt->listen_sockets = listen_sockets; 95 96 ret = nxt_runtime_inherited_listen_sockets(task, rt); 97 if (nxt_slow_path(ret != NXT_OK)) { 98 goto fail; 99 } 100 101 if (nxt_runtime_hostname(task, rt) != NXT_OK) { 102 goto fail; 103 } 104 105 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { 106 goto fail; 107 } 108 109 if (nxt_runtime_event_engines(task, rt) != NXT_OK) { 110 goto fail; 111 } 112 113 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { 114 goto fail; 115 } 116 117 rt->start = nxt_runtime_initial_start; 118 119 if (nxt_runtime_conf_init(task, rt) != NXT_OK) { 120 goto fail; 121 } 122 123 if (nxt_port_rpc_init() != NXT_OK) { 124 goto fail; 125 } 126 127 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 128 nxt_runtime_start, task, rt, NULL); 129 130 return NXT_OK; 131 132fail: 133 134 nxt_mp_destroy(mp); 135 136 return NXT_ERROR; 137} 138 139 140static nxt_int_t 141nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 142{ 143 u_char *v, *p; 144 nxt_int_t type; 145 nxt_array_t *inherited_sockets; 146 nxt_socket_t s; 147 nxt_listen_socket_t *ls; 148 149 v = (u_char *) getenv("NGINX"); 150 151 if (v == NULL) { 152 return nxt_runtime_systemd_listen_sockets(task, rt); 153 } 154 155 nxt_alert(task, "using inherited listen sockets: %s", v); 156 157 inherited_sockets = nxt_array_create(rt->mem_pool, 158 1, sizeof(nxt_listen_socket_t)); 159 if (inherited_sockets == NULL) { 160 return NXT_ERROR; 161 } 162 163 rt->inherited_sockets = inherited_sockets; 164 165 for (p = v; *p != '\0'; p++) { 166 167 if (*p == ';') { 168 s = nxt_int_parse(v, p - v); 169 170 if (nxt_slow_path(s < 0)) { 171 nxt_alert(task, "invalid socket number \"%s\" " 172 "in NGINX environment variable, " 173 "ignoring the rest of the variable", v); 174 return NXT_ERROR; 175 } 176 177 v = p + 1; 178 179 ls = nxt_array_zero_add(inherited_sockets); 180 if (nxt_slow_path(ls == NULL)) { 181 return NXT_ERROR; 182 } 183 184 ls->socket = s; 185 186 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 187 if (nxt_slow_path(ls->sockaddr == NULL)) { 188 return NXT_ERROR; 189 } 190 191 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); 192 if (nxt_slow_path(type == -1)) { 193 return NXT_ERROR; 194 } 195 196 ls->sockaddr->type = (uint16_t) type; 197 } 198 } 199 200 return NXT_OK; 201} 202 203 204static nxt_int_t 205nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) 206{ 207 u_char *nfd, *pid; 208 nxt_int_t n; 209 nxt_array_t *inherited_sockets; 210 nxt_socket_t s; 211 nxt_listen_socket_t *ls; 212 213 /* 214 * Number of listening sockets passed. The socket 215 * descriptors start from number 3 and are sequential. 216 */ 217 nfd = (u_char *) getenv("LISTEN_FDS"); 218 if (nfd == NULL) { 219 return NXT_OK; 220 } 221 222 /* The pid of the service process. */ 223 pid = (u_char *) getenv("LISTEN_PID"); 224 if (pid == NULL) { 225 return NXT_OK; 226 } 227 228 n = nxt_int_parse(nfd, nxt_strlen(nfd)); 229 if (n < 0) { 230 return NXT_OK; 231 } 232 233 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { 234 return NXT_OK; 235 } 236 237 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n); 238 239 inherited_sockets = nxt_array_create(rt->mem_pool, 240 n, sizeof(nxt_listen_socket_t)); 241 if (inherited_sockets == NULL) { 242 return NXT_ERROR; 243 } 244 245 rt->inherited_sockets = inherited_sockets; 246 247 for (s = 3; s < n; s++) { 248 ls = nxt_array_zero_add(inherited_sockets); 249 if (nxt_slow_path(ls == NULL)) { 250 return NXT_ERROR; 251 } 252 253 ls->socket = s; 254 255 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); 256 if (nxt_slow_path(ls->sockaddr == NULL)) { 257 return NXT_ERROR; 258 } 259 260 ls->sockaddr->type = SOCK_STREAM; 261 } 262 263 return NXT_OK; 264} 265 266 267static nxt_int_t 268nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) 269{ 270 nxt_thread_t *thread; 271 nxt_event_engine_t *engine; 272 const nxt_event_interface_t *interface; 273 274 interface = nxt_service_get(rt->services, "engine", NULL); 275 276 if (nxt_slow_path(interface == NULL)) { 277 /* TODO: log */ 278 return NXT_ERROR; 279 } 280 281 engine = nxt_event_engine_create(task, interface, 282 nxt_main_process_signals, 0, 0); 283 284 if (nxt_slow_path(engine == NULL)) { 285 return NXT_ERROR; 286 } 287 288 thread = task->thread; 289 thread->engine = engine; 290#if 0 291 thread->fiber = &engine->fibers->fiber; 292#endif 293 294 engine->id = rt->last_engine_id++; 295 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32); 296 297 nxt_queue_init(&rt->engines); 298 nxt_queue_insert_tail(&rt->engines, &engine->link); 299 300 return NXT_OK; 301} 302 303 304static nxt_int_t 305nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) 306{ 307 nxt_int_t ret; 308 nxt_array_t *thread_pools; 309 310 thread_pools = nxt_array_create(rt->mem_pool, 1, 311 sizeof(nxt_thread_pool_t *)); 312 313 if (nxt_slow_path(thread_pools == NULL)) { 314 return NXT_ERROR; 315 } 316 317 rt->thread_pools = thread_pools; 318 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); 319 320 if (nxt_slow_path(ret != NXT_OK)) { 321 return NXT_ERROR; 322 } 323 324 return NXT_OK; 325} 326 327 328static void 329nxt_runtime_start(nxt_task_t *task, void *obj, void *data) 330{ 331 nxt_runtime_t *rt; 332 333 rt = obj; 334 335 nxt_debug(task, "rt conf done"); 336 337 task->thread->log->ctx_handler = NULL; 338 task->thread->log->ctx = NULL; 339 340 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { 341 goto fail; 342 } 343 344 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 345 goto fail; 346 } 347 348 /* 349 * Thread pools should be destroyed before starting worker 350 * processes, because thread pool semaphores will stick in 351 * locked state in new processes after fork(). 352 */ 353 nxt_runtime_thread_pool_destroy(task, rt, rt->start); 354 355 return; 356 357fail: 358 359 nxt_runtime_quit(task, 1); 360} 361 362 363static void 364nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status) 365{ 366 nxt_int_t ret; 367 nxt_thread_t *thr; 368 nxt_runtime_t *rt; 369 const nxt_event_interface_t *interface; 370 371 thr = task->thread; 372 rt = thr->runtime; 373 374 if (rt->inherited_sockets == NULL && rt->daemon) { 375 376 if (nxt_process_daemon(task) != NXT_OK) { 377 goto fail; 378 } 379 380 /* 381 * An event engine should be updated after fork() 382 * even if an event facility was not changed because: 383 * 1) inherited kqueue descriptor is invalid, 384 * 2) the signal thread is not inherited. 385 */ 386 interface = nxt_service_get(rt->services, "engine", rt->engine); 387 if (interface == NULL) { 388 goto fail; 389 } 390 391 ret = nxt_event_engine_change(task->thread->engine, interface, 392 rt->batch); 393 if (ret != NXT_OK) { 394 goto fail; 395 } 396 } 397 398 ret = nxt_runtime_pid_file_create(task, rt->pid_file); 399 if (ret != NXT_OK) { 400 goto fail; 401 } 402 403 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { 404 goto fail; 405 } 406 407 thr->engine->max_connections = rt->engine_connections; 408 409 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) { 410 return; 411 } 412 413fail: 414 415 nxt_runtime_quit(task, 1); 416} 417 418 419void 420nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status) 421{ 422 nxt_bool_t done; 423 nxt_runtime_t *rt; 424 nxt_event_engine_t *engine; 425 426 rt = task->thread->runtime; 427 rt->status |= status; 428 engine = task->thread->engine; 429 430 nxt_debug(task, "exiting"); 431 432 done = 1; 433 434 if (!engine->shutdown) { 435 engine->shutdown = 1; 436 437 if (!nxt_array_is_empty(rt->thread_pools)) { 438 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); 439 done = 0; 440 } 441 442 if (rt->type == NXT_PROCESS_MAIN) { 443 nxt_runtime_stop_all_processes(task, rt); 444 done = 0; 445 } 446 } 447 448 nxt_runtime_close_idle_connections(engine); 449 450 if (done) { 451 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, 452 task, rt, engine); 453 } 454} 455 456 457static void 458nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) 459{ 460 nxt_conn_t *c; 461 nxt_queue_t *idle; 462 nxt_queue_link_t *link, *next; 463 464 nxt_debug(&engine->task, "close idle connections"); 465 466 idle = &engine->idle_connections; 467 468 for (link = nxt_queue_first(idle); 469 link != nxt_queue_tail(idle); 470 link = next) 471 { 472 next = nxt_queue_next(link); 473 c = nxt_queue_link_data(link, nxt_conn_t, link); 474 475 if (!c->socket.read_ready) { 476 nxt_queue_remove(link); 477 nxt_conn_close(engine, c); 478 } 479 } 480} 481 482 483void 484nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt) 485{ 486 nxt_port_t *port; 487 nxt_process_t *process; 488 nxt_process_init_t *init; 489 490 nxt_runtime_process_each(rt, process) { 491 492 init = nxt_process_init(process); 493 494 if (init->type == NXT_PROCESS_APP) { 495 496 nxt_process_port_each(process, port) { 497 498 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 499 0, 0, NULL); 500 501 } nxt_process_port_loop; 502 } 503 504 } nxt_runtime_process_loop; 505} 506 507 508static void 509nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) 510{ 511 nxt_port_t *port; 512 nxt_process_t *process; 513 514 nxt_runtime_process_each(rt, process) { 515 516 nxt_process_port_each(process, port) { 517 518 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 519 0, NULL); 520 521 } nxt_process_port_loop; 522 523 } nxt_runtime_process_loop; 524} 525 526 527static void 528nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) 529{ 530 int status, engine_count; 531 nxt_runtime_t *rt; 532 nxt_process_t *process; 533 nxt_event_engine_t *engine; 534 535 rt = obj; 536 engine = data; 537 538 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); 539 540 if (!nxt_array_is_empty(rt->thread_pools)) { 541 return; 542 } 543 544 if (rt->type == NXT_PROCESS_MAIN) { 545 if (rt->pid_file != NULL) { 546 nxt_file_delete(rt->pid_file); 547 } 548 549#if (NXT_HAVE_UNIX_DOMAIN) 550 { 551 nxt_sockaddr_t *sa; 552 nxt_file_name_t *name; 553 554 sa = rt->controller_listen; 555 556 if (sa->u.sockaddr.sa_family == AF_UNIX) { 557 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path; 558 (void) nxt_file_delete(name); 559 } 560 } 561#endif 562 } 563 564 if (!engine->event.signal_support) { 565 nxt_event_engine_signals_stop(engine); 566 } 567 568 nxt_runtime_process_each(rt, process) { 569 570 nxt_process_close_ports(task, process); 571 572 } nxt_runtime_process_loop; 573 574 status = rt->status; 575 576 engine_count = 0; 577 578 nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) { 579 580 engine_count++; 581 582 } nxt_queue_loop; 583 584 if (engine_count <= 1) { 585 if (rt->port_by_type[rt->type] != NULL) { 586 nxt_port_use(task, rt->port_by_type[rt->type], -1); 587 } 588 589 nxt_thread_mutex_destroy(&rt->processes_mutex); 590 591 nxt_mp_destroy(rt->mem_pool); 592 } 593 594 nxt_debug(task, "exit: %d", status); 595 596 exit(status); 597 nxt_unreachable(); 598} 599 600 601static nxt_int_t 602nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) 603{ 604 nxt_event_engine_t *engine; 605 const nxt_event_interface_t *interface; 606 607 engine = task->thread->engine; 608 609 if (engine->batch == rt->batch 610 && nxt_strcmp(engine->event.name, rt->engine) == 0) 611 { 612 return NXT_OK; 613 } 614 615 interface = nxt_service_get(rt->services, "engine", rt->engine); 616 617 if (interface != NULL) { 618 return nxt_event_engine_change(engine, interface, rt->batch); 619 } 620 621 return NXT_ERROR; 622} 623 624 625void 626nxt_runtime_event_engine_free(nxt_runtime_t *rt) 627{ 628 nxt_queue_link_t *link; 629 nxt_event_engine_t *engine; 630 631 link = nxt_queue_first(&rt->engines); 632 nxt_queue_remove(link); 633 634 engine = nxt_queue_link_data(link, nxt_event_engine_t, link); 635 nxt_event_engine_free(engine); 636} 637 638 639nxt_int_t 640nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, 641 nxt_uint_t max_threads, nxt_nsec_t timeout) 642{ 643 nxt_thread_pool_t *thread_pool, **tp; 644 645 tp = nxt_array_add(rt->thread_pools); 646 if (tp == NULL) { 647 return NXT_ERROR; 648 } 649 650 thread_pool = nxt_thread_pool_create(max_threads, timeout, 651 nxt_runtime_thread_pool_init, 652 thr->engine, 653 nxt_runtime_thread_pool_exit); 654 655 if (nxt_fast_path(thread_pool != NULL)) { 656 *tp = thread_pool; 657 } 658 659 return NXT_OK; 660} 661 662 663static void 664nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, 665 nxt_runtime_cont_t cont) 666{ 667 nxt_uint_t n; 668 nxt_thread_pool_t **tp; 669 670 rt->continuation = cont; 671 672 n = rt->thread_pools->nelts; 673 674 if (n == 0) { 675 cont(task, 0); 676 return; 677 } 678 679 tp = rt->thread_pools->elts; 680 681 do { 682 nxt_thread_pool_destroy(*tp); 683 684 tp++; 685 n--; 686 } while (n != 0); 687} 688 689 690static void 691nxt_runtime_thread_pool_init(void) 692{ 693#if (NXT_REGEX) 694 nxt_regex_init(0); 695#endif 696} 697 698 699static void 700nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 701{ 702 nxt_uint_t i, n; 703 nxt_runtime_t *rt; 704 nxt_thread_pool_t *tp, **thread_pools; 705 nxt_thread_handle_t handle; 706 707 tp = obj; 708 709 if (data != NULL) { 710 handle = (nxt_thread_handle_t) (uintptr_t) data; 711 nxt_thread_wait(handle); 712 } 713 714 rt = task->thread->runtime; 715 716 thread_pools = rt->thread_pools->elts; 717 n = rt->thread_pools->nelts; 718 719 nxt_debug(task, "thread pools: %ui", n); 720 721 for (i = 0; i < n; i++) { 722 723 if (tp == thread_pools[i]) { 724 nxt_array_remove(rt->thread_pools, &thread_pools[i]); 725 726 nxt_free(tp); 727 728 if (n == 1) { 729 /* The last thread pool. */ 730 rt->continuation(task, 0); 731 } 732 733 return; 734 } 735 } 736} 737 738 739static nxt_int_t 740nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) 741{ 742 nxt_int_t ret; 743 nxt_str_t control; 744 nxt_uint_t n; 745 nxt_file_t *file; 746 const char *slash; 747 nxt_sockaddr_t *sa; 748 nxt_file_name_str_t file_name; 749 const nxt_event_interface_t *interface; 750 751 rt->daemon = 1; 752 rt->engine_connections = 256; 753 rt->auxiliary_threads = 2; 754 rt->user_cred.user = NXT_USER; 755 rt->group = NXT_GROUP; 756 rt->pid = NXT_PID; 757 rt->log = NXT_LOG; 758 rt->modules = NXT_MODULES; 759 rt->state = NXT_STATE; 760 rt->control = NXT_CONTROL_SOCK; 761 rt->tmp = NXT_TMP; 762 763 nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t)); 764 765 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { 766 return NXT_ERROR; 767 } 768 769 if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) { 770 return NXT_ERROR; 771 } 772 773 if (rt->capabilities.setid) { 774 ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred, 775 rt->group); 776 777 if (nxt_slow_path(ret != NXT_OK)) { 778 return NXT_ERROR; 779 } 780 781 } else { 782 nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it " 783 "cannot use arbitrary user and group."); 784 } 785 786 /* An engine's parameters. */ 787 788 interface = nxt_service_get(rt->services, "engine", rt->engine); 789 if (interface == NULL) { 790 return NXT_ERROR; 791 } 792 793 rt->engine = interface->name; 794 795 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid); 796 if (nxt_slow_path(ret != NXT_OK)) { 797 return NXT_ERROR; 798 } 799 800 rt->pid_file = file_name.start; 801 802 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log); 803 if (nxt_slow_path(ret != NXT_OK)) { 804 return NXT_ERROR; 805 } 806 807 file = nxt_list_first(rt->log_files); 808 file->name = file_name.start; 809 810 slash = ""; 811 n = nxt_strlen(rt->modules); 812 813 if (n > 1 && rt->modules[n - 1] != '/') { 814 slash = "/"; 815 } 816 817 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z", 818 rt->modules, slash); 819 if (nxt_slow_path(ret != NXT_OK)) { 820 return NXT_ERROR; 821 } 822 823 rt->modules = (char *) file_name.start; 824 825 slash = ""; 826 n = nxt_strlen(rt->state); 827 828 if (n > 1 && rt->state[n - 1] != '/') { 829 slash = "/"; 830 } 831 832 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z", 833 rt->state, slash); 834 if (nxt_slow_path(ret != NXT_OK)) { 835 return NXT_ERROR; 836 } 837 838 rt->conf = (char *) file_name.start; 839 840 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf); 841 if (nxt_slow_path(ret != NXT_OK)) { 842 return NXT_ERROR; 843 } 844 845 rt->conf_tmp = (char *) file_name.start; 846 847 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z", 848 rt->state, slash); 849 if (nxt_slow_path(ret != NXT_OK)) { 850 return NXT_ERROR; 851 } 852 853 ret = mkdir((char *) file_name.start, S_IRWXU); 854 855 if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) { 856 rt->certs.length = file_name.len; 857 rt->certs.start = file_name.start; 858 859 } else { 860 nxt_alert(task, "Unable to create certificates storage directory: " 861 "mkdir(%s) failed %E", file_name.start, nxt_errno); 862 } 863 864 control.length = nxt_strlen(rt->control); 865 control.start = (u_char *) rt->control; 866 867 sa = nxt_sockaddr_parse(rt->mem_pool, &control); 868 if (nxt_slow_path(sa == NULL)) { 869 return NXT_ERROR; 870 } 871 872 sa->type = SOCK_STREAM; 873 874 rt->controller_listen = sa; 875 876 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { 877 return NXT_ERROR; 878 } 879 880 return NXT_OK; 881} 882 883 884static nxt_int_t 885nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) 886{ 887 char *p, **argv; 888 u_char *end; 889 u_char buf[1024]; 890 891 static const char version[] = 892 "unit version: " NXT_VERSION "\n" 893 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n"; 894 895 static const char no_control[] = 896 "option \"--control\" requires socket address\n"; 897 static const char no_user[] = "option \"--user\" requires username\n"; 898 static const char no_group[] = "option \"--group\" requires group name\n"; 899 static const char no_pid[] = "option \"--pid\" requires filename\n"; 900 static const char no_log[] = "option \"--log\" requires filename\n"; 901 static const char no_modules[] = 902 "option \"--modules\" requires directory\n"; 903 static const char no_state[] = "option \"--state\" requires directory\n"; 904 static const char no_tmp[] = "option \"--tmp\" requires directory\n"; 905 906 static const char help[] = 907 "\n" 908 "unit options:\n" 909 "\n" 910 " --version print unit version and configure options\n" 911 "\n" 912 " --no-daemon run unit in non-daemon mode\n" 913 "\n" 914 " --control ADDRESS set address of control API socket\n" 915 " default: \"" NXT_CONTROL_SOCK "\"\n" 916 "\n" 917 " --pid FILE set pid filename\n" 918 " default: \"" NXT_PID "\"\n" 919 "\n" 920 " --log FILE set log filename\n" 921 " default: \"" NXT_LOG "\"\n" 922 "\n" 923 " --modules DIRECTORY set modules directory name\n" 924 " default: \"" NXT_MODULES "\"\n" 925 "\n" 926 " --state DIRECTORY set state directory name\n" 927 " default: \"" NXT_STATE "\"\n" 928 "\n" 929 " --tmp DIRECTORY set tmp directory name\n" 930 " default: \"" NXT_TMP "\"\n" 931 "\n" 932 " --user USER set non-privileged processes to run" 933 " as specified user\n" 934 " default: \"" NXT_USER "\"\n" 935 "\n" 936 " --group GROUP set non-privileged processes to run" 937 " as specified group\n" 938 " default: "; 939 940 static const char group[] = "\"" NXT_GROUP "\"\n\n"; 941 static const char primary[] = "user's primary group\n\n"; 942 943 argv = &nxt_process_argv[1]; 944 945 while (*argv != NULL) { 946 p = *argv++; 947 948 if (nxt_strcmp(p, "--control") == 0) { 949 if (*argv == NULL) { 950 write(STDERR_FILENO, no_control, nxt_length(no_control)); 951 return NXT_ERROR; 952 } 953 954 p = *argv++; 955 956 rt->control = p; 957 958 continue; 959 } 960 961 if (nxt_strcmp(p, "--user") == 0) { 962 if (*argv == NULL) { 963 write(STDERR_FILENO, no_user, nxt_length(no_user)); 964 return NXT_ERROR; 965 } 966 967 p = *argv++; 968 969 rt->user_cred.user = p; 970 971 continue; 972 } 973 974 if (nxt_strcmp(p, "--group") == 0) { 975 if (*argv == NULL) { 976 write(STDERR_FILENO, no_group, nxt_length(no_group)); 977 return NXT_ERROR; 978 } 979 980 p = *argv++; 981 982 rt->group = p; 983 984 continue; 985 } 986 987 if (nxt_strcmp(p, "--pid") == 0) { 988 if (*argv == NULL) { 989 write(STDERR_FILENO, no_pid, nxt_length(no_pid)); 990 return NXT_ERROR; 991 } 992 993 p = *argv++; 994 995 rt->pid = p; 996 997 continue; 998 } 999 1000 if (nxt_strcmp(p, "--log") == 0) { 1001 if (*argv == NULL) { 1002 write(STDERR_FILENO, no_log, nxt_length(no_log)); 1003 return NXT_ERROR; 1004 } 1005 1006 p = *argv++; 1007 1008 rt->log = p; 1009 1010 continue; 1011 } 1012 1013 if (nxt_strcmp(p, "--modules") == 0) { 1014 if (*argv == NULL) { 1015 write(STDERR_FILENO, no_modules, nxt_length(no_modules)); 1016 return NXT_ERROR; 1017 } 1018 1019 p = *argv++; 1020 1021 rt->modules = p; 1022 1023 continue; 1024 } 1025 1026 if (nxt_strcmp(p, "--state") == 0) { 1027 if (*argv == NULL) { 1028 write(STDERR_FILENO, no_state, nxt_length(no_state)); 1029 return NXT_ERROR; 1030 } 1031 1032 p = *argv++; 1033 1034 rt->state = p; 1035 1036 continue; 1037 } 1038 1039 if (nxt_strcmp(p, "--tmp") == 0) { 1040 if (*argv == NULL) { 1041 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp)); 1042 return NXT_ERROR; 1043 } 1044 1045 p = *argv++; 1046 1047 rt->tmp = p; 1048 1049 continue; 1050 } 1051 1052 if (nxt_strcmp(p, "--no-daemon") == 0) { 1053 rt->daemon = 0; 1054 continue; 1055 } 1056 1057 if (nxt_strcmp(p, "--version") == 0) { 1058 write(STDERR_FILENO, version, nxt_length(version)); 1059 exit(0); 1060 } 1061 1062 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) { 1063 write(STDOUT_FILENO, help, nxt_length(help)); 1064 1065 if (sizeof(NXT_GROUP) == 1) { 1066 write(STDOUT_FILENO, primary, nxt_length(primary)); 1067 1068 } else { 1069 write(STDOUT_FILENO, group, nxt_length(group)); 1070 } 1071 1072 exit(0); 1073 } 1074 1075 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", " 1076 "try \"%s -h\" for available options\n", 1077 p, nxt_process_argv[0]); 1078 1079 write(STDERR_FILENO, buf, end - buf); 1080 1081 return NXT_ERROR; 1082 } 1083 1084 return NXT_OK; 1085} 1086 1087 1088nxt_listen_socket_t * 1089nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) 1090{ 1091 nxt_mp_t *mp; 1092 nxt_listen_socket_t *ls; 1093 1094 ls = nxt_array_zero_add(rt->listen_sockets); 1095 if (ls == NULL) { 1096 return NULL; 1097 } 1098 1099 mp = rt->mem_pool; 1100 1101 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, 1102 sa->length); 1103 if (ls->sockaddr == NULL) { 1104 return NULL; 1105 } 1106 1107 ls->sockaddr->type = sa->type; 1108 1109 nxt_sockaddr_text(ls->sockaddr); 1110 1111 ls->socket = -1; 1112 ls->backlog = NXT_LISTEN_BACKLOG; 1113 1114 return ls; 1115} 1116 1117 1118static nxt_int_t 1119nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) 1120{ 1121 size_t length; 1122 char hostname[NXT_MAXHOSTNAMELEN + 1]; 1123 1124 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { 1125 nxt_alert(task, "gethostname() failed %E", nxt_errno); 1126 return NXT_ERROR; 1127 } 1128 1129 /* 1130 * Linux gethostname(2): 1131 * 1132 * If the null-terminated hostname is too large to fit, 1133 * then the name is truncated, and no error is returned. 1134 * 1135 * For this reason an additional byte is reserved in the buffer. 1136 */ 1137 hostname[NXT_MAXHOSTNAMELEN] = '\0'; 1138 1139 length = nxt_strlen(hostname); 1140 rt->hostname.length = length; 1141 1142 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length); 1143 1144 if (rt->hostname.start != NULL) { 1145 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); 1146 return NXT_OK; 1147 } 1148 1149 return NXT_ERROR; 1150} 1151 1152 1153static nxt_int_t 1154nxt_runtime_log_files_init(nxt_runtime_t *rt) 1155{ 1156 nxt_file_t *file; 1157 nxt_list_t *log_files; 1158 1159 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); 1160 1161 if (nxt_fast_path(log_files != NULL)) { 1162 rt->log_files = log_files; 1163 1164 /* Preallocate the main log. This allocation cannot fail. */ 1165 file = nxt_list_zero_add(log_files); 1166 1167 file->fd = NXT_FILE_INVALID; 1168 file->log_level = NXT_LOG_ALERT; 1169 1170 return NXT_OK; 1171 } 1172 1173 return NXT_ERROR; 1174} 1175 1176 1177nxt_file_t * 1178nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) 1179{ 1180 nxt_int_t ret; 1181 nxt_file_t *file; 1182 nxt_file_name_str_t file_name; 1183 1184 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name); 1185 1186 if (nxt_slow_path(ret != NXT_OK)) { 1187 return NULL; 1188 } 1189 1190 nxt_list_each(file, rt->log_files) { 1191 1192 /* STUB: hardecoded case sensitive/insensitive. */ 1193 1194 if (file->name != NULL 1195 && nxt_file_name_eq(file->name, file_name.start)) 1196 { 1197 return file; 1198 } 1199 1200 } nxt_list_loop; 1201 1202 file = nxt_list_zero_add(rt->log_files); 1203 1204 if (nxt_slow_path(file == NULL)) { 1205 return NULL; 1206 } 1207 1208 file->fd = NXT_FILE_INVALID; 1209 file->log_level = NXT_LOG_ALERT; 1210 file->name = file_name.start; 1211 1212 return file; 1213} 1214 1215 1216static nxt_int_t 1217nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) 1218{ 1219 nxt_int_t ret; 1220 nxt_file_t *file; 1221 1222 nxt_list_each(file, rt->log_files) { 1223 1224 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, 1225 NXT_FILE_OWNER_ACCESS); 1226 1227 if (ret != NXT_OK) { 1228 return NXT_ERROR; 1229 } 1230 1231 } nxt_list_loop; 1232 1233 file = nxt_list_first(rt->log_files); 1234 1235 return nxt_file_stderr(file); 1236} 1237 1238 1239nxt_int_t 1240nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) 1241{ 1242 nxt_int_t ret; 1243 nxt_uint_t c, p, ncurr, nprev; 1244 nxt_listen_socket_t *curr, *prev; 1245 1246 curr = rt->listen_sockets->elts; 1247 ncurr = rt->listen_sockets->nelts; 1248 1249 if (rt->inherited_sockets != NULL) { 1250 prev = rt->inherited_sockets->elts; 1251 nprev = rt->inherited_sockets->nelts; 1252 1253 } else { 1254 prev = NULL; 1255 nprev = 0; 1256 } 1257 1258 for (c = 0; c < ncurr; c++) { 1259 1260 for (p = 0; p < nprev; p++) { 1261 1262 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { 1263 1264 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); 1265 if (ret != NXT_OK) { 1266 return NXT_ERROR; 1267 } 1268 1269 goto next; 1270 } 1271 } 1272 1273 if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) { 1274 return NXT_ERROR; 1275 } 1276 1277 next: 1278 1279 continue; 1280 } 1281 1282 return NXT_OK; 1283} 1284 1285 1286nxt_int_t 1287nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) 1288{ 1289 nxt_uint_t i, n; 1290 nxt_listen_socket_t *ls; 1291 1292 ls = rt->listen_sockets->elts; 1293 n = rt->listen_sockets->nelts; 1294 1295 for (i = 0; i < n; i++) { 1296 if (ls[i].flags == NXT_NONBLOCK) { 1297 if (nxt_listen_event(task, &ls[i]) == NULL) { 1298 return NXT_ERROR; 1299 } 1300 } 1301 } 1302 1303 return NXT_OK; 1304} 1305 1306 1307nxt_str_t * 1308nxt_current_directory(nxt_mp_t *mp) 1309{ 1310 size_t length; 1311 u_char *p; 1312 nxt_str_t *name; 1313 char buf[NXT_MAX_PATH_LEN]; 1314 1315 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); 1316 1317 if (nxt_fast_path(length != 0)) { 1318 name = nxt_str_alloc(mp, length + 1); 1319 1320 if (nxt_fast_path(name != NULL)) { 1321 p = nxt_cpymem(name->start, buf, length); 1322 *p = '/'; 1323 1324 return name; 1325 } 1326 } 1327 1328 return NULL; 1329} 1330 1331 1332static nxt_int_t 1333nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) 1334{ 1335 ssize_t length; 1336 nxt_int_t n; 1337 nxt_file_t file; 1338 u_char pid[NXT_INT64_T_LEN + nxt_length("\n")]; 1339 1340 nxt_memzero(&file, sizeof(nxt_file_t)); 1341 1342 file.name = pid_file; 1343 1344 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, 1345 NXT_FILE_DEFAULT_ACCESS); 1346 1347 if (n != NXT_OK) { 1348 return NXT_ERROR; 1349 } 1350 1351 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; 1352 1353 if (nxt_file_write(&file, pid, length, 0) != length) { 1354 return NXT_ERROR; 1355 } 1356 1357 nxt_file_close(task, &file); 1358 1359 return NXT_OK; 1360} 1361 1362 1363nxt_process_t * 1364nxt_runtime_process_new(nxt_runtime_t *rt) 1365{ 1366 nxt_process_t *process; 1367 1368 /* TODO: memory failures. */ 1369 1370 process = nxt_mp_zalloc(rt->mem_pool, 1371 sizeof(nxt_process_t) + sizeof(nxt_process_init_t)); 1372 1373 if (nxt_slow_path(process == NULL)) { 1374 return NULL; 1375 } 1376 1377 nxt_queue_init(&process->ports); 1378 1379 nxt_thread_mutex_create(&process->incoming.mutex); |
1380 nxt_thread_mutex_create(&process->outgoing.mutex); | |
1381 nxt_thread_mutex_create(&process->cp_mutex); 1382 1383 process->use_count = 1; 1384 1385 return process; 1386} 1387 1388 1389void 1390nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) 1391{ 1392 if (process->registered == 1) { 1393 nxt_runtime_process_remove(rt, process); 1394 } 1395 1396 nxt_assert(process->use_count == 0); 1397 nxt_assert(process->registered == 0); 1398 1399 nxt_port_mmaps_destroy(&process->incoming, 1); | 1380 nxt_thread_mutex_create(&process->cp_mutex); 1381 1382 process->use_count = 1; 1383 1384 return process; 1385} 1386 1387 1388void 1389nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) 1390{ 1391 if (process->registered == 1) { 1392 nxt_runtime_process_remove(rt, process); 1393 } 1394 1395 nxt_assert(process->use_count == 0); 1396 nxt_assert(process->registered == 0); 1397 1398 nxt_port_mmaps_destroy(&process->incoming, 1); |
1400 nxt_port_mmaps_destroy(&process->outgoing, 1); | |
1401 1402 nxt_thread_mutex_destroy(&process->incoming.mutex); | 1399 1400 nxt_thread_mutex_destroy(&process->incoming.mutex); |
1403 nxt_thread_mutex_destroy(&process->outgoing.mutex); | |
1404 nxt_thread_mutex_destroy(&process->cp_mutex); 1405 1406 /* processes from nxt_runtime_process_get() have no memory pool */ 1407 if (process->mem_pool != NULL) { 1408 nxt_mp_destroy(process->mem_pool); 1409 } 1410 1411 nxt_mp_free(rt->mem_pool, process); 1412} 1413 1414 1415static nxt_int_t 1416nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 1417{ 1418 nxt_process_t *process; 1419 1420 process = data; 1421 1422 if (lhq->key.length == sizeof(nxt_pid_t) 1423 && *(nxt_pid_t *) lhq->key.start == process->pid) 1424 { 1425 return NXT_OK; 1426 } 1427 1428 return NXT_DECLINED; 1429} 1430 1431static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 1432 NXT_LVLHSH_DEFAULT, 1433 nxt_runtime_lvlhsh_pid_test, 1434 nxt_lvlhsh_alloc, 1435 nxt_lvlhsh_free, 1436}; 1437 1438 1439nxt_inline void 1440nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) 1441{ 1442 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 1443 lhq->key.length = sizeof(*pid); 1444 lhq->key.start = (u_char *) pid; 1445 lhq->proto = &lvlhsh_processes_proto; 1446} 1447 1448 1449nxt_process_t * 1450nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) 1451{ 1452 nxt_process_t *process; 1453 nxt_lvlhsh_query_t lhq; 1454 1455 process = NULL; 1456 1457 nxt_runtime_process_lhq_pid(&lhq, &pid); 1458 1459 nxt_thread_mutex_lock(&rt->processes_mutex); 1460 1461 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1462 process = lhq.value; 1463 1464 } else { 1465 nxt_thread_log_debug("process %PI not found", pid); 1466 } 1467 1468 nxt_thread_mutex_unlock(&rt->processes_mutex); 1469 1470 return process; 1471} 1472 1473 1474static nxt_process_t * 1475nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) 1476{ 1477 nxt_process_t *process; 1478 nxt_lvlhsh_query_t lhq; 1479 1480 nxt_runtime_process_lhq_pid(&lhq, &pid); 1481 1482 nxt_thread_mutex_lock(&rt->processes_mutex); 1483 1484 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1485 nxt_thread_log_debug("process %PI found", pid); 1486 1487 nxt_thread_mutex_unlock(&rt->processes_mutex); 1488 1489 process = lhq.value; 1490 process->use_count++; 1491 1492 return process; 1493 } 1494 1495 process = nxt_runtime_process_new(rt); 1496 if (nxt_slow_path(process == NULL)) { 1497 1498 nxt_thread_mutex_unlock(&rt->processes_mutex); 1499 1500 return NULL; 1501 } 1502 1503 process->pid = pid; 1504 1505 lhq.replace = 0; 1506 lhq.value = process; 1507 lhq.pool = rt->mem_pool; 1508 1509 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1510 1511 case NXT_OK: 1512 if (rt->nprocesses == 0) { 1513 rt->mprocess = process; 1514 } 1515 1516 rt->nprocesses++; 1517 1518 process->registered = 1; 1519 1520 nxt_thread_log_debug("process %PI insert", pid); 1521 break; 1522 1523 default: 1524 nxt_thread_log_debug("process %PI insert failed", pid); 1525 break; 1526 } 1527 1528 nxt_thread_mutex_unlock(&rt->processes_mutex); 1529 1530 return process; 1531} 1532 1533 1534void 1535nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) 1536{ 1537 nxt_port_t *port; 1538 nxt_runtime_t *rt; 1539 nxt_lvlhsh_query_t lhq; 1540 1541 nxt_assert(process->registered == 0); 1542 1543 rt = task->thread->runtime; 1544 1545 nxt_runtime_process_lhq_pid(&lhq, &process->pid); 1546 1547 lhq.replace = 0; 1548 lhq.value = process; 1549 lhq.pool = rt->mem_pool; 1550 1551 nxt_thread_mutex_lock(&rt->processes_mutex); 1552 1553 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1554 1555 case NXT_OK: 1556 if (rt->nprocesses == 0) { 1557 rt->mprocess = process; 1558 } 1559 1560 rt->nprocesses++; 1561 1562 nxt_process_port_each(process, port) { 1563 1564 port->pid = process->pid; 1565 1566 nxt_runtime_port_add(task, port); 1567 1568 } nxt_process_port_loop; 1569 1570 process->registered = 1; 1571 1572 nxt_thread_log_debug("process %PI added", process->pid); 1573 break; 1574 1575 default: 1576 nxt_thread_log_debug("process %PI failed to add", process->pid); 1577 break; 1578 } 1579 1580 nxt_thread_mutex_unlock(&rt->processes_mutex); 1581} 1582 1583 1584static void 1585nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) 1586{ 1587 nxt_pid_t pid; 1588 nxt_lvlhsh_query_t lhq; 1589 1590 pid = process->pid; 1591 1592 nxt_runtime_process_lhq_pid(&lhq, &pid); 1593 1594 lhq.pool = rt->mem_pool; 1595 1596 nxt_thread_mutex_lock(&rt->processes_mutex); 1597 1598 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { 1599 1600 case NXT_OK: 1601 rt->nprocesses--; 1602 1603 process = lhq.value; 1604 1605 process->registered = 0; 1606 1607 nxt_thread_log_debug("process %PI removed", pid); 1608 break; 1609 1610 default: 1611 nxt_thread_log_debug("process %PI remove failed", pid); 1612 break; 1613 } 1614 1615 nxt_thread_mutex_unlock(&rt->processes_mutex); 1616} 1617 1618 1619nxt_process_t * 1620nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1621{ 1622 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto); 1623 1624 return nxt_runtime_process_next(rt, lhe); 1625} 1626 1627 1628nxt_port_t * 1629nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, 1630 nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type) 1631{ 1632 nxt_port_t *port; 1633 nxt_process_t *process; 1634 1635 process = nxt_runtime_process_get(rt, pid); 1636 if (nxt_slow_path(process == NULL)) { 1637 return NULL; 1638 } 1639 1640 port = nxt_port_new(task, id, pid, type); 1641 if (nxt_slow_path(port == NULL)) { 1642 nxt_process_use(task, process, -1); 1643 return NULL; 1644 } 1645 1646 nxt_process_port_add(task, process, port); 1647 1648 nxt_process_use(task, process, -1); 1649 1650 nxt_runtime_port_add(task, port); 1651 1652 nxt_port_use(task, port, -1); 1653 1654 return port; 1655} 1656 1657 1658static void 1659nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port) 1660{ 1661 nxt_int_t res; 1662 nxt_runtime_t *rt; 1663 1664 rt = task->thread->runtime; 1665 1666 res = nxt_port_hash_add(&rt->ports, port); 1667 1668 if (res != NXT_OK) { 1669 return; 1670 } 1671 1672 rt->port_by_type[port->type] = port; 1673 1674 nxt_port_use(task, port, 1); 1675} 1676 1677 1678void 1679nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port) 1680{ 1681 nxt_int_t res; 1682 nxt_runtime_t *rt; 1683 1684 rt = task->thread->runtime; 1685 1686 res = nxt_port_hash_remove(&rt->ports, port); 1687 1688 if (res != NXT_OK) { 1689 return; 1690 } 1691 1692 if (rt->port_by_type[port->type] == port) { 1693 rt->port_by_type[port->type] = NULL; 1694 } 1695 1696 nxt_port_use(task, port, -1); 1697} 1698 1699 1700nxt_port_t * 1701nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, 1702 nxt_port_id_t port_id) 1703{ 1704 return nxt_port_hash_find(&rt->ports, pid, port_id); 1705} | 1401 nxt_thread_mutex_destroy(&process->cp_mutex); 1402 1403 /* processes from nxt_runtime_process_get() have no memory pool */ 1404 if (process->mem_pool != NULL) { 1405 nxt_mp_destroy(process->mem_pool); 1406 } 1407 1408 nxt_mp_free(rt->mem_pool, process); 1409} 1410 1411 1412static nxt_int_t 1413nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 1414{ 1415 nxt_process_t *process; 1416 1417 process = data; 1418 1419 if (lhq->key.length == sizeof(nxt_pid_t) 1420 && *(nxt_pid_t *) lhq->key.start == process->pid) 1421 { 1422 return NXT_OK; 1423 } 1424 1425 return NXT_DECLINED; 1426} 1427 1428static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 1429 NXT_LVLHSH_DEFAULT, 1430 nxt_runtime_lvlhsh_pid_test, 1431 nxt_lvlhsh_alloc, 1432 nxt_lvlhsh_free, 1433}; 1434 1435 1436nxt_inline void 1437nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) 1438{ 1439 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 1440 lhq->key.length = sizeof(*pid); 1441 lhq->key.start = (u_char *) pid; 1442 lhq->proto = &lvlhsh_processes_proto; 1443} 1444 1445 1446nxt_process_t * 1447nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) 1448{ 1449 nxt_process_t *process; 1450 nxt_lvlhsh_query_t lhq; 1451 1452 process = NULL; 1453 1454 nxt_runtime_process_lhq_pid(&lhq, &pid); 1455 1456 nxt_thread_mutex_lock(&rt->processes_mutex); 1457 1458 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1459 process = lhq.value; 1460 1461 } else { 1462 nxt_thread_log_debug("process %PI not found", pid); 1463 } 1464 1465 nxt_thread_mutex_unlock(&rt->processes_mutex); 1466 1467 return process; 1468} 1469 1470 1471static nxt_process_t * 1472nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) 1473{ 1474 nxt_process_t *process; 1475 nxt_lvlhsh_query_t lhq; 1476 1477 nxt_runtime_process_lhq_pid(&lhq, &pid); 1478 1479 nxt_thread_mutex_lock(&rt->processes_mutex); 1480 1481 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { 1482 nxt_thread_log_debug("process %PI found", pid); 1483 1484 nxt_thread_mutex_unlock(&rt->processes_mutex); 1485 1486 process = lhq.value; 1487 process->use_count++; 1488 1489 return process; 1490 } 1491 1492 process = nxt_runtime_process_new(rt); 1493 if (nxt_slow_path(process == NULL)) { 1494 1495 nxt_thread_mutex_unlock(&rt->processes_mutex); 1496 1497 return NULL; 1498 } 1499 1500 process->pid = pid; 1501 1502 lhq.replace = 0; 1503 lhq.value = process; 1504 lhq.pool = rt->mem_pool; 1505 1506 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1507 1508 case NXT_OK: 1509 if (rt->nprocesses == 0) { 1510 rt->mprocess = process; 1511 } 1512 1513 rt->nprocesses++; 1514 1515 process->registered = 1; 1516 1517 nxt_thread_log_debug("process %PI insert", pid); 1518 break; 1519 1520 default: 1521 nxt_thread_log_debug("process %PI insert failed", pid); 1522 break; 1523 } 1524 1525 nxt_thread_mutex_unlock(&rt->processes_mutex); 1526 1527 return process; 1528} 1529 1530 1531void 1532nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) 1533{ 1534 nxt_port_t *port; 1535 nxt_runtime_t *rt; 1536 nxt_lvlhsh_query_t lhq; 1537 1538 nxt_assert(process->registered == 0); 1539 1540 rt = task->thread->runtime; 1541 1542 nxt_runtime_process_lhq_pid(&lhq, &process->pid); 1543 1544 lhq.replace = 0; 1545 lhq.value = process; 1546 lhq.pool = rt->mem_pool; 1547 1548 nxt_thread_mutex_lock(&rt->processes_mutex); 1549 1550 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { 1551 1552 case NXT_OK: 1553 if (rt->nprocesses == 0) { 1554 rt->mprocess = process; 1555 } 1556 1557 rt->nprocesses++; 1558 1559 nxt_process_port_each(process, port) { 1560 1561 port->pid = process->pid; 1562 1563 nxt_runtime_port_add(task, port); 1564 1565 } nxt_process_port_loop; 1566 1567 process->registered = 1; 1568 1569 nxt_thread_log_debug("process %PI added", process->pid); 1570 break; 1571 1572 default: 1573 nxt_thread_log_debug("process %PI failed to add", process->pid); 1574 break; 1575 } 1576 1577 nxt_thread_mutex_unlock(&rt->processes_mutex); 1578} 1579 1580 1581static void 1582nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) 1583{ 1584 nxt_pid_t pid; 1585 nxt_lvlhsh_query_t lhq; 1586 1587 pid = process->pid; 1588 1589 nxt_runtime_process_lhq_pid(&lhq, &pid); 1590 1591 lhq.pool = rt->mem_pool; 1592 1593 nxt_thread_mutex_lock(&rt->processes_mutex); 1594 1595 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { 1596 1597 case NXT_OK: 1598 rt->nprocesses--; 1599 1600 process = lhq.value; 1601 1602 process->registered = 0; 1603 1604 nxt_thread_log_debug("process %PI removed", pid); 1605 break; 1606 1607 default: 1608 nxt_thread_log_debug("process %PI remove failed", pid); 1609 break; 1610 } 1611 1612 nxt_thread_mutex_unlock(&rt->processes_mutex); 1613} 1614 1615 1616nxt_process_t * 1617nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) 1618{ 1619 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto); 1620 1621 return nxt_runtime_process_next(rt, lhe); 1622} 1623 1624 1625nxt_port_t * 1626nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, 1627 nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type) 1628{ 1629 nxt_port_t *port; 1630 nxt_process_t *process; 1631 1632 process = nxt_runtime_process_get(rt, pid); 1633 if (nxt_slow_path(process == NULL)) { 1634 return NULL; 1635 } 1636 1637 port = nxt_port_new(task, id, pid, type); 1638 if (nxt_slow_path(port == NULL)) { 1639 nxt_process_use(task, process, -1); 1640 return NULL; 1641 } 1642 1643 nxt_process_port_add(task, process, port); 1644 1645 nxt_process_use(task, process, -1); 1646 1647 nxt_runtime_port_add(task, port); 1648 1649 nxt_port_use(task, port, -1); 1650 1651 return port; 1652} 1653 1654 1655static void 1656nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port) 1657{ 1658 nxt_int_t res; 1659 nxt_runtime_t *rt; 1660 1661 rt = task->thread->runtime; 1662 1663 res = nxt_port_hash_add(&rt->ports, port); 1664 1665 if (res != NXT_OK) { 1666 return; 1667 } 1668 1669 rt->port_by_type[port->type] = port; 1670 1671 nxt_port_use(task, port, 1); 1672} 1673 1674 1675void 1676nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port) 1677{ 1678 nxt_int_t res; 1679 nxt_runtime_t *rt; 1680 1681 rt = task->thread->runtime; 1682 1683 res = nxt_port_hash_remove(&rt->ports, port); 1684 1685 if (res != NXT_OK) { 1686 return; 1687 } 1688 1689 if (rt->port_by_type[port->type] == port) { 1690 rt->port_by_type[port->type] = NULL; 1691 } 1692 1693 nxt_port_use(task, port, -1); 1694} 1695 1696 1697nxt_port_t * 1698nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, 1699 nxt_port_id_t port_id) 1700{ 1701 return nxt_port_hash_find(&rt->ports, pid, port_id); 1702} |