1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_runtime.h> 9 #include <nxt_port.h> 10 #include <nxt_main_process.h> 11 #include <nxt_conf.h> 12 #include <nxt_router.h> 13 #include <nxt_port_queue.h> 14 #if (NXT_TLS) 15 #include <nxt_cert.h> 16 #endif 17 18 #include <sys/mount.h> 19 20 21 typedef struct { 22 nxt_socket_t socket; 23 nxt_socket_error_t error; 24 u_char *start; 25 u_char *end; 26 } nxt_listening_socket_t; 27 28 29 typedef struct { 30 nxt_uint_t size; 31 nxt_conf_map_t *map; 32 } nxt_conf_app_map_t; 33 34 35 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, 36 nxt_runtime_t *rt); 37 static void nxt_main_process_title(nxt_task_t *task); 38 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, 39 void *data); 40 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, 41 void *data); 42 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, 43 void *data); 44 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, 45 void *data); 46 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj, 47 void *data); 48 static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process); 49 static void nxt_main_port_socket_handler(nxt_task_t *task, 50 nxt_port_recv_msg_t *msg); 51 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, 52 nxt_listening_socket_t *ls); 53 static void nxt_main_port_modules_handler(nxt_task_t *task, 54 nxt_port_recv_msg_t *msg); 55 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); 56 static void nxt_main_process_whoami_handler(nxt_task_t *task, 57 nxt_port_recv_msg_t *msg); 58 static void nxt_main_port_conf_store_handler(nxt_task_t *task, 59 nxt_port_recv_msg_t *msg); 60 static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name, 61 const char *name, u_char *buf, size_t size); 62 static void nxt_main_port_access_log_handler(nxt_task_t *task, 63 nxt_port_recv_msg_t *msg); 64 65 const nxt_sig_event_t nxt_main_process_signals[] = { 66 nxt_event_signal(SIGHUP, nxt_main_process_signal_handler), 67 nxt_event_signal(SIGINT, nxt_main_process_sigterm_handler), 68 nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler), 69 nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler), 70 nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler), 71 nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler), 72 nxt_event_signal_end, 73 }; 74 75 76 nxt_uint_t nxt_conf_ver; 77 78 static nxt_bool_t nxt_exiting; 79 80 81 nxt_int_t 82 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, 83 nxt_runtime_t *rt) 84 { 85 rt->type = NXT_PROCESS_MAIN; 86 87 if (nxt_main_process_port_create(task, rt) != NXT_OK) { 88 return NXT_ERROR; 89 } 90 91 nxt_main_process_title(task); 92 93 /* 94 * The discovery process will send a message processed by 95 * nxt_main_port_modules_handler() which starts the controller 96 * and router processes. 97 */ 98 return nxt_process_init_start(task, nxt_discovery_process); 99 } 100 101 102 static nxt_conf_map_t nxt_common_app_conf[] = { 103 { 104 nxt_string("type"), 105 NXT_CONF_MAP_STR, 106 offsetof(nxt_common_app_conf_t, type), 107 }, 108 109 { 110 nxt_string("user"), 111 NXT_CONF_MAP_STR, 112 offsetof(nxt_common_app_conf_t, user), 113 }, 114 115 { 116 nxt_string("group"), 117 NXT_CONF_MAP_STR, 118 offsetof(nxt_common_app_conf_t, group), 119 }, 120 121 { 122 nxt_string("working_directory"), 123 NXT_CONF_MAP_CSTRZ, 124 offsetof(nxt_common_app_conf_t, working_directory), 125 }, 126 127 { 128 nxt_string("environment"), 129 NXT_CONF_MAP_PTR, 130 offsetof(nxt_common_app_conf_t, environment), 131 }, 132 133 { 134 nxt_string("isolation"), 135 NXT_CONF_MAP_PTR, 136 offsetof(nxt_common_app_conf_t, isolation), 137 }, 138 139 { 140 nxt_string("limits"), 141 NXT_CONF_MAP_PTR, 142 offsetof(nxt_common_app_conf_t, limits), 143 }, 144 145 }; 146 147 148 static nxt_conf_map_t nxt_common_app_limits_conf[] = { 149 { 150 nxt_string("shm"), 151 NXT_CONF_MAP_SIZE, 152 offsetof(nxt_common_app_conf_t, shm_limit), 153 }, 154 155 { 156 nxt_string("requests"), 157 NXT_CONF_MAP_INT32, 158 offsetof(nxt_common_app_conf_t, request_limit), 159 }, 160 161 }; 162 163 164 static nxt_conf_map_t nxt_external_app_conf[] = { 165 { 166 nxt_string("executable"), 167 NXT_CONF_MAP_CSTRZ, 168 offsetof(nxt_common_app_conf_t, u.external.executable), 169 }, 170 171 { 172 nxt_string("arguments"), 173 NXT_CONF_MAP_PTR, 174 offsetof(nxt_common_app_conf_t, u.external.arguments), 175 }, 176 177 }; 178 179 180 static nxt_conf_map_t nxt_python_app_conf[] = { 181 { 182 nxt_string("home"), 183 NXT_CONF_MAP_CSTRZ, 184 offsetof(nxt_common_app_conf_t, u.python.home), 185 }, 186 187 { 188 nxt_string("path"), 189 NXT_CONF_MAP_PTR, 190 offsetof(nxt_common_app_conf_t, u.python.path), 191 }, 192 193 { 194 nxt_string("protocol"), 195 NXT_CONF_MAP_STR, 196 offsetof(nxt_common_app_conf_t, u.python.protocol), 197 }, 198 199 { 200 nxt_string("threads"), 201 NXT_CONF_MAP_INT32, 202 offsetof(nxt_common_app_conf_t, u.python.threads), 203 }, 204 205 { 206 nxt_string("targets"), 207 NXT_CONF_MAP_PTR, 208 offsetof(nxt_common_app_conf_t, u.python.targets), 209 }, 210 211 { 212 nxt_string("thread_stack_size"), 213 NXT_CONF_MAP_INT32, 214 offsetof(nxt_common_app_conf_t, u.python.thread_stack_size), 215 }, 216 }; 217 218 219 static nxt_conf_map_t nxt_php_app_conf[] = { 220 { 221 nxt_string("targets"), 222 NXT_CONF_MAP_PTR, 223 offsetof(nxt_common_app_conf_t, u.php.targets), 224 }, 225 226 { 227 nxt_string("options"), 228 NXT_CONF_MAP_PTR, 229 offsetof(nxt_common_app_conf_t, u.php.options), 230 }, 231 }; 232 233 234 static nxt_conf_map_t nxt_perl_app_conf[] = { 235 { 236 nxt_string("script"), 237 NXT_CONF_MAP_CSTRZ, 238 offsetof(nxt_common_app_conf_t, u.perl.script), 239 }, 240 241 { 242 nxt_string("threads"), 243 NXT_CONF_MAP_INT32, 244 offsetof(nxt_common_app_conf_t, u.perl.threads), 245 }, 246 247 { 248 nxt_string("thread_stack_size"), 249 NXT_CONF_MAP_INT32, 250 offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size), 251 }, 252 }; 253 254 255 static nxt_conf_map_t nxt_ruby_app_conf[] = { 256 { 257 nxt_string("script"), 258 NXT_CONF_MAP_STR, 259 offsetof(nxt_common_app_conf_t, u.ruby.script), 260 }, 261 { 262 nxt_string("threads"), 263 NXT_CONF_MAP_INT32, 264 offsetof(nxt_common_app_conf_t, u.ruby.threads), 265 }, 266 { 267 nxt_string("hooks"), 268 NXT_CONF_MAP_STR, 269 offsetof(nxt_common_app_conf_t, u.ruby.hooks), 270 } 271 }; 272 273 274 static nxt_conf_map_t nxt_java_app_conf[] = { 275 { 276 nxt_string("classpath"), 277 NXT_CONF_MAP_PTR, 278 offsetof(nxt_common_app_conf_t, u.java.classpath), 279 }, 280 { 281 nxt_string("webapp"), 282 NXT_CONF_MAP_CSTRZ, 283 offsetof(nxt_common_app_conf_t, u.java.webapp), 284 }, 285 { 286 nxt_string("options"), 287 NXT_CONF_MAP_PTR, 288 offsetof(nxt_common_app_conf_t, u.java.options), 289 }, 290 { 291 nxt_string("unit_jars"), 292 NXT_CONF_MAP_CSTRZ, 293 offsetof(nxt_common_app_conf_t, u.java.unit_jars), 294 }, 295 { 296 nxt_string("threads"), 297 NXT_CONF_MAP_INT32, 298 offsetof(nxt_common_app_conf_t, u.java.threads), 299 }, 300 { 301 nxt_string("thread_stack_size"), 302 NXT_CONF_MAP_INT32, 303 offsetof(nxt_common_app_conf_t, u.java.thread_stack_size), 304 }, 305 306 }; 307 308 309 static nxt_conf_app_map_t nxt_app_maps[] = { 310 { nxt_nitems(nxt_external_app_conf), nxt_external_app_conf }, 311 { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf }, 312 { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf }, 313 { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf }, 314 { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf }, 315 { nxt_nitems(nxt_java_app_conf), nxt_java_app_conf }, 316 }; 317 318 319 static void 320 nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 321 { 322 nxt_debug(task, "main data: %*s", 323 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 324 } 325 326 327 static void 328 nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 329 { 330 void *mem; 331 nxt_port_t *port; 332 333 nxt_port_new_port_handler(task, msg); 334 335 port = msg->u.new_port; 336 337 if (port != NULL 338 && port->type == NXT_PROCESS_APP 339 && msg->fd[1] != -1) 340 { 341 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 342 PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0); 343 if (nxt_fast_path(mem != MAP_FAILED)) { 344 port->queue = mem; 345 } 346 347 nxt_fd_close(msg->fd[1]); 348 msg->fd[1] = -1; 349 } 350 } 351 352 353 static void 354 nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 355 { 356 u_char *start, *p, ch; 357 size_t type_len; 358 nxt_int_t ret; 359 nxt_buf_t *b; 360 nxt_port_t *port; 361 nxt_runtime_t *rt; 362 nxt_process_t *process; 363 nxt_app_type_t idx; 364 nxt_conf_value_t *conf; 365 nxt_process_init_t *init; 366 nxt_common_app_conf_t *app_conf; 367 368 rt = task->thread->runtime; 369 370 port = rt->port_by_type[NXT_PROCESS_ROUTER]; 371 if (nxt_slow_path(port == NULL)) { 372 nxt_alert(task, "router port not found"); 373 goto close_fds; 374 } 375 376 if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) { 377 nxt_alert(task, "process %PI cannot start processes", 378 nxt_recv_msg_cmsg_pid(msg)); 379 380 goto close_fds; 381 } 382 383 process = nxt_process_new(rt); 384 if (nxt_slow_path(process == NULL)) { 385 goto close_fds; 386 } 387 388 process->mem_pool = nxt_mp_create(1024, 128, 256, 32); 389 if (process->mem_pool == NULL) { 390 nxt_process_use(task, process, -1); 391 goto close_fds; 392 } 393 394 process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN]; 395 396 init = nxt_process_init(process); 397 398 *init = nxt_proto_process; 399 400 b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size); 401 if (b == NULL) { 402 goto failed; 403 } 404 405 nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos, 406 b->mem.pos); 407 408 app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t)); 409 if (nxt_slow_path(app_conf == NULL)) { 410 goto failed; 411 } 412 413 app_conf->shared_port_fd = msg->fd[0]; 414 app_conf->shared_queue_fd = msg->fd[1]; 415 416 start = b->mem.pos; 417 418 app_conf->name.start = start; 419 app_conf->name.length = nxt_strlen(start); 420 421 init->name = (const char *) start; 422 423 process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length 424 + sizeof("\"\" prototype") + 1); 425 426 if (nxt_slow_path(process->name == NULL)) { 427 goto failed; 428 } 429 430 p = (u_char *) process->name; 431 *p++ = '"'; 432 p = nxt_cpymem(p, init->name, app_conf->name.length); 433 p = nxt_cpymem(p, "\" prototype", 11); 434 *p = '\0'; 435 436 app_conf->shm_limit = 100 * 1024 * 1024; 437 app_conf->request_limit = 0; 438 439 start += app_conf->name.length + 1; 440 441 conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL); 442 if (conf == NULL) { 443 nxt_alert(task, "router app configuration parsing error"); 444 445 goto failed; 446 } 447 448 rt = task->thread->runtime; 449 450 app_conf->user.start = (u_char*)rt->user_cred.user; 451 app_conf->user.length = nxt_strlen(rt->user_cred.user); 452 453 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf, 454 nxt_nitems(nxt_common_app_conf), app_conf); 455 456 if (ret != NXT_OK) { 457 nxt_alert(task, "failed to map common app conf received from router"); 458 goto failed; 459 } 460 461 for (type_len = 0; type_len != app_conf->type.length; type_len++) { 462 ch = app_conf->type.start[type_len]; 463 464 if (ch == ' ' || nxt_isdigit(ch)) { 465 break; 466 } 467 } 468 469 idx = nxt_app_parse_type(app_conf->type.start, type_len); 470 471 if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) { 472 nxt_alert(task, "invalid app type %d received from router", (int) idx); 473 goto failed; 474 } 475 476 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map, 477 nxt_app_maps[idx].size, app_conf); 478 479 if (nxt_slow_path(ret != NXT_OK)) { 480 nxt_alert(task, "failed to map app conf received from router"); 481 goto failed; 482 } 483 484 if (app_conf->limits != NULL) { 485 ret = nxt_conf_map_object(process->mem_pool, app_conf->limits, 486 nxt_common_app_limits_conf, 487 nxt_nitems(nxt_common_app_limits_conf), 488 app_conf); 489 490 if (nxt_slow_path(ret != NXT_OK)) { 491 nxt_alert(task, "failed to map app limits received from router"); 492 goto failed; 493 } 494 } 495 496 app_conf->self = conf; 497 498 process->stream = msg->port_msg.stream; 499 process->data.app = app_conf; 500 501 ret = nxt_process_start(task, process); 502 if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) { 503 504 /* Close shared port fds only in main process. */ 505 if (ret == NXT_OK) { 506 nxt_fd_close(app_conf->shared_port_fd); 507 nxt_fd_close(app_conf->shared_queue_fd); 508 } 509 510 /* Avoid fds close in caller. */ 511 msg->fd[0] = -1; 512 msg->fd[1] = -1; 513 514 return; 515 } 516 517 failed: 518 519 nxt_process_use(task, process, -1); 520 521 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 522 msg->port_msg.reply_port); 523 524 if (nxt_fast_path(port != NULL)) { 525 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 526 -1, msg->port_msg.stream, 0, NULL); 527 } 528 529 close_fds: 530 531 nxt_fd_close(msg->fd[0]); 532 msg->fd[0] = -1; 533 534 nxt_fd_close(msg->fd[1]); 535 msg->fd[1] = -1; 536 } 537 538 539 static void 540 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 541 { 542 nxt_port_t *port; 543 nxt_process_t *process; 544 nxt_runtime_t *rt; 545 546 rt = task->thread->runtime; 547 548 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 549 msg->port_msg.reply_port); 550 if (nxt_slow_path(port == NULL)) { 551 return; 552 } 553 554 process = port->process; 555 556 nxt_assert(process != NULL); 557 nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); 558 559 #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) 560 if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { 561 if (nxt_slow_path(nxt_clone_credential_map(task, process->pid, 562 process->user_cred, 563 &process->isolation.clone) 564 != NXT_OK)) 565 { 566 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 567 -1, msg->port_msg.stream, 0, NULL); 568 return; 569 } 570 } 571 572 #endif 573 574 process->state = NXT_PROCESS_STATE_CREATED; 575 576 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, 577 -1, msg->port_msg.stream, 0, NULL); 578 } 579 580 581 static nxt_port_handlers_t nxt_main_process_port_handlers = { 582 .data = nxt_main_data_handler, 583 .new_port = nxt_main_new_port_handler, 584 .process_created = nxt_main_process_created_handler, 585 .process_ready = nxt_port_process_ready_handler, 586 .whoami = nxt_main_process_whoami_handler, 587 .remove_pid = nxt_port_remove_pid_handler, 588 .start_process = nxt_main_start_process_handler, 589 .socket = nxt_main_port_socket_handler, 590 .modules = nxt_main_port_modules_handler, 591 .conf_store = nxt_main_port_conf_store_handler, 592 #if (NXT_TLS) 593 .cert_get = nxt_cert_store_get_handler, 594 .cert_delete = nxt_cert_store_delete_handler, 595 #endif 596 .access_log = nxt_main_port_access_log_handler, 597 .rpc_ready = nxt_port_rpc_handler, 598 .rpc_error = nxt_port_rpc_handler, 599 }; 600 601 602 static void 603 nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 604 { 605 nxt_buf_t *buf; 606 nxt_pid_t pid, ppid; 607 nxt_port_t *port; 608 nxt_runtime_t *rt; 609 nxt_process_t *pprocess; 610 611 nxt_assert(msg->port_msg.reply_port == 0); 612 613 if (nxt_slow_path(msg->buf == NULL 614 || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t))) 615 { 616 nxt_alert(task, "whoami: buffer is NULL or unexpected size"); 617 goto fail; 618 } 619 620 nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t)); 621 622 rt = task->thread->runtime; 623 624 pprocess = nxt_runtime_process_find(rt, ppid); 625 if (nxt_slow_path(pprocess == NULL)) { 626 nxt_alert(task, "whoami: parent process %PI not found", ppid); 627 goto fail; 628 } 629 630 pid = nxt_recv_msg_cmsg_pid(msg); 631 632 nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid, 633 msg->fd[0]); 634 635 if (msg->fd[0] != -1) { 636 port = nxt_runtime_process_port_create(task, rt, pid, 0, 637 NXT_PROCESS_APP); 638 if (nxt_slow_path(port == NULL)) { 639 goto fail; 640 } 641 642 nxt_fd_nonblocking(task, msg->fd[0]); 643 644 port->pair[0] = -1; 645 port->pair[1] = msg->fd[0]; 646 msg->fd[0] = -1; 647 648 port->max_size = 16 * 1024; 649 port->max_share = 64 * 1024; 650 port->socket.task = task; 651 652 nxt_port_write_enable(task, port); 653 654 } else { 655 port = nxt_runtime_port_find(rt, pid, 0); 656 if (nxt_slow_path(port == NULL)) { 657 goto fail; 658 } 659 } 660 661 if (ppid != nxt_pid) { 662 nxt_queue_insert_tail(&pprocess->children, &port->process->link); 663 } 664 665 buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool, 666 sizeof(nxt_pid_t), 0); 667 if (nxt_slow_path(buf == NULL)) { 668 goto fail; 669 } 670 671 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t)); 672 673 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1, 674 msg->port_msg.stream, 0, buf); 675 676 fail: 677 678 if (msg->fd[0] != -1) { 679 nxt_fd_close(msg->fd[0]); 680 } 681 } 682 683 684 static nxt_int_t 685 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) 686 { 687 nxt_int_t ret; 688 nxt_port_t *port; 689 nxt_process_t *process; 690 691 port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0, 692 NXT_PROCESS_MAIN); 693 if (nxt_slow_path(port == NULL)) { 694 return NXT_ERROR; 695 } 696 697 process = port->process; 698 699 ret = nxt_port_socket_init(task, port, 0); 700 if (nxt_slow_path(ret != NXT_OK)) { 701 nxt_port_use(task, port, -1); 702 return ret; 703 } 704 705 /* 706 * A main process port. A write port is not closed 707 * since it should be inherited by processes. 708 */ 709 nxt_port_enable(task, port, &nxt_main_process_port_handlers); 710 711 process->state = NXT_PROCESS_STATE_READY; 712 713 return NXT_OK; 714 } 715 716 717 static void 718 nxt_main_process_title(nxt_task_t *task) 719 { 720 u_char *p, *end; 721 nxt_uint_t i; 722 u_char title[2048]; 723 724 end = title + sizeof(title) - 1; 725 726 p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s", 727 nxt_process_argv[0]); 728 729 for (i = 1; nxt_process_argv[i] != NULL; i++) { 730 p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]); 731 } 732 733 if (p < end) { 734 *p++ = ']'; 735 } 736 737 *p = '\0'; 738 739 nxt_process_title(task, "%s", title); 740 } 741 742 743 static void 744 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) 745 { 746 nxt_debug(task, "sigterm handler signo:%d (%s)", 747 (int) (uintptr_t) obj, data); 748 749 /* TODO: fast exit. */ 750 751 nxt_exiting = 1; 752 753 nxt_runtime_quit(task, 0); 754 } 755 756 757 static void 758 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data) 759 { 760 nxt_debug(task, "sigquit handler signo:%d (%s)", 761 (int) (uintptr_t) obj, data); 762 763 /* TODO: graceful exit. */ 764 765 nxt_exiting = 1; 766 767 nxt_runtime_quit(task, 0); 768 } 769 770 771 static void 772 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) 773 { 774 nxt_mp_t *mp; 775 nxt_int_t ret; 776 nxt_uint_t n; 777 nxt_port_t *port; 778 nxt_file_t *file, *new_file; 779 nxt_array_t *new_files; 780 nxt_runtime_t *rt; 781 782 nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s", 783 (int) (uintptr_t) obj, data, "log files rotation"); 784 785 rt = task->thread->runtime; 786 787 port = rt->port_by_type[NXT_PROCESS_ROUTER]; 788 789 if (nxt_fast_path(port != NULL)) { 790 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG, 791 -1, 0, 0, NULL); 792 } 793 794 mp = nxt_mp_create(1024, 128, 256, 32); 795 if (mp == NULL) { 796 return; 797 } 798 799 n = nxt_list_nelts(rt->log_files); 800 801 new_files = nxt_array_create(mp, n, sizeof(nxt_file_t)); 802 if (new_files == NULL) { 803 nxt_mp_destroy(mp); 804 return; 805 } 806 807 nxt_list_each(file, rt->log_files) { 808 809 /* This allocation cannot fail. */ 810 new_file = nxt_array_add(new_files); 811 812 new_file->name = file->name; 813 new_file->fd = NXT_FILE_INVALID; 814 new_file->log_level = NXT_LOG_ALERT; 815 816 ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT, 817 NXT_FILE_OWNER_ACCESS); 818 819 if (ret != NXT_OK) { 820 goto fail; 821 } 822 823 } nxt_list_loop; 824 825 new_file = new_files->elts; 826 827 ret = nxt_file_stderr(&new_file[0]); 828 829 if (ret == NXT_OK) { 830 n = 0; 831 832 nxt_list_each(file, rt->log_files) { 833 834 nxt_port_change_log_file(task, rt, n, new_file[n].fd); 835 /* 836 * The old log file descriptor must be closed at the moment 837 * when no other threads use it. dup2() allows to use the 838 * old file descriptor for new log file. This change is 839 * performed atomically in the kernel. 840 */ 841 (void) nxt_file_redirect(file, new_file[n].fd); 842 843 n++; 844 845 } nxt_list_loop; 846 847 nxt_mp_destroy(mp); 848 return; 849 } 850 851 fail: 852 853 new_file = new_files->elts; 854 n = new_files->nelts; 855 856 while (n != 0) { 857 if (new_file->fd != NXT_FILE_INVALID) { 858 nxt_file_close(task, new_file); 859 } 860 861 new_file++; 862 n--; 863 } 864 865 nxt_mp_destroy(mp); 866 } 867 868 869 static void 870 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) 871 { 872 int status; 873 nxt_int_t ret; 874 nxt_err_t err; 875 nxt_pid_t pid; 876 nxt_port_t *port; 877 nxt_queue_t children; 878 nxt_runtime_t *rt; 879 nxt_process_t *process, *child; 880 nxt_process_init_t init; 881 882 nxt_debug(task, "sigchld handler signo:%d (%s)", 883 (int) (uintptr_t) obj, data); 884 885 rt = task->thread->runtime; 886 887 for ( ;; ) { 888 pid = waitpid(-1, &status, WNOHANG); 889 890 if (pid == -1) { 891 892 switch (err = nxt_errno) { 893 894 case NXT_ECHILD: 895 return; 896 897 case NXT_EINTR: 898 continue; 899 900 default: 901 nxt_alert(task, "waitpid() failed: %E", err); 902 return; 903 } 904 } 905 906 nxt_debug(task, "waitpid(): %PI", pid); 907 908 if (pid == 0) { 909 return; 910 } 911 912 if (WTERMSIG(status)) { 913 #ifdef WCOREDUMP 914 nxt_alert(task, "process %PI exited on signal %d%s", 915 pid, WTERMSIG(status), 916 WCOREDUMP(status) ? " (core dumped)" : ""); 917 #else 918 nxt_alert(task, "process %PI exited on signal %d", 919 pid, WTERMSIG(status)); 920 #endif 921 922 } else { 923 nxt_trace(task, "process %PI exited with code %d", 924 pid, WEXITSTATUS(status)); 925 } 926 927 process = nxt_runtime_process_find(rt, pid); 928 929 if (process != NULL) { 930 nxt_main_process_cleanup(task, process); 931 932 if (process->state == NXT_PROCESS_STATE_READY) { 933 process->stream = 0; 934 } 935 936 nxt_queue_init(&children); 937 938 if (!nxt_queue_is_empty(&process->children)) { 939 nxt_queue_add(&children, &process->children); 940 941 nxt_queue_init(&process->children); 942 943 nxt_queue_each(child, &children, nxt_process_t, link) { 944 port = nxt_process_port_first(child); 945 946 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, 947 -1, 0, 0, NULL); 948 } nxt_queue_loop; 949 } 950 951 if (nxt_exiting) { 952 nxt_process_close_ports(task, process); 953 954 nxt_queue_each(child, &children, nxt_process_t, link) { 955 nxt_queue_remove(&child->link); 956 child->link.next = NULL; 957 958 nxt_process_close_ports(task, child); 959 } nxt_queue_loop; 960 961 if (rt->nprocesses <= 1) { 962 nxt_runtime_quit(task, 0); 963 964 return; 965 } 966 967 continue; 968 } 969 970 nxt_port_remove_notify_others(task, process); 971 972 nxt_queue_each(child, &children, nxt_process_t, link) { 973 nxt_port_remove_notify_others(task, child); 974 975 nxt_queue_remove(&child->link); 976 child->link.next = NULL; 977 978 nxt_process_close_ports(task, child); 979 } nxt_queue_loop; 980 981 init = *(nxt_process_init_t *) nxt_process_init(process); 982 983 nxt_process_close_ports(task, process); 984 985 if (init.restart) { 986 ret = nxt_process_init_start(task, init); 987 if (nxt_slow_path(ret == NXT_ERROR)) { 988 nxt_alert(task, "failed to restart %s", init.name); 989 } 990 } 991 } 992 } 993 } 994 995 996 static void 997 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) 998 { 999 nxt_trace(task, "signal signo:%d (%s) recevied, ignored", 1000 (int) (uintptr_t) obj, data); 1001 } 1002 1003 1004 static void 1005 nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process) 1006 { 1007 if (process->isolation.cleanup != NULL) { 1008 process->isolation.cleanup(task, process); 1009 } 1010 } 1011 1012 1013 static void 1014 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1015 { 1016 size_t size; 1017 nxt_int_t ret; 1018 nxt_buf_t *b, *out; 1019 nxt_port_t *port; 1020 nxt_sockaddr_t *sa; 1021 nxt_port_msg_type_t type; 1022 nxt_listening_socket_t ls; 1023 u_char message[2048]; 1024 1025 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1026 msg->port_msg.reply_port); 1027 if (nxt_slow_path(port == NULL)) { 1028 return; 1029 } 1030 1031 if (nxt_slow_path(port->type != NXT_PROCESS_ROUTER)) { 1032 nxt_alert(task, "process %PI cannot create listener sockets", 1033 msg->port_msg.pid); 1034 1035 return; 1036 } 1037 1038 b = msg->buf; 1039 sa = (nxt_sockaddr_t *) b->mem.pos; 1040 1041 /* TODO check b size and make plain */ 1042 1043 ls.socket = -1; 1044 ls.error = NXT_SOCKET_ERROR_SYSTEM; 1045 ls.start = message; 1046 ls.end = message + sizeof(message); 1047 1048 nxt_debug(task, "listening socket \"%*s\"", 1049 (size_t) sa->length, nxt_sockaddr_start(sa)); 1050 1051 ret = nxt_main_listening_socket(sa, &ls); 1052 1053 if (ret == NXT_OK) { 1054 nxt_debug(task, "socket(\"%*s\"): %d", 1055 (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket); 1056 1057 out = NULL; 1058 1059 type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD; 1060 1061 } else { 1062 size = ls.end - ls.start; 1063 1064 nxt_alert(task, "%*s", size, ls.start); 1065 1066 out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 1067 size + 1); 1068 if (nxt_fast_path(out != NULL)) { 1069 *out->mem.free++ = (uint8_t) ls.error; 1070 1071 out->mem.free = nxt_cpymem(out->mem.free, ls.start, size); 1072 } 1073 1074 type = NXT_PORT_MSG_RPC_ERROR; 1075 } 1076 1077 nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream, 1078 0, out); 1079 } 1080 1081 1082 static nxt_int_t 1083 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls) 1084 { 1085 nxt_err_t err; 1086 nxt_socket_t s; 1087 1088 const socklen_t length = sizeof(int); 1089 static const int enable = 1; 1090 1091 s = socket(sa->u.sockaddr.sa_family, sa->type, 0); 1092 1093 if (nxt_slow_path(s == -1)) { 1094 err = nxt_errno; 1095 1096 #if (NXT_INET6) 1097 1098 if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) { 1099 ls->error = NXT_SOCKET_ERROR_NOINET6; 1100 } 1101 1102 #endif 1103 1104 ls->end = nxt_sprintf(ls->start, ls->end, 1105 "socket(\\\"%*s\\\") failed %E", 1106 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1107 1108 return NXT_ERROR; 1109 } 1110 1111 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) { 1112 ls->end = nxt_sprintf(ls->start, ls->end, 1113 "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E", 1114 (size_t) sa->length, nxt_sockaddr_start(sa), 1115 nxt_errno); 1116 goto fail; 1117 } 1118 1119 #if (NXT_INET6) 1120 1121 if (sa->u.sockaddr.sa_family == AF_INET6) { 1122 1123 if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) { 1124 ls->end = nxt_sprintf(ls->start, ls->end, 1125 "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E", 1126 (size_t) sa->length, nxt_sockaddr_start(sa), 1127 nxt_errno); 1128 goto fail; 1129 } 1130 } 1131 1132 #endif 1133 1134 if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) { 1135 err = nxt_errno; 1136 1137 #if (NXT_HAVE_UNIX_DOMAIN) 1138 1139 if (sa->u.sockaddr.sa_family == AF_UNIX) { 1140 switch (err) { 1141 1142 case EACCES: 1143 ls->error = NXT_SOCKET_ERROR_ACCESS; 1144 break; 1145 1146 case ENOENT: 1147 case ENOTDIR: 1148 ls->error = NXT_SOCKET_ERROR_PATH; 1149 break; 1150 } 1151 1152 } else 1153 #endif 1154 { 1155 switch (err) { 1156 1157 case EACCES: 1158 ls->error = NXT_SOCKET_ERROR_PORT; 1159 break; 1160 1161 case EADDRINUSE: 1162 ls->error = NXT_SOCKET_ERROR_INUSE; 1163 break; 1164 1165 case EADDRNOTAVAIL: 1166 ls->error = NXT_SOCKET_ERROR_NOADDR; 1167 break; 1168 } 1169 } 1170 1171 ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E", 1172 (size_t) sa->length, nxt_sockaddr_start(sa), err); 1173 goto fail; 1174 } 1175 1176 #if (NXT_HAVE_UNIX_DOMAIN) 1177 1178 if (sa->u.sockaddr.sa_family == AF_UNIX 1179 && sa->u.sockaddr_un.sun_path[0] != '\0') 1180 { 1181 char *filename; 1182 mode_t access; 1183 1184 filename = sa->u.sockaddr_un.sun_path; 1185 access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); 1186 1187 if (chmod(filename, access) != 0) { 1188 ls->end = nxt_sprintf(ls->start, ls->end, 1189 "chmod(\\\"%s\\\") failed %E", 1190 filename, nxt_errno); 1191 goto fail; 1192 } 1193 } 1194 1195 #endif 1196 1197 ls->socket = s; 1198 1199 return NXT_OK; 1200 1201 fail: 1202 1203 (void) close(s); 1204 1205 return NXT_ERROR; 1206 } 1207 1208 1209 static nxt_conf_map_t nxt_app_lang_module_map[] = { 1210 { 1211 nxt_string("type"), 1212 NXT_CONF_MAP_INT, 1213 offsetof(nxt_app_lang_module_t, type), 1214 }, 1215 1216 { 1217 nxt_string("version"), 1218 NXT_CONF_MAP_CSTRZ, 1219 offsetof(nxt_app_lang_module_t, version), 1220 }, 1221 1222 { 1223 nxt_string("file"), 1224 NXT_CONF_MAP_CSTRZ, 1225 offsetof(nxt_app_lang_module_t, file), 1226 }, 1227 }; 1228 1229 1230 static nxt_conf_map_t nxt_app_lang_mounts_map[] = { 1231 { 1232 nxt_string("src"), 1233 NXT_CONF_MAP_CSTRZ, 1234 offsetof(nxt_fs_mount_t, src), 1235 }, 1236 { 1237 nxt_string("dst"), 1238 NXT_CONF_MAP_CSTRZ, 1239 offsetof(nxt_fs_mount_t, dst), 1240 }, 1241 { 1242 nxt_string("name"), 1243 NXT_CONF_MAP_CSTRZ, 1244 offsetof(nxt_fs_mount_t, name), 1245 }, 1246 { 1247 nxt_string("type"), 1248 NXT_CONF_MAP_INT, 1249 offsetof(nxt_fs_mount_t, type), 1250 }, 1251 { 1252 nxt_string("flags"), 1253 NXT_CONF_MAP_INT, 1254 offsetof(nxt_fs_mount_t, flags), 1255 }, 1256 { 1257 nxt_string("data"), 1258 NXT_CONF_MAP_CSTRZ, 1259 offsetof(nxt_fs_mount_t, data), 1260 }, 1261 }; 1262 1263 1264 static void 1265 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1266 { 1267 uint32_t index, jindex, nmounts; 1268 nxt_mp_t *mp; 1269 nxt_int_t ret; 1270 nxt_buf_t *b; 1271 nxt_port_t *port; 1272 nxt_runtime_t *rt; 1273 nxt_fs_mount_t *mnt; 1274 nxt_conf_value_t *conf, *root, *value, *mounts; 1275 nxt_app_lang_module_t *lang; 1276 1277 static nxt_str_t root_path = nxt_string("/"); 1278 static nxt_str_t mounts_name = nxt_string("mounts"); 1279 1280 rt = task->thread->runtime; 1281 1282 if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) { 1283 nxt_alert(task, "process %PI cannot send modules", msg->port_msg.pid); 1284 return; 1285 } 1286 1287 if (nxt_exiting) { 1288 nxt_debug(task, "ignoring discovered modules, exiting"); 1289 return; 1290 } 1291 1292 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1293 msg->port_msg.reply_port); 1294 1295 if (nxt_fast_path(port != NULL)) { 1296 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 1297 msg->port_msg.stream, 0, NULL); 1298 } 1299 1300 b = msg->buf; 1301 1302 if (b == NULL) { 1303 return; 1304 } 1305 1306 mp = nxt_mp_create(1024, 128, 256, 32); 1307 if (mp == NULL) { 1308 return; 1309 } 1310 1311 b = nxt_buf_chk_make_plain(mp, b, msg->size); 1312 1313 if (b == NULL) { 1314 return; 1315 } 1316 1317 nxt_debug(task, "application languages: \"%*s\"", 1318 b->mem.free - b->mem.pos, b->mem.pos); 1319 1320 conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL); 1321 if (conf == NULL) { 1322 goto fail; 1323 } 1324 1325 root = nxt_conf_get_path(conf, &root_path); 1326 if (root == NULL) { 1327 goto fail; 1328 } 1329 1330 for (index = 0; /* void */ ; index++) { 1331 value = nxt_conf_get_array_element(root, index); 1332 if (value == NULL) { 1333 break; 1334 } 1335 1336 lang = nxt_array_zero_add(rt->languages); 1337 if (lang == NULL) { 1338 goto fail; 1339 } 1340 1341 lang->module = NULL; 1342 1343 ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map, 1344 nxt_nitems(nxt_app_lang_module_map), lang); 1345 1346 if (ret != NXT_OK) { 1347 goto fail; 1348 } 1349 1350 mounts = nxt_conf_get_object_member(value, &mounts_name, NULL); 1351 if (mounts == NULL) { 1352 nxt_alert(task, "missing mounts from discovery message."); 1353 goto fail; 1354 } 1355 1356 if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) { 1357 nxt_alert(task, "invalid mounts type from discovery message."); 1358 goto fail; 1359 } 1360 1361 nmounts = nxt_conf_array_elements_count(mounts); 1362 1363 lang->mounts = nxt_array_create(rt->mem_pool, nmounts, 1364 sizeof(nxt_fs_mount_t)); 1365 1366 if (lang->mounts == NULL) { 1367 goto fail; 1368 } 1369 1370 for (jindex = 0; /* */; jindex++) { 1371 value = nxt_conf_get_array_element(mounts, jindex); 1372 if (value == NULL) { 1373 break; 1374 } 1375 1376 mnt = nxt_array_zero_add(lang->mounts); 1377 if (mnt == NULL) { 1378 goto fail; 1379 } 1380 1381 mnt->builtin = 1; 1382 mnt->deps = 1; 1383 1384 ret = nxt_conf_map_object(rt->mem_pool, value, 1385 nxt_app_lang_mounts_map, 1386 nxt_nitems(nxt_app_lang_mounts_map), mnt); 1387 1388 if (ret != NXT_OK) { 1389 goto fail; 1390 } 1391 } 1392 1393 nxt_debug(task, "lang %d %s \"%s\" (%d mounts)", 1394 lang->type, lang->version, lang->file, lang->mounts->nelts); 1395 } 1396 1397 qsort(rt->languages->elts, rt->languages->nelts, 1398 sizeof(nxt_app_lang_module_t), nxt_app_lang_compare); 1399 1400 fail: 1401 1402 nxt_mp_destroy(mp); 1403 1404 ret = nxt_process_init_start(task, nxt_controller_process); 1405 if (ret == NXT_OK) { 1406 ret = nxt_process_init_start(task, nxt_router_process); 1407 } 1408 1409 if (nxt_slow_path(ret == NXT_ERROR)) { 1410 nxt_exiting = 1; 1411 1412 nxt_runtime_quit(task, 1); 1413 } 1414 } 1415 1416 1417 static int nxt_cdecl 1418 nxt_app_lang_compare(const void *v1, const void *v2) 1419 { 1420 int n; 1421 const nxt_app_lang_module_t *lang1, *lang2; 1422 1423 lang1 = v1; 1424 lang2 = v2; 1425 1426 n = lang1->type - lang2->type; 1427 1428 if (n != 0) { 1429 return n; 1430 } 1431 1432 n = nxt_strverscmp(lang1->version, lang2->version); 1433 1434 /* Negate result to move higher versions to the beginning. */ 1435 1436 return -n; 1437 } 1438 1439 1440 static void 1441 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1442 { 1443 void *p; 1444 size_t n, size; 1445 nxt_int_t ret; 1446 nxt_port_t *ctl_port; 1447 nxt_runtime_t *rt; 1448 u_char ver[NXT_INT_T_LEN]; 1449 1450 rt = task->thread->runtime; 1451 1452 ctl_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 1453 1454 if (nxt_slow_path(msg->port_msg.pid != ctl_port->pid)) { 1455 nxt_alert(task, "process %PI cannot store conf", msg->port_msg.pid); 1456 return; 1457 } 1458 1459 p = MAP_FAILED; 1460 1461 /* 1462 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 1463 * initialized in 'cleanup' section. 1464 */ 1465 size = 0; 1466 1467 if (nxt_slow_path(msg->fd[0] == -1)) { 1468 nxt_alert(task, "conf_store_handler: invalid shm fd"); 1469 goto error; 1470 } 1471 1472 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 1473 nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)", 1474 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 1475 goto error; 1476 } 1477 1478 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 1479 1480 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 1481 1482 nxt_fd_close(msg->fd[0]); 1483 msg->fd[0] = -1; 1484 1485 if (nxt_slow_path(p == MAP_FAILED)) { 1486 goto error; 1487 } 1488 1489 nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p); 1490 1491 if (nxt_conf_ver != NXT_VERNUM) { 1492 n = nxt_sprintf(ver, ver + NXT_INT_T_LEN, "%d", NXT_VERNUM) - ver; 1493 1494 ret = nxt_main_file_store(task, rt->ver_tmp, rt->ver, ver, n); 1495 if (nxt_slow_path(ret != NXT_OK)) { 1496 goto error; 1497 } 1498 1499 nxt_conf_ver = NXT_VERNUM; 1500 } 1501 1502 ret = nxt_main_file_store(task, rt->conf_tmp, rt->conf, p, size); 1503 1504 if (nxt_fast_path(ret == NXT_OK)) { 1505 goto cleanup; 1506 } 1507 1508 error: 1509 1510 nxt_alert(task, "failed to store current configuration"); 1511 1512 cleanup: 1513 1514 if (p != MAP_FAILED) { 1515 nxt_mem_munmap(p, size); 1516 } 1517 1518 if (msg->fd[0] != -1) { 1519 nxt_fd_close(msg->fd[0]); 1520 msg->fd[0] = -1; 1521 } 1522 } 1523 1524 1525 static nxt_int_t 1526 nxt_main_file_store(nxt_task_t *task, const char *tmp_name, const char *name, 1527 u_char *buf, size_t size) 1528 { 1529 ssize_t n; 1530 nxt_int_t ret; 1531 nxt_file_t file; 1532 1533 nxt_memzero(&file, sizeof(nxt_file_t)); 1534 1535 file.name = (nxt_file_name_t *) name; 1536 1537 ret = nxt_file_open(task, &file, NXT_FILE_WRONLY, NXT_FILE_TRUNCATE, 1538 NXT_FILE_OWNER_ACCESS); 1539 if (nxt_slow_path(ret != NXT_OK)) { 1540 return NXT_ERROR; 1541 } 1542 1543 n = nxt_file_write(&file, buf, size, 0); 1544 1545 nxt_file_close(task, &file); 1546 1547 if (nxt_slow_path(n != (ssize_t) size)) { 1548 (void) nxt_file_delete(file.name); 1549 return NXT_ERROR; 1550 } 1551 1552 return nxt_file_rename(file.name, (nxt_file_name_t *) name); 1553 } 1554 1555 1556 static void 1557 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1558 { 1559 u_char *path; 1560 nxt_int_t ret; 1561 nxt_file_t file; 1562 nxt_port_t *port; 1563 nxt_port_msg_type_t type; 1564 1565 nxt_debug(task, "opening access log file"); 1566 1567 path = msg->buf->mem.pos; 1568 1569 nxt_memzero(&file, sizeof(nxt_file_t)); 1570 1571 file.name = (nxt_file_name_t *) path; 1572 file.log_level = NXT_LOG_ERR; 1573 1574 ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT, 1575 NXT_FILE_OWNER_ACCESS); 1576 1577 type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD 1578 : NXT_PORT_MSG_RPC_ERROR; 1579 1580 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 1581 msg->port_msg.reply_port); 1582 1583 if (nxt_fast_path(port != NULL)) { 1584 (void) nxt_port_socket_write(task, port, type, file.fd, 1585 msg->port_msg.stream, 0, NULL); 1586 } 1587 } 1588