1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_main.h> 9#include <nxt_runtime.h> 10#include <nxt_main_process.h> 11#include <nxt_conf.h> 12 13 14typedef struct { 15 nxt_conf_value_t *root; 16 nxt_mp_t *pool; 17} nxt_controller_conf_t; 18 19 20typedef struct { 21 nxt_http_request_parse_t parser; 22 size_t length; 23 nxt_controller_conf_t conf; 24 nxt_conn_t *conn; 25 nxt_queue_link_t link; 26} nxt_controller_request_t; 27 28 29typedef struct { 30 nxt_uint_t status; 31 nxt_conf_value_t *conf; 32 33 u_char *title; 34 nxt_str_t detail; 35 ssize_t offset; 36 nxt_uint_t line; 37 nxt_uint_t column; 38} nxt_controller_response_t; 39 40 41static void nxt_controller_process_new_port_handler(nxt_task_t *task, 42 nxt_port_recv_msg_t *msg); 43static void nxt_controller_send_current_conf(nxt_task_t *task); 44static void nxt_controller_router_ready_handler(nxt_task_t *task, 45 nxt_port_recv_msg_t *msg); 46static nxt_int_t nxt_controller_conf_default(void); 47static void nxt_controller_conf_init_handler(nxt_task_t *task, 48 nxt_port_recv_msg_t *msg, void *data); 49static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, 50 nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data); 51 52static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); 53static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); 54static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, 55 uintptr_t data); 56static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, 57 void *data); 58static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, 59 void *data); 60static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, 61 void *data); 62static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data); 63static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, 64 void *data); 65static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, 66 void *data); 67static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); 68static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); 69 70static nxt_int_t nxt_controller_request_content_length(void *ctx, 71 nxt_http_field_t *field, uintptr_t data); 72 73static void nxt_controller_process_request(nxt_task_t *task, 74 nxt_controller_request_t *req); 75static void nxt_controller_conf_handler(nxt_task_t *task, 76 nxt_port_recv_msg_t *msg, void *data); 77static void nxt_controller_conf_store(nxt_task_t *task, 78 nxt_conf_value_t *conf); 79static void nxt_controller_response(nxt_task_t *task, 80 nxt_controller_request_t *req, nxt_controller_response_t *resp); 81static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now, 82 struct tm *tm, size_t size, const char *format); 83 84 85static nxt_http_field_proc_t nxt_controller_request_fields[] = { 86 { nxt_string("Content-Length"), 87 &nxt_controller_request_content_length, 0 }, 88}; 89 90static nxt_lvlhsh_t nxt_controller_fields_hash; 91 92static nxt_uint_t nxt_controller_listening; 93static nxt_uint_t nxt_controller_router_ready; 94static nxt_controller_conf_t nxt_controller_conf; 95static nxt_queue_t nxt_controller_waiting_requests; 96 97 98static const nxt_event_conn_state_t nxt_controller_conn_read_state; 99static const nxt_event_conn_state_t nxt_controller_conn_body_read_state; 100static const nxt_event_conn_state_t nxt_controller_conn_write_state; 101static const nxt_event_conn_state_t nxt_controller_conn_close_state; 102 103 104nxt_port_handlers_t nxt_controller_process_port_handlers = { 105 .quit = nxt_worker_process_quit_handler, 106 .new_port = nxt_controller_process_new_port_handler, 107 .change_file = nxt_port_change_log_file_handler, 108 .mmap = nxt_port_mmap_handler, 109 .process_ready = nxt_controller_router_ready_handler, 110 .data = nxt_port_data_handler, 111 .remove_pid = nxt_port_remove_pid_handler, 112 .rpc_ready = nxt_port_rpc_handler, 113 .rpc_error = nxt_port_rpc_handler, 114}; 115 116 117nxt_int_t 118nxt_controller_start(nxt_task_t *task, void *data) 119{ 120 nxt_mp_t *mp; 121 nxt_int_t ret; 122 nxt_str_t *json; 123 nxt_runtime_t *rt; 124 nxt_conf_value_t *conf; 125 nxt_event_engine_t *engine; 126 nxt_conf_validation_t vldt; 127 128 rt = task->thread->runtime; 129 130 engine = task->thread->engine; 131 132 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 133 if (nxt_slow_path(engine->mem_pool == NULL)) { 134 return NXT_ERROR; 135 } 136 137 ret = nxt_http_fields_hash(&nxt_controller_fields_hash, rt->mem_pool, 138 nxt_controller_request_fields, 139 nxt_nitems(nxt_controller_request_fields)); 140 141 if (nxt_slow_path(ret != NXT_OK)) { 142 return NXT_ERROR; 143 } 144 145 nxt_queue_init(&nxt_controller_waiting_requests); 146 147 json = data; 148 149 if (json->length == 0) { 150 return NXT_OK; 151 } 152 153 mp = nxt_mp_create(1024, 128, 256, 32); 154 if (nxt_slow_path(mp == NULL)) { 155 return NXT_ERROR; 156 } 157 158 conf = nxt_conf_json_parse_str(mp, json); 159 nxt_free(json->start); 160 161 if (nxt_slow_path(conf == NULL)) { 162 nxt_alert(task, "failed to restore previous configuration: " 163 "file is corrupted or not enough memory"); 164 165 nxt_mp_destroy(mp); 166 return NXT_OK; 167 } 168 169 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 170 171 vldt.pool = nxt_mp_create(1024, 128, 256, 32); 172 if (nxt_slow_path(vldt.pool == NULL)) { 173 return NXT_ERROR; 174 } 175 176 vldt.conf = conf; 177 178 ret = nxt_conf_validate(&vldt); 179 180 if (nxt_slow_path(ret != NXT_OK)) { 181 182 if (ret == NXT_DECLINED) { 183 nxt_alert(task, "the previous configuration is invalid: %V", 184 &vldt.error); 185 186 nxt_mp_destroy(vldt.pool); 187 nxt_mp_destroy(mp); 188 189 return NXT_OK; 190 } 191 192 /* ret == NXT_ERROR */ 193 194 return NXT_ERROR; 195 } 196 197 nxt_mp_destroy(vldt.pool); 198 199 nxt_controller_conf.root = conf; 200 nxt_controller_conf.pool = mp; 201 202 return NXT_OK; 203} 204 205 206static void 207nxt_controller_process_new_port_handler(nxt_task_t *task, 208 nxt_port_recv_msg_t *msg) 209{ 210 nxt_port_new_port_handler(task, msg); 211 212 if (msg->u.new_port->type != NXT_PROCESS_ROUTER 213 || !nxt_controller_router_ready) 214 { 215 return; 216 } 217 218 nxt_controller_send_current_conf(task); 219} 220 221 222static void 223nxt_controller_send_current_conf(nxt_task_t *task) 224{ 225 nxt_int_t rc; 226 nxt_runtime_t *rt; 227 nxt_conf_value_t *conf; 228 229 conf = nxt_controller_conf.root; 230 231 if (conf != NULL) { 232 rc = nxt_controller_conf_send(task, conf, 233 nxt_controller_conf_init_handler, NULL); 234 235 if (nxt_fast_path(rc == NXT_OK)) { 236 return; 237 } 238 239 nxt_mp_destroy(nxt_controller_conf.pool); 240 241 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 242 nxt_abort(); 243 } 244 } 245 246 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 247 nxt_abort(); 248 } 249 250 rt = task->thread->runtime; 251 252 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) { 253 nxt_abort(); 254 } 255 256 nxt_controller_listening = 1; 257} 258 259 260static void 261nxt_controller_router_ready_handler(nxt_task_t *task, 262 nxt_port_recv_msg_t *msg) 263{ 264 nxt_port_t *router_port; 265 nxt_runtime_t *rt; 266 267 rt = task->thread->runtime; 268 269 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 270 271 nxt_controller_router_ready = 1; 272 273 if (router_port != NULL) { 274 nxt_controller_send_current_conf(task); 275 } 276} 277 278 279static nxt_int_t 280nxt_controller_conf_default(void) 281{ 282 nxt_mp_t *mp; 283 nxt_conf_value_t *conf; 284 285 static const nxt_str_t json 286 = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); 287 288 mp = nxt_mp_create(1024, 128, 256, 32); 289 290 if (nxt_slow_path(mp == NULL)) { 291 return NXT_ERROR; 292 } 293 294 conf = nxt_conf_json_parse_str(mp, &json); 295 296 if (nxt_slow_path(conf == NULL)) { 297 return NXT_ERROR; 298 } 299 300 nxt_controller_conf.root = conf; 301 nxt_controller_conf.pool = mp; 302 303 return NXT_OK; 304} 305 306 307static void 308nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 309 void *data) 310{ 311 nxt_runtime_t *rt; 312 313 if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) { 314 nxt_alert(task, "failed to apply previous configuration"); 315 316 nxt_mp_destroy(nxt_controller_conf.pool); 317 318 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 319 nxt_abort(); 320 } 321 } 322 323 if (nxt_controller_listening == 0) { 324 rt = task->thread->runtime; 325 326 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) 327 == NULL)) 328 { 329 nxt_abort(); 330 } 331 332 nxt_controller_listening = 1; 333 } 334} 335 336 337static nxt_int_t 338nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, 339 nxt_port_rpc_handler_t handler, void *data) 340{ 341 size_t size; 342 uint32_t stream; 343 nxt_int_t rc; 344 nxt_buf_t *b; 345 nxt_port_t *router_port, *controller_port; 346 nxt_runtime_t *rt; 347 348 rt = task->thread->runtime; 349 350 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 351 352 if (nxt_slow_path(router_port == NULL || !nxt_controller_router_ready)) { 353 return NXT_DECLINED; 354 } 355 356 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 357 358 size = nxt_conf_json_length(conf, NULL); 359 360 b = nxt_port_mmap_get_buf(task, router_port, size); 361 if (nxt_slow_path(b == NULL)) { 362 return NXT_ERROR; 363 } 364 365 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 366 367 stream = nxt_port_rpc_register_handler(task, controller_port, 368 handler, handler, 369 router_port->pid, data); 370 371 if (nxt_slow_path(stream == 0)) { 372 return NXT_ERROR; 373 } 374 375 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, 376 stream, controller_port->id, b); 377 378 if (nxt_slow_path(rc != NXT_OK)) { 379 nxt_port_rpc_cancel(task, controller_port, stream); 380 return NXT_ERROR; 381 } 382 383 return NXT_OK; 384} 385 386 387nxt_int_t 388nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) 389{ 390 nxt_sockaddr_t *sa; 391 nxt_listen_socket_t *ls; 392 393 sa = rt->controller_listen; 394 395 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t)); 396 if (ls == NULL) { 397 return NXT_ERROR; 398 } 399 400 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr, 401 sa->socklen, sa->length); 402 if (ls->sockaddr == NULL) { 403 return NXT_ERROR; 404 } 405 406 ls->sockaddr->type = sa->type; 407 nxt_sockaddr_text(ls->sockaddr); 408 409 nxt_listen_socket_remote_size(ls); 410 411 ls->socket = -1; 412 ls->backlog = NXT_LISTEN_BACKLOG; 413 ls->read_after_accept = 1; 414 ls->flags = NXT_NONBLOCK; 415 416#if 0 417 /* STUB */ 418 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t)); 419 if (wq == NULL) { 420 return NXT_ERROR; 421 } 422 nxt_work_queue_name(wq, "listen"); 423 /**/ 424 425 ls->work_queue = wq; 426#endif 427 ls->handler = nxt_controller_conn_init; 428 429 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { 430 return NXT_ERROR; 431 } 432 433 rt->controller_socket = ls; 434 435 return NXT_OK; 436} 437 438 439static void 440nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) 441{ 442 nxt_buf_t *b; 443 nxt_conn_t *c; 444 nxt_event_engine_t *engine; 445 nxt_controller_request_t *r; 446 447 c = obj; 448 449 nxt_debug(task, "controller conn init fd:%d", c->socket.fd); 450 451 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t)); 452 if (nxt_slow_path(r == NULL)) { 453 nxt_controller_conn_free(task, c, NULL); 454 return; 455 } 456 457 r->conn = c; 458 459 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool) 460 != NXT_OK)) 461 { 462 nxt_controller_conn_free(task, c, NULL); 463 return; 464 } 465 466 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); 467 if (nxt_slow_path(b == NULL)) { 468 nxt_controller_conn_free(task, c, NULL); 469 return; 470 } 471 472 c->read = b; 473 c->socket.data = r; 474 c->socket.read_ready = 1; 475 c->read_state = &nxt_controller_conn_read_state; 476 477 engine = task->thread->engine; 478 c->read_work_queue = &engine->read_work_queue; 479 c->write_work_queue = &engine->write_work_queue; 480 481 nxt_conn_read(engine, c); 482} 483 484 485static const nxt_event_conn_state_t nxt_controller_conn_read_state 486 nxt_aligned(64) = 487{ 488 .ready_handler = nxt_controller_conn_read, 489 .close_handler = nxt_controller_conn_close, 490 .error_handler = nxt_controller_conn_read_error, 491 492 .timer_handler = nxt_controller_conn_read_timeout, 493 .timer_value = nxt_controller_conn_timeout_value, 494 .timer_data = 60 * 1000, 495}; 496 497 498static void 499nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) 500{ 501 size_t preread; 502 nxt_buf_t *b; 503 nxt_int_t rc; 504 nxt_conn_t *c; 505 nxt_controller_request_t *r; 506 507 c = obj; 508 r = data; 509 510 nxt_debug(task, "controller conn read"); 511 512 nxt_queue_remove(&c->link); 513 nxt_queue_self(&c->link); 514 515 b = c->read; 516 517 rc = nxt_http_parse_request(&r->parser, &b->mem); 518 519 if (nxt_slow_path(rc != NXT_DONE)) { 520 521 if (rc == NXT_AGAIN) { 522 if (nxt_buf_mem_free_size(&b->mem) == 0) { 523 nxt_log(task, NXT_LOG_ERR, "too long request headers"); 524 nxt_controller_conn_close(task, c, r); 525 return; 526 } 527 528 nxt_conn_read(task->thread->engine, c); 529 return; 530 } 531 532 /* rc == NXT_ERROR */ 533 534 nxt_log(task, NXT_LOG_ERR, "parsing error"); 535 536 nxt_controller_conn_close(task, c, r); 537 return; 538 } 539 540 rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash, 541 r); 542 543 if (nxt_slow_path(rc != NXT_OK)) { 544 nxt_controller_conn_close(task, c, r); 545 return; 546 } 547 548 preread = nxt_buf_mem_used_size(&b->mem); 549 550 nxt_debug(task, "controller request header parsing complete, " 551 "body length: %uz, preread: %uz", 552 r->length, preread); 553 554 if (preread >= r->length) { 555 nxt_controller_process_request(task, r); 556 return; 557 } 558 559 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) { 560 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0); 561 if (nxt_slow_path(b == NULL)) { 562 nxt_controller_conn_free(task, c, NULL); 563 return; 564 } 565 566 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread); 567 568 c->read = b; 569 } 570 571 c->read_state = &nxt_controller_conn_body_read_state; 572 573 nxt_conn_read(task->thread->engine, c); 574} 575 576 577static nxt_msec_t 578nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 579{ 580 return (nxt_msec_t) data; 581} 582 583 584static void 585nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) 586{ 587 nxt_conn_t *c; 588 589 c = obj; 590 591 nxt_debug(task, "controller conn read error"); 592 593 nxt_controller_conn_close(task, c, data); 594} 595 596 597static void 598nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) 599{ 600 nxt_timer_t *timer; 601 nxt_conn_t *c; 602 603 timer = obj; 604 605 c = nxt_read_timer_conn(timer); 606 c->socket.timedout = 1; 607 c->socket.closed = 1; 608 609 nxt_debug(task, "controller conn read timeout"); 610 611 nxt_controller_conn_close(task, c, data); 612} 613 614 615static const nxt_event_conn_state_t nxt_controller_conn_body_read_state 616 nxt_aligned(64) = 617{ 618 .ready_handler = nxt_controller_conn_body_read, 619 .close_handler = nxt_controller_conn_close, 620 .error_handler = nxt_controller_conn_read_error, 621 622 .timer_handler = nxt_controller_conn_read_timeout, 623 .timer_value = nxt_controller_conn_timeout_value, 624 .timer_data = 60 * 1000, 625 .timer_autoreset = 1, 626}; 627 628 629static void 630nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) 631{ 632 size_t read; 633 nxt_buf_t *b; 634 nxt_conn_t *c; 635 nxt_controller_request_t *r; 636 637 c = obj; 638 r = data; 639 b = c->read; 640 641 read = nxt_buf_mem_used_size(&b->mem); 642 643 nxt_debug(task, "controller conn body read: %uz of %uz", 644 read, r->length); 645 646 if (read >= r->length) { 647 nxt_controller_process_request(task, r); 648 return; 649 } 650 651 nxt_conn_read(task->thread->engine, c); 652} 653 654 655static const nxt_event_conn_state_t nxt_controller_conn_write_state 656 nxt_aligned(64) = 657{ 658 .ready_handler = nxt_controller_conn_write, 659 .error_handler = nxt_controller_conn_write_error, 660 661 .timer_handler = nxt_controller_conn_write_timeout, 662 .timer_value = nxt_controller_conn_timeout_value, 663 .timer_data = 60 * 1000, 664 .timer_autoreset = 1, 665}; 666 667 668static void 669nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) 670{ 671 nxt_buf_t *b; 672 nxt_conn_t *c; 673 674 c = obj; 675 676 nxt_debug(task, "controller conn write"); 677 678 b = c->write; 679 680 if (b->mem.pos != b->mem.free) { 681 nxt_conn_write(task->thread->engine, c); 682 return; 683 } 684 685 nxt_debug(task, "controller conn write complete"); 686 687 nxt_controller_conn_close(task, c, data); 688} 689 690 691static void 692nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) 693{ 694 nxt_conn_t *c; 695 696 c = obj; 697 698 nxt_debug(task, "controller conn write error"); 699 700 nxt_controller_conn_close(task, c, data); 701} 702 703 704static void 705nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) 706{ 707 nxt_conn_t *c; 708 nxt_timer_t *timer; 709 710 timer = obj; 711 712 c = nxt_write_timer_conn(timer); 713 c->socket.timedout = 1; 714 c->socket.closed = 1; 715 716 nxt_debug(task, "controller conn write timeout"); 717 718 nxt_controller_conn_close(task, c, data); 719} 720 721 722static const nxt_event_conn_state_t nxt_controller_conn_close_state 723 nxt_aligned(64) = 724{ 725 .ready_handler = nxt_controller_conn_free, 726}; 727 728 729static void 730nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) 731{ 732 nxt_conn_t *c; 733 734 c = obj; 735 736 nxt_debug(task, "controller conn close"); 737 738 nxt_queue_remove(&c->link); 739 740 c->write_state = &nxt_controller_conn_close_state; 741 742 nxt_conn_close(task->thread->engine, c); 743} 744 745 746static void 747nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) 748{ 749 nxt_conn_t *c; 750 751 c = obj; 752 753 nxt_debug(task, "controller conn free"); 754 755 nxt_sockaddr_cache_free(task->thread->engine, c); 756 757 nxt_conn_free(task, c); 758} 759 760 761static nxt_int_t 762nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, 763 uintptr_t data) 764{ 765 off_t length; 766 nxt_controller_request_t *r; 767 768 r = ctx; 769 770 length = nxt_off_t_parse(field->value, field->value_length); 771 772 if (nxt_fast_path(length > 0)) { 773 774 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) { 775 nxt_log_error(NXT_LOG_ERR, &r->conn->log, 776 "Content-Length is too big"); 777 return NXT_ERROR; 778 } 779 780 r->length = length; 781 return NXT_OK; 782 } 783 784 nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid"); 785 786 return NXT_ERROR; 787} 788 789 790static void 791nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) 792{ 793 nxt_mp_t *mp; 794 nxt_int_t rc; 795 nxt_str_t path; 796 nxt_conn_t *c; 797 nxt_buf_mem_t *mbuf; 798 nxt_conf_op_t *ops; 799 nxt_conf_value_t *value; 800 nxt_conf_validation_t vldt; 801 nxt_conf_json_error_t error; 802 nxt_controller_response_t resp; 803 804 static const nxt_str_t empty_obj = nxt_string("{}"); 805 806 c = req->conn; 807 path = req->parser.path; 808 809 if (nxt_str_start(&path, "/config", 7)) { 810 811 if (path.length == 7) { 812 path.length = 1; 813 814 } else if (path.start[7] == '/') { 815 path.length -= 7; 816 path.start += 7; 817 } 818 } 819 820 if (path.length > 1 && path.start[path.length - 1] == '/') { 821 path.length--; 822 } 823 824 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 825 826 if (nxt_str_eq(&req->parser.method, "GET", 3)) { 827 828 value = nxt_conf_get_path(nxt_controller_conf.root, &path); 829 830 if (value == NULL) { 831 goto not_found; 832 } 833 834 resp.status = 200; 835 resp.conf = value; 836 837 nxt_controller_response(task, req, &resp); 838 return; 839 } 840 841 if (nxt_str_eq(&req->parser.method, "PUT", 3)) { 842 843 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 844 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 845 return; 846 } 847 848 mp = nxt_mp_create(1024, 128, 256, 32); 849 850 if (nxt_slow_path(mp == NULL)) { 851 goto alloc_fail; 852 } 853 854 mbuf = &c->read->mem; 855 856 nxt_memzero(&error, sizeof(nxt_conf_json_error_t)); 857 858 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error); 859 860 if (value == NULL) { 861 nxt_mp_destroy(mp); 862 863 if (error.pos == NULL) { 864 goto alloc_fail; 865 } 866 867 resp.status = 400; 868 resp.title = (u_char *) "Invalid JSON."; 869 resp.detail.length = nxt_strlen(error.detail); 870 resp.detail.start = error.detail; 871 resp.offset = error.pos - mbuf->pos; 872 873 nxt_conf_json_position(mbuf->pos, error.pos, 874 &resp.line, &resp.column); 875 876 nxt_controller_response(task, req, &resp); 877 return; 878 } 879 880 if (path.length != 1) { 881 rc = nxt_conf_op_compile(c->mem_pool, &ops, 882 nxt_controller_conf.root, 883 &path, value); 884 885 if (rc != NXT_OK) { 886 nxt_mp_destroy(mp); 887 888 if (rc == NXT_DECLINED) { 889 goto not_found; 890 } 891 892 goto alloc_fail; 893 } 894 895 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 896 897 if (nxt_slow_path(value == NULL)) { 898 nxt_mp_destroy(mp); 899 goto alloc_fail; 900 } 901 } 902 903 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 904 905 vldt.conf = value; 906 vldt.pool = c->mem_pool; 907 908 rc = nxt_conf_validate(&vldt); 909 910 if (nxt_slow_path(rc != NXT_OK)) { 911 nxt_mp_destroy(mp); 912 913 if (rc == NXT_DECLINED) { 914 resp.detail = vldt.error; 915 goto invalid_conf; 916 } 917 918 /* rc == NXT_ERROR */ 919 goto alloc_fail; 920 } 921 922 rc = nxt_controller_conf_send(task, value, 923 nxt_controller_conf_handler, req); 924 925 if (nxt_slow_path(rc != NXT_OK)) { 926 nxt_mp_destroy(mp); 927 928 if (rc == NXT_DECLINED) { 929 goto no_router; 930 } 931 932 /* rc == NXT_ERROR */ 933 goto alloc_fail; 934 } 935 936 req->conf.root = value; 937 req->conf.pool = mp; 938 939 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 940 941 return; 942 } 943 944 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { 945 946 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 947 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 948 return; 949 } 950 951 if (path.length == 1) { 952 mp = nxt_mp_create(1024, 128, 256, 32); 953 954 if (nxt_slow_path(mp == NULL)) { 955 goto alloc_fail; 956 } 957 958 value = nxt_conf_json_parse_str(mp, &empty_obj); 959 960 } else { 961 rc = nxt_conf_op_compile(c->mem_pool, &ops, 962 nxt_controller_conf.root, 963 &path, NULL); 964 965 if (rc != NXT_OK) { 966 if (rc == NXT_DECLINED) { 967 goto not_found; 968 } 969 970 goto alloc_fail; 971 } 972 973 mp = nxt_mp_create(1024, 128, 256, 32); 974 975 if (nxt_slow_path(mp == NULL)) { 976 goto alloc_fail; 977 } 978 979 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 980 } 981 982 if (nxt_slow_path(value == NULL)) { 983 nxt_mp_destroy(mp); 984 goto alloc_fail; 985 } 986 987 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 988 989 vldt.conf = value; 990 vldt.pool = c->mem_pool; 991 992 rc = nxt_conf_validate(&vldt); 993 994 if (nxt_slow_path(rc != NXT_OK)) { 995 nxt_mp_destroy(mp); 996 997 if (rc == NXT_DECLINED) { 998 resp.detail = vldt.error; 999 goto invalid_conf; 1000 } 1001 1002 /* rc == NXT_ERROR */ 1003 goto alloc_fail; 1004 } 1005 1006 rc = nxt_controller_conf_send(task, value, 1007 nxt_controller_conf_handler, req); 1008 1009 if (nxt_slow_path(rc != NXT_OK)) { 1010 nxt_mp_destroy(mp); 1011 1012 if (rc == NXT_DECLINED) { 1013 goto no_router; 1014 } 1015 1016 /* rc == NXT_ERROR */ 1017 goto alloc_fail; 1018 } 1019 1020 req->conf.root = value; 1021 req->conf.pool = mp; 1022 1023 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 1024 1025 return; 1026 } 1027 1028 resp.status = 405; 1029 resp.title = (u_char *) "Invalid method."; 1030 resp.offset = -1; 1031 1032 nxt_controller_response(task, req, &resp); 1033 return; 1034 1035not_found: 1036 1037 resp.status = 404; 1038 resp.title = (u_char *) "Value doesn't exist."; 1039 resp.offset = -1; 1040 1041 nxt_controller_response(task, req, &resp); 1042 return; 1043 1044invalid_conf: 1045 1046 resp.status = 400; 1047 resp.title = (u_char *) "Invalid configuration."; 1048 resp.offset = -1; 1049 1050 nxt_controller_response(task, req, &resp); 1051 return; 1052 1053alloc_fail: 1054 1055 resp.status = 500; 1056 resp.title = (u_char *) "Memory allocation failed."; 1057 resp.offset = -1; 1058 1059 nxt_controller_response(task, req, &resp); 1060 return; 1061 1062no_router: 1063 1064 resp.status = 500; 1065 resp.title = (u_char *) "Router process isn't available."; 1066 resp.offset = -1; 1067 1068 nxt_controller_response(task, req, &resp); 1069 return; 1070} 1071 1072 1073static void 1074nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1075 void *data) 1076{ 1077 nxt_queue_t queue; 1078 nxt_controller_request_t *req; 1079 nxt_controller_response_t resp; 1080 1081 req = data; 1082 1083 nxt_debug(task, "controller conf ready: %*s", 1084 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 1085 1086 nxt_queue_remove(&req->link); 1087 1088 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1089 1090 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 1091 nxt_mp_destroy(nxt_controller_conf.pool); 1092 1093 nxt_controller_conf = req->conf; 1094 1095 nxt_controller_conf_store(task, req->conf.root); 1096 1097 resp.status = 200; 1098 resp.title = (u_char *) "Reconfiguration done."; 1099 1100 } else { 1101 nxt_mp_destroy(req->conf.pool); 1102 1103 resp.status = 500; 1104 resp.title = (u_char *) "Failed to apply new configuration."; 1105 resp.offset = -1; 1106 } 1107 1108 nxt_controller_response(task, req, &resp); 1109 1110 nxt_queue_init(&queue); 1111 nxt_queue_add(&queue, &nxt_controller_waiting_requests); 1112 1113 nxt_queue_init(&nxt_controller_waiting_requests); 1114 1115 nxt_queue_each(req, &queue, nxt_controller_request_t, link) { 1116 nxt_controller_process_request(task, req); 1117 } nxt_queue_loop; 1118} 1119 1120 1121static void 1122nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) 1123{ 1124 size_t size; 1125 nxt_buf_t *b; 1126 nxt_port_t *main_port; 1127 nxt_runtime_t *rt; 1128 1129 rt = task->thread->runtime; 1130 1131 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1132 1133 size = nxt_conf_json_length(conf, NULL); 1134 1135 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 1136 1137 if (nxt_fast_path(b != NULL)) { 1138 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 1139 1140 (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE, 1141 -1, 0, -1, b); 1142 } 1143} 1144 1145 1146static void 1147nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, 1148 nxt_controller_response_t *resp) 1149{ 1150 size_t size; 1151 nxt_str_t status_line, str; 1152 nxt_buf_t *b, *body; 1153 nxt_conn_t *c; 1154 nxt_uint_t n; 1155 nxt_conf_value_t *value, *location; 1156 nxt_conf_json_pretty_t pretty; 1157 1158 static nxt_str_t success_str = nxt_string("success"); 1159 static nxt_str_t error_str = nxt_string("error"); 1160 static nxt_str_t detail_str = nxt_string("detail"); 1161 static nxt_str_t location_str = nxt_string("location"); 1162 static nxt_str_t offset_str = nxt_string("offset"); 1163 static nxt_str_t line_str = nxt_string("line"); 1164 static nxt_str_t column_str = nxt_string("column"); 1165 1166 static nxt_time_string_t date_cache = { 1167 (nxt_atomic_uint_t) -1, 1168 nxt_controller_date, 1169 "%s, %02d %s %4d %02d:%02d:%02d GMT",
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_main.h> 9#include <nxt_runtime.h> 10#include <nxt_main_process.h> 11#include <nxt_conf.h> 12 13 14typedef struct { 15 nxt_conf_value_t *root; 16 nxt_mp_t *pool; 17} nxt_controller_conf_t; 18 19 20typedef struct { 21 nxt_http_request_parse_t parser; 22 size_t length; 23 nxt_controller_conf_t conf; 24 nxt_conn_t *conn; 25 nxt_queue_link_t link; 26} nxt_controller_request_t; 27 28 29typedef struct { 30 nxt_uint_t status; 31 nxt_conf_value_t *conf; 32 33 u_char *title; 34 nxt_str_t detail; 35 ssize_t offset; 36 nxt_uint_t line; 37 nxt_uint_t column; 38} nxt_controller_response_t; 39 40 41static void nxt_controller_process_new_port_handler(nxt_task_t *task, 42 nxt_port_recv_msg_t *msg); 43static void nxt_controller_send_current_conf(nxt_task_t *task); 44static void nxt_controller_router_ready_handler(nxt_task_t *task, 45 nxt_port_recv_msg_t *msg); 46static nxt_int_t nxt_controller_conf_default(void); 47static void nxt_controller_conf_init_handler(nxt_task_t *task, 48 nxt_port_recv_msg_t *msg, void *data); 49static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, 50 nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data); 51 52static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); 53static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); 54static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, 55 uintptr_t data); 56static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, 57 void *data); 58static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, 59 void *data); 60static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, 61 void *data); 62static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data); 63static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, 64 void *data); 65static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, 66 void *data); 67static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); 68static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); 69 70static nxt_int_t nxt_controller_request_content_length(void *ctx, 71 nxt_http_field_t *field, uintptr_t data); 72 73static void nxt_controller_process_request(nxt_task_t *task, 74 nxt_controller_request_t *req); 75static void nxt_controller_conf_handler(nxt_task_t *task, 76 nxt_port_recv_msg_t *msg, void *data); 77static void nxt_controller_conf_store(nxt_task_t *task, 78 nxt_conf_value_t *conf); 79static void nxt_controller_response(nxt_task_t *task, 80 nxt_controller_request_t *req, nxt_controller_response_t *resp); 81static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now, 82 struct tm *tm, size_t size, const char *format); 83 84 85static nxt_http_field_proc_t nxt_controller_request_fields[] = { 86 { nxt_string("Content-Length"), 87 &nxt_controller_request_content_length, 0 }, 88}; 89 90static nxt_lvlhsh_t nxt_controller_fields_hash; 91 92static nxt_uint_t nxt_controller_listening; 93static nxt_uint_t nxt_controller_router_ready; 94static nxt_controller_conf_t nxt_controller_conf; 95static nxt_queue_t nxt_controller_waiting_requests; 96 97 98static const nxt_event_conn_state_t nxt_controller_conn_read_state; 99static const nxt_event_conn_state_t nxt_controller_conn_body_read_state; 100static const nxt_event_conn_state_t nxt_controller_conn_write_state; 101static const nxt_event_conn_state_t nxt_controller_conn_close_state; 102 103 104nxt_port_handlers_t nxt_controller_process_port_handlers = { 105 .quit = nxt_worker_process_quit_handler, 106 .new_port = nxt_controller_process_new_port_handler, 107 .change_file = nxt_port_change_log_file_handler, 108 .mmap = nxt_port_mmap_handler, 109 .process_ready = nxt_controller_router_ready_handler, 110 .data = nxt_port_data_handler, 111 .remove_pid = nxt_port_remove_pid_handler, 112 .rpc_ready = nxt_port_rpc_handler, 113 .rpc_error = nxt_port_rpc_handler, 114}; 115 116 117nxt_int_t 118nxt_controller_start(nxt_task_t *task, void *data) 119{ 120 nxt_mp_t *mp; 121 nxt_int_t ret; 122 nxt_str_t *json; 123 nxt_runtime_t *rt; 124 nxt_conf_value_t *conf; 125 nxt_event_engine_t *engine; 126 nxt_conf_validation_t vldt; 127 128 rt = task->thread->runtime; 129 130 engine = task->thread->engine; 131 132 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 133 if (nxt_slow_path(engine->mem_pool == NULL)) { 134 return NXT_ERROR; 135 } 136 137 ret = nxt_http_fields_hash(&nxt_controller_fields_hash, rt->mem_pool, 138 nxt_controller_request_fields, 139 nxt_nitems(nxt_controller_request_fields)); 140 141 if (nxt_slow_path(ret != NXT_OK)) { 142 return NXT_ERROR; 143 } 144 145 nxt_queue_init(&nxt_controller_waiting_requests); 146 147 json = data; 148 149 if (json->length == 0) { 150 return NXT_OK; 151 } 152 153 mp = nxt_mp_create(1024, 128, 256, 32); 154 if (nxt_slow_path(mp == NULL)) { 155 return NXT_ERROR; 156 } 157 158 conf = nxt_conf_json_parse_str(mp, json); 159 nxt_free(json->start); 160 161 if (nxt_slow_path(conf == NULL)) { 162 nxt_alert(task, "failed to restore previous configuration: " 163 "file is corrupted or not enough memory"); 164 165 nxt_mp_destroy(mp); 166 return NXT_OK; 167 } 168 169 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 170 171 vldt.pool = nxt_mp_create(1024, 128, 256, 32); 172 if (nxt_slow_path(vldt.pool == NULL)) { 173 return NXT_ERROR; 174 } 175 176 vldt.conf = conf; 177 178 ret = nxt_conf_validate(&vldt); 179 180 if (nxt_slow_path(ret != NXT_OK)) { 181 182 if (ret == NXT_DECLINED) { 183 nxt_alert(task, "the previous configuration is invalid: %V", 184 &vldt.error); 185 186 nxt_mp_destroy(vldt.pool); 187 nxt_mp_destroy(mp); 188 189 return NXT_OK; 190 } 191 192 /* ret == NXT_ERROR */ 193 194 return NXT_ERROR; 195 } 196 197 nxt_mp_destroy(vldt.pool); 198 199 nxt_controller_conf.root = conf; 200 nxt_controller_conf.pool = mp; 201 202 return NXT_OK; 203} 204 205 206static void 207nxt_controller_process_new_port_handler(nxt_task_t *task, 208 nxt_port_recv_msg_t *msg) 209{ 210 nxt_port_new_port_handler(task, msg); 211 212 if (msg->u.new_port->type != NXT_PROCESS_ROUTER 213 || !nxt_controller_router_ready) 214 { 215 return; 216 } 217 218 nxt_controller_send_current_conf(task); 219} 220 221 222static void 223nxt_controller_send_current_conf(nxt_task_t *task) 224{ 225 nxt_int_t rc; 226 nxt_runtime_t *rt; 227 nxt_conf_value_t *conf; 228 229 conf = nxt_controller_conf.root; 230 231 if (conf != NULL) { 232 rc = nxt_controller_conf_send(task, conf, 233 nxt_controller_conf_init_handler, NULL); 234 235 if (nxt_fast_path(rc == NXT_OK)) { 236 return; 237 } 238 239 nxt_mp_destroy(nxt_controller_conf.pool); 240 241 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 242 nxt_abort(); 243 } 244 } 245 246 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 247 nxt_abort(); 248 } 249 250 rt = task->thread->runtime; 251 252 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) { 253 nxt_abort(); 254 } 255 256 nxt_controller_listening = 1; 257} 258 259 260static void 261nxt_controller_router_ready_handler(nxt_task_t *task, 262 nxt_port_recv_msg_t *msg) 263{ 264 nxt_port_t *router_port; 265 nxt_runtime_t *rt; 266 267 rt = task->thread->runtime; 268 269 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 270 271 nxt_controller_router_ready = 1; 272 273 if (router_port != NULL) { 274 nxt_controller_send_current_conf(task); 275 } 276} 277 278 279static nxt_int_t 280nxt_controller_conf_default(void) 281{ 282 nxt_mp_t *mp; 283 nxt_conf_value_t *conf; 284 285 static const nxt_str_t json 286 = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); 287 288 mp = nxt_mp_create(1024, 128, 256, 32); 289 290 if (nxt_slow_path(mp == NULL)) { 291 return NXT_ERROR; 292 } 293 294 conf = nxt_conf_json_parse_str(mp, &json); 295 296 if (nxt_slow_path(conf == NULL)) { 297 return NXT_ERROR; 298 } 299 300 nxt_controller_conf.root = conf; 301 nxt_controller_conf.pool = mp; 302 303 return NXT_OK; 304} 305 306 307static void 308nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 309 void *data) 310{ 311 nxt_runtime_t *rt; 312 313 if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) { 314 nxt_alert(task, "failed to apply previous configuration"); 315 316 nxt_mp_destroy(nxt_controller_conf.pool); 317 318 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 319 nxt_abort(); 320 } 321 } 322 323 if (nxt_controller_listening == 0) { 324 rt = task->thread->runtime; 325 326 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) 327 == NULL)) 328 { 329 nxt_abort(); 330 } 331 332 nxt_controller_listening = 1; 333 } 334} 335 336 337static nxt_int_t 338nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, 339 nxt_port_rpc_handler_t handler, void *data) 340{ 341 size_t size; 342 uint32_t stream; 343 nxt_int_t rc; 344 nxt_buf_t *b; 345 nxt_port_t *router_port, *controller_port; 346 nxt_runtime_t *rt; 347 348 rt = task->thread->runtime; 349 350 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 351 352 if (nxt_slow_path(router_port == NULL || !nxt_controller_router_ready)) { 353 return NXT_DECLINED; 354 } 355 356 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 357 358 size = nxt_conf_json_length(conf, NULL); 359 360 b = nxt_port_mmap_get_buf(task, router_port, size); 361 if (nxt_slow_path(b == NULL)) { 362 return NXT_ERROR; 363 } 364 365 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 366 367 stream = nxt_port_rpc_register_handler(task, controller_port, 368 handler, handler, 369 router_port->pid, data); 370 371 if (nxt_slow_path(stream == 0)) { 372 return NXT_ERROR; 373 } 374 375 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, 376 stream, controller_port->id, b); 377 378 if (nxt_slow_path(rc != NXT_OK)) { 379 nxt_port_rpc_cancel(task, controller_port, stream); 380 return NXT_ERROR; 381 } 382 383 return NXT_OK; 384} 385 386 387nxt_int_t 388nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) 389{ 390 nxt_sockaddr_t *sa; 391 nxt_listen_socket_t *ls; 392 393 sa = rt->controller_listen; 394 395 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t)); 396 if (ls == NULL) { 397 return NXT_ERROR; 398 } 399 400 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr, 401 sa->socklen, sa->length); 402 if (ls->sockaddr == NULL) { 403 return NXT_ERROR; 404 } 405 406 ls->sockaddr->type = sa->type; 407 nxt_sockaddr_text(ls->sockaddr); 408 409 nxt_listen_socket_remote_size(ls); 410 411 ls->socket = -1; 412 ls->backlog = NXT_LISTEN_BACKLOG; 413 ls->read_after_accept = 1; 414 ls->flags = NXT_NONBLOCK; 415 416#if 0 417 /* STUB */ 418 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t)); 419 if (wq == NULL) { 420 return NXT_ERROR; 421 } 422 nxt_work_queue_name(wq, "listen"); 423 /**/ 424 425 ls->work_queue = wq; 426#endif 427 ls->handler = nxt_controller_conn_init; 428 429 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { 430 return NXT_ERROR; 431 } 432 433 rt->controller_socket = ls; 434 435 return NXT_OK; 436} 437 438 439static void 440nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) 441{ 442 nxt_buf_t *b; 443 nxt_conn_t *c; 444 nxt_event_engine_t *engine; 445 nxt_controller_request_t *r; 446 447 c = obj; 448 449 nxt_debug(task, "controller conn init fd:%d", c->socket.fd); 450 451 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t)); 452 if (nxt_slow_path(r == NULL)) { 453 nxt_controller_conn_free(task, c, NULL); 454 return; 455 } 456 457 r->conn = c; 458 459 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool) 460 != NXT_OK)) 461 { 462 nxt_controller_conn_free(task, c, NULL); 463 return; 464 } 465 466 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); 467 if (nxt_slow_path(b == NULL)) { 468 nxt_controller_conn_free(task, c, NULL); 469 return; 470 } 471 472 c->read = b; 473 c->socket.data = r; 474 c->socket.read_ready = 1; 475 c->read_state = &nxt_controller_conn_read_state; 476 477 engine = task->thread->engine; 478 c->read_work_queue = &engine->read_work_queue; 479 c->write_work_queue = &engine->write_work_queue; 480 481 nxt_conn_read(engine, c); 482} 483 484 485static const nxt_event_conn_state_t nxt_controller_conn_read_state 486 nxt_aligned(64) = 487{ 488 .ready_handler = nxt_controller_conn_read, 489 .close_handler = nxt_controller_conn_close, 490 .error_handler = nxt_controller_conn_read_error, 491 492 .timer_handler = nxt_controller_conn_read_timeout, 493 .timer_value = nxt_controller_conn_timeout_value, 494 .timer_data = 60 * 1000, 495}; 496 497 498static void 499nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) 500{ 501 size_t preread; 502 nxt_buf_t *b; 503 nxt_int_t rc; 504 nxt_conn_t *c; 505 nxt_controller_request_t *r; 506 507 c = obj; 508 r = data; 509 510 nxt_debug(task, "controller conn read"); 511 512 nxt_queue_remove(&c->link); 513 nxt_queue_self(&c->link); 514 515 b = c->read; 516 517 rc = nxt_http_parse_request(&r->parser, &b->mem); 518 519 if (nxt_slow_path(rc != NXT_DONE)) { 520 521 if (rc == NXT_AGAIN) { 522 if (nxt_buf_mem_free_size(&b->mem) == 0) { 523 nxt_log(task, NXT_LOG_ERR, "too long request headers"); 524 nxt_controller_conn_close(task, c, r); 525 return; 526 } 527 528 nxt_conn_read(task->thread->engine, c); 529 return; 530 } 531 532 /* rc == NXT_ERROR */ 533 534 nxt_log(task, NXT_LOG_ERR, "parsing error"); 535 536 nxt_controller_conn_close(task, c, r); 537 return; 538 } 539 540 rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash, 541 r); 542 543 if (nxt_slow_path(rc != NXT_OK)) { 544 nxt_controller_conn_close(task, c, r); 545 return; 546 } 547 548 preread = nxt_buf_mem_used_size(&b->mem); 549 550 nxt_debug(task, "controller request header parsing complete, " 551 "body length: %uz, preread: %uz", 552 r->length, preread); 553 554 if (preread >= r->length) { 555 nxt_controller_process_request(task, r); 556 return; 557 } 558 559 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) { 560 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0); 561 if (nxt_slow_path(b == NULL)) { 562 nxt_controller_conn_free(task, c, NULL); 563 return; 564 } 565 566 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread); 567 568 c->read = b; 569 } 570 571 c->read_state = &nxt_controller_conn_body_read_state; 572 573 nxt_conn_read(task->thread->engine, c); 574} 575 576 577static nxt_msec_t 578nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 579{ 580 return (nxt_msec_t) data; 581} 582 583 584static void 585nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) 586{ 587 nxt_conn_t *c; 588 589 c = obj; 590 591 nxt_debug(task, "controller conn read error"); 592 593 nxt_controller_conn_close(task, c, data); 594} 595 596 597static void 598nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) 599{ 600 nxt_timer_t *timer; 601 nxt_conn_t *c; 602 603 timer = obj; 604 605 c = nxt_read_timer_conn(timer); 606 c->socket.timedout = 1; 607 c->socket.closed = 1; 608 609 nxt_debug(task, "controller conn read timeout"); 610 611 nxt_controller_conn_close(task, c, data); 612} 613 614 615static const nxt_event_conn_state_t nxt_controller_conn_body_read_state 616 nxt_aligned(64) = 617{ 618 .ready_handler = nxt_controller_conn_body_read, 619 .close_handler = nxt_controller_conn_close, 620 .error_handler = nxt_controller_conn_read_error, 621 622 .timer_handler = nxt_controller_conn_read_timeout, 623 .timer_value = nxt_controller_conn_timeout_value, 624 .timer_data = 60 * 1000, 625 .timer_autoreset = 1, 626}; 627 628 629static void 630nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) 631{ 632 size_t read; 633 nxt_buf_t *b; 634 nxt_conn_t *c; 635 nxt_controller_request_t *r; 636 637 c = obj; 638 r = data; 639 b = c->read; 640 641 read = nxt_buf_mem_used_size(&b->mem); 642 643 nxt_debug(task, "controller conn body read: %uz of %uz", 644 read, r->length); 645 646 if (read >= r->length) { 647 nxt_controller_process_request(task, r); 648 return; 649 } 650 651 nxt_conn_read(task->thread->engine, c); 652} 653 654 655static const nxt_event_conn_state_t nxt_controller_conn_write_state 656 nxt_aligned(64) = 657{ 658 .ready_handler = nxt_controller_conn_write, 659 .error_handler = nxt_controller_conn_write_error, 660 661 .timer_handler = nxt_controller_conn_write_timeout, 662 .timer_value = nxt_controller_conn_timeout_value, 663 .timer_data = 60 * 1000, 664 .timer_autoreset = 1, 665}; 666 667 668static void 669nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) 670{ 671 nxt_buf_t *b; 672 nxt_conn_t *c; 673 674 c = obj; 675 676 nxt_debug(task, "controller conn write"); 677 678 b = c->write; 679 680 if (b->mem.pos != b->mem.free) { 681 nxt_conn_write(task->thread->engine, c); 682 return; 683 } 684 685 nxt_debug(task, "controller conn write complete"); 686 687 nxt_controller_conn_close(task, c, data); 688} 689 690 691static void 692nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) 693{ 694 nxt_conn_t *c; 695 696 c = obj; 697 698 nxt_debug(task, "controller conn write error"); 699 700 nxt_controller_conn_close(task, c, data); 701} 702 703 704static void 705nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) 706{ 707 nxt_conn_t *c; 708 nxt_timer_t *timer; 709 710 timer = obj; 711 712 c = nxt_write_timer_conn(timer); 713 c->socket.timedout = 1; 714 c->socket.closed = 1; 715 716 nxt_debug(task, "controller conn write timeout"); 717 718 nxt_controller_conn_close(task, c, data); 719} 720 721 722static const nxt_event_conn_state_t nxt_controller_conn_close_state 723 nxt_aligned(64) = 724{ 725 .ready_handler = nxt_controller_conn_free, 726}; 727 728 729static void 730nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) 731{ 732 nxt_conn_t *c; 733 734 c = obj; 735 736 nxt_debug(task, "controller conn close"); 737 738 nxt_queue_remove(&c->link); 739 740 c->write_state = &nxt_controller_conn_close_state; 741 742 nxt_conn_close(task->thread->engine, c); 743} 744 745 746static void 747nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) 748{ 749 nxt_conn_t *c; 750 751 c = obj; 752 753 nxt_debug(task, "controller conn free"); 754 755 nxt_sockaddr_cache_free(task->thread->engine, c); 756 757 nxt_conn_free(task, c); 758} 759 760 761static nxt_int_t 762nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, 763 uintptr_t data) 764{ 765 off_t length; 766 nxt_controller_request_t *r; 767 768 r = ctx; 769 770 length = nxt_off_t_parse(field->value, field->value_length); 771 772 if (nxt_fast_path(length > 0)) { 773 774 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) { 775 nxt_log_error(NXT_LOG_ERR, &r->conn->log, 776 "Content-Length is too big"); 777 return NXT_ERROR; 778 } 779 780 r->length = length; 781 return NXT_OK; 782 } 783 784 nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid"); 785 786 return NXT_ERROR; 787} 788 789 790static void 791nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) 792{ 793 nxt_mp_t *mp; 794 nxt_int_t rc; 795 nxt_str_t path; 796 nxt_conn_t *c; 797 nxt_buf_mem_t *mbuf; 798 nxt_conf_op_t *ops; 799 nxt_conf_value_t *value; 800 nxt_conf_validation_t vldt; 801 nxt_conf_json_error_t error; 802 nxt_controller_response_t resp; 803 804 static const nxt_str_t empty_obj = nxt_string("{}"); 805 806 c = req->conn; 807 path = req->parser.path; 808 809 if (nxt_str_start(&path, "/config", 7)) { 810 811 if (path.length == 7) { 812 path.length = 1; 813 814 } else if (path.start[7] == '/') { 815 path.length -= 7; 816 path.start += 7; 817 } 818 } 819 820 if (path.length > 1 && path.start[path.length - 1] == '/') { 821 path.length--; 822 } 823 824 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 825 826 if (nxt_str_eq(&req->parser.method, "GET", 3)) { 827 828 value = nxt_conf_get_path(nxt_controller_conf.root, &path); 829 830 if (value == NULL) { 831 goto not_found; 832 } 833 834 resp.status = 200; 835 resp.conf = value; 836 837 nxt_controller_response(task, req, &resp); 838 return; 839 } 840 841 if (nxt_str_eq(&req->parser.method, "PUT", 3)) { 842 843 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 844 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 845 return; 846 } 847 848 mp = nxt_mp_create(1024, 128, 256, 32); 849 850 if (nxt_slow_path(mp == NULL)) { 851 goto alloc_fail; 852 } 853 854 mbuf = &c->read->mem; 855 856 nxt_memzero(&error, sizeof(nxt_conf_json_error_t)); 857 858 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error); 859 860 if (value == NULL) { 861 nxt_mp_destroy(mp); 862 863 if (error.pos == NULL) { 864 goto alloc_fail; 865 } 866 867 resp.status = 400; 868 resp.title = (u_char *) "Invalid JSON."; 869 resp.detail.length = nxt_strlen(error.detail); 870 resp.detail.start = error.detail; 871 resp.offset = error.pos - mbuf->pos; 872 873 nxt_conf_json_position(mbuf->pos, error.pos, 874 &resp.line, &resp.column); 875 876 nxt_controller_response(task, req, &resp); 877 return; 878 } 879 880 if (path.length != 1) { 881 rc = nxt_conf_op_compile(c->mem_pool, &ops, 882 nxt_controller_conf.root, 883 &path, value); 884 885 if (rc != NXT_OK) { 886 nxt_mp_destroy(mp); 887 888 if (rc == NXT_DECLINED) { 889 goto not_found; 890 } 891 892 goto alloc_fail; 893 } 894 895 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 896 897 if (nxt_slow_path(value == NULL)) { 898 nxt_mp_destroy(mp); 899 goto alloc_fail; 900 } 901 } 902 903 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 904 905 vldt.conf = value; 906 vldt.pool = c->mem_pool; 907 908 rc = nxt_conf_validate(&vldt); 909 910 if (nxt_slow_path(rc != NXT_OK)) { 911 nxt_mp_destroy(mp); 912 913 if (rc == NXT_DECLINED) { 914 resp.detail = vldt.error; 915 goto invalid_conf; 916 } 917 918 /* rc == NXT_ERROR */ 919 goto alloc_fail; 920 } 921 922 rc = nxt_controller_conf_send(task, value, 923 nxt_controller_conf_handler, req); 924 925 if (nxt_slow_path(rc != NXT_OK)) { 926 nxt_mp_destroy(mp); 927 928 if (rc == NXT_DECLINED) { 929 goto no_router; 930 } 931 932 /* rc == NXT_ERROR */ 933 goto alloc_fail; 934 } 935 936 req->conf.root = value; 937 req->conf.pool = mp; 938 939 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 940 941 return; 942 } 943 944 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { 945 946 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 947 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 948 return; 949 } 950 951 if (path.length == 1) { 952 mp = nxt_mp_create(1024, 128, 256, 32); 953 954 if (nxt_slow_path(mp == NULL)) { 955 goto alloc_fail; 956 } 957 958 value = nxt_conf_json_parse_str(mp, &empty_obj); 959 960 } else { 961 rc = nxt_conf_op_compile(c->mem_pool, &ops, 962 nxt_controller_conf.root, 963 &path, NULL); 964 965 if (rc != NXT_OK) { 966 if (rc == NXT_DECLINED) { 967 goto not_found; 968 } 969 970 goto alloc_fail; 971 } 972 973 mp = nxt_mp_create(1024, 128, 256, 32); 974 975 if (nxt_slow_path(mp == NULL)) { 976 goto alloc_fail; 977 } 978 979 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 980 } 981 982 if (nxt_slow_path(value == NULL)) { 983 nxt_mp_destroy(mp); 984 goto alloc_fail; 985 } 986 987 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 988 989 vldt.conf = value; 990 vldt.pool = c->mem_pool; 991 992 rc = nxt_conf_validate(&vldt); 993 994 if (nxt_slow_path(rc != NXT_OK)) { 995 nxt_mp_destroy(mp); 996 997 if (rc == NXT_DECLINED) { 998 resp.detail = vldt.error; 999 goto invalid_conf; 1000 } 1001 1002 /* rc == NXT_ERROR */ 1003 goto alloc_fail; 1004 } 1005 1006 rc = nxt_controller_conf_send(task, value, 1007 nxt_controller_conf_handler, req); 1008 1009 if (nxt_slow_path(rc != NXT_OK)) { 1010 nxt_mp_destroy(mp); 1011 1012 if (rc == NXT_DECLINED) { 1013 goto no_router; 1014 } 1015 1016 /* rc == NXT_ERROR */ 1017 goto alloc_fail; 1018 } 1019 1020 req->conf.root = value; 1021 req->conf.pool = mp; 1022 1023 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 1024 1025 return; 1026 } 1027 1028 resp.status = 405; 1029 resp.title = (u_char *) "Invalid method."; 1030 resp.offset = -1; 1031 1032 nxt_controller_response(task, req, &resp); 1033 return; 1034 1035not_found: 1036 1037 resp.status = 404; 1038 resp.title = (u_char *) "Value doesn't exist."; 1039 resp.offset = -1; 1040 1041 nxt_controller_response(task, req, &resp); 1042 return; 1043 1044invalid_conf: 1045 1046 resp.status = 400; 1047 resp.title = (u_char *) "Invalid configuration."; 1048 resp.offset = -1; 1049 1050 nxt_controller_response(task, req, &resp); 1051 return; 1052 1053alloc_fail: 1054 1055 resp.status = 500; 1056 resp.title = (u_char *) "Memory allocation failed."; 1057 resp.offset = -1; 1058 1059 nxt_controller_response(task, req, &resp); 1060 return; 1061 1062no_router: 1063 1064 resp.status = 500; 1065 resp.title = (u_char *) "Router process isn't available."; 1066 resp.offset = -1; 1067 1068 nxt_controller_response(task, req, &resp); 1069 return; 1070} 1071 1072 1073static void 1074nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1075 void *data) 1076{ 1077 nxt_queue_t queue; 1078 nxt_controller_request_t *req; 1079 nxt_controller_response_t resp; 1080 1081 req = data; 1082 1083 nxt_debug(task, "controller conf ready: %*s", 1084 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 1085 1086 nxt_queue_remove(&req->link); 1087 1088 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1089 1090 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 1091 nxt_mp_destroy(nxt_controller_conf.pool); 1092 1093 nxt_controller_conf = req->conf; 1094 1095 nxt_controller_conf_store(task, req->conf.root); 1096 1097 resp.status = 200; 1098 resp.title = (u_char *) "Reconfiguration done."; 1099 1100 } else { 1101 nxt_mp_destroy(req->conf.pool); 1102 1103 resp.status = 500; 1104 resp.title = (u_char *) "Failed to apply new configuration."; 1105 resp.offset = -1; 1106 } 1107 1108 nxt_controller_response(task, req, &resp); 1109 1110 nxt_queue_init(&queue); 1111 nxt_queue_add(&queue, &nxt_controller_waiting_requests); 1112 1113 nxt_queue_init(&nxt_controller_waiting_requests); 1114 1115 nxt_queue_each(req, &queue, nxt_controller_request_t, link) { 1116 nxt_controller_process_request(task, req); 1117 } nxt_queue_loop; 1118} 1119 1120 1121static void 1122nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) 1123{ 1124 size_t size; 1125 nxt_buf_t *b; 1126 nxt_port_t *main_port; 1127 nxt_runtime_t *rt; 1128 1129 rt = task->thread->runtime; 1130 1131 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1132 1133 size = nxt_conf_json_length(conf, NULL); 1134 1135 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 1136 1137 if (nxt_fast_path(b != NULL)) { 1138 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 1139 1140 (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE, 1141 -1, 0, -1, b); 1142 } 1143} 1144 1145 1146static void 1147nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, 1148 nxt_controller_response_t *resp) 1149{ 1150 size_t size; 1151 nxt_str_t status_line, str; 1152 nxt_buf_t *b, *body; 1153 nxt_conn_t *c; 1154 nxt_uint_t n; 1155 nxt_conf_value_t *value, *location; 1156 nxt_conf_json_pretty_t pretty; 1157 1158 static nxt_str_t success_str = nxt_string("success"); 1159 static nxt_str_t error_str = nxt_string("error"); 1160 static nxt_str_t detail_str = nxt_string("detail"); 1161 static nxt_str_t location_str = nxt_string("location"); 1162 static nxt_str_t offset_str = nxt_string("offset"); 1163 static nxt_str_t line_str = nxt_string("line"); 1164 static nxt_str_t column_str = nxt_string("column"); 1165 1166 static nxt_time_string_t date_cache = { 1167 (nxt_atomic_uint_t) -1, 1168 nxt_controller_date, 1169 "%s, %02d %s %4d %02d:%02d:%02d GMT",
|