1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_main.h> 9#include <nxt_runtime.h> 10#include <nxt_master_process.h> 11#include <nxt_conf.h> 12 13 14typedef struct { 15 nxt_conf_value_t *root; 16 nxt_mp_t *pool; 17} nxt_controller_conf_t; 18 19 20typedef struct { 21 nxt_http_request_parse_t parser; 22 size_t length; 23 nxt_controller_conf_t conf; 24 nxt_conn_t *conn; 25 nxt_queue_link_t link; 26} nxt_controller_request_t; 27 28 29typedef struct {
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_main.h> 9#include <nxt_runtime.h> 10#include <nxt_master_process.h> 11#include <nxt_conf.h> 12 13 14typedef struct { 15 nxt_conf_value_t *root; 16 nxt_mp_t *pool; 17} nxt_controller_conf_t; 18 19 20typedef struct { 21 nxt_http_request_parse_t parser; 22 size_t length; 23 nxt_controller_conf_t conf; 24 nxt_conn_t *conn; 25 nxt_queue_link_t link; 26} nxt_controller_request_t; 27 28 29typedef struct {
|
30 nxt_str_t status_line;
| 30 nxt_uint_t status;
|
31 nxt_conf_value_t *conf;
| 31 nxt_conf_value_t *conf;
|
32 nxt_str_t json;
| 32 33 u_char *title; 34 u_char *detail; 35 ssize_t offset; 36 nxt_uint_t line; 37 nxt_uint_t column;
|
33} nxt_controller_response_t; 34 35 36static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); 37static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); 38static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, 39 uintptr_t data); 40static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, 41 void *data); 42static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, 43 void *data); 44static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, 45 void *data); 46static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data); 47static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, 48 void *data); 49static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, 50 void *data); 51static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); 52static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); 53 54static nxt_int_t nxt_controller_request_content_length(void *ctx, 55 nxt_http_field_t *field, nxt_log_t *log); 56 57static void nxt_controller_process_request(nxt_task_t *task, 58 nxt_controller_request_t *req); 59static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task, 60 nxt_controller_request_t *req); 61static void nxt_controller_process_waiting(nxt_task_t *task); 62static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task, 63 nxt_conf_value_t *conf); 64static void nxt_controller_response(nxt_task_t *task, 65 nxt_controller_request_t *req, nxt_controller_response_t *resp);
| 38} nxt_controller_response_t; 39 40 41static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); 42static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); 43static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, 44 uintptr_t data); 45static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, 46 void *data); 47static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, 48 void *data); 49static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, 50 void *data); 51static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data); 52static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, 53 void *data); 54static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, 55 void *data); 56static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); 57static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); 58 59static nxt_int_t nxt_controller_request_content_length(void *ctx, 60 nxt_http_field_t *field, nxt_log_t *log); 61 62static void nxt_controller_process_request(nxt_task_t *task, 63 nxt_controller_request_t *req); 64static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task, 65 nxt_controller_request_t *req); 66static void nxt_controller_process_waiting(nxt_task_t *task); 67static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task, 68 nxt_conf_value_t *conf); 69static void nxt_controller_response(nxt_task_t *task, 70 nxt_controller_request_t *req, nxt_controller_response_t *resp);
|
66static nxt_buf_t *nxt_controller_response_body(nxt_controller_response_t *resp, 67 nxt_mp_t *pool);
| 71static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now, 72 struct tm *tm, size_t size, const char *format);
|
68 69 70static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = { 71 { nxt_string("Content-Length"), 72 &nxt_controller_request_content_length, 0 }, 73 74 { nxt_null_string, NULL, 0 } 75}; 76 77static nxt_http_fields_hash_t *nxt_controller_fields_hash; 78 79 80static nxt_controller_conf_t nxt_controller_conf; 81static nxt_queue_t nxt_controller_waiting_requests; 82static nxt_controller_request_t *nxt_controller_current_request; 83 84 85static const nxt_event_conn_state_t nxt_controller_conn_read_state; 86static const nxt_event_conn_state_t nxt_controller_conn_body_read_state; 87static const nxt_event_conn_state_t nxt_controller_conn_write_state; 88static const nxt_event_conn_state_t nxt_controller_conn_close_state; 89 90 91nxt_int_t 92nxt_controller_start(nxt_task_t *task, void *data) 93{ 94 nxt_mp_t *mp; 95 nxt_runtime_t *rt; 96 nxt_conf_value_t *conf; 97 nxt_http_fields_hash_t *hash; 98 99 static const nxt_str_t json 100 = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); 101 102 rt = task->thread->runtime; 103 104 hash = nxt_http_fields_hash_create(nxt_controller_request_fields, 105 rt->mem_pool); 106 if (nxt_slow_path(hash == NULL)) { 107 return NXT_ERROR; 108 } 109 110 nxt_controller_fields_hash = hash; 111 112 if (nxt_listen_event(task, rt->controller_socket) == NULL) { 113 return NXT_ERROR; 114 } 115 116 mp = nxt_mp_create(1024, 128, 256, 32); 117 118 if (nxt_slow_path(mp == NULL)) { 119 return NXT_ERROR; 120 } 121 122 conf = nxt_conf_json_parse_str(mp, &json); 123 124 if (conf == NULL) { 125 return NXT_ERROR; 126 } 127 128 nxt_controller_conf.root = conf; 129 nxt_controller_conf.pool = mp; 130 131 nxt_queue_init(&nxt_controller_waiting_requests); 132 133 return NXT_OK; 134} 135 136 137nxt_int_t 138nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) 139{ 140 nxt_sockaddr_t *sa; 141 nxt_listen_socket_t *ls; 142 143 sa = rt->controller_listen; 144 145 if (rt->controller_listen == NULL) { 146 sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in), 147 NXT_INET_ADDR_STR_LEN); 148 if (sa == NULL) { 149 return NXT_ERROR; 150 } 151 152 sa->type = SOCK_STREAM; 153 sa->u.sockaddr_in.sin_family = AF_INET; 154 sa->u.sockaddr_in.sin_port = htons(8443); 155 156 nxt_sockaddr_text(sa); 157 158 rt->controller_listen = sa; 159 } 160 161 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t)); 162 if (ls == NULL) { 163 return NXT_ERROR; 164 } 165 166 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr, 167 sa->socklen, sa->length); 168 if (ls->sockaddr == NULL) { 169 return NXT_ERROR; 170 } 171 172 ls->sockaddr->type = sa->type; 173 ls->socklen = sa->socklen; 174 ls->address_length = sa->length; 175 176 nxt_sockaddr_text(ls->sockaddr); 177 178 ls->socket = -1; 179 ls->backlog = NXT_LISTEN_BACKLOG; 180 ls->read_after_accept = 1; 181 ls->flags = NXT_NONBLOCK; 182 183#if 0 184 /* STUB */ 185 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t)); 186 if (wq == NULL) { 187 return NXT_ERROR; 188 } 189 nxt_work_queue_name(wq, "listen"); 190 /**/ 191 192 ls->work_queue = wq; 193#endif 194 ls->handler = nxt_controller_conn_init; 195 196 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { 197 return NXT_ERROR; 198 } 199 200 rt->controller_socket = ls; 201 202 return NXT_OK; 203} 204 205 206static void 207nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) 208{ 209 nxt_buf_t *b; 210 nxt_conn_t *c; 211 nxt_event_engine_t *engine; 212 nxt_controller_request_t *r; 213 214 c = obj; 215 216 nxt_debug(task, "controller conn init fd:%d", c->socket.fd); 217 218 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t)); 219 if (nxt_slow_path(r == NULL)) { 220 nxt_controller_conn_free(task, c, NULL); 221 return; 222 } 223 224 r->conn = c; 225 226 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool) 227 != NXT_OK)) 228 { 229 nxt_controller_conn_free(task, c, NULL); 230 return; 231 } 232 233 r->parser.fields_hash = nxt_controller_fields_hash; 234 235 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); 236 if (nxt_slow_path(b == NULL)) { 237 nxt_controller_conn_free(task, c, NULL); 238 return; 239 } 240 241 c->read = b; 242 c->socket.data = r; 243 c->socket.read_ready = 1; 244 c->read_state = &nxt_controller_conn_read_state; 245 246 engine = task->thread->engine; 247 c->read_work_queue = &engine->read_work_queue; 248 c->write_work_queue = &engine->write_work_queue; 249 250 nxt_conn_read(engine, c); 251} 252 253 254static const nxt_event_conn_state_t nxt_controller_conn_read_state 255 nxt_aligned(64) = 256{ 257 .ready_handler = nxt_controller_conn_read, 258 .close_handler = nxt_controller_conn_close, 259 .error_handler = nxt_controller_conn_read_error, 260 261 .timer_handler = nxt_controller_conn_read_timeout, 262 .timer_value = nxt_controller_conn_timeout_value, 263 .timer_data = 60 * 1000, 264}; 265 266 267static void 268nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) 269{ 270 size_t preread; 271 nxt_buf_t *b; 272 nxt_int_t rc; 273 nxt_conn_t *c; 274 nxt_controller_request_t *r; 275 276 c = obj; 277 r = data; 278 279 nxt_debug(task, "controller conn read"); 280 281 nxt_queue_remove(&c->link); 282 nxt_queue_self(&c->link); 283 284 b = c->read; 285 286 rc = nxt_http_parse_request(&r->parser, &b->mem); 287 288 if (nxt_slow_path(rc != NXT_DONE)) { 289 290 if (rc == NXT_AGAIN) { 291 if (nxt_buf_mem_free_size(&b->mem) == 0) { 292 nxt_log(task, NXT_LOG_ERR, "too long request headers"); 293 nxt_controller_conn_close(task, c, r); 294 return; 295 } 296 297 nxt_conn_read(task->thread->engine, c); 298 return; 299 } 300 301 /* rc == NXT_ERROR */ 302 303 nxt_log(task, NXT_LOG_ERR, "parsing error"); 304 305 nxt_controller_conn_close(task, c, r); 306 return; 307 } 308 309 rc = nxt_http_fields_process(r->parser.fields, r, task->log); 310 311 if (nxt_slow_path(rc != NXT_OK)) { 312 nxt_controller_conn_close(task, c, r); 313 return; 314 } 315 316 preread = nxt_buf_mem_used_size(&b->mem); 317 318 nxt_debug(task, "controller request header parsing complete, " 319 "body length: %uz, preread: %uz", 320 r->length, preread); 321 322 if (preread >= r->length) { 323 nxt_controller_process_request(task, r); 324 return; 325 } 326 327 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) { 328 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0); 329 if (nxt_slow_path(b == NULL)) { 330 nxt_controller_conn_free(task, c, NULL); 331 return; 332 } 333 334 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread); 335 336 c->read = b; 337 } 338 339 c->read_state = &nxt_controller_conn_body_read_state; 340 341 nxt_conn_read(task->thread->engine, c); 342} 343 344 345static nxt_msec_t 346nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 347{ 348 return (nxt_msec_t) data; 349} 350 351 352static void 353nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) 354{ 355 nxt_conn_t *c; 356 357 c = obj; 358 359 nxt_debug(task, "controller conn read error"); 360 361 nxt_controller_conn_close(task, c, data); 362} 363 364 365static void 366nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) 367{ 368 nxt_timer_t *timer; 369 nxt_conn_t *c; 370 371 timer = obj; 372 373 c = nxt_read_timer_conn(timer); 374 c->socket.timedout = 1; 375 c->socket.closed = 1; 376 377 nxt_debug(task, "controller conn read timeout"); 378 379 nxt_controller_conn_close(task, c, data); 380} 381 382 383static const nxt_event_conn_state_t nxt_controller_conn_body_read_state 384 nxt_aligned(64) = 385{ 386 .ready_handler = nxt_controller_conn_body_read, 387 .close_handler = nxt_controller_conn_close, 388 .error_handler = nxt_controller_conn_read_error, 389 390 .timer_handler = nxt_controller_conn_read_timeout, 391 .timer_value = nxt_controller_conn_timeout_value, 392 .timer_data = 60 * 1000, 393 .timer_autoreset = 1, 394}; 395 396 397static void 398nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) 399{ 400 size_t read; 401 nxt_buf_t *b; 402 nxt_conn_t *c; 403 nxt_controller_request_t *r; 404 405 c = obj; 406 r = data; 407 b = c->read; 408 409 read = nxt_buf_mem_used_size(&b->mem); 410 411 nxt_debug(task, "controller conn body read: %uz of %uz", 412 read, r->length); 413 414 if (read >= r->length) { 415 nxt_controller_process_request(task, r); 416 return; 417 } 418 419 nxt_conn_read(task->thread->engine, c); 420} 421 422 423static const nxt_event_conn_state_t nxt_controller_conn_write_state 424 nxt_aligned(64) = 425{ 426 .ready_handler = nxt_controller_conn_write, 427 .error_handler = nxt_controller_conn_write_error, 428 429 .timer_handler = nxt_controller_conn_write_timeout, 430 .timer_value = nxt_controller_conn_timeout_value, 431 .timer_data = 60 * 1000, 432 .timer_autoreset = 1, 433}; 434 435 436static void 437nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) 438{ 439 nxt_buf_t *b; 440 nxt_conn_t *c; 441 442 c = obj; 443 444 nxt_debug(task, "controller conn write"); 445 446 b = c->write; 447 448 if (b->mem.pos != b->mem.free) { 449 nxt_conn_write(task->thread->engine, c); 450 return; 451 } 452 453 nxt_debug(task, "controller conn write complete"); 454 455 nxt_controller_conn_close(task, c, data); 456} 457 458 459static void 460nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) 461{ 462 nxt_conn_t *c; 463 464 c = obj; 465 466 nxt_debug(task, "controller conn write error"); 467 468 nxt_controller_conn_close(task, c, data); 469} 470 471 472static void 473nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) 474{ 475 nxt_conn_t *c; 476 nxt_timer_t *timer; 477 478 timer = obj; 479 480 c = nxt_write_timer_conn(timer); 481 c->socket.timedout = 1; 482 c->socket.closed = 1; 483 484 nxt_debug(task, "controller conn write timeout"); 485 486 nxt_controller_conn_close(task, c, data); 487} 488 489 490static const nxt_event_conn_state_t nxt_controller_conn_close_state 491 nxt_aligned(64) = 492{ 493 .ready_handler = nxt_controller_conn_free, 494}; 495 496 497static void 498nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) 499{ 500 nxt_conn_t *c; 501 502 c = obj; 503 504 nxt_debug(task, "controller conn close"); 505 506 nxt_queue_remove(&c->link); 507 508 c->write_state = &nxt_controller_conn_close_state; 509 510 nxt_conn_close(task->thread->engine, c); 511} 512 513 514static void 515nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) 516{ 517 nxt_conn_t *c; 518 519 c = obj; 520 521 nxt_debug(task, "controller conn free"); 522 523 nxt_mp_destroy(c->mem_pool); 524 525 //nxt_free(c); 526} 527 528 529static nxt_int_t 530nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, 531 nxt_log_t *log) 532{ 533 off_t length; 534 nxt_controller_request_t *r; 535 536 r = ctx; 537 538 length = nxt_off_t_parse(field->value.start, field->value.length); 539 540 if (nxt_fast_path(length > 0)) { 541 542 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) { 543 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is too big"); 544 return NXT_ERROR; 545 } 546 547 r->length = length; 548 return NXT_OK; 549 } 550 551 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid"); 552 553 return NXT_ERROR; 554} 555 556 557static void 558nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) 559{ 560 nxt_mp_t *mp; 561 nxt_int_t rc; 562 nxt_str_t path; 563 nxt_conn_t *c;
| 73 74 75static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = { 76 { nxt_string("Content-Length"), 77 &nxt_controller_request_content_length, 0 }, 78 79 { nxt_null_string, NULL, 0 } 80}; 81 82static nxt_http_fields_hash_t *nxt_controller_fields_hash; 83 84 85static nxt_controller_conf_t nxt_controller_conf; 86static nxt_queue_t nxt_controller_waiting_requests; 87static nxt_controller_request_t *nxt_controller_current_request; 88 89 90static const nxt_event_conn_state_t nxt_controller_conn_read_state; 91static const nxt_event_conn_state_t nxt_controller_conn_body_read_state; 92static const nxt_event_conn_state_t nxt_controller_conn_write_state; 93static const nxt_event_conn_state_t nxt_controller_conn_close_state; 94 95 96nxt_int_t 97nxt_controller_start(nxt_task_t *task, void *data) 98{ 99 nxt_mp_t *mp; 100 nxt_runtime_t *rt; 101 nxt_conf_value_t *conf; 102 nxt_http_fields_hash_t *hash; 103 104 static const nxt_str_t json 105 = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); 106 107 rt = task->thread->runtime; 108 109 hash = nxt_http_fields_hash_create(nxt_controller_request_fields, 110 rt->mem_pool); 111 if (nxt_slow_path(hash == NULL)) { 112 return NXT_ERROR; 113 } 114 115 nxt_controller_fields_hash = hash; 116 117 if (nxt_listen_event(task, rt->controller_socket) == NULL) { 118 return NXT_ERROR; 119 } 120 121 mp = nxt_mp_create(1024, 128, 256, 32); 122 123 if (nxt_slow_path(mp == NULL)) { 124 return NXT_ERROR; 125 } 126 127 conf = nxt_conf_json_parse_str(mp, &json); 128 129 if (conf == NULL) { 130 return NXT_ERROR; 131 } 132 133 nxt_controller_conf.root = conf; 134 nxt_controller_conf.pool = mp; 135 136 nxt_queue_init(&nxt_controller_waiting_requests); 137 138 return NXT_OK; 139} 140 141 142nxt_int_t 143nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) 144{ 145 nxt_sockaddr_t *sa; 146 nxt_listen_socket_t *ls; 147 148 sa = rt->controller_listen; 149 150 if (rt->controller_listen == NULL) { 151 sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in), 152 NXT_INET_ADDR_STR_LEN); 153 if (sa == NULL) { 154 return NXT_ERROR; 155 } 156 157 sa->type = SOCK_STREAM; 158 sa->u.sockaddr_in.sin_family = AF_INET; 159 sa->u.sockaddr_in.sin_port = htons(8443); 160 161 nxt_sockaddr_text(sa); 162 163 rt->controller_listen = sa; 164 } 165 166 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t)); 167 if (ls == NULL) { 168 return NXT_ERROR; 169 } 170 171 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr, 172 sa->socklen, sa->length); 173 if (ls->sockaddr == NULL) { 174 return NXT_ERROR; 175 } 176 177 ls->sockaddr->type = sa->type; 178 ls->socklen = sa->socklen; 179 ls->address_length = sa->length; 180 181 nxt_sockaddr_text(ls->sockaddr); 182 183 ls->socket = -1; 184 ls->backlog = NXT_LISTEN_BACKLOG; 185 ls->read_after_accept = 1; 186 ls->flags = NXT_NONBLOCK; 187 188#if 0 189 /* STUB */ 190 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t)); 191 if (wq == NULL) { 192 return NXT_ERROR; 193 } 194 nxt_work_queue_name(wq, "listen"); 195 /**/ 196 197 ls->work_queue = wq; 198#endif 199 ls->handler = nxt_controller_conn_init; 200 201 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { 202 return NXT_ERROR; 203 } 204 205 rt->controller_socket = ls; 206 207 return NXT_OK; 208} 209 210 211static void 212nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) 213{ 214 nxt_buf_t *b; 215 nxt_conn_t *c; 216 nxt_event_engine_t *engine; 217 nxt_controller_request_t *r; 218 219 c = obj; 220 221 nxt_debug(task, "controller conn init fd:%d", c->socket.fd); 222 223 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t)); 224 if (nxt_slow_path(r == NULL)) { 225 nxt_controller_conn_free(task, c, NULL); 226 return; 227 } 228 229 r->conn = c; 230 231 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool) 232 != NXT_OK)) 233 { 234 nxt_controller_conn_free(task, c, NULL); 235 return; 236 } 237 238 r->parser.fields_hash = nxt_controller_fields_hash; 239 240 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); 241 if (nxt_slow_path(b == NULL)) { 242 nxt_controller_conn_free(task, c, NULL); 243 return; 244 } 245 246 c->read = b; 247 c->socket.data = r; 248 c->socket.read_ready = 1; 249 c->read_state = &nxt_controller_conn_read_state; 250 251 engine = task->thread->engine; 252 c->read_work_queue = &engine->read_work_queue; 253 c->write_work_queue = &engine->write_work_queue; 254 255 nxt_conn_read(engine, c); 256} 257 258 259static const nxt_event_conn_state_t nxt_controller_conn_read_state 260 nxt_aligned(64) = 261{ 262 .ready_handler = nxt_controller_conn_read, 263 .close_handler = nxt_controller_conn_close, 264 .error_handler = nxt_controller_conn_read_error, 265 266 .timer_handler = nxt_controller_conn_read_timeout, 267 .timer_value = nxt_controller_conn_timeout_value, 268 .timer_data = 60 * 1000, 269}; 270 271 272static void 273nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) 274{ 275 size_t preread; 276 nxt_buf_t *b; 277 nxt_int_t rc; 278 nxt_conn_t *c; 279 nxt_controller_request_t *r; 280 281 c = obj; 282 r = data; 283 284 nxt_debug(task, "controller conn read"); 285 286 nxt_queue_remove(&c->link); 287 nxt_queue_self(&c->link); 288 289 b = c->read; 290 291 rc = nxt_http_parse_request(&r->parser, &b->mem); 292 293 if (nxt_slow_path(rc != NXT_DONE)) { 294 295 if (rc == NXT_AGAIN) { 296 if (nxt_buf_mem_free_size(&b->mem) == 0) { 297 nxt_log(task, NXT_LOG_ERR, "too long request headers"); 298 nxt_controller_conn_close(task, c, r); 299 return; 300 } 301 302 nxt_conn_read(task->thread->engine, c); 303 return; 304 } 305 306 /* rc == NXT_ERROR */ 307 308 nxt_log(task, NXT_LOG_ERR, "parsing error"); 309 310 nxt_controller_conn_close(task, c, r); 311 return; 312 } 313 314 rc = nxt_http_fields_process(r->parser.fields, r, task->log); 315 316 if (nxt_slow_path(rc != NXT_OK)) { 317 nxt_controller_conn_close(task, c, r); 318 return; 319 } 320 321 preread = nxt_buf_mem_used_size(&b->mem); 322 323 nxt_debug(task, "controller request header parsing complete, " 324 "body length: %uz, preread: %uz", 325 r->length, preread); 326 327 if (preread >= r->length) { 328 nxt_controller_process_request(task, r); 329 return; 330 } 331 332 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) { 333 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0); 334 if (nxt_slow_path(b == NULL)) { 335 nxt_controller_conn_free(task, c, NULL); 336 return; 337 } 338 339 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread); 340 341 c->read = b; 342 } 343 344 c->read_state = &nxt_controller_conn_body_read_state; 345 346 nxt_conn_read(task->thread->engine, c); 347} 348 349 350static nxt_msec_t 351nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 352{ 353 return (nxt_msec_t) data; 354} 355 356 357static void 358nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) 359{ 360 nxt_conn_t *c; 361 362 c = obj; 363 364 nxt_debug(task, "controller conn read error"); 365 366 nxt_controller_conn_close(task, c, data); 367} 368 369 370static void 371nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) 372{ 373 nxt_timer_t *timer; 374 nxt_conn_t *c; 375 376 timer = obj; 377 378 c = nxt_read_timer_conn(timer); 379 c->socket.timedout = 1; 380 c->socket.closed = 1; 381 382 nxt_debug(task, "controller conn read timeout"); 383 384 nxt_controller_conn_close(task, c, data); 385} 386 387 388static const nxt_event_conn_state_t nxt_controller_conn_body_read_state 389 nxt_aligned(64) = 390{ 391 .ready_handler = nxt_controller_conn_body_read, 392 .close_handler = nxt_controller_conn_close, 393 .error_handler = nxt_controller_conn_read_error, 394 395 .timer_handler = nxt_controller_conn_read_timeout, 396 .timer_value = nxt_controller_conn_timeout_value, 397 .timer_data = 60 * 1000, 398 .timer_autoreset = 1, 399}; 400 401 402static void 403nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) 404{ 405 size_t read; 406 nxt_buf_t *b; 407 nxt_conn_t *c; 408 nxt_controller_request_t *r; 409 410 c = obj; 411 r = data; 412 b = c->read; 413 414 read = nxt_buf_mem_used_size(&b->mem); 415 416 nxt_debug(task, "controller conn body read: %uz of %uz", 417 read, r->length); 418 419 if (read >= r->length) { 420 nxt_controller_process_request(task, r); 421 return; 422 } 423 424 nxt_conn_read(task->thread->engine, c); 425} 426 427 428static const nxt_event_conn_state_t nxt_controller_conn_write_state 429 nxt_aligned(64) = 430{ 431 .ready_handler = nxt_controller_conn_write, 432 .error_handler = nxt_controller_conn_write_error, 433 434 .timer_handler = nxt_controller_conn_write_timeout, 435 .timer_value = nxt_controller_conn_timeout_value, 436 .timer_data = 60 * 1000, 437 .timer_autoreset = 1, 438}; 439 440 441static void 442nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) 443{ 444 nxt_buf_t *b; 445 nxt_conn_t *c; 446 447 c = obj; 448 449 nxt_debug(task, "controller conn write"); 450 451 b = c->write; 452 453 if (b->mem.pos != b->mem.free) { 454 nxt_conn_write(task->thread->engine, c); 455 return; 456 } 457 458 nxt_debug(task, "controller conn write complete"); 459 460 nxt_controller_conn_close(task, c, data); 461} 462 463 464static void 465nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) 466{ 467 nxt_conn_t *c; 468 469 c = obj; 470 471 nxt_debug(task, "controller conn write error"); 472 473 nxt_controller_conn_close(task, c, data); 474} 475 476 477static void 478nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) 479{ 480 nxt_conn_t *c; 481 nxt_timer_t *timer; 482 483 timer = obj; 484 485 c = nxt_write_timer_conn(timer); 486 c->socket.timedout = 1; 487 c->socket.closed = 1; 488 489 nxt_debug(task, "controller conn write timeout"); 490 491 nxt_controller_conn_close(task, c, data); 492} 493 494 495static const nxt_event_conn_state_t nxt_controller_conn_close_state 496 nxt_aligned(64) = 497{ 498 .ready_handler = nxt_controller_conn_free, 499}; 500 501 502static void 503nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) 504{ 505 nxt_conn_t *c; 506 507 c = obj; 508 509 nxt_debug(task, "controller conn close"); 510 511 nxt_queue_remove(&c->link); 512 513 c->write_state = &nxt_controller_conn_close_state; 514 515 nxt_conn_close(task->thread->engine, c); 516} 517 518 519static void 520nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) 521{ 522 nxt_conn_t *c; 523 524 c = obj; 525 526 nxt_debug(task, "controller conn free"); 527 528 nxt_mp_destroy(c->mem_pool); 529 530 //nxt_free(c); 531} 532 533 534static nxt_int_t 535nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, 536 nxt_log_t *log) 537{ 538 off_t length; 539 nxt_controller_request_t *r; 540 541 r = ctx; 542 543 length = nxt_off_t_parse(field->value.start, field->value.length); 544 545 if (nxt_fast_path(length > 0)) { 546 547 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) { 548 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is too big"); 549 return NXT_ERROR; 550 } 551 552 r->length = length; 553 return NXT_OK; 554 } 555 556 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid"); 557 558 return NXT_ERROR; 559} 560 561 562static void 563nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) 564{ 565 nxt_mp_t *mp; 566 nxt_int_t rc; 567 nxt_str_t path; 568 nxt_conn_t *c;
|
564 nxt_uint_t status;
| |
565 nxt_buf_mem_t *mbuf; 566 nxt_conf_op_t *ops; 567 nxt_conf_value_t *value;
| 569 nxt_buf_mem_t *mbuf; 570 nxt_conf_op_t *ops; 571 nxt_conf_value_t *value;
|
| 572 nxt_conf_json_error_t error;
|
568 nxt_controller_response_t resp; 569 570 static const nxt_str_t empty_obj = nxt_string("{}"); 571 572 c = req->conn; 573 path = req->parser.path; 574 575 if (path.length > 1 && path.start[path.length - 1] == '/') { 576 path.length--; 577 } 578 579 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 580 581 if (nxt_str_eq(&req->parser.method, "GET", 3)) { 582 583 value = nxt_conf_get_path(nxt_controller_conf.root, &path); 584 585 if (value == NULL) {
| 573 nxt_controller_response_t resp; 574 575 static const nxt_str_t empty_obj = nxt_string("{}"); 576 577 c = req->conn; 578 path = req->parser.path; 579 580 if (path.length > 1 && path.start[path.length - 1] == '/') { 581 path.length--; 582 } 583 584 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 585 586 if (nxt_str_eq(&req->parser.method, "GET", 3)) { 587 588 value = nxt_conf_get_path(nxt_controller_conf.root, &path); 589 590 if (value == NULL) {
|
586 status = 404; 587 goto done;
| 591 goto not_found;
|
588 } 589
| 592 } 593
|
| 594 resp.status = 200;
|
590 resp.conf = value; 591
| 595 resp.conf = value; 596
|
592 status = 200; 593 goto done;
| 597 nxt_controller_response(task, req, &resp); 598 return;
|
594 } 595 596 if (nxt_str_eq(&req->parser.method, "PUT", 3)) { 597 598 mp = nxt_mp_create(1024, 128, 256, 32); 599 600 if (nxt_slow_path(mp == NULL)) {
| 599 } 600 601 if (nxt_str_eq(&req->parser.method, "PUT", 3)) { 602 603 mp = nxt_mp_create(1024, 128, 256, 32); 604 605 if (nxt_slow_path(mp == NULL)) {
|
601 status = 500; 602 goto done;
| 606 goto alloc_fail;
|
603 } 604 605 mbuf = &c->read->mem; 606
| 607 } 608 609 mbuf = &c->read->mem; 610
|
607 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free);
| 611 nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
|
608
| 612
|
| 613 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error); 614
|
609 if (value == NULL) { 610 nxt_mp_destroy(mp);
| 615 if (value == NULL) { 616 nxt_mp_destroy(mp);
|
611 status = 400; 612 nxt_str_set(&resp.json, "{ \"error\": \"Invalid JSON.\" }"); 613 goto done;
| 617 618 if (error.pos == NULL) { 619 goto alloc_fail; 620 } 621 622 resp.status = 400; 623 resp.title = (u_char *) "Invalid JSON."; 624 resp.detail = error.detail; 625 resp.offset = error.pos - mbuf->pos; 626 627 nxt_conf_json_position(mbuf->pos, error.pos, 628 &resp.line, &resp.column); 629 630 nxt_controller_response(task, req, &resp); 631 return;
|
614 } 615 616 if (path.length != 1) { 617 rc = nxt_conf_op_compile(c->mem_pool, &ops, 618 nxt_controller_conf.root, 619 &path, value); 620 621 if (rc != NXT_OK) { 622 if (rc == NXT_DECLINED) {
| 632 } 633 634 if (path.length != 1) { 635 rc = nxt_conf_op_compile(c->mem_pool, &ops, 636 nxt_controller_conf.root, 637 &path, value); 638 639 if (rc != NXT_OK) { 640 if (rc == NXT_DECLINED) {
|
623 status = 404; 624 goto done;
| 641 goto not_found;
|
625 } 626
| 642 } 643
|
627 status = 500; 628 goto done;
| 644 goto alloc_fail;
|
629 } 630 631 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 632 633 if (nxt_slow_path(value == NULL)) { 634 nxt_mp_destroy(mp);
| 645 } 646 647 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 648 649 if (nxt_slow_path(value == NULL)) { 650 nxt_mp_destroy(mp);
|
635 status = 500; 636 goto done;
| 651 goto alloc_fail;
|
637 } 638 } 639 640 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) { 641 nxt_mp_destroy(mp);
| 652 } 653 } 654 655 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) { 656 nxt_mp_destroy(mp);
|
642 status = 400; 643 nxt_str_set(&resp.json, 644 "{ \"error\": \"Invalid configuration.\" }"); 645 goto done;
| 657 goto invalid_conf;
|
646 } 647 648 req->conf.root = value; 649 req->conf.pool = mp; 650 651 if (nxt_controller_conf_apply(task, req) != NXT_OK) { 652 nxt_mp_destroy(mp);
| 658 } 659 660 req->conf.root = value; 661 req->conf.pool = mp; 662 663 if (nxt_controller_conf_apply(task, req) != NXT_OK) { 664 nxt_mp_destroy(mp);
|
653 status = 500; 654 goto done;
| 665 goto alloc_fail;
|
655 } 656 657 return; 658 } 659 660 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { 661 662 if (path.length == 1) { 663 mp = nxt_mp_create(1024, 128, 256, 32); 664 665 if (nxt_slow_path(mp == NULL)) {
| 666 } 667 668 return; 669 } 670 671 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { 672 673 if (path.length == 1) { 674 mp = nxt_mp_create(1024, 128, 256, 32); 675 676 if (nxt_slow_path(mp == NULL)) {
|
666 status = 500; 667 goto done;
| 677 goto alloc_fail;
|
668 } 669 670 value = nxt_conf_json_parse_str(mp, &empty_obj); 671 672 } else { 673 rc = nxt_conf_op_compile(c->mem_pool, &ops, 674 nxt_controller_conf.root, 675 &path, NULL); 676 677 if (rc != NXT_OK) { 678 if (rc == NXT_DECLINED) {
| 678 } 679 680 value = nxt_conf_json_parse_str(mp, &empty_obj); 681 682 } else { 683 rc = nxt_conf_op_compile(c->mem_pool, &ops, 684 nxt_controller_conf.root, 685 &path, NULL); 686 687 if (rc != NXT_OK) { 688 if (rc == NXT_DECLINED) {
|
679 status = 404; 680 goto done;
| 689 goto not_found;
|
681 } 682
| 690 } 691
|
683 status = 500; 684 goto done;
| 692 goto alloc_fail;
|
685 } 686 687 mp = nxt_mp_create(1024, 128, 256, 32); 688 689 if (nxt_slow_path(mp == NULL)) {
| 693 } 694 695 mp = nxt_mp_create(1024, 128, 256, 32); 696 697 if (nxt_slow_path(mp == NULL)) {
|
690 status = 500; 691 goto done;
| 698 goto alloc_fail;
|
692 } 693 694 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 695 } 696 697 if (nxt_slow_path(value == NULL)) { 698 nxt_mp_destroy(mp);
| 699 } 700 701 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 702 } 703 704 if (nxt_slow_path(value == NULL)) { 705 nxt_mp_destroy(mp);
|
699 status = 500; 700 goto done;
| 706 goto alloc_fail;
|
701 } 702 703 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) { 704 nxt_mp_destroy(mp);
| 707 } 708 709 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) { 710 nxt_mp_destroy(mp);
|
705 status = 400; 706 nxt_str_set(&resp.json, 707 "{ \"error\": \"Invalid configuration.\" }"); 708 goto done;
| 711 goto invalid_conf;
|
709 } 710 711 req->conf.root = value; 712 req->conf.pool = mp; 713 714 if (nxt_controller_conf_apply(task, req) != NXT_OK) { 715 nxt_mp_destroy(mp);
| 712 } 713 714 req->conf.root = value; 715 req->conf.pool = mp; 716 717 if (nxt_controller_conf_apply(task, req) != NXT_OK) { 718 nxt_mp_destroy(mp);
|
716 status = 500; 717 goto done;
| 719 goto alloc_fail;
|
718 } 719 720 return; 721 } 722
| 720 } 721 722 return; 723 } 724
|
723 status = 405;
| 725 resp.status = 405; 726 resp.title = (u_char *) "Invalid method."; 727 resp.offset = -1;
|
724
| 728
|
725done:
| 729 nxt_controller_response(task, req, &resp); 730 return;
|
726
| 731
|
727 switch (status) {
| 732alloc_fail:
|
728
| 733
|
729 case 200: 730 nxt_str_set(&resp.status_line, "200 OK"); 731 break;
| 734 resp.status = 500; 735 resp.title = (u_char *) "Memory allocation failed."; 736 resp.offset = -1;
|
732
| 737
|
733 case 400: 734 nxt_str_set(&resp.status_line, "400 Bad Request"); 735 break;
| 738 nxt_controller_response(task, req, &resp); 739 return;
|
736
| 740
|
737 case 404: 738 nxt_str_set(&resp.status_line, "404 Not Found"); 739 nxt_str_set(&resp.json, "{ \"error\": \"Value doesn't exist.\" }"); 740 break;
| 741not_found:
|
741
| 742
|
742 case 405: 743 nxt_str_set(&resp.status_line, "405 Method Not Allowed"); 744 nxt_str_set(&resp.json, "{ \"error\": \"Invalid method.\" }"); 745 break;
| 743 resp.status = 404; 744 resp.title = (u_char *) "Value doesn't exist."; 745 resp.offset = -1;
|
746
| 746
|
747 case 500: 748 nxt_str_set(&resp.status_line, "500 Internal Server Error"); 749 nxt_str_set(&resp.json, "{ \"error\": \"Memory allocation failed.\" }"); 750 break; 751 }
| 747 nxt_controller_response(task, req, &resp); 748 return;
|
752
| 749
|
| 750invalid_conf: 751 752 resp.status = 400; 753 resp.title = (u_char *) "Invalid configuration."; 754 resp.offset = -1; 755
|
753 nxt_controller_response(task, req, &resp);
| 756 nxt_controller_response(task, req, &resp);
|
| 757 return;
|
754} 755 756 757static nxt_int_t 758nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req) 759{ 760 nxt_int_t rc; 761 762 if (nxt_controller_current_request != NULL) { 763 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 764 return NXT_OK; 765 } 766 767 rc = nxt_controller_conf_pass(task, req->conf.root); 768 769 if (nxt_slow_path(rc != NXT_OK)) { 770 return NXT_ERROR; 771 } 772 773 nxt_controller_current_request = req; 774 775 return NXT_OK; 776} 777 778 779static void 780nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 781 void *data) 782{ 783 nxt_controller_request_t *req; 784 nxt_controller_response_t resp; 785 786 nxt_debug(task, "controller conf ready: %*s", 787 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 788 789 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 790 791 req = nxt_controller_current_request; 792 nxt_controller_current_request = NULL; 793 794 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 795 nxt_mp_destroy(nxt_controller_conf.pool); 796 797 nxt_controller_conf = req->conf; 798
| 758} 759 760 761static nxt_int_t 762nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req) 763{ 764 nxt_int_t rc; 765 766 if (nxt_controller_current_request != NULL) { 767 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 768 return NXT_OK; 769 } 770 771 rc = nxt_controller_conf_pass(task, req->conf.root); 772 773 if (nxt_slow_path(rc != NXT_OK)) { 774 return NXT_ERROR; 775 } 776 777 nxt_controller_current_request = req; 778 779 return NXT_OK; 780} 781 782 783static void 784nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 785 void *data) 786{ 787 nxt_controller_request_t *req; 788 nxt_controller_response_t resp; 789 790 nxt_debug(task, "controller conf ready: %*s", 791 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 792 793 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 794 795 req = nxt_controller_current_request; 796 nxt_controller_current_request = NULL; 797 798 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 799 nxt_mp_destroy(nxt_controller_conf.pool); 800 801 nxt_controller_conf = req->conf; 802
|
799 nxt_str_set(&resp.status_line, "200 OK"); 800 nxt_str_set(&resp.json, "{ \"success\": \"Reconfiguration done.\" }");
| 803 resp.status = 200; 804 resp.title = (u_char *) "Reconfiguration done.";
|
801 802 } else { 803 nxt_mp_destroy(req->conf.pool); 804
| 805 806 } else { 807 nxt_mp_destroy(req->conf.pool); 808
|
805 nxt_str_set(&resp.status_line, "500 Internal Server Error"); 806 nxt_str_set(&resp.json, 807 "{ \"error\": \"Failed to apply new configuration.\" }");
| 809 resp.status = 500; 810 resp.title = (u_char *) "Failed to apply new configuration."; 811 resp.offset = -1;
|
808 } 809 810 nxt_controller_response(task, req, &resp); 811 812 nxt_controller_process_waiting(task); 813} 814 815 816static void 817nxt_controller_process_waiting(nxt_task_t *task) 818{ 819 nxt_controller_request_t *req; 820 nxt_controller_response_t resp; 821 822 nxt_queue_each(req, &nxt_controller_waiting_requests, 823 nxt_controller_request_t, link) 824 { 825 nxt_queue_remove(&req->link); 826 827 if (nxt_fast_path(nxt_controller_conf_apply(task, req) == NXT_OK)) { 828 return; 829 } 830 831 nxt_mp_destroy(req->conf.pool); 832
| 812 } 813 814 nxt_controller_response(task, req, &resp); 815 816 nxt_controller_process_waiting(task); 817} 818 819 820static void 821nxt_controller_process_waiting(nxt_task_t *task) 822{ 823 nxt_controller_request_t *req; 824 nxt_controller_response_t resp; 825 826 nxt_queue_each(req, &nxt_controller_waiting_requests, 827 nxt_controller_request_t, link) 828 { 829 nxt_queue_remove(&req->link); 830 831 if (nxt_fast_path(nxt_controller_conf_apply(task, req) == NXT_OK)) { 832 return; 833 } 834 835 nxt_mp_destroy(req->conf.pool); 836
|
833 nxt_str_set(&resp.status_line, "500 Internal Server Error"); 834 nxt_str_set(&resp.json, 835 "{ \"error\": \"Memory allocation failed.\" }");
| 837 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
|
836
| 838
|
| 839 resp.status = 500; 840 resp.title = (u_char *) "Memory allocation failed."; 841 resp.offset = -1; 842
|
837 nxt_controller_response(task, req, &resp); 838 839 } nxt_queue_loop; 840} 841 842 843static nxt_int_t 844nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf) 845{ 846 size_t size; 847 uint32_t stream; 848 nxt_int_t rc; 849 nxt_buf_t *b; 850 nxt_port_t *router_port, *controller_port; 851 nxt_runtime_t *rt; 852 853 rt = task->thread->runtime; 854 855 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 856 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 857 858 size = nxt_conf_json_length(conf, NULL); 859 860 b = nxt_port_mmap_get_buf(task, router_port, size); 861 862 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 863 864 stream = nxt_port_rpc_register_handler(task, controller_port, 865 nxt_controller_conf_handler, 866 nxt_controller_conf_handler, 867 router_port->pid, NULL); 868 869 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, 870 stream, controller_port->id, b); 871 872 if (nxt_slow_path(rc != NXT_OK)) { 873 nxt_port_rpc_cancel(task, controller_port, stream); 874 } 875 876 return rc; 877} 878 879
| 843 nxt_controller_response(task, req, &resp); 844 845 } nxt_queue_loop; 846} 847 848 849static nxt_int_t 850nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf) 851{ 852 size_t size; 853 uint32_t stream; 854 nxt_int_t rc; 855 nxt_buf_t *b; 856 nxt_port_t *router_port, *controller_port; 857 nxt_runtime_t *rt; 858 859 rt = task->thread->runtime; 860 861 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 862 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 863 864 size = nxt_conf_json_length(conf, NULL); 865 866 b = nxt_port_mmap_get_buf(task, router_port, size); 867 868 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 869 870 stream = nxt_port_rpc_register_handler(task, controller_port, 871 nxt_controller_conf_handler, 872 nxt_controller_conf_handler, 873 router_port->pid, NULL); 874 875 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, 876 stream, controller_port->id, b); 877 878 if (nxt_slow_path(rc != NXT_OK)) { 879 nxt_port_rpc_cancel(task, controller_port, stream); 880 } 881 882 return rc; 883} 884 885
|
880
| |
881static void 882nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, 883 nxt_controller_response_t *resp) 884{
| 886static void 887nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, 888 nxt_controller_response_t *resp) 889{
|
885 size_t size; 886 nxt_buf_t *b; 887 nxt_conn_t *c;
| 890 size_t size; 891 nxt_str_t status_line, str; 892 nxt_buf_t *b, *body; 893 nxt_conn_t *c; 894 nxt_uint_t n; 895 nxt_conf_value_t *value, *location; 896 nxt_conf_json_pretty_t pretty;
|
888
| 897
|
889 c = req->conn;
| 898 static nxt_str_t success_str = nxt_string("success"); 899 static nxt_str_t error_str = nxt_string("error"); 900 static nxt_str_t detail_str = nxt_string("detail"); 901 static nxt_str_t location_str = nxt_string("location"); 902 static nxt_str_t offset_str = nxt_string("offset"); 903 static nxt_str_t line_str = nxt_string("line"); 904 static nxt_str_t column_str = nxt_string("column");
|
890
| 905
|
891 size = sizeof("HTTP/1.0 " "\r\n\r\n") - 1 + resp->status_line.length;
| 906 static nxt_time_string_t date_cache = { 907 (nxt_atomic_uint_t) -1, 908 nxt_controller_date, 909 "%s, %02d %s %4d %02d:%02d:%02d GMT", 910 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1, 911 NXT_THREAD_TIME_GMT, 912 NXT_THREAD_TIME_SEC, 913 };
|
892
| 914
|
893 b = nxt_buf_mem_alloc(c->mem_pool, size, 0); 894 if (nxt_slow_path(b == NULL)) { 895 nxt_controller_conn_close(task, c, req); 896 return; 897 }
| 915 switch (resp->status) {
|
898
| 916
|
899 b->mem.free = nxt_cpymem(b->mem.free, "HTTP/1.0 ", sizeof("HTTP/1.0 ") - 1); 900 b->mem.free = nxt_cpymem(b->mem.free, resp->status_line.start, 901 resp->status_line.length);
| 917 case 200: 918 nxt_str_set(&status_line, "200 OK"); 919 break;
|
902
| 920
|
903 b->mem.free = nxt_cpymem(b->mem.free, "\r\n\r\n", sizeof("\r\n\r\n") - 1);
| 921 case 400: 922 nxt_str_set(&status_line, "400 Bad Request"); 923 break;
|
904
| 924
|
905 b->next = nxt_controller_response_body(resp, c->mem_pool);
| 925 case 404: 926 nxt_str_set(&status_line, "404 Not Found"); 927 break;
|
906
| 928
|
907 if (nxt_slow_path(b->next == NULL)) { 908 nxt_controller_conn_close(task, c, req); 909 return;
| 929 case 405: 930 nxt_str_set(&status_line, "405 Method Not Allowed"); 931 break; 932 933 case 500: 934 nxt_str_set(&status_line, "500 Internal Server Error"); 935 break;
|
910 } 911
| 936 } 937
|
912 c->write = b; 913 c->write_state = &nxt_controller_conn_write_state;
| 938 c = req->conn; 939 value = resp->conf;
|
914
| 940
|
915 nxt_conn_write(task->thread->engine, c); 916}
| 941 if (value == NULL) { 942 n = 1 943 + (resp->detail != NULL) 944 + (resp->status >= 400 && resp->offset != -1);
|
917
| 945
|
| 946 value = nxt_conf_create_object(c->mem_pool, n);
|
918
| 947
|
919static nxt_buf_t * 920nxt_controller_response_body(nxt_controller_response_t *resp, nxt_mp_t *pool) 921{ 922 size_t size; 923 nxt_buf_t *b; 924 nxt_conf_value_t *value; 925 nxt_conf_json_pretty_t pretty;
| 948 if (nxt_slow_path(value == NULL)) { 949 nxt_controller_conn_close(task, c, req); 950 return; 951 }
|
926
| 952
|
927 if (resp->conf) { 928 value = resp->conf;
| 953 str.length = nxt_strlen(resp->title); 954 str.start = resp->title;
|
929
| 955
|
930 } else { 931 value = nxt_conf_json_parse_str(pool, &resp->json);
| 956 if (resp->status < 400) { 957 nxt_conf_set_member_string(value, &success_str, &str, 0);
|
932
| 958
|
933 if (nxt_slow_path(value == NULL)) { 934 return NULL;
| 959 } else { 960 nxt_conf_set_member_string(value, &error_str, &str, 0);
|
935 }
| 961 }
|
| 962 963 n = 0; 964 965 if (resp->detail != NULL) { 966 str.length = nxt_strlen(resp->detail); 967 str.start = resp->detail; 968 969 n++; 970 971 nxt_conf_set_member_string(value, &detail_str, &str, n); 972 } 973 974 if (resp->status >= 400 && resp->offset != -1) { 975 n++; 976 977 location = nxt_conf_create_object(c->mem_pool, 978 resp->line != 0 ? 3 : 1); 979 980 nxt_conf_set_member(value, &location_str, location, n); 981 982 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0); 983 984 if (resp->line != 0) { 985 nxt_conf_set_member_integer(location, &line_str, 986 resp->line, 1); 987 988 nxt_conf_set_member_integer(location, &column_str, 989 resp->column, 2); 990 } 991 }
|
936 } 937 938 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 939 940 size = nxt_conf_json_length(value, &pretty) + 2; 941
| 992 } 993 994 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 995 996 size = nxt_conf_json_length(value, &pretty) + 2; 997
|
942 b = nxt_buf_mem_alloc(pool, size, 0); 943 if (nxt_slow_path(b == NULL)) { 944 return NULL;
| 998 body = nxt_buf_mem_alloc(c->mem_pool, size, 0); 999 if (nxt_slow_path(body == NULL)) { 1000 nxt_controller_conn_close(task, c, req); 1001 return;
|
945 } 946 947 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 948
| 1002 } 1003 1004 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 1005
|
949 b->mem.free = nxt_conf_json_print(b->mem.free, value, &pretty);
| 1006 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
|
950
| 1007
|
951 *b->mem.free++ = '\r'; 952 *b->mem.free++ = '\n';
| 1008 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
|
953
| 1009
|
954 return b;
| 1010 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length 1011 + sizeof("Server: nginext/0.1\r\n") - 1 1012 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1 1013 + sizeof("Content-Type: application/json\r\n") - 1 1014 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN 1015 + sizeof("Connection: close\r\n") - 1 1016 + sizeof("\r\n") - 1; 1017 1018 b = nxt_buf_mem_alloc(c->mem_pool, size, 0); 1019 if (nxt_slow_path(b == NULL)) { 1020 nxt_controller_conn_close(task, c, req); 1021 return; 1022 } 1023 1024 b->next = body; 1025 1026 nxt_str_set(&str, "HTTP/1.1 "); 1027 1028 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1029 b->mem.free = nxt_cpymem(b->mem.free, status_line.start, 1030 status_line.length); 1031 1032 nxt_str_set(&str, "\r\n" 1033 "Server: nginext/0.1\r\n" 1034 "Date: "); 1035 1036 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1037 1038 b->mem.free = nxt_thread_time_string(task->thread, &date_cache, 1039 b->mem.free); 1040 1041 nxt_str_set(&str, "\r\n" 1042 "Content-Type: application/json\r\n" 1043 "Content-Length: "); 1044 1045 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1046 1047 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz", 1048 nxt_buf_mem_used_size(&body->mem)); 1049 1050 nxt_str_set(&str, "\r\n" 1051 "Connection: close\r\n" 1052 "\r\n"); 1053 1054 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1055 1056 c->write = b; 1057 c->write_state = &nxt_controller_conn_write_state; 1058 1059 nxt_conn_write(task->thread->engine, c);
|
955}
| 1060}
|
| 1061 1062 1063static u_char * 1064nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 1065 size_t size, const char *format) 1066{ 1067 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", 1068 "Sat" }; 1069 1070 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 1071 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 1072 1073 return nxt_sprintf(buf, buf + size, format, 1074 week[tm->tm_wday], tm->tm_mday, 1075 month[tm->tm_mon], tm->tm_year + 1900, 1076 tm->tm_hour, tm->tm_min, tm->tm_sec); 1077}
|
| |