11#include <nxt_conf.h> 12 13 14typedef struct { 15 nxt_conf_value_t *root; 16 nxt_mp_t *pool; 17} nxt_controller_conf_t; 18 19 20typedef struct { 21 nxt_http_request_parse_t parser; 22 size_t length; 23 nxt_controller_conf_t conf; 24 nxt_conn_t *conn; 25 nxt_queue_link_t link; 26} nxt_controller_request_t; 27 28 29typedef struct { 30 nxt_uint_t status; 31 nxt_conf_value_t *conf; 32 33 u_char *title; 34 u_char *detail; 35 ssize_t offset; 36 nxt_uint_t line; 37 nxt_uint_t column; 38} nxt_controller_response_t; 39 40 41static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); 42static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); 43static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, 44 uintptr_t data); 45static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, 46 void *data); 47static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, 48 void *data); 49static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, 50 void *data); 51static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data); 52static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, 53 void *data); 54static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, 55 void *data); 56static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); 57static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); 58 59static nxt_int_t nxt_controller_request_content_length(void *ctx, 60 nxt_http_field_t *field, nxt_log_t *log); 61 62static void nxt_controller_process_request(nxt_task_t *task, 63 nxt_controller_request_t *req); 64static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task, 65 nxt_controller_request_t *req); 66static void nxt_controller_conf_handler(nxt_task_t *task, 67 nxt_port_recv_msg_t *msg, void *data); 68static void nxt_controller_response(nxt_task_t *task, 69 nxt_controller_request_t *req, nxt_controller_response_t *resp); 70static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now, 71 struct tm *tm, size_t size, const char *format); 72 73 74static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = { 75 { nxt_string("Content-Length"), 76 &nxt_controller_request_content_length, 0 }, 77 78 { nxt_null_string, NULL, 0 } 79}; 80 81static nxt_http_fields_hash_t *nxt_controller_fields_hash; 82 83static nxt_controller_conf_t nxt_controller_conf; 84static nxt_queue_t nxt_controller_waiting_requests; 85 86 87static const nxt_event_conn_state_t nxt_controller_conn_read_state; 88static const nxt_event_conn_state_t nxt_controller_conn_body_read_state; 89static const nxt_event_conn_state_t nxt_controller_conn_write_state; 90static const nxt_event_conn_state_t nxt_controller_conn_close_state; 91 92 93nxt_int_t 94nxt_controller_start(nxt_task_t *task, void *data) 95{ 96 nxt_mp_t *mp; 97 nxt_runtime_t *rt; 98 nxt_conf_value_t *conf; 99 nxt_http_fields_hash_t *hash; 100 101 static const nxt_str_t json 102 = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); 103 104 rt = task->thread->runtime; 105 106 hash = nxt_http_fields_hash_create(nxt_controller_request_fields, 107 rt->mem_pool); 108 if (nxt_slow_path(hash == NULL)) { 109 return NXT_ERROR; 110 } 111 112 nxt_controller_fields_hash = hash; 113 114 if (nxt_listen_event(task, rt->controller_socket) == NULL) { 115 return NXT_ERROR; 116 } 117 118 mp = nxt_mp_create(1024, 128, 256, 32); 119 120 if (nxt_slow_path(mp == NULL)) { 121 return NXT_ERROR; 122 } 123 124 conf = nxt_conf_json_parse_str(mp, &json); 125 126 if (conf == NULL) { 127 return NXT_ERROR; 128 } 129 130 nxt_controller_conf.root = conf; 131 nxt_controller_conf.pool = mp; 132 133 nxt_queue_init(&nxt_controller_waiting_requests); 134 135 return NXT_OK; 136} 137 138 139nxt_int_t 140nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) 141{ 142 nxt_sockaddr_t *sa; 143 nxt_listen_socket_t *ls; 144 145 sa = rt->controller_listen; 146 147 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t)); 148 if (ls == NULL) { 149 return NXT_ERROR; 150 } 151 152 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr, 153 sa->socklen, sa->length); 154 if (ls->sockaddr == NULL) { 155 return NXT_ERROR; 156 } 157 158 ls->sockaddr->type = sa->type; 159 ls->socklen = sa->socklen; 160 ls->address_length = sa->length; 161 162 nxt_sockaddr_text(ls->sockaddr); 163 164 ls->socket = -1; 165 ls->backlog = NXT_LISTEN_BACKLOG; 166 ls->read_after_accept = 1; 167 ls->flags = NXT_NONBLOCK; 168 169#if 0 170 /* STUB */ 171 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t)); 172 if (wq == NULL) { 173 return NXT_ERROR; 174 } 175 nxt_work_queue_name(wq, "listen"); 176 /**/ 177 178 ls->work_queue = wq; 179#endif 180 ls->handler = nxt_controller_conn_init; 181 182 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { 183 return NXT_ERROR; 184 } 185 186 rt->controller_socket = ls; 187 188 return NXT_OK; 189} 190 191 192static void 193nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) 194{ 195 nxt_buf_t *b; 196 nxt_conn_t *c; 197 nxt_event_engine_t *engine; 198 nxt_controller_request_t *r; 199 200 c = obj; 201 202 nxt_debug(task, "controller conn init fd:%d", c->socket.fd); 203 204 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t)); 205 if (nxt_slow_path(r == NULL)) { 206 nxt_controller_conn_free(task, c, NULL); 207 return; 208 } 209 210 r->conn = c; 211 212 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool) 213 != NXT_OK)) 214 { 215 nxt_controller_conn_free(task, c, NULL); 216 return; 217 } 218 219 r->parser.fields_hash = nxt_controller_fields_hash; 220 221 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); 222 if (nxt_slow_path(b == NULL)) { 223 nxt_controller_conn_free(task, c, NULL); 224 return; 225 } 226 227 c->read = b; 228 c->socket.data = r; 229 c->socket.read_ready = 1; 230 c->read_state = &nxt_controller_conn_read_state; 231 232 engine = task->thread->engine; 233 c->read_work_queue = &engine->read_work_queue; 234 c->write_work_queue = &engine->write_work_queue; 235 236 nxt_conn_read(engine, c); 237} 238 239 240static const nxt_event_conn_state_t nxt_controller_conn_read_state 241 nxt_aligned(64) = 242{ 243 .ready_handler = nxt_controller_conn_read, 244 .close_handler = nxt_controller_conn_close, 245 .error_handler = nxt_controller_conn_read_error, 246 247 .timer_handler = nxt_controller_conn_read_timeout, 248 .timer_value = nxt_controller_conn_timeout_value, 249 .timer_data = 60 * 1000, 250}; 251 252 253static void 254nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) 255{ 256 size_t preread; 257 nxt_buf_t *b; 258 nxt_int_t rc; 259 nxt_conn_t *c; 260 nxt_controller_request_t *r; 261 262 c = obj; 263 r = data; 264 265 nxt_debug(task, "controller conn read"); 266 267 nxt_queue_remove(&c->link); 268 nxt_queue_self(&c->link); 269 270 b = c->read; 271 272 rc = nxt_http_parse_request(&r->parser, &b->mem); 273 274 if (nxt_slow_path(rc != NXT_DONE)) { 275 276 if (rc == NXT_AGAIN) { 277 if (nxt_buf_mem_free_size(&b->mem) == 0) { 278 nxt_log(task, NXT_LOG_ERR, "too long request headers"); 279 nxt_controller_conn_close(task, c, r); 280 return; 281 } 282 283 nxt_conn_read(task->thread->engine, c); 284 return; 285 } 286 287 /* rc == NXT_ERROR */ 288 289 nxt_log(task, NXT_LOG_ERR, "parsing error"); 290 291 nxt_controller_conn_close(task, c, r); 292 return; 293 } 294 295 rc = nxt_http_fields_process(r->parser.fields, r, task->log); 296 297 if (nxt_slow_path(rc != NXT_OK)) { 298 nxt_controller_conn_close(task, c, r); 299 return; 300 } 301 302 preread = nxt_buf_mem_used_size(&b->mem); 303 304 nxt_debug(task, "controller request header parsing complete, " 305 "body length: %uz, preread: %uz", 306 r->length, preread); 307 308 if (preread >= r->length) { 309 nxt_controller_process_request(task, r); 310 return; 311 } 312 313 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) { 314 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0); 315 if (nxt_slow_path(b == NULL)) { 316 nxt_controller_conn_free(task, c, NULL); 317 return; 318 } 319 320 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread); 321 322 c->read = b; 323 } 324 325 c->read_state = &nxt_controller_conn_body_read_state; 326 327 nxt_conn_read(task->thread->engine, c); 328} 329 330 331static nxt_msec_t 332nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 333{ 334 return (nxt_msec_t) data; 335} 336 337 338static void 339nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) 340{ 341 nxt_conn_t *c; 342 343 c = obj; 344 345 nxt_debug(task, "controller conn read error"); 346 347 nxt_controller_conn_close(task, c, data); 348} 349 350 351static void 352nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) 353{ 354 nxt_timer_t *timer; 355 nxt_conn_t *c; 356 357 timer = obj; 358 359 c = nxt_read_timer_conn(timer); 360 c->socket.timedout = 1; 361 c->socket.closed = 1; 362 363 nxt_debug(task, "controller conn read timeout"); 364 365 nxt_controller_conn_close(task, c, data); 366} 367 368 369static const nxt_event_conn_state_t nxt_controller_conn_body_read_state 370 nxt_aligned(64) = 371{ 372 .ready_handler = nxt_controller_conn_body_read, 373 .close_handler = nxt_controller_conn_close, 374 .error_handler = nxt_controller_conn_read_error, 375 376 .timer_handler = nxt_controller_conn_read_timeout, 377 .timer_value = nxt_controller_conn_timeout_value, 378 .timer_data = 60 * 1000, 379 .timer_autoreset = 1, 380}; 381 382 383static void 384nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) 385{ 386 size_t read; 387 nxt_buf_t *b; 388 nxt_conn_t *c; 389 nxt_controller_request_t *r; 390 391 c = obj; 392 r = data; 393 b = c->read; 394 395 read = nxt_buf_mem_used_size(&b->mem); 396 397 nxt_debug(task, "controller conn body read: %uz of %uz", 398 read, r->length); 399 400 if (read >= r->length) { 401 nxt_controller_process_request(task, r); 402 return; 403 } 404 405 nxt_conn_read(task->thread->engine, c); 406} 407 408 409static const nxt_event_conn_state_t nxt_controller_conn_write_state 410 nxt_aligned(64) = 411{ 412 .ready_handler = nxt_controller_conn_write, 413 .error_handler = nxt_controller_conn_write_error, 414 415 .timer_handler = nxt_controller_conn_write_timeout, 416 .timer_value = nxt_controller_conn_timeout_value, 417 .timer_data = 60 * 1000, 418 .timer_autoreset = 1, 419}; 420 421 422static void 423nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) 424{ 425 nxt_buf_t *b; 426 nxt_conn_t *c; 427 428 c = obj; 429 430 nxt_debug(task, "controller conn write"); 431 432 b = c->write; 433 434 if (b->mem.pos != b->mem.free) { 435 nxt_conn_write(task->thread->engine, c); 436 return; 437 } 438 439 nxt_debug(task, "controller conn write complete"); 440 441 nxt_controller_conn_close(task, c, data); 442} 443 444 445static void 446nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) 447{ 448 nxt_conn_t *c; 449 450 c = obj; 451 452 nxt_debug(task, "controller conn write error"); 453 454 nxt_controller_conn_close(task, c, data); 455} 456 457 458static void 459nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) 460{ 461 nxt_conn_t *c; 462 nxt_timer_t *timer; 463 464 timer = obj; 465 466 c = nxt_write_timer_conn(timer); 467 c->socket.timedout = 1; 468 c->socket.closed = 1; 469 470 nxt_debug(task, "controller conn write timeout"); 471 472 nxt_controller_conn_close(task, c, data); 473} 474 475 476static const nxt_event_conn_state_t nxt_controller_conn_close_state 477 nxt_aligned(64) = 478{ 479 .ready_handler = nxt_controller_conn_free, 480}; 481 482 483static void 484nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) 485{ 486 nxt_conn_t *c; 487 488 c = obj; 489 490 nxt_debug(task, "controller conn close"); 491 492 nxt_queue_remove(&c->link); 493 494 c->write_state = &nxt_controller_conn_close_state; 495 496 nxt_conn_close(task->thread->engine, c); 497} 498 499 500static void 501nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) 502{ 503 nxt_conn_t *c; 504 505 c = obj; 506 507 nxt_debug(task, "controller conn free"); 508 509 nxt_mp_destroy(c->mem_pool); 510 511 //nxt_free(c); 512} 513 514 515static nxt_int_t 516nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, 517 nxt_log_t *log) 518{ 519 off_t length; 520 nxt_controller_request_t *r; 521 522 r = ctx; 523 524 length = nxt_off_t_parse(field->value.start, field->value.length); 525 526 if (nxt_fast_path(length > 0)) { 527 528 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) { 529 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is too big"); 530 return NXT_ERROR; 531 } 532 533 r->length = length; 534 return NXT_OK; 535 } 536 537 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid"); 538 539 return NXT_ERROR; 540} 541 542 543static void 544nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) 545{ 546 nxt_mp_t *mp; 547 nxt_int_t rc; 548 nxt_str_t path; 549 nxt_conn_t *c; 550 nxt_buf_mem_t *mbuf; 551 nxt_conf_op_t *ops; 552 nxt_conf_value_t *value; 553 nxt_conf_json_error_t error; 554 nxt_controller_response_t resp; 555 556 static const nxt_str_t empty_obj = nxt_string("{}"); 557 558 c = req->conn; 559 path = req->parser.path; 560 561 if (path.length > 1 && path.start[path.length - 1] == '/') { 562 path.length--; 563 } 564 565 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 566 567 if (nxt_str_eq(&req->parser.method, "GET", 3)) { 568 569 value = nxt_conf_get_path(nxt_controller_conf.root, &path); 570 571 if (value == NULL) { 572 goto not_found; 573 } 574 575 resp.status = 200; 576 resp.conf = value; 577 578 nxt_controller_response(task, req, &resp); 579 return; 580 } 581 582 if (nxt_str_eq(&req->parser.method, "PUT", 3)) { 583 584 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 585 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 586 return; 587 } 588 589 mp = nxt_mp_create(1024, 128, 256, 32); 590 591 if (nxt_slow_path(mp == NULL)) { 592 goto alloc_fail; 593 } 594 595 mbuf = &c->read->mem; 596 597 nxt_memzero(&error, sizeof(nxt_conf_json_error_t)); 598 599 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error); 600 601 if (value == NULL) { 602 nxt_mp_destroy(mp); 603 604 if (error.pos == NULL) { 605 goto alloc_fail; 606 } 607 608 resp.status = 400; 609 resp.title = (u_char *) "Invalid JSON."; 610 resp.detail = error.detail; 611 resp.offset = error.pos - mbuf->pos; 612 613 nxt_conf_json_position(mbuf->pos, error.pos, 614 &resp.line, &resp.column); 615 616 nxt_controller_response(task, req, &resp); 617 return; 618 } 619 620 if (path.length != 1) { 621 rc = nxt_conf_op_compile(c->mem_pool, &ops, 622 nxt_controller_conf.root, 623 &path, value); 624 625 if (rc != NXT_OK) { 626 if (rc == NXT_DECLINED) { 627 goto not_found; 628 } 629 630 goto alloc_fail; 631 } 632 633 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 634 635 if (nxt_slow_path(value == NULL)) { 636 nxt_mp_destroy(mp); 637 goto alloc_fail; 638 } 639 } 640 641 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) { 642 nxt_mp_destroy(mp); 643 goto invalid_conf; 644 } 645 646 req->conf.root = value; 647 req->conf.pool = mp; 648 649 if (nxt_controller_conf_apply(task, req) != NXT_OK) { 650 nxt_mp_destroy(mp); 651 goto alloc_fail; 652 } 653 654 return; 655 } 656 657 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { 658 659 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 660 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 661 return; 662 } 663 664 if (path.length == 1) { 665 mp = nxt_mp_create(1024, 128, 256, 32); 666 667 if (nxt_slow_path(mp == NULL)) { 668 goto alloc_fail; 669 } 670 671 value = nxt_conf_json_parse_str(mp, &empty_obj); 672 673 } else { 674 rc = nxt_conf_op_compile(c->mem_pool, &ops, 675 nxt_controller_conf.root, 676 &path, NULL); 677 678 if (rc != NXT_OK) { 679 if (rc == NXT_DECLINED) { 680 goto not_found; 681 } 682 683 goto alloc_fail; 684 } 685 686 mp = nxt_mp_create(1024, 128, 256, 32); 687 688 if (nxt_slow_path(mp == NULL)) { 689 goto alloc_fail; 690 } 691 692 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 693 } 694 695 if (nxt_slow_path(value == NULL)) { 696 nxt_mp_destroy(mp); 697 goto alloc_fail; 698 } 699 700 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) { 701 nxt_mp_destroy(mp); 702 goto invalid_conf; 703 } 704 705 req->conf.root = value; 706 req->conf.pool = mp; 707 708 if (nxt_controller_conf_apply(task, req) != NXT_OK) { 709 nxt_mp_destroy(mp); 710 goto alloc_fail; 711 } 712 713 return; 714 } 715 716 resp.status = 405; 717 resp.title = (u_char *) "Invalid method."; 718 resp.offset = -1; 719 720 nxt_controller_response(task, req, &resp); 721 return; 722 723alloc_fail: 724 725 resp.status = 500; 726 resp.title = (u_char *) "Memory allocation failed."; 727 resp.offset = -1; 728 729 nxt_controller_response(task, req, &resp); 730 return; 731 732not_found: 733 734 resp.status = 404; 735 resp.title = (u_char *) "Value doesn't exist."; 736 resp.offset = -1; 737 738 nxt_controller_response(task, req, &resp); 739 return; 740 741invalid_conf: 742 743 resp.status = 400; 744 resp.title = (u_char *) "Invalid configuration."; 745 resp.offset = -1; 746 747 nxt_controller_response(task, req, &resp); 748 return; 749} 750 751 752static nxt_int_t 753nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req) 754{ 755 size_t size; 756 uint32_t stream; 757 nxt_int_t rc; 758 nxt_buf_t *b; 759 nxt_port_t *router_port, *controller_port; 760 nxt_runtime_t *rt; 761 762 rt = task->thread->runtime; 763 764 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 765 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 766 767 size = nxt_conf_json_length(req->conf.root, NULL); 768 769 b = nxt_port_mmap_get_buf(task, router_port, size); 770 771 b->mem.free = nxt_conf_json_print(b->mem.free, req->conf.root, NULL); 772 773 stream = nxt_port_rpc_register_handler(task, controller_port, 774 nxt_controller_conf_handler, 775 nxt_controller_conf_handler, 776 router_port->pid, req); 777 778 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, 779 stream, controller_port->id, b); 780 781 if (nxt_slow_path(rc != NXT_OK)) { 782 nxt_port_rpc_cancel(task, controller_port, stream); 783 return NXT_ERROR; 784 } 785 786 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 787 788 return NXT_OK; 789} 790 791 792static void 793nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 794 void *data) 795{ 796 nxt_queue_t queue; 797 nxt_controller_request_t *req; 798 nxt_controller_response_t resp; 799 800 req = data; 801 802 nxt_debug(task, "controller conf ready: %*s", 803 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 804 805 nxt_queue_remove(&req->link); 806 807 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 808 809 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 810 nxt_mp_destroy(nxt_controller_conf.pool); 811 812 nxt_controller_conf = req->conf; 813 814 resp.status = 200; 815 resp.title = (u_char *) "Reconfiguration done."; 816 817 } else { 818 nxt_mp_destroy(req->conf.pool); 819 820 resp.status = 500; 821 resp.title = (u_char *) "Failed to apply new configuration."; 822 resp.offset = -1; 823 } 824 825 nxt_controller_response(task, req, &resp); 826 827 nxt_queue_init(&queue); 828 nxt_queue_add(&queue, &nxt_controller_waiting_requests); 829 830 nxt_queue_init(&nxt_controller_waiting_requests); 831 832 nxt_queue_each(req, &queue, nxt_controller_request_t, link) { 833 nxt_controller_process_request(task, req); 834 } nxt_queue_loop; 835} 836 837 838static void 839nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, 840 nxt_controller_response_t *resp) 841{ 842 size_t size; 843 nxt_str_t status_line, str; 844 nxt_buf_t *b, *body; 845 nxt_conn_t *c; 846 nxt_uint_t n; 847 nxt_conf_value_t *value, *location; 848 nxt_conf_json_pretty_t pretty; 849 850 static nxt_str_t success_str = nxt_string("success"); 851 static nxt_str_t error_str = nxt_string("error"); 852 static nxt_str_t detail_str = nxt_string("detail"); 853 static nxt_str_t location_str = nxt_string("location"); 854 static nxt_str_t offset_str = nxt_string("offset"); 855 static nxt_str_t line_str = nxt_string("line"); 856 static nxt_str_t column_str = nxt_string("column"); 857 858 static nxt_time_string_t date_cache = { 859 (nxt_atomic_uint_t) -1, 860 nxt_controller_date, 861 "%s, %02d %s %4d %02d:%02d:%02d GMT", 862 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1, 863 NXT_THREAD_TIME_GMT, 864 NXT_THREAD_TIME_SEC, 865 }; 866 867 switch (resp->status) { 868 869 case 200: 870 nxt_str_set(&status_line, "200 OK"); 871 break; 872 873 case 400: 874 nxt_str_set(&status_line, "400 Bad Request"); 875 break; 876 877 case 404: 878 nxt_str_set(&status_line, "404 Not Found"); 879 break; 880 881 case 405: 882 nxt_str_set(&status_line, "405 Method Not Allowed"); 883 break; 884 885 default: 886 nxt_str_set(&status_line, "500 Internal Server Error"); 887 break; 888 } 889 890 c = req->conn; 891 value = resp->conf; 892 893 if (value == NULL) { 894 n = 1 895 + (resp->detail != NULL) 896 + (resp->status >= 400 && resp->offset != -1); 897 898 value = nxt_conf_create_object(c->mem_pool, n); 899 900 if (nxt_slow_path(value == NULL)) { 901 nxt_controller_conn_close(task, c, req); 902 return; 903 } 904 905 str.length = nxt_strlen(resp->title); 906 str.start = resp->title; 907 908 if (resp->status < 400) { 909 nxt_conf_set_member_string(value, &success_str, &str, 0); 910 911 } else { 912 nxt_conf_set_member_string(value, &error_str, &str, 0); 913 } 914 915 n = 0; 916 917 if (resp->detail != NULL) { 918 str.length = nxt_strlen(resp->detail); 919 str.start = resp->detail; 920 921 n++; 922 923 nxt_conf_set_member_string(value, &detail_str, &str, n); 924 } 925 926 if (resp->status >= 400 && resp->offset != -1) { 927 n++; 928 929 location = nxt_conf_create_object(c->mem_pool, 930 resp->line != 0 ? 3 : 1); 931 932 nxt_conf_set_member(value, &location_str, location, n); 933 934 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0); 935 936 if (resp->line != 0) { 937 nxt_conf_set_member_integer(location, &line_str, 938 resp->line, 1); 939 940 nxt_conf_set_member_integer(location, &column_str, 941 resp->column, 2); 942 } 943 } 944 } 945 946 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 947 948 size = nxt_conf_json_length(value, &pretty) + 2; 949 950 body = nxt_buf_mem_alloc(c->mem_pool, size, 0); 951 if (nxt_slow_path(body == NULL)) { 952 nxt_controller_conn_close(task, c, req); 953 return; 954 } 955 956 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 957 958 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty); 959 960 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2); 961 962 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length 963 + sizeof("Server: nginext/0.1\r\n") - 1 964 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1 965 + sizeof("Content-Type: application/json\r\n") - 1 966 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN 967 + sizeof("Connection: close\r\n") - 1 968 + sizeof("\r\n") - 1; 969 970 b = nxt_buf_mem_alloc(c->mem_pool, size, 0); 971 if (nxt_slow_path(b == NULL)) { 972 nxt_controller_conn_close(task, c, req); 973 return; 974 } 975 976 b->next = body; 977 978 nxt_str_set(&str, "HTTP/1.1 "); 979 980 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 981 b->mem.free = nxt_cpymem(b->mem.free, status_line.start, 982 status_line.length); 983 984 nxt_str_set(&str, "\r\n" 985 "Server: nginext/0.1\r\n" 986 "Date: "); 987 988 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 989 990 b->mem.free = nxt_thread_time_string(task->thread, &date_cache, 991 b->mem.free); 992 993 nxt_str_set(&str, "\r\n" 994 "Content-Type: application/json\r\n" 995 "Content-Length: "); 996 997 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 998 999 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz", 1000 nxt_buf_mem_used_size(&body->mem)); 1001 1002 nxt_str_set(&str, "\r\n" 1003 "Connection: close\r\n" 1004 "\r\n"); 1005 1006 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1007 1008 c->write = b; 1009 c->write_state = &nxt_controller_conn_write_state; 1010 1011 nxt_conn_write(task->thread->engine, c); 1012} 1013 1014 1015static u_char * 1016nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 1017 size_t size, const char *format) 1018{ 1019 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", 1020 "Sat" }; 1021 1022 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 1023 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 1024 1025 return nxt_sprintf(buf, buf + size, format, 1026 week[tm->tm_wday], tm->tm_mday, 1027 month[tm->tm_mon], tm->tm_year + 1900, 1028 tm->tm_hour, tm->tm_min, tm->tm_sec); 1029}
| 11#include <nxt_conf.h> 12 13 14typedef struct { 15 nxt_conf_value_t *root; 16 nxt_mp_t *pool; 17} nxt_controller_conf_t; 18 19 20typedef struct { 21 nxt_http_request_parse_t parser; 22 size_t length; 23 nxt_controller_conf_t conf; 24 nxt_conn_t *conn; 25 nxt_queue_link_t link; 26} nxt_controller_request_t; 27 28 29typedef struct { 30 nxt_uint_t status; 31 nxt_conf_value_t *conf; 32 33 u_char *title; 34 u_char *detail; 35 ssize_t offset; 36 nxt_uint_t line; 37 nxt_uint_t column; 38} nxt_controller_response_t; 39 40 41static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); 42static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); 43static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, 44 uintptr_t data); 45static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, 46 void *data); 47static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, 48 void *data); 49static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, 50 void *data); 51static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data); 52static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, 53 void *data); 54static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, 55 void *data); 56static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); 57static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); 58 59static nxt_int_t nxt_controller_request_content_length(void *ctx, 60 nxt_http_field_t *field, nxt_log_t *log); 61 62static void nxt_controller_process_request(nxt_task_t *task, 63 nxt_controller_request_t *req); 64static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task, 65 nxt_controller_request_t *req); 66static void nxt_controller_conf_handler(nxt_task_t *task, 67 nxt_port_recv_msg_t *msg, void *data); 68static void nxt_controller_response(nxt_task_t *task, 69 nxt_controller_request_t *req, nxt_controller_response_t *resp); 70static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now, 71 struct tm *tm, size_t size, const char *format); 72 73 74static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = { 75 { nxt_string("Content-Length"), 76 &nxt_controller_request_content_length, 0 }, 77 78 { nxt_null_string, NULL, 0 } 79}; 80 81static nxt_http_fields_hash_t *nxt_controller_fields_hash; 82 83static nxt_controller_conf_t nxt_controller_conf; 84static nxt_queue_t nxt_controller_waiting_requests; 85 86 87static const nxt_event_conn_state_t nxt_controller_conn_read_state; 88static const nxt_event_conn_state_t nxt_controller_conn_body_read_state; 89static const nxt_event_conn_state_t nxt_controller_conn_write_state; 90static const nxt_event_conn_state_t nxt_controller_conn_close_state; 91 92 93nxt_int_t 94nxt_controller_start(nxt_task_t *task, void *data) 95{ 96 nxt_mp_t *mp; 97 nxt_runtime_t *rt; 98 nxt_conf_value_t *conf; 99 nxt_http_fields_hash_t *hash; 100 101 static const nxt_str_t json 102 = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); 103 104 rt = task->thread->runtime; 105 106 hash = nxt_http_fields_hash_create(nxt_controller_request_fields, 107 rt->mem_pool); 108 if (nxt_slow_path(hash == NULL)) { 109 return NXT_ERROR; 110 } 111 112 nxt_controller_fields_hash = hash; 113 114 if (nxt_listen_event(task, rt->controller_socket) == NULL) { 115 return NXT_ERROR; 116 } 117 118 mp = nxt_mp_create(1024, 128, 256, 32); 119 120 if (nxt_slow_path(mp == NULL)) { 121 return NXT_ERROR; 122 } 123 124 conf = nxt_conf_json_parse_str(mp, &json); 125 126 if (conf == NULL) { 127 return NXT_ERROR; 128 } 129 130 nxt_controller_conf.root = conf; 131 nxt_controller_conf.pool = mp; 132 133 nxt_queue_init(&nxt_controller_waiting_requests); 134 135 return NXT_OK; 136} 137 138 139nxt_int_t 140nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) 141{ 142 nxt_sockaddr_t *sa; 143 nxt_listen_socket_t *ls; 144 145 sa = rt->controller_listen; 146 147 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t)); 148 if (ls == NULL) { 149 return NXT_ERROR; 150 } 151 152 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr, 153 sa->socklen, sa->length); 154 if (ls->sockaddr == NULL) { 155 return NXT_ERROR; 156 } 157 158 ls->sockaddr->type = sa->type; 159 ls->socklen = sa->socklen; 160 ls->address_length = sa->length; 161 162 nxt_sockaddr_text(ls->sockaddr); 163 164 ls->socket = -1; 165 ls->backlog = NXT_LISTEN_BACKLOG; 166 ls->read_after_accept = 1; 167 ls->flags = NXT_NONBLOCK; 168 169#if 0 170 /* STUB */ 171 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t)); 172 if (wq == NULL) { 173 return NXT_ERROR; 174 } 175 nxt_work_queue_name(wq, "listen"); 176 /**/ 177 178 ls->work_queue = wq; 179#endif 180 ls->handler = nxt_controller_conn_init; 181 182 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { 183 return NXT_ERROR; 184 } 185 186 rt->controller_socket = ls; 187 188 return NXT_OK; 189} 190 191 192static void 193nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) 194{ 195 nxt_buf_t *b; 196 nxt_conn_t *c; 197 nxt_event_engine_t *engine; 198 nxt_controller_request_t *r; 199 200 c = obj; 201 202 nxt_debug(task, "controller conn init fd:%d", c->socket.fd); 203 204 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t)); 205 if (nxt_slow_path(r == NULL)) { 206 nxt_controller_conn_free(task, c, NULL); 207 return; 208 } 209 210 r->conn = c; 211 212 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool) 213 != NXT_OK)) 214 { 215 nxt_controller_conn_free(task, c, NULL); 216 return; 217 } 218 219 r->parser.fields_hash = nxt_controller_fields_hash; 220 221 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); 222 if (nxt_slow_path(b == NULL)) { 223 nxt_controller_conn_free(task, c, NULL); 224 return; 225 } 226 227 c->read = b; 228 c->socket.data = r; 229 c->socket.read_ready = 1; 230 c->read_state = &nxt_controller_conn_read_state; 231 232 engine = task->thread->engine; 233 c->read_work_queue = &engine->read_work_queue; 234 c->write_work_queue = &engine->write_work_queue; 235 236 nxt_conn_read(engine, c); 237} 238 239 240static const nxt_event_conn_state_t nxt_controller_conn_read_state 241 nxt_aligned(64) = 242{ 243 .ready_handler = nxt_controller_conn_read, 244 .close_handler = nxt_controller_conn_close, 245 .error_handler = nxt_controller_conn_read_error, 246 247 .timer_handler = nxt_controller_conn_read_timeout, 248 .timer_value = nxt_controller_conn_timeout_value, 249 .timer_data = 60 * 1000, 250}; 251 252 253static void 254nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) 255{ 256 size_t preread; 257 nxt_buf_t *b; 258 nxt_int_t rc; 259 nxt_conn_t *c; 260 nxt_controller_request_t *r; 261 262 c = obj; 263 r = data; 264 265 nxt_debug(task, "controller conn read"); 266 267 nxt_queue_remove(&c->link); 268 nxt_queue_self(&c->link); 269 270 b = c->read; 271 272 rc = nxt_http_parse_request(&r->parser, &b->mem); 273 274 if (nxt_slow_path(rc != NXT_DONE)) { 275 276 if (rc == NXT_AGAIN) { 277 if (nxt_buf_mem_free_size(&b->mem) == 0) { 278 nxt_log(task, NXT_LOG_ERR, "too long request headers"); 279 nxt_controller_conn_close(task, c, r); 280 return; 281 } 282 283 nxt_conn_read(task->thread->engine, c); 284 return; 285 } 286 287 /* rc == NXT_ERROR */ 288 289 nxt_log(task, NXT_LOG_ERR, "parsing error"); 290 291 nxt_controller_conn_close(task, c, r); 292 return; 293 } 294 295 rc = nxt_http_fields_process(r->parser.fields, r, task->log); 296 297 if (nxt_slow_path(rc != NXT_OK)) { 298 nxt_controller_conn_close(task, c, r); 299 return; 300 } 301 302 preread = nxt_buf_mem_used_size(&b->mem); 303 304 nxt_debug(task, "controller request header parsing complete, " 305 "body length: %uz, preread: %uz", 306 r->length, preread); 307 308 if (preread >= r->length) { 309 nxt_controller_process_request(task, r); 310 return; 311 } 312 313 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) { 314 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0); 315 if (nxt_slow_path(b == NULL)) { 316 nxt_controller_conn_free(task, c, NULL); 317 return; 318 } 319 320 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread); 321 322 c->read = b; 323 } 324 325 c->read_state = &nxt_controller_conn_body_read_state; 326 327 nxt_conn_read(task->thread->engine, c); 328} 329 330 331static nxt_msec_t 332nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 333{ 334 return (nxt_msec_t) data; 335} 336 337 338static void 339nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) 340{ 341 nxt_conn_t *c; 342 343 c = obj; 344 345 nxt_debug(task, "controller conn read error"); 346 347 nxt_controller_conn_close(task, c, data); 348} 349 350 351static void 352nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) 353{ 354 nxt_timer_t *timer; 355 nxt_conn_t *c; 356 357 timer = obj; 358 359 c = nxt_read_timer_conn(timer); 360 c->socket.timedout = 1; 361 c->socket.closed = 1; 362 363 nxt_debug(task, "controller conn read timeout"); 364 365 nxt_controller_conn_close(task, c, data); 366} 367 368 369static const nxt_event_conn_state_t nxt_controller_conn_body_read_state 370 nxt_aligned(64) = 371{ 372 .ready_handler = nxt_controller_conn_body_read, 373 .close_handler = nxt_controller_conn_close, 374 .error_handler = nxt_controller_conn_read_error, 375 376 .timer_handler = nxt_controller_conn_read_timeout, 377 .timer_value = nxt_controller_conn_timeout_value, 378 .timer_data = 60 * 1000, 379 .timer_autoreset = 1, 380}; 381 382 383static void 384nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) 385{ 386 size_t read; 387 nxt_buf_t *b; 388 nxt_conn_t *c; 389 nxt_controller_request_t *r; 390 391 c = obj; 392 r = data; 393 b = c->read; 394 395 read = nxt_buf_mem_used_size(&b->mem); 396 397 nxt_debug(task, "controller conn body read: %uz of %uz", 398 read, r->length); 399 400 if (read >= r->length) { 401 nxt_controller_process_request(task, r); 402 return; 403 } 404 405 nxt_conn_read(task->thread->engine, c); 406} 407 408 409static const nxt_event_conn_state_t nxt_controller_conn_write_state 410 nxt_aligned(64) = 411{ 412 .ready_handler = nxt_controller_conn_write, 413 .error_handler = nxt_controller_conn_write_error, 414 415 .timer_handler = nxt_controller_conn_write_timeout, 416 .timer_value = nxt_controller_conn_timeout_value, 417 .timer_data = 60 * 1000, 418 .timer_autoreset = 1, 419}; 420 421 422static void 423nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) 424{ 425 nxt_buf_t *b; 426 nxt_conn_t *c; 427 428 c = obj; 429 430 nxt_debug(task, "controller conn write"); 431 432 b = c->write; 433 434 if (b->mem.pos != b->mem.free) { 435 nxt_conn_write(task->thread->engine, c); 436 return; 437 } 438 439 nxt_debug(task, "controller conn write complete"); 440 441 nxt_controller_conn_close(task, c, data); 442} 443 444 445static void 446nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) 447{ 448 nxt_conn_t *c; 449 450 c = obj; 451 452 nxt_debug(task, "controller conn write error"); 453 454 nxt_controller_conn_close(task, c, data); 455} 456 457 458static void 459nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) 460{ 461 nxt_conn_t *c; 462 nxt_timer_t *timer; 463 464 timer = obj; 465 466 c = nxt_write_timer_conn(timer); 467 c->socket.timedout = 1; 468 c->socket.closed = 1; 469 470 nxt_debug(task, "controller conn write timeout"); 471 472 nxt_controller_conn_close(task, c, data); 473} 474 475 476static const nxt_event_conn_state_t nxt_controller_conn_close_state 477 nxt_aligned(64) = 478{ 479 .ready_handler = nxt_controller_conn_free, 480}; 481 482 483static void 484nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) 485{ 486 nxt_conn_t *c; 487 488 c = obj; 489 490 nxt_debug(task, "controller conn close"); 491 492 nxt_queue_remove(&c->link); 493 494 c->write_state = &nxt_controller_conn_close_state; 495 496 nxt_conn_close(task->thread->engine, c); 497} 498 499 500static void 501nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) 502{ 503 nxt_conn_t *c; 504 505 c = obj; 506 507 nxt_debug(task, "controller conn free"); 508 509 nxt_mp_destroy(c->mem_pool); 510 511 //nxt_free(c); 512} 513 514 515static nxt_int_t 516nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, 517 nxt_log_t *log) 518{ 519 off_t length; 520 nxt_controller_request_t *r; 521 522 r = ctx; 523 524 length = nxt_off_t_parse(field->value.start, field->value.length); 525 526 if (nxt_fast_path(length > 0)) { 527 528 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) { 529 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is too big"); 530 return NXT_ERROR; 531 } 532 533 r->length = length; 534 return NXT_OK; 535 } 536 537 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid"); 538 539 return NXT_ERROR; 540} 541 542 543static void 544nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) 545{ 546 nxt_mp_t *mp; 547 nxt_int_t rc; 548 nxt_str_t path; 549 nxt_conn_t *c; 550 nxt_buf_mem_t *mbuf; 551 nxt_conf_op_t *ops; 552 nxt_conf_value_t *value; 553 nxt_conf_json_error_t error; 554 nxt_controller_response_t resp; 555 556 static const nxt_str_t empty_obj = nxt_string("{}"); 557 558 c = req->conn; 559 path = req->parser.path; 560 561 if (path.length > 1 && path.start[path.length - 1] == '/') { 562 path.length--; 563 } 564 565 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 566 567 if (nxt_str_eq(&req->parser.method, "GET", 3)) { 568 569 value = nxt_conf_get_path(nxt_controller_conf.root, &path); 570 571 if (value == NULL) { 572 goto not_found; 573 } 574 575 resp.status = 200; 576 resp.conf = value; 577 578 nxt_controller_response(task, req, &resp); 579 return; 580 } 581 582 if (nxt_str_eq(&req->parser.method, "PUT", 3)) { 583 584 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 585 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 586 return; 587 } 588 589 mp = nxt_mp_create(1024, 128, 256, 32); 590 591 if (nxt_slow_path(mp == NULL)) { 592 goto alloc_fail; 593 } 594 595 mbuf = &c->read->mem; 596 597 nxt_memzero(&error, sizeof(nxt_conf_json_error_t)); 598 599 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error); 600 601 if (value == NULL) { 602 nxt_mp_destroy(mp); 603 604 if (error.pos == NULL) { 605 goto alloc_fail; 606 } 607 608 resp.status = 400; 609 resp.title = (u_char *) "Invalid JSON."; 610 resp.detail = error.detail; 611 resp.offset = error.pos - mbuf->pos; 612 613 nxt_conf_json_position(mbuf->pos, error.pos, 614 &resp.line, &resp.column); 615 616 nxt_controller_response(task, req, &resp); 617 return; 618 } 619 620 if (path.length != 1) { 621 rc = nxt_conf_op_compile(c->mem_pool, &ops, 622 nxt_controller_conf.root, 623 &path, value); 624 625 if (rc != NXT_OK) { 626 if (rc == NXT_DECLINED) { 627 goto not_found; 628 } 629 630 goto alloc_fail; 631 } 632 633 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 634 635 if (nxt_slow_path(value == NULL)) { 636 nxt_mp_destroy(mp); 637 goto alloc_fail; 638 } 639 } 640 641 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) { 642 nxt_mp_destroy(mp); 643 goto invalid_conf; 644 } 645 646 req->conf.root = value; 647 req->conf.pool = mp; 648 649 if (nxt_controller_conf_apply(task, req) != NXT_OK) { 650 nxt_mp_destroy(mp); 651 goto alloc_fail; 652 } 653 654 return; 655 } 656 657 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { 658 659 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 660 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 661 return; 662 } 663 664 if (path.length == 1) { 665 mp = nxt_mp_create(1024, 128, 256, 32); 666 667 if (nxt_slow_path(mp == NULL)) { 668 goto alloc_fail; 669 } 670 671 value = nxt_conf_json_parse_str(mp, &empty_obj); 672 673 } else { 674 rc = nxt_conf_op_compile(c->mem_pool, &ops, 675 nxt_controller_conf.root, 676 &path, NULL); 677 678 if (rc != NXT_OK) { 679 if (rc == NXT_DECLINED) { 680 goto not_found; 681 } 682 683 goto alloc_fail; 684 } 685 686 mp = nxt_mp_create(1024, 128, 256, 32); 687 688 if (nxt_slow_path(mp == NULL)) { 689 goto alloc_fail; 690 } 691 692 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 693 } 694 695 if (nxt_slow_path(value == NULL)) { 696 nxt_mp_destroy(mp); 697 goto alloc_fail; 698 } 699 700 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) { 701 nxt_mp_destroy(mp); 702 goto invalid_conf; 703 } 704 705 req->conf.root = value; 706 req->conf.pool = mp; 707 708 if (nxt_controller_conf_apply(task, req) != NXT_OK) { 709 nxt_mp_destroy(mp); 710 goto alloc_fail; 711 } 712 713 return; 714 } 715 716 resp.status = 405; 717 resp.title = (u_char *) "Invalid method."; 718 resp.offset = -1; 719 720 nxt_controller_response(task, req, &resp); 721 return; 722 723alloc_fail: 724 725 resp.status = 500; 726 resp.title = (u_char *) "Memory allocation failed."; 727 resp.offset = -1; 728 729 nxt_controller_response(task, req, &resp); 730 return; 731 732not_found: 733 734 resp.status = 404; 735 resp.title = (u_char *) "Value doesn't exist."; 736 resp.offset = -1; 737 738 nxt_controller_response(task, req, &resp); 739 return; 740 741invalid_conf: 742 743 resp.status = 400; 744 resp.title = (u_char *) "Invalid configuration."; 745 resp.offset = -1; 746 747 nxt_controller_response(task, req, &resp); 748 return; 749} 750 751 752static nxt_int_t 753nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req) 754{ 755 size_t size; 756 uint32_t stream; 757 nxt_int_t rc; 758 nxt_buf_t *b; 759 nxt_port_t *router_port, *controller_port; 760 nxt_runtime_t *rt; 761 762 rt = task->thread->runtime; 763 764 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 765 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 766 767 size = nxt_conf_json_length(req->conf.root, NULL); 768 769 b = nxt_port_mmap_get_buf(task, router_port, size); 770 771 b->mem.free = nxt_conf_json_print(b->mem.free, req->conf.root, NULL); 772 773 stream = nxt_port_rpc_register_handler(task, controller_port, 774 nxt_controller_conf_handler, 775 nxt_controller_conf_handler, 776 router_port->pid, req); 777 778 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, 779 stream, controller_port->id, b); 780 781 if (nxt_slow_path(rc != NXT_OK)) { 782 nxt_port_rpc_cancel(task, controller_port, stream); 783 return NXT_ERROR; 784 } 785 786 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 787 788 return NXT_OK; 789} 790 791 792static void 793nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 794 void *data) 795{ 796 nxt_queue_t queue; 797 nxt_controller_request_t *req; 798 nxt_controller_response_t resp; 799 800 req = data; 801 802 nxt_debug(task, "controller conf ready: %*s", 803 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 804 805 nxt_queue_remove(&req->link); 806 807 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 808 809 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 810 nxt_mp_destroy(nxt_controller_conf.pool); 811 812 nxt_controller_conf = req->conf; 813 814 resp.status = 200; 815 resp.title = (u_char *) "Reconfiguration done."; 816 817 } else { 818 nxt_mp_destroy(req->conf.pool); 819 820 resp.status = 500; 821 resp.title = (u_char *) "Failed to apply new configuration."; 822 resp.offset = -1; 823 } 824 825 nxt_controller_response(task, req, &resp); 826 827 nxt_queue_init(&queue); 828 nxt_queue_add(&queue, &nxt_controller_waiting_requests); 829 830 nxt_queue_init(&nxt_controller_waiting_requests); 831 832 nxt_queue_each(req, &queue, nxt_controller_request_t, link) { 833 nxt_controller_process_request(task, req); 834 } nxt_queue_loop; 835} 836 837 838static void 839nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, 840 nxt_controller_response_t *resp) 841{ 842 size_t size; 843 nxt_str_t status_line, str; 844 nxt_buf_t *b, *body; 845 nxt_conn_t *c; 846 nxt_uint_t n; 847 nxt_conf_value_t *value, *location; 848 nxt_conf_json_pretty_t pretty; 849 850 static nxt_str_t success_str = nxt_string("success"); 851 static nxt_str_t error_str = nxt_string("error"); 852 static nxt_str_t detail_str = nxt_string("detail"); 853 static nxt_str_t location_str = nxt_string("location"); 854 static nxt_str_t offset_str = nxt_string("offset"); 855 static nxt_str_t line_str = nxt_string("line"); 856 static nxt_str_t column_str = nxt_string("column"); 857 858 static nxt_time_string_t date_cache = { 859 (nxt_atomic_uint_t) -1, 860 nxt_controller_date, 861 "%s, %02d %s %4d %02d:%02d:%02d GMT", 862 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1, 863 NXT_THREAD_TIME_GMT, 864 NXT_THREAD_TIME_SEC, 865 }; 866 867 switch (resp->status) { 868 869 case 200: 870 nxt_str_set(&status_line, "200 OK"); 871 break; 872 873 case 400: 874 nxt_str_set(&status_line, "400 Bad Request"); 875 break; 876 877 case 404: 878 nxt_str_set(&status_line, "404 Not Found"); 879 break; 880 881 case 405: 882 nxt_str_set(&status_line, "405 Method Not Allowed"); 883 break; 884 885 default: 886 nxt_str_set(&status_line, "500 Internal Server Error"); 887 break; 888 } 889 890 c = req->conn; 891 value = resp->conf; 892 893 if (value == NULL) { 894 n = 1 895 + (resp->detail != NULL) 896 + (resp->status >= 400 && resp->offset != -1); 897 898 value = nxt_conf_create_object(c->mem_pool, n); 899 900 if (nxt_slow_path(value == NULL)) { 901 nxt_controller_conn_close(task, c, req); 902 return; 903 } 904 905 str.length = nxt_strlen(resp->title); 906 str.start = resp->title; 907 908 if (resp->status < 400) { 909 nxt_conf_set_member_string(value, &success_str, &str, 0); 910 911 } else { 912 nxt_conf_set_member_string(value, &error_str, &str, 0); 913 } 914 915 n = 0; 916 917 if (resp->detail != NULL) { 918 str.length = nxt_strlen(resp->detail); 919 str.start = resp->detail; 920 921 n++; 922 923 nxt_conf_set_member_string(value, &detail_str, &str, n); 924 } 925 926 if (resp->status >= 400 && resp->offset != -1) { 927 n++; 928 929 location = nxt_conf_create_object(c->mem_pool, 930 resp->line != 0 ? 3 : 1); 931 932 nxt_conf_set_member(value, &location_str, location, n); 933 934 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0); 935 936 if (resp->line != 0) { 937 nxt_conf_set_member_integer(location, &line_str, 938 resp->line, 1); 939 940 nxt_conf_set_member_integer(location, &column_str, 941 resp->column, 2); 942 } 943 } 944 } 945 946 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 947 948 size = nxt_conf_json_length(value, &pretty) + 2; 949 950 body = nxt_buf_mem_alloc(c->mem_pool, size, 0); 951 if (nxt_slow_path(body == NULL)) { 952 nxt_controller_conn_close(task, c, req); 953 return; 954 } 955 956 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 957 958 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty); 959 960 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2); 961 962 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length 963 + sizeof("Server: nginext/0.1\r\n") - 1 964 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1 965 + sizeof("Content-Type: application/json\r\n") - 1 966 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN 967 + sizeof("Connection: close\r\n") - 1 968 + sizeof("\r\n") - 1; 969 970 b = nxt_buf_mem_alloc(c->mem_pool, size, 0); 971 if (nxt_slow_path(b == NULL)) { 972 nxt_controller_conn_close(task, c, req); 973 return; 974 } 975 976 b->next = body; 977 978 nxt_str_set(&str, "HTTP/1.1 "); 979 980 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 981 b->mem.free = nxt_cpymem(b->mem.free, status_line.start, 982 status_line.length); 983 984 nxt_str_set(&str, "\r\n" 985 "Server: nginext/0.1\r\n" 986 "Date: "); 987 988 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 989 990 b->mem.free = nxt_thread_time_string(task->thread, &date_cache, 991 b->mem.free); 992 993 nxt_str_set(&str, "\r\n" 994 "Content-Type: application/json\r\n" 995 "Content-Length: "); 996 997 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 998 999 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz", 1000 nxt_buf_mem_used_size(&body->mem)); 1001 1002 nxt_str_set(&str, "\r\n" 1003 "Connection: close\r\n" 1004 "\r\n"); 1005 1006 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1007 1008 c->write = b; 1009 c->write_state = &nxt_controller_conn_write_state; 1010 1011 nxt_conn_write(task->thread->engine, c); 1012} 1013 1014 1015static u_char * 1016nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 1017 size_t size, const char *format) 1018{ 1019 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", 1020 "Sat" }; 1021 1022 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 1023 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 1024 1025 return nxt_sprintf(buf, buf + size, format, 1026 week[tm->tm_wday], tm->tm_mday, 1027 month[tm->tm_mon], tm->tm_year + 1900, 1028 tm->tm_hour, tm->tm_min, tm->tm_sec); 1029}
|