1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_main.h> 9 #include <nxt_runtime.h> 10 #include <nxt_main_process.h> 11 #include <nxt_conf.h> 12 13 14 typedef struct { 15 nxt_conf_value_t *root; 16 nxt_mp_t *pool; 17 } nxt_controller_conf_t; 18 19 20 typedef struct { 21 nxt_http_request_parse_t parser; 22 size_t length; 23 nxt_controller_conf_t conf; 24 nxt_conn_t *conn; 25 nxt_queue_link_t link; 26 } nxt_controller_request_t; 27 28 29 typedef struct { 30 nxt_uint_t status; 31 nxt_conf_value_t *conf; 32 33 u_char *title; 34 nxt_str_t detail; 35 ssize_t offset; 36 nxt_uint_t line; 37 nxt_uint_t column; 38 } nxt_controller_response_t; 39 40 41 static void nxt_controller_process_new_port_handler(nxt_task_t *task, 42 nxt_port_recv_msg_t *msg); 43 static nxt_int_t nxt_controller_conf_default(void); 44 static void nxt_controller_conf_init_handler(nxt_task_t *task, 45 nxt_port_recv_msg_t *msg, void *data); 46 static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, 47 nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data); 48 49 static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); 50 static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); 51 static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, 52 uintptr_t data); 53 static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, 54 void *data); 55 static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, 56 void *data); 57 static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, 58 void *data); 59 static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data); 60 static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, 61 void *data); 62 static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, 63 void *data); 64 static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); 65 static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); 66 67 static nxt_int_t nxt_controller_request_content_length(void *ctx, 68 nxt_http_field_t *field, uintptr_t data); 69 70 static void nxt_controller_process_request(nxt_task_t *task, 71 nxt_controller_request_t *req); 72 static void nxt_controller_conf_handler(nxt_task_t *task, 73 nxt_port_recv_msg_t *msg, void *data); 74 static void nxt_controller_conf_store(nxt_task_t *task, 75 nxt_conf_value_t *conf); 76 static void nxt_controller_response(nxt_task_t *task, 77 nxt_controller_request_t *req, nxt_controller_response_t *resp); 78 static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now, 79 struct tm *tm, size_t size, const char *format); 80 81 82 static nxt_http_field_proc_t nxt_controller_request_fields[] = { 83 { nxt_string("Content-Length"), 84 &nxt_controller_request_content_length, 0 }, 85 }; 86 87 static nxt_lvlhsh_t nxt_controller_fields_hash; 88 89 static nxt_uint_t nxt_controller_listening; 90 static nxt_controller_conf_t nxt_controller_conf; 91 static nxt_queue_t nxt_controller_waiting_requests; 92 93 94 static const nxt_event_conn_state_t nxt_controller_conn_read_state; 95 static const nxt_event_conn_state_t nxt_controller_conn_body_read_state; 96 static const nxt_event_conn_state_t nxt_controller_conn_write_state; 97 static const nxt_event_conn_state_t nxt_controller_conn_close_state; 98 99 100 nxt_port_handlers_t nxt_controller_process_port_handlers = { 101 .quit = nxt_worker_process_quit_handler, 102 .new_port = nxt_controller_process_new_port_handler, 103 .change_file = nxt_port_change_log_file_handler, 104 .mmap = nxt_port_mmap_handler, 105 .data = nxt_port_data_handler, 106 .remove_pid = nxt_port_remove_pid_handler, 107 .rpc_ready = nxt_port_rpc_handler, 108 .rpc_error = nxt_port_rpc_handler, 109 }; 110 111 112 nxt_int_t 113 nxt_controller_start(nxt_task_t *task, void *data) 114 { 115 nxt_mp_t *mp; 116 nxt_int_t ret; 117 nxt_str_t *json; 118 nxt_runtime_t *rt; 119 nxt_conf_value_t *conf; 120 nxt_event_engine_t *engine; 121 nxt_conf_validation_t vldt; 122 123 rt = task->thread->runtime; 124 125 engine = task->thread->engine; 126 127 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 128 if (nxt_slow_path(engine->mem_pool == NULL)) { 129 return NXT_ERROR; 130 } 131 132 ret = nxt_http_fields_hash(&nxt_controller_fields_hash, rt->mem_pool, 133 nxt_controller_request_fields, 134 nxt_nitems(nxt_controller_request_fields)); 135 136 if (nxt_slow_path(ret != NXT_OK)) { 137 return NXT_ERROR; 138 } 139 140 nxt_queue_init(&nxt_controller_waiting_requests); 141 142 json = data; 143 144 if (json->length == 0) { 145 return NXT_OK; 146 } 147 148 mp = nxt_mp_create(1024, 128, 256, 32); 149 if (nxt_slow_path(mp == NULL)) { 150 return NXT_ERROR; 151 } 152 153 conf = nxt_conf_json_parse_str(mp, json); 154 nxt_free(json->start); 155 156 if (nxt_slow_path(conf == NULL)) { 157 nxt_alert(task, "failed to restore previous configuration: " 158 "file is corrupted or not enough memory"); 159 160 nxt_mp_destroy(mp); 161 return NXT_OK; 162 } 163 164 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 165 166 vldt.pool = nxt_mp_create(1024, 128, 256, 32); 167 if (nxt_slow_path(vldt.pool == NULL)) { 168 return NXT_ERROR; 169 } 170 171 vldt.conf = conf; 172 173 ret = nxt_conf_validate(&vldt); 174 175 if (nxt_slow_path(ret != NXT_OK)) { 176 177 if (ret == NXT_DECLINED) { 178 nxt_alert(task, "the previous configuration is invalid: %V", 179 &vldt.error); 180 181 nxt_mp_destroy(vldt.pool); 182 nxt_mp_destroy(mp); 183 184 return NXT_OK; 185 } 186 187 /* ret == NXT_ERROR */ 188 189 return NXT_ERROR; 190 } 191 192 nxt_mp_destroy(vldt.pool); 193 194 nxt_controller_conf.root = conf; 195 nxt_controller_conf.pool = mp; 196 197 return NXT_OK; 198 } 199 200 201 static void 202 nxt_controller_process_new_port_handler(nxt_task_t *task, 203 nxt_port_recv_msg_t *msg) 204 { 205 nxt_int_t rc; 206 nxt_runtime_t *rt; 207 nxt_conf_value_t *conf; 208 209 nxt_port_new_port_handler(task, msg); 210 211 if (msg->u.new_port->type != NXT_PROCESS_ROUTER) { 212 return; 213 } 214 215 conf = nxt_controller_conf.root; 216 217 if (conf != NULL) { 218 rc = nxt_controller_conf_send(task, conf, 219 nxt_controller_conf_init_handler, NULL); 220 221 if (nxt_fast_path(rc == NXT_OK)) { 222 return; 223 } 224 225 nxt_mp_destroy(nxt_controller_conf.pool); 226 227 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 228 nxt_abort(); 229 } 230 } 231 232 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 233 nxt_abort(); 234 } 235 236 rt = task->thread->runtime; 237 238 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) { 239 nxt_abort(); 240 } 241 242 nxt_controller_listening = 1; 243 } 244 245 246 static nxt_int_t 247 nxt_controller_conf_default(void) 248 { 249 nxt_mp_t *mp; 250 nxt_conf_value_t *conf; 251 252 static const nxt_str_t json 253 = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); 254 255 mp = nxt_mp_create(1024, 128, 256, 32); 256 257 if (nxt_slow_path(mp == NULL)) { 258 return NXT_ERROR; 259 } 260 261 conf = nxt_conf_json_parse_str(mp, &json); 262 263 if (nxt_slow_path(conf == NULL)) { 264 return NXT_ERROR; 265 } 266 267 nxt_controller_conf.root = conf; 268 nxt_controller_conf.pool = mp; 269 270 return NXT_OK; 271 } 272 273 274 static void 275 nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 276 void *data) 277 { 278 nxt_runtime_t *rt; 279 280 if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) { 281 nxt_alert(task, "failed to apply previous configuration"); 282 283 nxt_mp_destroy(nxt_controller_conf.pool); 284 285 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 286 nxt_abort(); 287 } 288 } 289 290 if (nxt_controller_listening == 0) { 291 rt = task->thread->runtime; 292 293 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) 294 == NULL)) 295 { 296 nxt_abort(); 297 } 298 299 nxt_controller_listening = 1; 300 } 301 } 302 303 304 static nxt_int_t 305 nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, 306 nxt_port_rpc_handler_t handler, void *data) 307 { 308 size_t size; 309 uint32_t stream; 310 nxt_int_t rc; 311 nxt_buf_t *b; 312 nxt_port_t *router_port, *controller_port; 313 nxt_runtime_t *rt; 314 315 rt = task->thread->runtime; 316 317 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 318 319 if (nxt_slow_path(router_port == NULL)) { 320 return NXT_DECLINED; 321 } 322 323 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 324 325 size = nxt_conf_json_length(conf, NULL); 326 327 b = nxt_port_mmap_get_buf(task, router_port, size); 328 if (nxt_slow_path(b == NULL)) { 329 return NXT_ERROR; 330 } 331 332 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 333 334 stream = nxt_port_rpc_register_handler(task, controller_port, 335 handler, handler, 336 router_port->pid, data); 337 338 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, 339 stream, controller_port->id, b); 340 341 if (nxt_slow_path(rc != NXT_OK)) { 342 nxt_port_rpc_cancel(task, controller_port, stream); 343 return NXT_ERROR; 344 } 345 346 return NXT_OK; 347 } 348 349 350 nxt_int_t 351 nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) 352 { 353 nxt_sockaddr_t *sa; 354 nxt_listen_socket_t *ls; 355 356 sa = rt->controller_listen; 357 358 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t)); 359 if (ls == NULL) { 360 return NXT_ERROR; 361 } 362 363 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr, 364 sa->socklen, sa->length); 365 if (ls->sockaddr == NULL) { 366 return NXT_ERROR; 367 } 368 369 ls->sockaddr->type = sa->type; 370 nxt_sockaddr_text(ls->sockaddr); 371 372 nxt_listen_socket_remote_size(ls); 373 374 ls->socket = -1; 375 ls->backlog = NXT_LISTEN_BACKLOG; 376 ls->read_after_accept = 1; 377 ls->flags = NXT_NONBLOCK; 378 379 #if 0 380 /* STUB */ 381 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t)); 382 if (wq == NULL) { 383 return NXT_ERROR; 384 } 385 nxt_work_queue_name(wq, "listen"); 386 /**/ 387 388 ls->work_queue = wq; 389 #endif 390 ls->handler = nxt_controller_conn_init; 391 392 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { 393 return NXT_ERROR; 394 } 395 396 rt->controller_socket = ls; 397 398 return NXT_OK; 399 } 400 401 402 static void 403 nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) 404 { 405 nxt_buf_t *b; 406 nxt_conn_t *c; 407 nxt_event_engine_t *engine; 408 nxt_controller_request_t *r; 409 410 c = obj; 411 412 nxt_debug(task, "controller conn init fd:%d", c->socket.fd); 413 414 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t)); 415 if (nxt_slow_path(r == NULL)) { 416 nxt_controller_conn_free(task, c, NULL); 417 return; 418 } 419 420 r->conn = c; 421 422 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool) 423 != NXT_OK)) 424 { 425 nxt_controller_conn_free(task, c, NULL); 426 return; 427 } 428 429 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); 430 if (nxt_slow_path(b == NULL)) { 431 nxt_controller_conn_free(task, c, NULL); 432 return; 433 } 434 435 c->read = b; 436 c->socket.data = r; 437 c->socket.read_ready = 1; 438 c->read_state = &nxt_controller_conn_read_state; 439 440 engine = task->thread->engine; 441 c->read_work_queue = &engine->read_work_queue; 442 c->write_work_queue = &engine->write_work_queue; 443 444 nxt_conn_read(engine, c); 445 } 446 447 448 static const nxt_event_conn_state_t nxt_controller_conn_read_state 449 nxt_aligned(64) = 450 { 451 .ready_handler = nxt_controller_conn_read, 452 .close_handler = nxt_controller_conn_close, 453 .error_handler = nxt_controller_conn_read_error, 454 455 .timer_handler = nxt_controller_conn_read_timeout, 456 .timer_value = nxt_controller_conn_timeout_value, 457 .timer_data = 60 * 1000, 458 }; 459 460 461 static void 462 nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) 463 { 464 size_t preread; 465 nxt_buf_t *b; 466 nxt_int_t rc; 467 nxt_conn_t *c; 468 nxt_controller_request_t *r; 469 470 c = obj; 471 r = data; 472 473 nxt_debug(task, "controller conn read"); 474 475 nxt_queue_remove(&c->link); 476 nxt_queue_self(&c->link); 477 478 b = c->read; 479 480 rc = nxt_http_parse_request(&r->parser, &b->mem); 481 482 if (nxt_slow_path(rc != NXT_DONE)) { 483 484 if (rc == NXT_AGAIN) { 485 if (nxt_buf_mem_free_size(&b->mem) == 0) { 486 nxt_log(task, NXT_LOG_ERR, "too long request headers"); 487 nxt_controller_conn_close(task, c, r); 488 return; 489 } 490 491 nxt_conn_read(task->thread->engine, c); 492 return; 493 } 494 495 /* rc == NXT_ERROR */ 496 497 nxt_log(task, NXT_LOG_ERR, "parsing error"); 498 499 nxt_controller_conn_close(task, c, r); 500 return; 501 } 502 503 rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash, 504 r); 505 506 if (nxt_slow_path(rc != NXT_OK)) { 507 nxt_controller_conn_close(task, c, r); 508 return; 509 } 510 511 preread = nxt_buf_mem_used_size(&b->mem); 512 513 nxt_debug(task, "controller request header parsing complete, " 514 "body length: %uz, preread: %uz", 515 r->length, preread); 516 517 if (preread >= r->length) { 518 nxt_controller_process_request(task, r); 519 return; 520 } 521 522 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) { 523 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0); 524 if (nxt_slow_path(b == NULL)) { 525 nxt_controller_conn_free(task, c, NULL); 526 return; 527 } 528 529 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread); 530 531 c->read = b; 532 } 533 534 c->read_state = &nxt_controller_conn_body_read_state; 535 536 nxt_conn_read(task->thread->engine, c); 537 } 538 539 540 static nxt_msec_t 541 nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 542 { 543 return (nxt_msec_t) data; 544 } 545 546 547 static void 548 nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) 549 { 550 nxt_conn_t *c; 551 552 c = obj; 553 554 nxt_debug(task, "controller conn read error"); 555 556 nxt_controller_conn_close(task, c, data); 557 } 558 559 560 static void 561 nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) 562 { 563 nxt_timer_t *timer; 564 nxt_conn_t *c; 565 566 timer = obj; 567 568 c = nxt_read_timer_conn(timer); 569 c->socket.timedout = 1; 570 c->socket.closed = 1; 571 572 nxt_debug(task, "controller conn read timeout"); 573 574 nxt_controller_conn_close(task, c, data); 575 } 576 577 578 static const nxt_event_conn_state_t nxt_controller_conn_body_read_state 579 nxt_aligned(64) = 580 { 581 .ready_handler = nxt_controller_conn_body_read, 582 .close_handler = nxt_controller_conn_close, 583 .error_handler = nxt_controller_conn_read_error, 584 585 .timer_handler = nxt_controller_conn_read_timeout, 586 .timer_value = nxt_controller_conn_timeout_value, 587 .timer_data = 60 * 1000, 588 .timer_autoreset = 1, 589 }; 590 591 592 static void 593 nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) 594 { 595 size_t read; 596 nxt_buf_t *b; 597 nxt_conn_t *c; 598 nxt_controller_request_t *r; 599 600 c = obj; 601 r = data; 602 b = c->read; 603 604 read = nxt_buf_mem_used_size(&b->mem); 605 606 nxt_debug(task, "controller conn body read: %uz of %uz", 607 read, r->length); 608 609 if (read >= r->length) { 610 nxt_controller_process_request(task, r); 611 return; 612 } 613 614 nxt_conn_read(task->thread->engine, c); 615 } 616 617 618 static const nxt_event_conn_state_t nxt_controller_conn_write_state 619 nxt_aligned(64) = 620 { 621 .ready_handler = nxt_controller_conn_write, 622 .error_handler = nxt_controller_conn_write_error, 623 624 .timer_handler = nxt_controller_conn_write_timeout, 625 .timer_value = nxt_controller_conn_timeout_value, 626 .timer_data = 60 * 1000, 627 .timer_autoreset = 1, 628 }; 629 630 631 static void 632 nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) 633 { 634 nxt_buf_t *b; 635 nxt_conn_t *c; 636 637 c = obj; 638 639 nxt_debug(task, "controller conn write"); 640 641 b = c->write; 642 643 if (b->mem.pos != b->mem.free) { 644 nxt_conn_write(task->thread->engine, c); 645 return; 646 } 647 648 nxt_debug(task, "controller conn write complete"); 649 650 nxt_controller_conn_close(task, c, data); 651 } 652 653 654 static void 655 nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) 656 { 657 nxt_conn_t *c; 658 659 c = obj; 660 661 nxt_debug(task, "controller conn write error"); 662 663 nxt_controller_conn_close(task, c, data); 664 } 665 666 667 static void 668 nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) 669 { 670 nxt_conn_t *c; 671 nxt_timer_t *timer; 672 673 timer = obj; 674 675 c = nxt_write_timer_conn(timer); 676 c->socket.timedout = 1; 677 c->socket.closed = 1; 678 679 nxt_debug(task, "controller conn write timeout"); 680 681 nxt_controller_conn_close(task, c, data); 682 } 683 684 685 static const nxt_event_conn_state_t nxt_controller_conn_close_state 686 nxt_aligned(64) = 687 { 688 .ready_handler = nxt_controller_conn_free, 689 }; 690 691 692 static void 693 nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) 694 { 695 nxt_conn_t *c; 696 697 c = obj; 698 699 nxt_debug(task, "controller conn close"); 700 701 nxt_queue_remove(&c->link); 702 703 c->write_state = &nxt_controller_conn_close_state; 704 705 nxt_conn_close(task->thread->engine, c); 706 } 707 708 709 static void 710 nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) 711 { 712 nxt_conn_t *c; 713 714 c = obj; 715 716 nxt_debug(task, "controller conn free"); 717 718 nxt_sockaddr_cache_free(task->thread->engine, c); 719 720 nxt_conn_free(task, c); 721 } 722 723 724 static nxt_int_t 725 nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, 726 uintptr_t data) 727 { 728 off_t length; 729 nxt_controller_request_t *r; 730 731 r = ctx; 732 733 length = nxt_off_t_parse(field->value, field->value_length); 734 735 if (nxt_fast_path(length > 0)) { 736 737 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) { 738 nxt_log_error(NXT_LOG_ERR, &r->conn->log, 739 "Content-Length is too big"); 740 return NXT_ERROR; 741 } 742 743 r->length = length; 744 return NXT_OK; 745 } 746 747 nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid"); 748 749 return NXT_ERROR; 750 } 751 752 753 static void 754 nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) 755 { 756 nxt_mp_t *mp; 757 nxt_int_t rc; 758 nxt_str_t path; 759 nxt_conn_t *c; 760 nxt_buf_mem_t *mbuf; 761 nxt_conf_op_t *ops; 762 nxt_conf_value_t *value; 763 nxt_conf_validation_t vldt; 764 nxt_conf_json_error_t error; 765 nxt_controller_response_t resp; 766 767 static const nxt_str_t empty_obj = nxt_string("{}"); 768 769 c = req->conn; 770 path = req->parser.path; 771 772 if (nxt_str_start(&path, "/config", 7)) { 773 774 if (path.length == 7) { 775 path.length = 1; 776 777 } else if (path.start[7] == '/') { 778 path.length -= 7; 779 path.start += 7; 780 } 781 } 782 783 if (path.length > 1 && path.start[path.length - 1] == '/') { 784 path.length--; 785 } 786 787 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 788 789 if (nxt_str_eq(&req->parser.method, "GET", 3)) { 790 791 value = nxt_conf_get_path(nxt_controller_conf.root, &path); 792 793 if (value == NULL) { 794 goto not_found; 795 } 796 797 resp.status = 200; 798 resp.conf = value; 799 800 nxt_controller_response(task, req, &resp); 801 return; 802 } 803 804 if (nxt_str_eq(&req->parser.method, "PUT", 3)) { 805 806 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 807 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 808 return; 809 } 810 811 mp = nxt_mp_create(1024, 128, 256, 32); 812 813 if (nxt_slow_path(mp == NULL)) { 814 goto alloc_fail; 815 } 816 817 mbuf = &c->read->mem; 818 819 nxt_memzero(&error, sizeof(nxt_conf_json_error_t)); 820 821 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error); 822 823 if (value == NULL) { 824 nxt_mp_destroy(mp); 825 826 if (error.pos == NULL) { 827 goto alloc_fail; 828 } 829 830 resp.status = 400; 831 resp.title = (u_char *) "Invalid JSON."; 832 resp.detail.length = nxt_strlen(error.detail); 833 resp.detail.start = error.detail; 834 resp.offset = error.pos - mbuf->pos; 835 836 nxt_conf_json_position(mbuf->pos, error.pos, 837 &resp.line, &resp.column); 838 839 nxt_controller_response(task, req, &resp); 840 return; 841 } 842 843 if (path.length != 1) { 844 rc = nxt_conf_op_compile(c->mem_pool, &ops, 845 nxt_controller_conf.root, 846 &path, value); 847 848 if (rc != NXT_OK) { 849 nxt_mp_destroy(mp); 850 851 if (rc == NXT_DECLINED) { 852 goto not_found; 853 } 854 855 goto alloc_fail; 856 } 857 858 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 859 860 if (nxt_slow_path(value == NULL)) { 861 nxt_mp_destroy(mp); 862 goto alloc_fail; 863 } 864 } 865 866 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 867 868 vldt.conf = value; 869 vldt.pool = c->mem_pool; 870 871 rc = nxt_conf_validate(&vldt); 872 873 if (nxt_slow_path(rc != NXT_OK)) { 874 nxt_mp_destroy(mp); 875 876 if (rc == NXT_DECLINED) { 877 resp.detail = vldt.error; 878 goto invalid_conf; 879 } 880 881 /* rc == NXT_ERROR */ 882 goto alloc_fail; 883 } 884 885 rc = nxt_controller_conf_send(task, value, 886 nxt_controller_conf_handler, req); 887 888 if (nxt_slow_path(rc != NXT_OK)) { 889 nxt_mp_destroy(mp); 890 891 if (rc == NXT_DECLINED) { 892 goto no_router; 893 } 894 895 /* rc == NXT_ERROR */ 896 goto alloc_fail; 897 } 898 899 req->conf.root = value; 900 req->conf.pool = mp; 901 902 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 903 904 return; 905 } 906 907 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { 908 909 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 910 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 911 return; 912 } 913 914 if (path.length == 1) { 915 mp = nxt_mp_create(1024, 128, 256, 32); 916 917 if (nxt_slow_path(mp == NULL)) { 918 goto alloc_fail; 919 } 920 921 value = nxt_conf_json_parse_str(mp, &empty_obj); 922 923 } else { 924 rc = nxt_conf_op_compile(c->mem_pool, &ops, 925 nxt_controller_conf.root, 926 &path, NULL); 927 928 if (rc != NXT_OK) { 929 if (rc == NXT_DECLINED) { 930 goto not_found; 931 } 932 933 goto alloc_fail; 934 } 935 936 mp = nxt_mp_create(1024, 128, 256, 32); 937 938 if (nxt_slow_path(mp == NULL)) { 939 goto alloc_fail; 940 } 941 942 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 943 } 944 945 if (nxt_slow_path(value == NULL)) { 946 nxt_mp_destroy(mp); 947 goto alloc_fail; 948 } 949 950 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 951 952 vldt.conf = value; 953 vldt.pool = c->mem_pool; 954 955 rc = nxt_conf_validate(&vldt); 956 957 if (nxt_slow_path(rc != NXT_OK)) { 958 nxt_mp_destroy(mp); 959 960 if (rc == NXT_DECLINED) { 961 resp.detail = vldt.error; 962 goto invalid_conf; 963 } 964 965 /* rc == NXT_ERROR */ 966 goto alloc_fail; 967 } 968 969 rc = nxt_controller_conf_send(task, value, 970 nxt_controller_conf_handler, req); 971 972 if (nxt_slow_path(rc != NXT_OK)) { 973 nxt_mp_destroy(mp); 974 975 if (rc == NXT_DECLINED) { 976 goto no_router; 977 } 978 979 /* rc == NXT_ERROR */ 980 goto alloc_fail; 981 } 982 983 req->conf.root = value; 984 req->conf.pool = mp; 985 986 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 987 988 return; 989 } 990 991 resp.status = 405; 992 resp.title = (u_char *) "Invalid method."; 993 resp.offset = -1; 994 995 nxt_controller_response(task, req, &resp); 996 return; 997 998 not_found: 999 1000 resp.status = 404; 1001 resp.title = (u_char *) "Value doesn't exist."; 1002 resp.offset = -1; 1003 1004 nxt_controller_response(task, req, &resp); 1005 return; 1006 1007 invalid_conf: 1008 1009 resp.status = 400; 1010 resp.title = (u_char *) "Invalid configuration."; 1011 resp.offset = -1; 1012 1013 nxt_controller_response(task, req, &resp); 1014 return; 1015 1016 alloc_fail: 1017 1018 resp.status = 500; 1019 resp.title = (u_char *) "Memory allocation failed."; 1020 resp.offset = -1; 1021 1022 nxt_controller_response(task, req, &resp); 1023 return; 1024 1025 no_router: 1026 1027 resp.status = 500; 1028 resp.title = (u_char *) "Router process isn't available."; 1029 resp.offset = -1; 1030 1031 nxt_controller_response(task, req, &resp); 1032 return; 1033 } 1034 1035 1036 static void 1037 nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1038 void *data) 1039 { 1040 nxt_queue_t queue; 1041 nxt_controller_request_t *req; 1042 nxt_controller_response_t resp; 1043 1044 req = data; 1045 1046 nxt_debug(task, "controller conf ready: %*s", 1047 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 1048 1049 nxt_queue_remove(&req->link); 1050 1051 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1052 1053 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 1054 nxt_mp_destroy(nxt_controller_conf.pool); 1055 1056 nxt_controller_conf = req->conf; 1057 1058 nxt_controller_conf_store(task, req->conf.root); 1059 1060 resp.status = 200; 1061 resp.title = (u_char *) "Reconfiguration done."; 1062 1063 } else { 1064 nxt_mp_destroy(req->conf.pool); 1065 1066 resp.status = 500; 1067 resp.title = (u_char *) "Failed to apply new configuration."; 1068 resp.offset = -1; 1069 } 1070 1071 nxt_controller_response(task, req, &resp); 1072 1073 nxt_queue_init(&queue); 1074 nxt_queue_add(&queue, &nxt_controller_waiting_requests); 1075 1076 nxt_queue_init(&nxt_controller_waiting_requests); 1077 1078 nxt_queue_each(req, &queue, nxt_controller_request_t, link) { 1079 nxt_controller_process_request(task, req); 1080 } nxt_queue_loop; 1081 } 1082 1083 1084 static void 1085 nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) 1086 { 1087 size_t size; 1088 nxt_buf_t *b; 1089 nxt_port_t *main_port; 1090 nxt_runtime_t *rt; 1091 1092 rt = task->thread->runtime; 1093 1094 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1095 1096 size = nxt_conf_json_length(conf, NULL); 1097 1098 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 1099 1100 if (nxt_fast_path(b != NULL)) { 1101 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 1102 1103 (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE, 1104 -1, 0, -1, b); 1105 } 1106 } 1107 1108 1109 static void 1110 nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, 1111 nxt_controller_response_t *resp) 1112 { 1113 size_t size; 1114 nxt_str_t status_line, str; 1115 nxt_buf_t *b, *body; 1116 nxt_conn_t *c; 1117 nxt_uint_t n; 1118 nxt_conf_value_t *value, *location; 1119 nxt_conf_json_pretty_t pretty; 1120 1121 static nxt_str_t success_str = nxt_string("success"); 1122 static nxt_str_t error_str = nxt_string("error"); 1123 static nxt_str_t detail_str = nxt_string("detail"); 1124 static nxt_str_t location_str = nxt_string("location"); 1125 static nxt_str_t offset_str = nxt_string("offset"); 1126 static nxt_str_t line_str = nxt_string("line"); 1127 static nxt_str_t column_str = nxt_string("column"); 1128 1129 static nxt_time_string_t date_cache = { 1130 (nxt_atomic_uint_t) -1, 1131 nxt_controller_date, 1132 "%s, %02d %s %4d %02d:%02d:%02d GMT", 1133 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1, 1134 NXT_THREAD_TIME_GMT, 1135 NXT_THREAD_TIME_SEC, 1136 }; 1137 1138 switch (resp->status) { 1139 1140 case 200: 1141 nxt_str_set(&status_line, "200 OK"); 1142 break; 1143 1144 case 400: 1145 nxt_str_set(&status_line, "400 Bad Request"); 1146 break; 1147 1148 case 404: 1149 nxt_str_set(&status_line, "404 Not Found"); 1150 break; 1151 1152 case 405: 1153 nxt_str_set(&status_line, "405 Method Not Allowed"); 1154 break; 1155 1156 default: 1157 nxt_str_set(&status_line, "500 Internal Server Error"); 1158 break; 1159 } 1160 1161 c = req->conn; 1162 value = resp->conf; 1163 1164 if (value == NULL) { 1165 n = 1 1166 + (resp->detail.length != 0) 1167 + (resp->status >= 400 && resp->offset != -1); 1168 1169 value = nxt_conf_create_object(c->mem_pool, n); 1170 1171 if (nxt_slow_path(value == NULL)) { 1172 nxt_controller_conn_close(task, c, req); 1173 return; 1174 } 1175 1176 str.length = nxt_strlen(resp->title); 1177 str.start = resp->title; 1178 1179 if (resp->status < 400) { 1180 nxt_conf_set_member_string(value, &success_str, &str, 0); 1181 1182 } else { 1183 nxt_conf_set_member_string(value, &error_str, &str, 0); 1184 } 1185 1186 n = 0; 1187 1188 if (resp->detail.length != 0) { 1189 n++; 1190 1191 nxt_conf_set_member_string(value, &detail_str, &resp->detail, n); 1192 } 1193 1194 if (resp->status >= 400 && resp->offset != -1) { 1195 n++; 1196 1197 location = nxt_conf_create_object(c->mem_pool, 1198 resp->line != 0 ? 3 : 1); 1199 1200 nxt_conf_set_member(value, &location_str, location, n); 1201 1202 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0); 1203 1204 if (resp->line != 0) { 1205 nxt_conf_set_member_integer(location, &line_str, 1206 resp->line, 1); 1207 1208 nxt_conf_set_member_integer(location, &column_str, 1209 resp->column, 2); 1210 } 1211 } 1212 } 1213 1214 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 1215 1216 size = nxt_conf_json_length(value, &pretty) + 2; 1217 1218 body = nxt_buf_mem_alloc(c->mem_pool, size, 0); 1219 if (nxt_slow_path(body == NULL)) { 1220 nxt_controller_conn_close(task, c, req); 1221 return; 1222 } 1223 1224 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 1225 1226 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty); 1227 1228 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2); 1229 1230 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length 1231 + sizeof("Server: Unit/" NXT_VERSION "\r\n") - 1 1232 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1 1233 + sizeof("Content-Type: application/json\r\n") - 1 1234 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN 1235 + sizeof("Connection: close\r\n") - 1 1236 + sizeof("\r\n") - 1; 1237 1238 b = nxt_buf_mem_alloc(c->mem_pool, size, 0); 1239 if (nxt_slow_path(b == NULL)) { 1240 nxt_controller_conn_close(task, c, req); 1241 return; 1242 } 1243 1244 b->next = body; 1245 1246 nxt_str_set(&str, "HTTP/1.1 "); 1247 1248 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1249 b->mem.free = nxt_cpymem(b->mem.free, status_line.start, 1250 status_line.length); 1251 1252 nxt_str_set(&str, "\r\n" 1253 "Server: Unit/" NXT_VERSION "\r\n" 1254 "Date: "); 1255 1256 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1257 1258 b->mem.free = nxt_thread_time_string(task->thread, &date_cache, 1259 b->mem.free); 1260 1261 nxt_str_set(&str, "\r\n" 1262 "Content-Type: application/json\r\n" 1263 "Content-Length: "); 1264 1265 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1266 1267 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz", 1268 nxt_buf_mem_used_size(&body->mem)); 1269 1270 nxt_str_set(&str, "\r\n" 1271 "Connection: close\r\n" 1272 "\r\n"); 1273 1274 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1275 1276 c->write = b; 1277 c->write_state = &nxt_controller_conn_write_state; 1278 1279 nxt_conn_write(task->thread->engine, c); 1280 } 1281 1282 1283 static u_char * 1284 nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 1285 size_t size, const char *format) 1286 { 1287 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", 1288 "Sat" }; 1289 1290 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 1291 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 1292 1293 return nxt_sprintf(buf, buf + size, format, 1294 week[tm->tm_wday], tm->tm_mday, 1295 month[tm->tm_mon], tm->tm_year + 1900, 1296 tm->tm_hour, tm->tm_min, tm->tm_sec); 1297 } 1298