1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_main.h> 9 #include <nxt_runtime.h> 10 #include <nxt_main_process.h> 11 #include <nxt_conf.h> 12 #include <nxt_status.h> 13 #include <nxt_cert.h> 14 15 16 typedef struct { 17 nxt_conf_value_t *root; 18 nxt_mp_t *pool; 19 } nxt_controller_conf_t; 20 21 22 typedef struct { 23 nxt_http_request_parse_t parser; 24 size_t length; 25 nxt_controller_conf_t conf; 26 nxt_conn_t *conn; 27 nxt_queue_link_t link; 28 } nxt_controller_request_t; 29 30 31 typedef struct { 32 nxt_uint_t status; 33 nxt_conf_value_t *conf; 34 35 u_char *title; 36 nxt_str_t detail; 37 ssize_t offset; 38 nxt_uint_t line; 39 nxt_uint_t column; 40 } nxt_controller_response_t; 41 42 43 static nxt_int_t nxt_controller_prefork(nxt_task_t *task, 44 nxt_process_t *process, nxt_mp_t *mp); 45 static nxt_int_t nxt_controller_file_read(nxt_task_t *task, const char *name, 46 nxt_str_t *str, nxt_mp_t *mp); 47 static nxt_int_t nxt_controller_start(nxt_task_t *task, 48 nxt_process_data_t *data); 49 static void nxt_controller_process_new_port_handler(nxt_task_t *task, 50 nxt_port_recv_msg_t *msg); 51 static void nxt_controller_send_current_conf(nxt_task_t *task); 52 static void nxt_controller_router_ready_handler(nxt_task_t *task, 53 nxt_port_recv_msg_t *msg); 54 static void nxt_controller_remove_pid_handler(nxt_task_t *task, 55 nxt_port_recv_msg_t *msg); 56 static nxt_int_t nxt_controller_conf_default(void); 57 static void nxt_controller_conf_init_handler(nxt_task_t *task, 58 nxt_port_recv_msg_t *msg, void *data); 59 static void nxt_controller_flush_requests(nxt_task_t *task); 60 static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp, 61 nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data); 62 63 static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); 64 static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); 65 static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, 66 uintptr_t data); 67 static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, 68 void *data); 69 static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, 70 void *data); 71 static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, 72 void *data); 73 static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data); 74 static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, 75 void *data); 76 static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, 77 void *data); 78 static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); 79 static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); 80 81 static nxt_int_t nxt_controller_request_content_length(void *ctx, 82 nxt_http_field_t *field, uintptr_t data); 83 84 static void nxt_controller_process_request(nxt_task_t *task, 85 nxt_controller_request_t *req); 86 static void nxt_controller_process_config(nxt_task_t *task, 87 nxt_controller_request_t *req, nxt_str_t *path); 88 static nxt_bool_t nxt_controller_check_postpone_request(nxt_task_t *task); 89 static void nxt_controller_process_status(nxt_task_t *task, 90 nxt_controller_request_t *req); 91 static void nxt_controller_status_handler(nxt_task_t *task, 92 nxt_port_recv_msg_t *msg, void *data); 93 static void nxt_controller_status_response(nxt_task_t *task, 94 nxt_controller_request_t *req, nxt_str_t *path); 95 #if (NXT_TLS) 96 static void nxt_controller_process_cert(nxt_task_t *task, 97 nxt_controller_request_t *req, nxt_str_t *path); 98 static void nxt_controller_process_cert_save(nxt_task_t *task, 99 nxt_port_recv_msg_t *msg, void *data); 100 static nxt_bool_t nxt_controller_cert_in_use(nxt_str_t *name); 101 static void nxt_controller_cert_cleanup(nxt_task_t *task, void *obj, 102 void *data); 103 #endif 104 static void nxt_controller_process_control(nxt_task_t *task, 105 nxt_controller_request_t *req, nxt_str_t *path); 106 static void nxt_controller_app_restart_handler(nxt_task_t *task, 107 nxt_port_recv_msg_t *msg, void *data); 108 static void nxt_controller_conf_handler(nxt_task_t *task, 109 nxt_port_recv_msg_t *msg, void *data); 110 static void nxt_controller_conf_store(nxt_task_t *task, 111 nxt_conf_value_t *conf); 112 static void nxt_controller_response(nxt_task_t *task, 113 nxt_controller_request_t *req, nxt_controller_response_t *resp); 114 static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now, 115 struct tm *tm, size_t size, const char *format); 116 117 118 static nxt_http_field_proc_t nxt_controller_request_fields[] = { 119 { nxt_string("Content-Length"), 120 &nxt_controller_request_content_length, 0 }, 121 }; 122 123 static nxt_lvlhsh_t nxt_controller_fields_hash; 124 125 static nxt_uint_t nxt_controller_listening; 126 static nxt_uint_t nxt_controller_router_ready; 127 static nxt_controller_conf_t nxt_controller_conf; 128 static nxt_queue_t nxt_controller_waiting_requests; 129 static nxt_bool_t nxt_controller_waiting_init_conf; 130 static nxt_conf_value_t *nxt_controller_status; 131 132 133 static const nxt_event_conn_state_t nxt_controller_conn_read_state; 134 static const nxt_event_conn_state_t nxt_controller_conn_body_read_state; 135 static const nxt_event_conn_state_t nxt_controller_conn_write_state; 136 static const nxt_event_conn_state_t nxt_controller_conn_close_state; 137 138 139 static const nxt_port_handlers_t nxt_controller_process_port_handlers = { 140 .quit = nxt_signal_quit_handler, 141 .new_port = nxt_controller_process_new_port_handler, 142 .change_file = nxt_port_change_log_file_handler, 143 .mmap = nxt_port_mmap_handler, 144 .process_ready = nxt_controller_router_ready_handler, 145 .data = nxt_port_data_handler, 146 .remove_pid = nxt_controller_remove_pid_handler, 147 .rpc_ready = nxt_port_rpc_handler, 148 .rpc_error = nxt_port_rpc_handler, 149 }; 150 151 152 const nxt_process_init_t nxt_controller_process = { 153 .name = "controller", 154 .type = NXT_PROCESS_CONTROLLER, 155 .prefork = nxt_controller_prefork, 156 .restart = 1, 157 .setup = nxt_process_core_setup, 158 .start = nxt_controller_start, 159 .port_handlers = &nxt_controller_process_port_handlers, 160 .signals = nxt_process_signals, 161 }; 162 163 164 static nxt_int_t 165 nxt_controller_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 166 { 167 nxt_str_t ver; 168 nxt_int_t ret, num; 169 nxt_runtime_t *rt; 170 nxt_controller_init_t ctrl_init; 171 172 nxt_log(task, NXT_LOG_INFO, "controller started"); 173 174 rt = task->thread->runtime; 175 176 nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t)); 177 178 /* 179 * Since configuration version has only been introduced in 1.26, 180 * set the default version to 1.25. 181 */ 182 nxt_conf_ver = 12500; 183 184 ret = nxt_controller_file_read(task, rt->conf, &ctrl_init.conf, mp); 185 if (nxt_slow_path(ret == NXT_ERROR)) { 186 return NXT_ERROR; 187 } 188 189 if (ret == NXT_OK) { 190 ret = nxt_controller_file_read(task, rt->ver, &ver, mp); 191 if (nxt_slow_path(ret == NXT_ERROR)) { 192 return NXT_ERROR; 193 } 194 195 if (ret == NXT_OK) { 196 num = nxt_int_parse(ver.start, ver.length); 197 198 if (nxt_slow_path(num < 0)) { 199 nxt_alert(task, "failed to restore previous configuration: " 200 "invalid version string \"%V\"", &ver); 201 202 nxt_str_null(&ctrl_init.conf); 203 204 } else { 205 nxt_conf_ver = num; 206 } 207 } 208 } 209 210 #if (NXT_TLS) 211 ctrl_init.certs = nxt_cert_store_load(task, mp); 212 213 nxt_mp_cleanup(mp, nxt_controller_cert_cleanup, task, ctrl_init.certs, rt); 214 #endif 215 216 process->data.controller = ctrl_init; 217 218 return NXT_OK; 219 } 220 221 222 static nxt_int_t 223 nxt_controller_file_read(nxt_task_t *task, const char *name, nxt_str_t *str, 224 nxt_mp_t *mp) 225 { 226 ssize_t n; 227 nxt_int_t ret; 228 nxt_file_t file; 229 nxt_file_info_t fi; 230 231 nxt_memzero(&file, sizeof(nxt_file_t)); 232 233 file.name = (nxt_file_name_t *) name; 234 235 ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0); 236 237 if (ret == NXT_OK) { 238 ret = nxt_file_info(&file, &fi); 239 if (nxt_slow_path(ret != NXT_OK)) { 240 goto fail; 241 } 242 243 if (nxt_fast_path(nxt_is_file(&fi))) { 244 str->length = nxt_file_size(&fi); 245 str->start = nxt_mp_nget(mp, str->length); 246 if (nxt_slow_path(str->start == NULL)) { 247 goto fail; 248 } 249 250 n = nxt_file_read(&file, str->start, str->length, 0); 251 if (nxt_slow_path(n != (ssize_t) str->length)) { 252 goto fail; 253 } 254 255 nxt_file_close(task, &file); 256 257 return NXT_OK; 258 } 259 260 nxt_file_close(task, &file); 261 } 262 263 return NXT_DECLINED; 264 265 fail: 266 267 nxt_file_close(task, &file); 268 269 return NXT_ERROR; 270 } 271 272 273 #if (NXT_TLS) 274 275 static void 276 nxt_controller_cert_cleanup(nxt_task_t *task, void *obj, void *data) 277 { 278 pid_t main_pid; 279 nxt_array_t *certs; 280 nxt_runtime_t *rt; 281 282 certs = obj; 283 rt = data; 284 285 main_pid = rt->port_by_type[NXT_PROCESS_MAIN]->pid; 286 287 if (nxt_pid == main_pid && certs != NULL) { 288 nxt_cert_store_release(certs); 289 } 290 } 291 292 #endif 293 294 295 static nxt_int_t 296 nxt_controller_start(nxt_task_t *task, nxt_process_data_t *data) 297 { 298 nxt_mp_t *mp; 299 nxt_int_t ret; 300 nxt_str_t *json; 301 nxt_conf_value_t *conf; 302 nxt_conf_validation_t vldt; 303 nxt_controller_init_t *init; 304 305 ret = nxt_http_fields_hash(&nxt_controller_fields_hash, 306 nxt_controller_request_fields, 307 nxt_nitems(nxt_controller_request_fields)); 308 309 if (nxt_slow_path(ret != NXT_OK)) { 310 return NXT_ERROR; 311 } 312 313 nxt_queue_init(&nxt_controller_waiting_requests); 314 315 init = &data->controller; 316 317 #if (NXT_TLS) 318 if (init->certs != NULL) { 319 nxt_cert_info_init(task, init->certs); 320 nxt_cert_store_release(init->certs); 321 } 322 #endif 323 324 json = &init->conf; 325 326 if (json->start == NULL) { 327 return NXT_OK; 328 } 329 330 mp = nxt_mp_create(1024, 128, 256, 32); 331 if (nxt_slow_path(mp == NULL)) { 332 return NXT_ERROR; 333 } 334 335 conf = nxt_conf_json_parse_str(mp, json); 336 if (nxt_slow_path(conf == NULL)) { 337 nxt_alert(task, "failed to restore previous configuration: " 338 "file is corrupted or not enough memory"); 339 340 nxt_mp_destroy(mp); 341 return NXT_OK; 342 } 343 344 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 345 346 vldt.pool = nxt_mp_create(1024, 128, 256, 32); 347 if (nxt_slow_path(vldt.pool == NULL)) { 348 nxt_mp_destroy(mp); 349 return NXT_ERROR; 350 } 351 352 vldt.conf = conf; 353 vldt.conf_pool = mp; 354 vldt.ver = nxt_conf_ver; 355 356 ret = nxt_conf_validate(&vldt); 357 358 if (nxt_slow_path(ret != NXT_OK)) { 359 360 if (ret == NXT_DECLINED) { 361 nxt_alert(task, "the previous configuration is invalid: %V", 362 &vldt.error); 363 364 nxt_mp_destroy(vldt.pool); 365 nxt_mp_destroy(mp); 366 367 return NXT_OK; 368 } 369 370 /* ret == NXT_ERROR */ 371 372 return NXT_ERROR; 373 } 374 375 nxt_mp_destroy(vldt.pool); 376 377 nxt_controller_conf.root = conf; 378 nxt_controller_conf.pool = mp; 379 380 return NXT_OK; 381 } 382 383 384 static void 385 nxt_controller_process_new_port_handler(nxt_task_t *task, 386 nxt_port_recv_msg_t *msg) 387 { 388 nxt_port_new_port_handler(task, msg); 389 390 if (msg->u.new_port->type != NXT_PROCESS_ROUTER 391 || !nxt_controller_router_ready) 392 { 393 return; 394 } 395 396 nxt_controller_send_current_conf(task); 397 } 398 399 400 static void 401 nxt_controller_send_current_conf(nxt_task_t *task) 402 { 403 nxt_int_t rc; 404 nxt_runtime_t *rt; 405 nxt_conf_value_t *conf; 406 407 conf = nxt_controller_conf.root; 408 409 if (conf != NULL) { 410 rc = nxt_controller_conf_send(task, nxt_controller_conf.pool, conf, 411 nxt_controller_conf_init_handler, NULL); 412 413 if (nxt_fast_path(rc == NXT_OK)) { 414 nxt_controller_waiting_init_conf = 1; 415 416 return; 417 } 418 419 nxt_mp_destroy(nxt_controller_conf.pool); 420 421 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 422 nxt_abort(); 423 } 424 } 425 426 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 427 nxt_abort(); 428 } 429 430 rt = task->thread->runtime; 431 432 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) { 433 nxt_abort(); 434 } 435 436 nxt_controller_listening = 1; 437 438 nxt_controller_flush_requests(task); 439 } 440 441 442 static void 443 nxt_controller_router_ready_handler(nxt_task_t *task, 444 nxt_port_recv_msg_t *msg) 445 { 446 nxt_port_t *router_port; 447 nxt_runtime_t *rt; 448 449 rt = task->thread->runtime; 450 451 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 452 453 nxt_controller_router_ready = 1; 454 455 if (router_port != NULL) { 456 nxt_controller_send_current_conf(task); 457 } 458 } 459 460 461 static void 462 nxt_controller_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 463 { 464 nxt_pid_t pid; 465 nxt_process_t *process; 466 nxt_runtime_t *rt; 467 468 rt = task->thread->runtime; 469 470 nxt_assert(nxt_buf_used_size(msg->buf) == sizeof(pid)); 471 472 nxt_memcpy(&pid, msg->buf->mem.pos, sizeof(pid)); 473 474 process = nxt_runtime_process_find(rt, pid); 475 if (process != NULL && nxt_process_type(process) == NXT_PROCESS_ROUTER) { 476 nxt_controller_router_ready = 0; 477 } 478 479 nxt_port_remove_pid_handler(task, msg); 480 } 481 482 483 static nxt_int_t 484 nxt_controller_conf_default(void) 485 { 486 nxt_mp_t *mp; 487 nxt_conf_value_t *conf; 488 489 static const nxt_str_t json 490 = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); 491 492 mp = nxt_mp_create(1024, 128, 256, 32); 493 494 if (nxt_slow_path(mp == NULL)) { 495 return NXT_ERROR; 496 } 497 498 conf = nxt_conf_json_parse_str(mp, &json); 499 500 if (nxt_slow_path(conf == NULL)) { 501 return NXT_ERROR; 502 } 503 504 nxt_controller_conf.root = conf; 505 nxt_controller_conf.pool = mp; 506 507 return NXT_OK; 508 } 509 510 511 static void 512 nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 513 void *data) 514 { 515 nxt_runtime_t *rt; 516 517 nxt_controller_waiting_init_conf = 0; 518 519 if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) { 520 nxt_alert(task, "failed to apply previous configuration"); 521 522 nxt_mp_destroy(nxt_controller_conf.pool); 523 524 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 525 nxt_abort(); 526 } 527 } 528 529 if (nxt_controller_listening == 0) { 530 rt = task->thread->runtime; 531 532 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) 533 == NULL)) 534 { 535 nxt_abort(); 536 } 537 538 nxt_controller_listening = 1; 539 } 540 541 nxt_controller_flush_requests(task); 542 } 543 544 545 static void 546 nxt_controller_flush_requests(nxt_task_t *task) 547 { 548 nxt_queue_t queue; 549 nxt_controller_request_t *req; 550 551 nxt_queue_init(&queue); 552 nxt_queue_add(&queue, &nxt_controller_waiting_requests); 553 554 nxt_queue_init(&nxt_controller_waiting_requests); 555 556 nxt_queue_each(req, &queue, nxt_controller_request_t, link) { 557 nxt_controller_process_request(task, req); 558 } nxt_queue_loop; 559 } 560 561 562 static nxt_int_t 563 nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf, 564 nxt_port_rpc_handler_t handler, void *data) 565 { 566 void *mem; 567 u_char *end; 568 size_t size; 569 uint32_t stream; 570 nxt_fd_t fd; 571 nxt_int_t rc; 572 nxt_buf_t *b; 573 nxt_port_t *router_port, *controller_port; 574 nxt_runtime_t *rt; 575 576 rt = task->thread->runtime; 577 578 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 579 580 nxt_assert(router_port != NULL); 581 nxt_assert(nxt_controller_router_ready); 582 583 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 584 585 size = nxt_conf_json_length(conf, NULL); 586 587 b = nxt_buf_mem_alloc(mp, sizeof(size_t), 0); 588 if (nxt_slow_path(b == NULL)) { 589 return NXT_ERROR; 590 } 591 592 fd = nxt_shm_open(task, size); 593 if (nxt_slow_path(fd == -1)) { 594 return NXT_ERROR; 595 } 596 597 mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 598 if (nxt_slow_path(mem == MAP_FAILED)) { 599 goto fail; 600 } 601 602 end = nxt_conf_json_print(mem, conf, NULL); 603 604 nxt_mem_munmap(mem, size); 605 606 size = end - (u_char *) mem; 607 608 b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t)); 609 610 stream = nxt_port_rpc_register_handler(task, controller_port, 611 handler, handler, 612 router_port->pid, data); 613 if (nxt_slow_path(stream == 0)) { 614 goto fail; 615 } 616 617 rc = nxt_port_socket_write(task, router_port, 618 NXT_PORT_MSG_DATA_LAST | NXT_PORT_MSG_CLOSE_FD, 619 fd, stream, controller_port->id, b); 620 621 if (nxt_slow_path(rc != NXT_OK)) { 622 nxt_port_rpc_cancel(task, controller_port, stream); 623 624 goto fail; 625 } 626 627 return NXT_OK; 628 629 fail: 630 631 nxt_fd_close(fd); 632 633 return NXT_ERROR; 634 } 635 636 637 nxt_int_t 638 nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) 639 { 640 nxt_listen_socket_t *ls; 641 642 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t)); 643 if (ls == NULL) { 644 return NXT_ERROR; 645 } 646 647 ls->sockaddr = rt->controller_listen; 648 649 nxt_listen_socket_remote_size(ls); 650 651 ls->socket = -1; 652 ls->backlog = NXT_LISTEN_BACKLOG; 653 ls->read_after_accept = 1; 654 ls->flags = NXT_NONBLOCK; 655 656 #if 0 657 /* STUB */ 658 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t)); 659 if (wq == NULL) { 660 return NXT_ERROR; 661 } 662 nxt_work_queue_name(wq, "listen"); 663 /**/ 664 665 ls->work_queue = wq; 666 #endif 667 ls->handler = nxt_controller_conn_init; 668 669 if (nxt_listen_socket_create(task, rt->mem_pool, ls) != NXT_OK) { 670 return NXT_ERROR; 671 } 672 673 rt->controller_socket = ls; 674 675 return NXT_OK; 676 } 677 678 679 static void 680 nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) 681 { 682 nxt_buf_t *b; 683 nxt_conn_t *c; 684 nxt_event_engine_t *engine; 685 nxt_controller_request_t *r; 686 687 c = obj; 688 689 nxt_debug(task, "controller conn init fd:%d", c->socket.fd); 690 691 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t)); 692 if (nxt_slow_path(r == NULL)) { 693 nxt_controller_conn_free(task, c, NULL); 694 return; 695 } 696 697 r->conn = c; 698 699 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool) 700 != NXT_OK)) 701 { 702 nxt_controller_conn_free(task, c, NULL); 703 return; 704 } 705 706 r->parser.encoded_slashes = 1; 707 708 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); 709 if (nxt_slow_path(b == NULL)) { 710 nxt_controller_conn_free(task, c, NULL); 711 return; 712 } 713 714 c->read = b; 715 c->socket.data = r; 716 c->socket.read_ready = 1; 717 c->read_state = &nxt_controller_conn_read_state; 718 719 engine = task->thread->engine; 720 c->read_work_queue = &engine->read_work_queue; 721 c->write_work_queue = &engine->write_work_queue; 722 723 nxt_conn_read(engine, c); 724 } 725 726 727 static const nxt_event_conn_state_t nxt_controller_conn_read_state 728 nxt_aligned(64) = 729 { 730 .ready_handler = nxt_controller_conn_read, 731 .close_handler = nxt_controller_conn_close, 732 .error_handler = nxt_controller_conn_read_error, 733 734 .timer_handler = nxt_controller_conn_read_timeout, 735 .timer_value = nxt_controller_conn_timeout_value, 736 .timer_data = 300 * 1000, 737 }; 738 739 740 static void 741 nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) 742 { 743 size_t preread; 744 nxt_buf_t *b; 745 nxt_int_t rc; 746 nxt_conn_t *c; 747 nxt_controller_request_t *r; 748 749 c = obj; 750 r = data; 751 752 nxt_debug(task, "controller conn read"); 753 754 nxt_queue_remove(&c->link); 755 nxt_queue_self(&c->link); 756 757 b = c->read; 758 759 rc = nxt_http_parse_request(&r->parser, &b->mem); 760 761 if (nxt_slow_path(rc != NXT_DONE)) { 762 763 if (rc == NXT_AGAIN) { 764 if (nxt_buf_mem_free_size(&b->mem) == 0) { 765 nxt_log(task, NXT_LOG_ERR, "too long request headers"); 766 nxt_controller_conn_close(task, c, r); 767 return; 768 } 769 770 nxt_conn_read(task->thread->engine, c); 771 return; 772 } 773 774 /* rc == NXT_ERROR */ 775 776 nxt_log(task, NXT_LOG_ERR, "parsing error"); 777 778 nxt_controller_conn_close(task, c, r); 779 return; 780 } 781 782 rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash, 783 r); 784 785 if (nxt_slow_path(rc != NXT_OK)) { 786 nxt_controller_conn_close(task, c, r); 787 return; 788 } 789 790 preread = nxt_buf_mem_used_size(&b->mem); 791 792 nxt_debug(task, "controller request header parsing complete, " 793 "body length: %uz, preread: %uz", 794 r->length, preread); 795 796 if (preread >= r->length) { 797 nxt_controller_process_request(task, r); 798 return; 799 } 800 801 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) { 802 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0); 803 if (nxt_slow_path(b == NULL)) { 804 nxt_controller_conn_free(task, c, NULL); 805 return; 806 } 807 808 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread); 809 810 c->read = b; 811 } 812 813 c->read_state = &nxt_controller_conn_body_read_state; 814 815 nxt_conn_read(task->thread->engine, c); 816 } 817 818 819 static nxt_msec_t 820 nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 821 { 822 return (nxt_msec_t) data; 823 } 824 825 826 static void 827 nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) 828 { 829 nxt_conn_t *c; 830 831 c = obj; 832 833 nxt_debug(task, "controller conn read error"); 834 835 nxt_controller_conn_close(task, c, data); 836 } 837 838 839 static void 840 nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) 841 { 842 nxt_timer_t *timer; 843 nxt_conn_t *c; 844 845 timer = obj; 846 847 c = nxt_read_timer_conn(timer); 848 c->socket.timedout = 1; 849 c->socket.closed = 1; 850 851 nxt_debug(task, "controller conn read timeout"); 852 853 nxt_controller_conn_close(task, c, data); 854 } 855 856 857 static const nxt_event_conn_state_t nxt_controller_conn_body_read_state 858 nxt_aligned(64) = 859 { 860 .ready_handler = nxt_controller_conn_body_read, 861 .close_handler = nxt_controller_conn_close, 862 .error_handler = nxt_controller_conn_read_error, 863 864 .timer_handler = nxt_controller_conn_read_timeout, 865 .timer_value = nxt_controller_conn_timeout_value, 866 .timer_data = 60 * 1000, 867 .timer_autoreset = 1, 868 }; 869 870 871 static void 872 nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) 873 { 874 size_t read; 875 nxt_buf_t *b; 876 nxt_conn_t *c; 877 nxt_controller_request_t *r; 878 879 c = obj; 880 r = data; 881 b = c->read; 882 883 read = nxt_buf_mem_used_size(&b->mem); 884 885 nxt_debug(task, "controller conn body read: %uz of %uz", 886 read, r->length); 887 888 if (read >= r->length) { 889 nxt_controller_process_request(task, r); 890 return; 891 } 892 893 nxt_conn_read(task->thread->engine, c); 894 } 895 896 897 static const nxt_event_conn_state_t nxt_controller_conn_write_state 898 nxt_aligned(64) = 899 { 900 .ready_handler = nxt_controller_conn_write, 901 .error_handler = nxt_controller_conn_write_error, 902 903 .timer_handler = nxt_controller_conn_write_timeout, 904 .timer_value = nxt_controller_conn_timeout_value, 905 .timer_data = 60 * 1000, 906 .timer_autoreset = 1, 907 }; 908 909 910 static void 911 nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) 912 { 913 nxt_buf_t *b; 914 nxt_conn_t *c; 915 916 c = obj; 917 918 nxt_debug(task, "controller conn write"); 919 920 b = c->write; 921 922 if (b->mem.pos != b->mem.free) { 923 nxt_conn_write(task->thread->engine, c); 924 return; 925 } 926 927 nxt_debug(task, "controller conn write complete"); 928 929 nxt_controller_conn_close(task, c, data); 930 } 931 932 933 static void 934 nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) 935 { 936 nxt_conn_t *c; 937 938 c = obj; 939 940 nxt_debug(task, "controller conn write error"); 941 942 nxt_controller_conn_close(task, c, data); 943 } 944 945 946 static void 947 nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) 948 { 949 nxt_conn_t *c; 950 nxt_timer_t *timer; 951 952 timer = obj; 953 954 c = nxt_write_timer_conn(timer); 955 c->socket.timedout = 1; 956 c->socket.closed = 1; 957 958 nxt_debug(task, "controller conn write timeout"); 959 960 nxt_controller_conn_close(task, c, data); 961 } 962 963 964 static const nxt_event_conn_state_t nxt_controller_conn_close_state 965 nxt_aligned(64) = 966 { 967 .ready_handler = nxt_controller_conn_free, 968 }; 969 970 971 static void 972 nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) 973 { 974 nxt_conn_t *c; 975 976 c = obj; 977 978 nxt_debug(task, "controller conn close"); 979 980 nxt_queue_remove(&c->link); 981 982 c->write_state = &nxt_controller_conn_close_state; 983 984 nxt_conn_close(task->thread->engine, c); 985 } 986 987 988 static void 989 nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) 990 { 991 nxt_conn_t *c; 992 993 c = obj; 994 995 nxt_debug(task, "controller conn free"); 996 997 nxt_sockaddr_cache_free(task->thread->engine, c); 998 999 nxt_conn_free(task, c); 1000 } 1001 1002 1003 static nxt_int_t 1004 nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, 1005 uintptr_t data) 1006 { 1007 off_t length; 1008 nxt_controller_request_t *r; 1009 1010 r = ctx; 1011 1012 length = nxt_off_t_parse(field->value, field->value_length); 1013 1014 if (nxt_fast_path(length >= 0)) { 1015 1016 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) { 1017 nxt_log_error(NXT_LOG_ERR, &r->conn->log, 1018 "Content-Length is too big"); 1019 return NXT_ERROR; 1020 } 1021 1022 r->length = length; 1023 return NXT_OK; 1024 } 1025 1026 nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid"); 1027 1028 return NXT_ERROR; 1029 } 1030 1031 1032 static void 1033 nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) 1034 { 1035 uint32_t i, count; 1036 nxt_str_t path; 1037 nxt_conn_t *c; 1038 nxt_conf_value_t *value; 1039 nxt_controller_response_t resp; 1040 #if (NXT_TLS) 1041 nxt_conf_value_t *certs; 1042 1043 static nxt_str_t certificates = nxt_string("certificates"); 1044 #endif 1045 static nxt_str_t config = nxt_string("config"); 1046 static nxt_str_t status = nxt_string("status"); 1047 1048 c = req->conn; 1049 path = req->parser.path; 1050 1051 if (path.length > 1 && path.start[path.length - 1] == '/') { 1052 path.length--; 1053 } 1054 1055 if (nxt_str_start(&path, "/config", 7) 1056 && (path.length == 7 || path.start[7] == '/')) 1057 { 1058 if (path.length == 7) { 1059 path.length = 1; 1060 1061 } else { 1062 path.length -= 7; 1063 path.start += 7; 1064 } 1065 1066 nxt_controller_process_config(task, req, &path); 1067 return; 1068 } 1069 1070 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1071 1072 if (nxt_str_start(&path, "/status", 7) 1073 && (path.length == 7 || path.start[7] == '/')) 1074 { 1075 if (!nxt_str_eq(&req->parser.method, "GET", 3)) { 1076 goto invalid_method; 1077 } 1078 1079 if (nxt_controller_status == NULL) { 1080 nxt_controller_process_status(task, req); 1081 return; 1082 } 1083 1084 if (path.length == 7) { 1085 path.length = 1; 1086 1087 } else { 1088 path.length -= 7; 1089 path.start += 7; 1090 } 1091 1092 nxt_controller_status_response(task, req, &path); 1093 return; 1094 } 1095 1096 #if (NXT_TLS) 1097 1098 if (nxt_str_start(&path, "/certificates", 13) 1099 && (path.length == 13 || path.start[13] == '/')) 1100 { 1101 if (path.length == 13) { 1102 path.length = 1; 1103 1104 } else { 1105 path.length -= 13; 1106 path.start += 13; 1107 } 1108 1109 nxt_controller_process_cert(task, req, &path); 1110 return; 1111 } 1112 1113 #endif 1114 1115 if (nxt_str_start(&path, "/control/", 9)) { 1116 path.length -= 9; 1117 path.start += 9; 1118 1119 nxt_controller_process_control(task, req, &path); 1120 return; 1121 } 1122 1123 if (path.length == 1 && path.start[0] == '/') { 1124 1125 if (!nxt_str_eq(&req->parser.method, "GET", 3)) { 1126 goto invalid_method; 1127 } 1128 1129 if (nxt_controller_status == NULL) { 1130 nxt_controller_process_status(task, req); 1131 return; 1132 } 1133 1134 count = 2; 1135 #if (NXT_TLS) 1136 count++; 1137 #endif 1138 1139 value = nxt_conf_create_object(c->mem_pool, count); 1140 if (nxt_slow_path(value == NULL)) { 1141 goto alloc_fail; 1142 } 1143 1144 i = 0; 1145 1146 #if (NXT_TLS) 1147 certs = nxt_cert_info_get_all(c->mem_pool); 1148 if (nxt_slow_path(certs == NULL)) { 1149 goto alloc_fail; 1150 } 1151 1152 nxt_conf_set_member(value, &certificates, certs, i++); 1153 #endif 1154 1155 nxt_conf_set_member(value, &config, nxt_controller_conf.root, i++); 1156 nxt_conf_set_member(value, &status, nxt_controller_status, i); 1157 1158 resp.status = 200; 1159 resp.conf = value; 1160 1161 nxt_controller_response(task, req, &resp); 1162 return; 1163 } 1164 1165 resp.status = 404; 1166 resp.title = (u_char *) "Value doesn't exist."; 1167 resp.offset = -1; 1168 1169 nxt_controller_response(task, req, &resp); 1170 return; 1171 1172 invalid_method: 1173 1174 resp.status = 405; 1175 resp.title = (u_char *) "Invalid method."; 1176 resp.offset = -1; 1177 1178 nxt_controller_response(task, req, &resp); 1179 return; 1180 1181 alloc_fail: 1182 1183 resp.status = 500; 1184 resp.title = (u_char *) "Memory allocation failed."; 1185 resp.offset = -1; 1186 1187 nxt_controller_response(task, req, &resp); 1188 return; 1189 } 1190 1191 1192 static void 1193 nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, 1194 nxt_str_t *path) 1195 { 1196 nxt_mp_t *mp; 1197 nxt_int_t rc; 1198 nxt_conn_t *c; 1199 nxt_bool_t post; 1200 nxt_buf_mem_t *mbuf; 1201 nxt_conf_op_t *ops; 1202 nxt_conf_value_t *value; 1203 nxt_conf_validation_t vldt; 1204 nxt_conf_json_error_t error; 1205 nxt_controller_response_t resp; 1206 1207 static const nxt_str_t empty_obj = nxt_string("{}"); 1208 1209 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1210 1211 c = req->conn; 1212 1213 if (nxt_str_eq(&req->parser.method, "GET", 3)) { 1214 1215 value = nxt_conf_get_path(nxt_controller_conf.root, path); 1216 1217 if (value == NULL) { 1218 goto not_found; 1219 } 1220 1221 resp.status = 200; 1222 resp.conf = value; 1223 1224 nxt_controller_response(task, req, &resp); 1225 return; 1226 } 1227 1228 if (nxt_str_eq(&req->parser.method, "POST", 4)) { 1229 if (path->length == 1) { 1230 goto not_allowed; 1231 } 1232 1233 post = 1; 1234 1235 } else { 1236 post = 0; 1237 } 1238 1239 if (post || nxt_str_eq(&req->parser.method, "PUT", 3)) { 1240 1241 if (nxt_controller_check_postpone_request(task)) { 1242 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 1243 return; 1244 } 1245 1246 mp = nxt_mp_create(1024, 128, 256, 32); 1247 1248 if (nxt_slow_path(mp == NULL)) { 1249 goto alloc_fail; 1250 } 1251 1252 mbuf = &c->read->mem; 1253 1254 nxt_memzero(&error, sizeof(nxt_conf_json_error_t)); 1255 1256 /* Skip UTF-8 BOM. */ 1257 if (nxt_buf_mem_used_size(mbuf) >= 3 1258 && nxt_memcmp(mbuf->pos, "\xEF\xBB\xBF", 3) == 0) 1259 { 1260 mbuf->pos += 3; 1261 } 1262 1263 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error); 1264 1265 if (value == NULL) { 1266 nxt_mp_destroy(mp); 1267 1268 if (error.pos == NULL) { 1269 goto alloc_fail; 1270 } 1271 1272 resp.status = 400; 1273 resp.title = (u_char *) "Invalid JSON."; 1274 resp.detail.length = nxt_strlen(error.detail); 1275 resp.detail.start = error.detail; 1276 resp.offset = error.pos - mbuf->pos; 1277 1278 nxt_conf_json_position(mbuf->pos, error.pos, 1279 &resp.line, &resp.column); 1280 1281 nxt_controller_response(task, req, &resp); 1282 return; 1283 } 1284 1285 if (path->length != 1) { 1286 rc = nxt_conf_op_compile(c->mem_pool, &ops, 1287 nxt_controller_conf.root, 1288 path, value, post); 1289 1290 if (rc != NXT_CONF_OP_OK) { 1291 nxt_mp_destroy(mp); 1292 1293 switch (rc) { 1294 case NXT_CONF_OP_NOT_FOUND: 1295 goto not_found; 1296 1297 case NXT_CONF_OP_NOT_ALLOWED: 1298 goto not_allowed; 1299 } 1300 1301 /* rc == NXT_CONF_OP_ERROR */ 1302 goto alloc_fail; 1303 } 1304 1305 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 1306 1307 if (nxt_slow_path(value == NULL)) { 1308 nxt_mp_destroy(mp); 1309 goto alloc_fail; 1310 } 1311 } 1312 1313 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 1314 1315 vldt.conf = value; 1316 vldt.pool = c->mem_pool; 1317 vldt.conf_pool = mp; 1318 vldt.ver = NXT_VERNUM; 1319 1320 rc = nxt_conf_validate(&vldt); 1321 1322 if (nxt_slow_path(rc != NXT_OK)) { 1323 nxt_mp_destroy(mp); 1324 1325 if (rc == NXT_DECLINED) { 1326 resp.detail = vldt.error; 1327 goto invalid_conf; 1328 } 1329 1330 /* rc == NXT_ERROR */ 1331 goto alloc_fail; 1332 } 1333 1334 rc = nxt_controller_conf_send(task, mp, value, 1335 nxt_controller_conf_handler, req); 1336 1337 if (nxt_slow_path(rc != NXT_OK)) { 1338 nxt_mp_destroy(mp); 1339 1340 /* rc == NXT_ERROR */ 1341 goto alloc_fail; 1342 } 1343 1344 req->conf.root = value; 1345 req->conf.pool = mp; 1346 1347 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 1348 1349 return; 1350 } 1351 1352 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { 1353 1354 if (nxt_controller_check_postpone_request(task)) { 1355 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 1356 return; 1357 } 1358 1359 if (path->length == 1) { 1360 mp = nxt_mp_create(1024, 128, 256, 32); 1361 1362 if (nxt_slow_path(mp == NULL)) { 1363 goto alloc_fail; 1364 } 1365 1366 value = nxt_conf_json_parse_str(mp, &empty_obj); 1367 1368 } else { 1369 rc = nxt_conf_op_compile(c->mem_pool, &ops, 1370 nxt_controller_conf.root, 1371 path, NULL, 0); 1372 1373 if (rc != NXT_OK) { 1374 if (rc == NXT_CONF_OP_NOT_FOUND) { 1375 goto not_found; 1376 } 1377 1378 /* rc == NXT_CONF_OP_ERROR */ 1379 goto alloc_fail; 1380 } 1381 1382 mp = nxt_mp_create(1024, 128, 256, 32); 1383 1384 if (nxt_slow_path(mp == NULL)) { 1385 goto alloc_fail; 1386 } 1387 1388 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 1389 } 1390 1391 if (nxt_slow_path(value == NULL)) { 1392 nxt_mp_destroy(mp); 1393 goto alloc_fail; 1394 } 1395 1396 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 1397 1398 vldt.conf = value; 1399 vldt.pool = c->mem_pool; 1400 vldt.conf_pool = mp; 1401 vldt.ver = NXT_VERNUM; 1402 1403 rc = nxt_conf_validate(&vldt); 1404 1405 if (nxt_slow_path(rc != NXT_OK)) { 1406 nxt_mp_destroy(mp); 1407 1408 if (rc == NXT_DECLINED) { 1409 resp.detail = vldt.error; 1410 goto invalid_conf; 1411 } 1412 1413 /* rc == NXT_ERROR */ 1414 goto alloc_fail; 1415 } 1416 1417 rc = nxt_controller_conf_send(task, mp, value, 1418 nxt_controller_conf_handler, req); 1419 1420 if (nxt_slow_path(rc != NXT_OK)) { 1421 nxt_mp_destroy(mp); 1422 1423 /* rc == NXT_ERROR */ 1424 goto alloc_fail; 1425 } 1426 1427 req->conf.root = value; 1428 req->conf.pool = mp; 1429 1430 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 1431 1432 return; 1433 } 1434 1435 not_allowed: 1436 1437 resp.status = 405; 1438 resp.title = (u_char *) "Method isn't allowed."; 1439 resp.offset = -1; 1440 1441 nxt_controller_response(task, req, &resp); 1442 return; 1443 1444 not_found: 1445 1446 resp.status = 404; 1447 resp.title = (u_char *) "Value doesn't exist."; 1448 resp.offset = -1; 1449 1450 nxt_controller_response(task, req, &resp); 1451 return; 1452 1453 invalid_conf: 1454 1455 resp.status = 400; 1456 resp.title = (u_char *) "Invalid configuration."; 1457 resp.offset = -1; 1458 1459 nxt_controller_response(task, req, &resp); 1460 return; 1461 1462 alloc_fail: 1463 1464 resp.status = 500; 1465 resp.title = (u_char *) "Memory allocation failed."; 1466 resp.offset = -1; 1467 1468 nxt_controller_response(task, req, &resp); 1469 } 1470 1471 1472 static nxt_bool_t 1473 nxt_controller_check_postpone_request(nxt_task_t *task) 1474 { 1475 nxt_port_t *router_port; 1476 nxt_runtime_t *rt; 1477 1478 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests) 1479 || nxt_controller_waiting_init_conf 1480 || !nxt_controller_router_ready) 1481 { 1482 return 1; 1483 } 1484 1485 rt = task->thread->runtime; 1486 1487 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1488 1489 return (router_port == NULL); 1490 } 1491 1492 1493 static void 1494 nxt_controller_process_status(nxt_task_t *task, nxt_controller_request_t *req) 1495 { 1496 uint32_t stream; 1497 nxt_int_t rc; 1498 nxt_port_t *router_port, *controller_port; 1499 nxt_runtime_t *rt; 1500 nxt_controller_response_t resp; 1501 1502 if (nxt_controller_check_postpone_request(task)) { 1503 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 1504 return; 1505 } 1506 1507 rt = task->thread->runtime; 1508 1509 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1510 1511 nxt_assert(router_port != NULL); 1512 nxt_assert(nxt_controller_router_ready); 1513 1514 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 1515 1516 stream = nxt_port_rpc_register_handler(task, controller_port, 1517 nxt_controller_status_handler, 1518 nxt_controller_status_handler, 1519 router_port->pid, req); 1520 if (nxt_slow_path(stream == 0)) { 1521 goto fail; 1522 } 1523 1524 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_STATUS, 1525 -1, stream, controller_port->id, NULL); 1526 1527 if (nxt_slow_path(rc != NXT_OK)) { 1528 nxt_port_rpc_cancel(task, controller_port, stream); 1529 1530 goto fail; 1531 } 1532 1533 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 1534 return; 1535 1536 fail: 1537 1538 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1539 1540 resp.status = 500; 1541 resp.title = (u_char *) "Failed to get status."; 1542 resp.offset = -1; 1543 1544 nxt_controller_response(task, req, &resp); 1545 return; 1546 } 1547 1548 1549 static void 1550 nxt_controller_status_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1551 void *data) 1552 { 1553 nxt_conf_value_t *status; 1554 nxt_controller_request_t *req; 1555 nxt_controller_response_t resp; 1556 1557 nxt_debug(task, "controller status handler"); 1558 1559 req = data; 1560 1561 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 1562 status = nxt_status_get((nxt_status_report_t *) msg->buf->mem.pos, 1563 req->conn->mem_pool); 1564 } else { 1565 status = NULL; 1566 } 1567 1568 if (status == NULL) { 1569 nxt_queue_remove(&req->link); 1570 1571 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1572 1573 resp.status = 500; 1574 resp.title = (u_char *) "Failed to get status."; 1575 resp.offset = -1; 1576 1577 nxt_controller_response(task, req, &resp); 1578 } 1579 1580 nxt_controller_status = status; 1581 1582 nxt_controller_flush_requests(task); 1583 1584 nxt_controller_status = NULL; 1585 } 1586 1587 1588 static void 1589 nxt_controller_status_response(nxt_task_t *task, nxt_controller_request_t *req, 1590 nxt_str_t *path) 1591 { 1592 nxt_conf_value_t *status; 1593 nxt_controller_response_t resp; 1594 1595 status = nxt_conf_get_path(nxt_controller_status, path); 1596 1597 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1598 1599 if (status == NULL) { 1600 resp.status = 404; 1601 resp.title = (u_char *) "Invalid path."; 1602 resp.offset = -1; 1603 1604 nxt_controller_response(task, req, &resp); 1605 return; 1606 } 1607 1608 resp.status = 200; 1609 resp.conf = status; 1610 1611 nxt_controller_response(task, req, &resp); 1612 } 1613 1614 1615 #if (NXT_TLS) 1616 1617 static void 1618 nxt_controller_process_cert(nxt_task_t *task, 1619 nxt_controller_request_t *req, nxt_str_t *path) 1620 { 1621 u_char *p; 1622 nxt_str_t name; 1623 nxt_int_t ret; 1624 nxt_conn_t *c; 1625 nxt_cert_t *cert; 1626 nxt_conf_value_t *value; 1627 nxt_controller_response_t resp; 1628 1629 name.length = path->length - 1; 1630 name.start = path->start + 1; 1631 1632 p = nxt_memchr(name.start, '/', name.length); 1633 1634 if (p != NULL) { 1635 name.length = p - name.start; 1636 1637 path->length -= p - path->start; 1638 path->start = p; 1639 1640 } else { 1641 path = NULL; 1642 } 1643 1644 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1645 1646 c = req->conn; 1647 1648 if (nxt_str_eq(&req->parser.method, "GET", 3)) { 1649 1650 if (name.length != 0) { 1651 value = nxt_cert_info_get(&name); 1652 if (value == NULL) { 1653 goto cert_not_found; 1654 } 1655 1656 if (path != NULL) { 1657 value = nxt_conf_get_path(value, path); 1658 if (value == NULL) { 1659 goto not_found; 1660 } 1661 } 1662 1663 } else { 1664 value = nxt_cert_info_get_all(c->mem_pool); 1665 if (value == NULL) { 1666 goto alloc_fail; 1667 } 1668 } 1669 1670 resp.status = 200; 1671 resp.conf = value; 1672 1673 nxt_controller_response(task, req, &resp); 1674 return; 1675 } 1676 1677 if (name.length == 0 || path != NULL) { 1678 goto invalid_name; 1679 } 1680 1681 if (nxt_str_eq(&req->parser.method, "PUT", 3)) { 1682 value = nxt_cert_info_get(&name); 1683 if (value != NULL) { 1684 goto exists_cert; 1685 } 1686 1687 cert = nxt_cert_mem(task, &c->read->mem); 1688 if (cert == NULL) { 1689 goto invalid_cert; 1690 } 1691 1692 ret = nxt_cert_info_save(&name, cert); 1693 1694 nxt_cert_destroy(cert); 1695 1696 if (nxt_slow_path(ret != NXT_OK)) { 1697 goto alloc_fail; 1698 } 1699 1700 nxt_cert_store_get(task, &name, c->mem_pool, 1701 nxt_controller_process_cert_save, req); 1702 return; 1703 } 1704 1705 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { 1706 1707 if (nxt_controller_cert_in_use(&name)) { 1708 goto cert_in_use; 1709 } 1710 1711 if (nxt_cert_info_delete(&name) != NXT_OK) { 1712 goto cert_not_found; 1713 } 1714 1715 nxt_cert_store_delete(task, &name, c->mem_pool); 1716 1717 resp.status = 200; 1718 resp.title = (u_char *) "Certificate deleted."; 1719 1720 nxt_controller_response(task, req, &resp); 1721 return; 1722 } 1723 1724 resp.status = 405; 1725 resp.title = (u_char *) "Invalid method."; 1726 resp.offset = -1; 1727 1728 nxt_controller_response(task, req, &resp); 1729 return; 1730 1731 invalid_name: 1732 1733 resp.status = 400; 1734 resp.title = (u_char *) "Invalid certificate name."; 1735 resp.offset = -1; 1736 1737 nxt_controller_response(task, req, &resp); 1738 return; 1739 1740 invalid_cert: 1741 1742 resp.status = 400; 1743 resp.title = (u_char *) "Invalid certificate."; 1744 resp.offset = -1; 1745 1746 nxt_controller_response(task, req, &resp); 1747 return; 1748 1749 exists_cert: 1750 1751 resp.status = 400; 1752 resp.title = (u_char *) "Certificate already exists."; 1753 resp.offset = -1; 1754 1755 nxt_controller_response(task, req, &resp); 1756 return; 1757 1758 cert_in_use: 1759 1760 resp.status = 400; 1761 resp.title = (u_char *) "Certificate is used in the configuration."; 1762 resp.offset = -1; 1763 1764 nxt_controller_response(task, req, &resp); 1765 return; 1766 1767 cert_not_found: 1768 1769 resp.status = 404; 1770 resp.title = (u_char *) "Certificate doesn't exist."; 1771 resp.offset = -1; 1772 1773 nxt_controller_response(task, req, &resp); 1774 return; 1775 1776 not_found: 1777 1778 resp.status = 404; 1779 resp.title = (u_char *) "Invalid path."; 1780 resp.offset = -1; 1781 1782 nxt_controller_response(task, req, &resp); 1783 return; 1784 1785 alloc_fail: 1786 1787 resp.status = 500; 1788 resp.title = (u_char *) "Memory allocation failed."; 1789 resp.offset = -1; 1790 1791 nxt_controller_response(task, req, &resp); 1792 return; 1793 } 1794 1795 1796 static void 1797 nxt_controller_process_cert_save(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1798 void *data) 1799 { 1800 nxt_conn_t *c; 1801 nxt_buf_mem_t *mbuf; 1802 nxt_controller_request_t *req; 1803 nxt_controller_response_t resp; 1804 1805 req = data; 1806 1807 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1808 1809 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 1810 resp.status = 500; 1811 resp.title = (u_char *) "Failed to store certificate."; 1812 1813 nxt_controller_response(task, req, &resp); 1814 return; 1815 } 1816 1817 c = req->conn; 1818 1819 mbuf = &c->read->mem; 1820 1821 nxt_fd_write(msg->fd[0], mbuf->pos, nxt_buf_mem_used_size(mbuf)); 1822 1823 nxt_fd_close(msg->fd[0]); 1824 1825 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1826 1827 resp.status = 200; 1828 resp.title = (u_char *) "Certificate chain uploaded."; 1829 1830 nxt_controller_response(task, req, &resp); 1831 } 1832 1833 1834 static nxt_bool_t 1835 nxt_controller_cert_in_use(nxt_str_t *name) 1836 { 1837 uint32_t next; 1838 nxt_str_t str; 1839 nxt_conf_value_t *listeners, *listener, *value; 1840 1841 static nxt_str_t listeners_path = nxt_string("/listeners"); 1842 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1843 1844 listeners = nxt_conf_get_path(nxt_controller_conf.root, &listeners_path); 1845 1846 if (listeners != NULL) { 1847 next = 0; 1848 1849 for ( ;; ) { 1850 listener = nxt_conf_next_object_member(listeners, &str, &next); 1851 if (listener == NULL) { 1852 break; 1853 } 1854 1855 value = nxt_conf_get_path(listener, &certificate_path); 1856 if (value == NULL) { 1857 continue; 1858 } 1859 1860 nxt_conf_get_string(value, &str); 1861 1862 if (nxt_strstr_eq(&str, name)) { 1863 return 1; 1864 } 1865 } 1866 } 1867 1868 return 0; 1869 } 1870 1871 #endif 1872 1873 1874 static void 1875 nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1876 void *data) 1877 { 1878 nxt_controller_request_t *req; 1879 nxt_controller_response_t resp; 1880 1881 req = data; 1882 1883 nxt_debug(task, "controller conf ready: %*s", 1884 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 1885 1886 nxt_queue_remove(&req->link); 1887 1888 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1889 1890 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 1891 nxt_mp_destroy(nxt_controller_conf.pool); 1892 1893 nxt_controller_conf = req->conf; 1894 1895 nxt_controller_conf_store(task, req->conf.root); 1896 1897 resp.status = 200; 1898 resp.title = (u_char *) "Reconfiguration done."; 1899 1900 } else { 1901 nxt_mp_destroy(req->conf.pool); 1902 1903 resp.status = 500; 1904 resp.title = (u_char *) "Failed to apply new configuration."; 1905 resp.offset = -1; 1906 } 1907 1908 nxt_controller_response(task, req, &resp); 1909 1910 nxt_controller_flush_requests(task); 1911 } 1912 1913 1914 static void 1915 nxt_controller_process_control(nxt_task_t *task, 1916 nxt_controller_request_t *req, nxt_str_t *path) 1917 { 1918 uint32_t stream; 1919 nxt_buf_t *b; 1920 nxt_int_t rc; 1921 nxt_port_t *router_port, *controller_port; 1922 nxt_runtime_t *rt; 1923 nxt_conf_value_t *value; 1924 nxt_controller_response_t resp; 1925 1926 static nxt_str_t applications = nxt_string("applications"); 1927 1928 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1929 1930 if (!nxt_str_eq(&req->parser.method, "GET", 3)) { 1931 goto not_allowed; 1932 } 1933 1934 if (!nxt_str_start(path, "applications/", 13) 1935 || nxt_memcmp(path->start + path->length - 8, "/restart", 8) != 0) 1936 { 1937 goto not_found; 1938 } 1939 1940 path->start += 13; 1941 path->length -= 13 + 8; 1942 1943 if (nxt_controller_check_postpone_request(task)) { 1944 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 1945 return; 1946 } 1947 1948 value = nxt_controller_conf.root; 1949 if (value == NULL) { 1950 goto not_found; 1951 } 1952 1953 value = nxt_conf_get_object_member(value, &applications, NULL); 1954 if (value == NULL) { 1955 goto not_found; 1956 } 1957 1958 value = nxt_conf_get_object_member(value, path, NULL); 1959 if (value == NULL) { 1960 goto not_found; 1961 } 1962 1963 b = nxt_buf_mem_alloc(req->conn->mem_pool, path->length, 0); 1964 if (nxt_slow_path(b == NULL)) { 1965 goto alloc_fail; 1966 } 1967 1968 b->mem.free = nxt_cpymem(b->mem.pos, path->start, path->length); 1969 1970 rt = task->thread->runtime; 1971 1972 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 1973 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1974 1975 stream = nxt_port_rpc_register_handler(task, controller_port, 1976 nxt_controller_app_restart_handler, 1977 nxt_controller_app_restart_handler, 1978 router_port->pid, req); 1979 if (nxt_slow_path(stream == 0)) { 1980 goto alloc_fail; 1981 } 1982 1983 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_APP_RESTART, 1984 -1, stream, 0, b); 1985 if (nxt_slow_path(rc != NXT_OK)) { 1986 nxt_port_rpc_cancel(task, controller_port, stream); 1987 1988 goto fail; 1989 } 1990 1991 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 1992 1993 return; 1994 1995 not_allowed: 1996 1997 resp.status = 405; 1998 resp.title = (u_char *) "Method isn't allowed."; 1999 resp.offset = -1; 2000 2001 nxt_controller_response(task, req, &resp); 2002 return; 2003 2004 not_found: 2005 2006 resp.status = 404; 2007 resp.title = (u_char *) "Value doesn't exist."; 2008 resp.offset = -1; 2009 2010 nxt_controller_response(task, req, &resp); 2011 return; 2012 2013 alloc_fail: 2014 2015 resp.status = 500; 2016 resp.title = (u_char *) "Memory allocation failed."; 2017 resp.offset = -1; 2018 2019 nxt_controller_response(task, req, &resp); 2020 return; 2021 2022 fail: 2023 2024 resp.status = 500; 2025 resp.title = (u_char *) "Send restart failed."; 2026 resp.offset = -1; 2027 2028 nxt_controller_response(task, req, &resp); 2029 } 2030 2031 2032 static void 2033 nxt_controller_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2034 void *data) 2035 { 2036 nxt_controller_request_t *req; 2037 nxt_controller_response_t resp; 2038 2039 req = data; 2040 2041 nxt_debug(task, "controller app restart handler"); 2042 2043 nxt_queue_remove(&req->link); 2044 2045 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 2046 2047 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 2048 resp.status = 200; 2049 resp.title = (u_char *) "Ok"; 2050 2051 } else { 2052 resp.status = 500; 2053 resp.title = (u_char *) "Failed to restart app."; 2054 resp.offset = -1; 2055 } 2056 2057 nxt_controller_response(task, req, &resp); 2058 2059 nxt_controller_flush_requests(task); 2060 } 2061 2062 2063 static void 2064 nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) 2065 { 2066 void *mem; 2067 u_char *end; 2068 size_t size; 2069 nxt_fd_t fd; 2070 nxt_buf_t *b; 2071 nxt_port_t *main_port; 2072 nxt_runtime_t *rt; 2073 2074 rt = task->thread->runtime; 2075 2076 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2077 2078 size = nxt_conf_json_length(conf, NULL); 2079 2080 fd = nxt_shm_open(task, size); 2081 if (nxt_slow_path(fd == -1)) { 2082 return; 2083 } 2084 2085 mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2086 if (nxt_slow_path(mem == MAP_FAILED)) { 2087 goto fail; 2088 } 2089 2090 end = nxt_conf_json_print(mem, conf, NULL); 2091 2092 nxt_mem_munmap(mem, size); 2093 2094 size = end - (u_char *) mem; 2095 2096 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, sizeof(size_t), 0); 2097 if (nxt_slow_path(b == NULL)) { 2098 goto fail; 2099 } 2100 2101 b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t)); 2102 2103 (void) nxt_port_socket_write(task, main_port, 2104 NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_CLOSE_FD, 2105 fd, 0, -1, b); 2106 2107 return; 2108 2109 fail: 2110 2111 nxt_fd_close(fd); 2112 } 2113 2114 2115 static void 2116 nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, 2117 nxt_controller_response_t *resp) 2118 { 2119 size_t size; 2120 nxt_str_t status_line, str; 2121 nxt_buf_t *b, *body; 2122 nxt_conn_t *c; 2123 nxt_uint_t n; 2124 nxt_conf_value_t *value, *location; 2125 nxt_conf_json_pretty_t pretty; 2126 2127 static nxt_str_t success_str = nxt_string("success"); 2128 static nxt_str_t error_str = nxt_string("error"); 2129 static nxt_str_t detail_str = nxt_string("detail"); 2130 static nxt_str_t location_str = nxt_string("location"); 2131 static nxt_str_t offset_str = nxt_string("offset"); 2132 static nxt_str_t line_str = nxt_string("line"); 2133 static nxt_str_t column_str = nxt_string("column"); 2134 2135 static nxt_time_string_t date_cache = { 2136 (nxt_atomic_uint_t) -1, 2137 nxt_controller_date, 2138 "%s, %02d %s %4d %02d:%02d:%02d GMT", 2139 nxt_length("Wed, 31 Dec 1986 16:40:00 GMT"), 2140 NXT_THREAD_TIME_GMT, 2141 NXT_THREAD_TIME_SEC, 2142 }; 2143 2144 switch (resp->status) { 2145 2146 case 200: 2147 nxt_str_set(&status_line, "200 OK"); 2148 break; 2149 2150 case 400: 2151 nxt_str_set(&status_line, "400 Bad Request"); 2152 break; 2153 2154 case 404: 2155 nxt_str_set(&status_line, "404 Not Found"); 2156 break; 2157 2158 case 405: 2159 nxt_str_set(&status_line, "405 Method Not Allowed"); 2160 break; 2161 2162 default: 2163 nxt_str_set(&status_line, "500 Internal Server Error"); 2164 break; 2165 } 2166 2167 c = req->conn; 2168 value = resp->conf; 2169 2170 if (value == NULL) { 2171 n = 1 2172 + (resp->detail.length != 0) 2173 + (resp->status >= 400 && resp->offset != -1); 2174 2175 value = nxt_conf_create_object(c->mem_pool, n); 2176 2177 if (nxt_slow_path(value == NULL)) { 2178 nxt_controller_conn_close(task, c, req); 2179 return; 2180 } 2181 2182 str.length = nxt_strlen(resp->title); 2183 str.start = resp->title; 2184 2185 if (resp->status < 400) { 2186 nxt_conf_set_member_string(value, &success_str, &str, 0); 2187 2188 } else { 2189 nxt_conf_set_member_string(value, &error_str, &str, 0); 2190 } 2191 2192 n = 0; 2193 2194 if (resp->detail.length != 0) { 2195 n++; 2196 2197 nxt_conf_set_member_string(value, &detail_str, &resp->detail, n); 2198 } 2199 2200 if (resp->status >= 400 && resp->offset != -1) { 2201 n++; 2202 2203 location = nxt_conf_create_object(c->mem_pool, 2204 resp->line != 0 ? 3 : 1); 2205 2206 nxt_conf_set_member(value, &location_str, location, n); 2207 2208 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0); 2209 2210 if (resp->line != 0) { 2211 nxt_conf_set_member_integer(location, &line_str, 2212 resp->line, 1); 2213 2214 nxt_conf_set_member_integer(location, &column_str, 2215 resp->column, 2); 2216 } 2217 } 2218 } 2219 2220 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 2221 2222 size = nxt_conf_json_length(value, &pretty) + 2; 2223 2224 body = nxt_buf_mem_alloc(c->mem_pool, size, 0); 2225 if (nxt_slow_path(body == NULL)) { 2226 nxt_controller_conn_close(task, c, req); 2227 return; 2228 } 2229 2230 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 2231 2232 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty); 2233 2234 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2); 2235 2236 size = nxt_length("HTTP/1.1 " "\r\n") + status_line.length 2237 + nxt_length("Server: " NXT_SERVER "\r\n") 2238 + nxt_length("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") 2239 + nxt_length("Content-Type: application/json\r\n") 2240 + nxt_length("Content-Length: " "\r\n") + NXT_SIZE_T_LEN 2241 + nxt_length("Connection: close\r\n") 2242 + nxt_length("\r\n"); 2243 2244 b = nxt_buf_mem_alloc(c->mem_pool, size, 0); 2245 if (nxt_slow_path(b == NULL)) { 2246 nxt_controller_conn_close(task, c, req); 2247 return; 2248 } 2249 2250 b->next = body; 2251 2252 nxt_str_set(&str, "HTTP/1.1 "); 2253 2254 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 2255 b->mem.free = nxt_cpymem(b->mem.free, status_line.start, 2256 status_line.length); 2257 2258 nxt_str_set(&str, "\r\n" 2259 "Server: " NXT_SERVER "\r\n" 2260 "Date: "); 2261 2262 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 2263 2264 b->mem.free = nxt_thread_time_string(task->thread, &date_cache, 2265 b->mem.free); 2266 2267 nxt_str_set(&str, "\r\n" 2268 "Content-Type: application/json\r\n" 2269 "Content-Length: "); 2270 2271 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 2272 2273 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz", 2274 nxt_buf_mem_used_size(&body->mem)); 2275 2276 nxt_str_set(&str, "\r\n" 2277 "Connection: close\r\n" 2278 "\r\n"); 2279 2280 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 2281 2282 c->write = b; 2283 c->write_state = &nxt_controller_conn_write_state; 2284 2285 nxt_conn_write(task->thread->engine, c); 2286 } 2287 2288 2289 static u_char * 2290 nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 2291 size_t size, const char *format) 2292 { 2293 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", 2294 "Sat" }; 2295 2296 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 2297 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 2298 2299 return nxt_sprintf(buf, buf + size, format, 2300 week[tm->tm_wday], tm->tm_mday, 2301 month[tm->tm_mon], tm->tm_year + 1900, 2302 tm->tm_hour, tm->tm_min, tm->tm_sec); 2303 } 2304