1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> 9#include <nxt_conf.h> 10 11 12typedef struct { 13 nxt_str_t type; 14 uint32_t workers; 15 nxt_msec_t timeout; 16 uint32_t requests; 17 nxt_conf_value_t *limits_value; 18} nxt_router_app_conf_t; 19 20 21typedef struct { 22 nxt_str_t application; 23} nxt_router_listener_conf_t; 24 25 26typedef struct nxt_req_app_link_s nxt_req_app_link_t; 27 28 29typedef struct { 30 uint32_t stream; 31 nxt_conn_t *conn; 32 nxt_app_t *app; 33 nxt_port_t *app_port; 34 nxt_app_parse_ctx_t *ap; 35 nxt_req_app_link_t *ra; 36 37 nxt_queue_link_t link; /* for nxt_conn_t.requests */ 38} nxt_req_conn_link_t; 39 40 41struct nxt_req_app_link_s { 42 uint32_t stream; 43 nxt_port_t *app_port; 44 nxt_pid_t app_pid; 45 nxt_port_t *reply_port; 46 nxt_app_parse_ctx_t *ap; 47 nxt_req_conn_link_t *rc; 48 49 nxt_queue_link_t link; /* for nxt_app_t.requests */ 50 51 nxt_mp_t *mem_pool; 52 nxt_work_t work; 53 54 int err_code; 55 const char *err_str; 56}; 57 58 59typedef struct { 60 nxt_socket_conf_t *socket_conf; 61 nxt_router_temp_conf_t *temp_conf; 62} nxt_socket_rpc_t; 63 64 65static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); 66 67static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, 68 int code, const char* str); 69 70static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 71static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 72static void nxt_router_conf_ready(nxt_task_t *task, 73 nxt_router_temp_conf_t *tmcf); 74static void nxt_router_conf_error(nxt_task_t *task, 75 nxt_router_temp_conf_t *tmcf); 76static void nxt_router_conf_send(nxt_task_t *task, 77 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 78 79static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 80 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 81static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 82static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, 83 nxt_str_t *name); 84static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 85 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 86static void nxt_router_listen_socket_ready(nxt_task_t *task, 87 nxt_port_recv_msg_t *msg, void *data); 88static void nxt_router_listen_socket_error(nxt_task_t *task, 89 nxt_port_recv_msg_t *msg, void *data); 90static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 91 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 92static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 93 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 94 95static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 96 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 97 const nxt_event_interface_t *interface); 98static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 99 nxt_router_engine_conf_t *recf); 100static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 101 nxt_router_engine_conf_t *recf); 102static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 103 nxt_router_engine_conf_t *recf); 104static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 105 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 106 nxt_work_handler_t handler); 107static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 108 nxt_router_engine_conf_t *recf); 109static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 110 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 111 112static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 113 nxt_router_temp_conf_t *tmcf); 114static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 115 nxt_event_engine_t *engine); 116static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 117 nxt_router_temp_conf_t *tmcf); 118 119static void nxt_router_engines_post(nxt_router_t *router, 120 nxt_router_temp_conf_t *tmcf); 121static void nxt_router_engine_post(nxt_event_engine_t *engine, 122 nxt_work_t *jobs); 123 124static void nxt_router_thread_start(void *data); 125static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 126 void *data); 127static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 128 void *data); 129static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 130 void *data); 131static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 132 void *data); 133static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 134 void *data); 135static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 136 void *data); 137static void nxt_router_listen_socket_release(nxt_task_t *task, 138 nxt_socket_conf_t *skcf); 139static void nxt_router_conf_release(nxt_task_t *task, 140 nxt_socket_conf_joint_t *joint); 141 142static void nxt_router_app_port_ready(nxt_task_t *task, 143 nxt_port_recv_msg_t *msg, void *data); 144static void nxt_router_app_port_error(nxt_task_t *task, 145 nxt_port_recv_msg_t *msg, void *data); 146 147static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); 148static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 149 uint32_t request_failed, uint32_t got_response); 150 151static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); 152static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, 153 void *data); 154static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c); 155static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, 156 void *data); 157static void nxt_router_process_http_request(nxt_task_t *task, 158 nxt_conn_t *c, nxt_app_parse_ctx_t *ap); 159static void nxt_router_process_http_request_mp(nxt_task_t *task, 160 nxt_req_app_link_t *ra); 161static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 162 nxt_app_wmsg_t *wmsg); 163static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 164 nxt_app_wmsg_t *wmsg); 165static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 166 nxt_app_wmsg_t *wmsg); 167static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); 168static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); 169static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); 170static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); 171static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); 172static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 173static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); 174 175static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 176 const char* str); 177 178static nxt_router_t *nxt_router; 179 180 181static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = { 182 nxt_python_prepare_msg, 183 nxt_php_prepare_msg, 184 nxt_go_prepare_msg, 185}; 186 187 188nxt_int_t 189nxt_router_start(nxt_task_t *task, void *data) 190{ 191 nxt_int_t ret; 192 nxt_router_t *router; 193 nxt_runtime_t *rt; 194 195 rt = task->thread->runtime; 196 197 ret = nxt_app_http_init(task, rt); 198 if (nxt_slow_path(ret != NXT_OK)) { 199 return ret; 200 } 201 202 router = nxt_zalloc(sizeof(nxt_router_t)); 203 if (nxt_slow_path(router == NULL)) { 204 return NXT_ERROR; 205 } 206 207 nxt_queue_init(&router->engines); 208 nxt_queue_init(&router->sockets); 209 nxt_queue_init(&router->apps); 210 211 nxt_router = router; 212 213 return NXT_OK; 214} 215 216 217static void 218nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data) 219{ 220 size_t size; 221 uint32_t stream; 222 nxt_app_t *app; 223 nxt_buf_t *b; 224 nxt_port_t *main_port; 225 nxt_runtime_t *rt; 226 227 app = data; 228 229 rt = task->thread->runtime; 230 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 231 232 nxt_debug(task, "app '%V' %p start worker", &app->name, app); 233 234 size = app->name.length + 1 + app->conf.length; 235 236 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 237 238 if (nxt_slow_path(b == NULL)) { 239 goto failed; 240 } 241 242 nxt_buf_cpystr(b, &app->name); 243 *b->mem.free++ = '\0'; 244 nxt_buf_cpystr(b, &app->conf); 245 246 stream = nxt_port_rpc_register_handler(task, port, 247 nxt_router_app_port_ready, 248 nxt_router_app_port_error, 249 -1, app); 250 251 if (nxt_slow_path(stream == 0)) { 252 nxt_mp_release(b->data, b); 253 254 goto failed; 255 } 256 257 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 258 stream, port->id, b); 259 260 return; 261 262failed: 263 264 nxt_thread_mutex_lock(&app->mutex); 265 266 app->pending_workers--; 267 268 nxt_thread_mutex_unlock(&app->mutex); 269 270 nxt_router_app_use(task, app, -1); 271} 272 273 274static nxt_int_t 275nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) 276{ 277 nxt_int_t res; 278 nxt_port_t *router_port; 279 nxt_runtime_t *rt; 280 281 rt = task->thread->runtime; 282 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 283 284 nxt_router_app_use(task, app, 1); 285 286 res = nxt_port_post(task, router_port, nxt_router_start_worker_handler, 287 app); 288 289 if (res == NXT_OK) { 290 return res; 291 } 292 293 nxt_thread_mutex_lock(&app->mutex); 294 295 app->pending_workers--; 296 297 nxt_thread_mutex_unlock(&app->mutex); 298 299 nxt_router_app_use(task, app, -1); 300 301 return NXT_ERROR; 302} 303 304 305nxt_inline void 306nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra, 307 nxt_req_conn_link_t *rc) 308{ 309 nxt_event_engine_t *engine; 310 311 engine = task->thread->engine; 312 313 nxt_memzero(ra, sizeof(nxt_req_app_link_t)); 314 315 ra->stream = rc->stream; 316 ra->app_pid = -1; 317 ra->rc = rc; 318 rc->ra = ra; 319 ra->reply_port = engine->port; 320 ra->ap = rc->ap; 321 322 ra->work.handler = NULL; 323 ra->work.task = &engine->task; 324 ra->work.obj = ra; 325 ra->work.data = engine; 326} 327 328 329nxt_inline nxt_req_app_link_t * 330nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) 331{ 332 nxt_mp_t *mp; 333 nxt_req_app_link_t *ra; 334 335 mp = ra_src->ap->mem_pool; 336 337 ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); 338 339 if (nxt_slow_path(ra == NULL)) { 340 341 ra_src->rc->ra = NULL; 342 ra_src->rc = NULL; 343 344 return NULL; 345 } 346 347 nxt_router_ra_init(task, ra, ra_src->rc); 348 349 ra->mem_pool = mp; 350 351 return ra; 352} 353 354 355static void 356nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) 357{ 358 nxt_req_app_link_t *ra; 359 nxt_event_engine_t *engine; 360 nxt_req_conn_link_t *rc; 361 362 ra = obj; 363 engine = data; 364 365 if (task->thread->engine != engine) { 366 if (ra->app_port != NULL) { 367 ra->app_pid = ra->app_port->pid; 368 } 369 370 ra->work.handler = nxt_router_ra_release; 371 ra->work.task = &engine->task; 372 ra->work.next = NULL; 373 374 nxt_debug(task, "ra stream #%uD post release to %p", 375 ra->stream, engine); 376 377 nxt_event_engine_post(engine, &ra->work); 378 379 return; 380 } 381 382 nxt_debug(task, "ra stream #%uD release", ra->stream); 383 384 rc = ra->rc; 385 386 if (rc != NULL) { 387 if (ra->app_pid != -1) { 388 nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid); 389 } 390 391 rc->app_port = ra->app_port; 392 393 ra->app_port = NULL; 394 rc->ra = NULL; 395 ra->rc = NULL; 396 } 397 398 if (ra->app_port != NULL) { 399 nxt_router_app_port_release(task, ra->app_port, 0, 1); 400 401 ra->app_port = NULL; 402 } 403 404 if (ra->mem_pool != NULL) { 405 nxt_mp_release(ra->mem_pool, ra); 406 } 407} 408 409 410static void 411nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) 412{ 413 nxt_conn_t *c; 414 nxt_req_app_link_t *ra; 415 nxt_req_conn_link_t *rc; 416 nxt_event_engine_t *engine; 417 418 ra = obj; 419 engine = data; 420 421 if (task->thread->engine != engine) { 422 ra->work.handler = nxt_router_ra_abort; 423 ra->work.task = &engine->task; 424 ra->work.next = NULL; 425 426 nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine); 427 428 nxt_event_engine_post(engine, &ra->work); 429 430 return; 431 } 432 433 nxt_debug(task, "ra stream #%uD abort", ra->stream); 434 435 rc = ra->rc; 436 437 if (rc != NULL) { 438 c = rc->conn; 439 440 nxt_router_gen_error(task, c, 500, 441 "Failed to start application worker"); 442 443 rc->ra = NULL; 444 ra->rc = NULL; 445 } 446 447 if (ra->app_port != NULL) { 448 nxt_router_app_port_release(task, ra->app_port, 0, 1); 449 450 ra->app_port = NULL; 451 } 452 453 if (ra->mem_pool != NULL) { 454 nxt_mp_release(ra->mem_pool, ra); 455 } 456} 457 458 459static void 460nxt_router_ra_error_handler(nxt_task_t *task, void *obj, void *data) 461{ 462 nxt_req_app_link_t *ra; 463 464 ra = obj; 465 466 nxt_router_ra_error(task, ra, ra->err_code, ra->err_str); 467} 468 469 470static void 471nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code, 472 const char* str) 473{ 474 nxt_conn_t *c; 475 nxt_req_conn_link_t *rc; 476 nxt_event_engine_t *engine; 477 478 engine = ra->work.data; 479 480 if (task->thread->engine != engine) { 481 ra->err_code = code; 482 ra->err_str = str; 483 484 ra->work.handler = nxt_router_ra_error_handler; 485 ra->work.task = &engine->task; 486 ra->work.next = NULL; 487 488 nxt_debug(task, "ra stream #%uD post error to %p", ra->stream, engine); 489 490 nxt_event_engine_post(engine, &ra->work); 491 492 return; 493 } 494 495 nxt_debug(task, "ra stream #%uD error", ra->stream); 496 497 rc = ra->rc; 498 499 if (rc != NULL) { 500 c = rc->conn; 501 502 nxt_router_gen_error(task, c, code, str); 503 504 rc->ra = NULL; 505 ra->rc = NULL; 506 } 507 508 if (ra->app_port != NULL) { 509 nxt_router_app_port_release(task, ra->app_port, 0, 1); 510 511 ra->app_port = NULL; 512 } 513 514 if (ra->mem_pool != NULL) { 515 nxt_mp_release(ra->mem_pool, ra); 516 } 517} 518 519 520nxt_inline void 521nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) 522{ 523 nxt_req_app_link_t *ra; 524 525 if (rc->app_port != NULL) { 526 nxt_router_app_port_release(task, rc->app_port, 0, 1); 527 528 rc->app_port = NULL; 529 } 530 531 ra = rc->ra; 532 533 if (ra != NULL) { 534 rc->ra = NULL; 535 ra->rc = NULL; 536 537 nxt_thread_mutex_lock(&rc->app->mutex); 538 539 if (ra->link.next != NULL) { 540 nxt_queue_remove(&ra->link); 541 542 ra->link.next = NULL; 543 544 } else { 545 ra = NULL; 546 } 547 548 nxt_thread_mutex_unlock(&rc->app->mutex); 549 } 550 551 if (ra != NULL) { 552 nxt_router_ra_release(task, ra, ra->work.data); 553 } 554 555 if (rc->app != NULL) { 556 nxt_router_app_use(task, rc->app, -1); 557 558 rc->app = NULL; 559 } 560 561 if (rc->ap != NULL) { 562 nxt_app_http_req_done(task, rc->ap); 563 564 rc->ap = NULL; 565 } 566 567 nxt_queue_remove(&rc->link); 568 569 rc->conn = NULL; 570} 571 572 573void 574nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 575{ 576 nxt_port_new_port_handler(task, msg); 577 578 if (msg->port_msg.stream == 0) { 579 return; 580 } 581 582 if (msg->u.new_port == NULL || 583 msg->u.new_port->type != NXT_PROCESS_WORKER) 584 { 585 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 586 } 587 588 nxt_port_rpc_handler(task, msg); 589} 590 591 592void 593nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 594{ 595 nxt_int_t ret; 596 nxt_buf_t *b; 597 nxt_router_temp_conf_t *tmcf; 598 599 tmcf = nxt_router_temp_conf(task); 600 if (nxt_slow_path(tmcf == NULL)) { 601 return; 602 } 603 604 b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size); 605 606 nxt_assert(b != NULL); 607 608 tmcf->conf->router = nxt_router; 609 tmcf->stream = msg->port_msg.stream; 610 tmcf->port = nxt_runtime_port_find(task->thread->runtime, 611 msg->port_msg.pid, 612 msg->port_msg.reply_port); 613 614 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); 615 616 if (nxt_fast_path(ret == NXT_OK)) { 617 nxt_router_conf_apply(task, tmcf, NULL); 618 619 } else { 620 nxt_router_conf_error(task, tmcf); 621 } 622} 623 624 625static void 626nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data) 627{ 628 union { 629 nxt_pid_t removed_pid; 630 void *data; 631 } u; 632 633 u.data = data; 634 635 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 636} 637 638 639void 640nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 641{ 642 nxt_event_engine_t *engine; 643 644 nxt_port_remove_pid_handler(task, msg); 645 646 if (msg->port_msg.stream == 0) { 647 return; 648 } 649 650 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 651 { 652 nxt_port_post(task, engine->port, nxt_router_worker_remove_pid, 653 msg->u.data); 654 } 655 nxt_queue_loop; 656 657 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 658 659 nxt_port_rpc_handler(task, msg); 660} 661 662 663static nxt_router_temp_conf_t * 664nxt_router_temp_conf(nxt_task_t *task) 665{ 666 nxt_mp_t *mp, *tmp; 667 nxt_router_conf_t *rtcf; 668 nxt_router_temp_conf_t *tmcf; 669 670 mp = nxt_mp_create(1024, 128, 256, 32); 671 if (nxt_slow_path(mp == NULL)) { 672 return NULL; 673 } 674 675 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 676 if (nxt_slow_path(rtcf == NULL)) { 677 goto fail; 678 } 679 680 rtcf->mem_pool = mp; 681 682 tmp = nxt_mp_create(1024, 128, 256, 32); 683 if (nxt_slow_path(tmp == NULL)) { 684 goto fail; 685 } 686 687 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 688 if (nxt_slow_path(tmcf == NULL)) { 689 goto temp_fail; 690 } 691 692 tmcf->mem_pool = tmp; 693 tmcf->conf = rtcf; 694 tmcf->count = 1; 695 tmcf->engine = task->thread->engine; 696 697 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 698 sizeof(nxt_router_engine_conf_t)); 699 if (nxt_slow_path(tmcf->engines == NULL)) { 700 goto temp_fail; 701 } 702 703 nxt_queue_init(&tmcf->deleting); 704 nxt_queue_init(&tmcf->keeping); 705 nxt_queue_init(&tmcf->updating); 706 nxt_queue_init(&tmcf->pending); 707 nxt_queue_init(&tmcf->creating); 708 nxt_queue_init(&tmcf->apps); 709 nxt_queue_init(&tmcf->previous); 710 711 return tmcf; 712 713temp_fail: 714 715 nxt_mp_destroy(tmp); 716 717fail: 718 719 nxt_mp_destroy(mp); 720 721 return NULL; 722} 723 724 725static void 726nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 727{ 728 nxt_int_t ret; 729 nxt_router_t *router; 730 nxt_runtime_t *rt; 731 nxt_queue_link_t *qlk; 732 nxt_socket_conf_t *skcf; 733 nxt_router_temp_conf_t *tmcf; 734 const nxt_event_interface_t *interface; 735 736 tmcf = obj; 737 738 qlk = nxt_queue_first(&tmcf->pending); 739 740 if (qlk != nxt_queue_tail(&tmcf->pending)) { 741 nxt_queue_remove(qlk); 742 nxt_queue_insert_tail(&tmcf->creating, qlk); 743 744 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 745 746 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 747 748 return; 749 } 750 751 rt = task->thread->runtime; 752 753 interface = nxt_service_get(rt->services, "engine", NULL); 754 755 router = tmcf->conf->router; 756 757 ret = nxt_router_engines_create(task, router, tmcf, interface); 758 if (nxt_slow_path(ret != NXT_OK)) { 759 goto fail; 760 } 761 762 ret = nxt_router_threads_create(task, rt, tmcf); 763 if (nxt_slow_path(ret != NXT_OK)) { 764 goto fail; 765 } 766 767 nxt_router_apps_sort(task, router, tmcf); 768 769 nxt_router_engines_post(router, tmcf); 770 771 nxt_queue_add(&router->sockets, &tmcf->updating); 772 nxt_queue_add(&router->sockets, &tmcf->creating); 773 774 nxt_router_conf_ready(task, tmcf); 775 776 return; 777 778fail: 779 780 nxt_router_conf_error(task, tmcf); 781 782 return; 783} 784 785 786static void 787nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 788{ 789 nxt_joint_job_t *job; 790 791 job = obj; 792 793 nxt_router_conf_ready(task, job->tmcf); 794} 795 796 797static void 798nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 799{ 800 nxt_debug(task, "temp conf count:%D", tmcf->count); 801 802 if (--tmcf->count == 0) { 803 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 804 } 805} 806 807 808static void 809nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 810{ 811 nxt_socket_t s; 812 nxt_router_t *router; 813 nxt_queue_link_t *qlk; 814 nxt_socket_conf_t *skcf; 815 816 nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf"); 817 818 for (qlk = nxt_queue_first(&tmcf->creating); 819 qlk != nxt_queue_tail(&tmcf->creating); 820 qlk = nxt_queue_next(qlk)) 821 { 822 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 823 s = skcf->listen->socket; 824 825 if (s != -1) { 826 nxt_socket_close(task, s); 827 } 828 829 nxt_free(skcf->listen); 830 } 831 832 router = tmcf->conf->router; 833 834 nxt_queue_add(&router->sockets, &tmcf->keeping); 835 nxt_queue_add(&router->sockets, &tmcf->deleting); 836 837 // TODO: new engines and threads 838 839 nxt_mp_destroy(tmcf->conf->mem_pool); 840 841 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 842} 843 844 845static void 846nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 847 nxt_port_msg_type_t type) 848{ 849 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 850} 851 852 853static nxt_conf_map_t nxt_router_conf[] = { 854 { 855 nxt_string("listeners_threads"), 856 NXT_CONF_MAP_INT32, 857 offsetof(nxt_router_conf_t, threads), 858 }, 859}; 860 861 862static nxt_conf_map_t nxt_router_app_conf[] = { 863 { 864 nxt_string("type"), 865 NXT_CONF_MAP_STR, 866 offsetof(nxt_router_app_conf_t, type), 867 }, 868 869 { 870 nxt_string("workers"), 871 NXT_CONF_MAP_INT32, 872 offsetof(nxt_router_app_conf_t, workers), 873 }, 874 875 { 876 nxt_string("limits"), 877 NXT_CONF_MAP_PTR, 878 offsetof(nxt_router_app_conf_t, limits_value), 879 }, 880}; 881 882 883static nxt_conf_map_t nxt_router_app_limits_conf[] = { 884 { 885 nxt_string("timeout"), 886 NXT_CONF_MAP_MSEC, 887 offsetof(nxt_router_app_conf_t, timeout), 888 }, 889 890 { 891 nxt_string("requests"), 892 NXT_CONF_MAP_INT32, 893 offsetof(nxt_router_app_conf_t, requests), 894 }, 895}; 896 897 898static nxt_conf_map_t nxt_router_listener_conf[] = { 899 { 900 nxt_string("application"), 901 NXT_CONF_MAP_STR, 902 offsetof(nxt_router_listener_conf_t, application), 903 }, 904}; 905 906 907static nxt_conf_map_t nxt_router_http_conf[] = { 908 { 909 nxt_string("header_buffer_size"), 910 NXT_CONF_MAP_SIZE, 911 offsetof(nxt_socket_conf_t, header_buffer_size), 912 }, 913 914 { 915 nxt_string("large_header_buffer_size"), 916 NXT_CONF_MAP_SIZE, 917 offsetof(nxt_socket_conf_t, large_header_buffer_size), 918 }, 919 920 { 921 nxt_string("large_header_buffers"), 922 NXT_CONF_MAP_SIZE, 923 offsetof(nxt_socket_conf_t, large_header_buffers), 924 }, 925 926 { 927 nxt_string("body_buffer_size"), 928 NXT_CONF_MAP_SIZE, 929 offsetof(nxt_socket_conf_t, body_buffer_size), 930 }, 931 932 { 933 nxt_string("max_body_size"), 934 NXT_CONF_MAP_SIZE, 935 offsetof(nxt_socket_conf_t, max_body_size), 936 }, 937 938 { 939 nxt_string("header_read_timeout"), 940 NXT_CONF_MAP_MSEC, 941 offsetof(nxt_socket_conf_t, header_read_timeout), 942 }, 943 944 { 945 nxt_string("body_read_timeout"), 946 NXT_CONF_MAP_MSEC, 947 offsetof(nxt_socket_conf_t, body_read_timeout), 948 }, 949}; 950 951 952static nxt_int_t 953nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 954 u_char *start, u_char *end) 955{ 956 u_char *p; 957 size_t size; 958 nxt_mp_t *mp; 959 uint32_t next; 960 nxt_int_t ret; 961 nxt_str_t name; 962 nxt_app_t *app, *prev; 963 nxt_router_t *router; 964 nxt_conf_value_t *conf, *http; 965 nxt_conf_value_t *applications, *application; 966 nxt_conf_value_t *listeners, *listener; 967 nxt_socket_conf_t *skcf; 968 nxt_app_lang_module_t *lang; 969 nxt_router_app_conf_t apcf; 970 nxt_router_listener_conf_t lscf; 971 972 static nxt_str_t http_path = nxt_string("/http"); 973 static nxt_str_t applications_path = nxt_string("/applications"); 974 static nxt_str_t listeners_path = nxt_string("/listeners"); 975 976 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 977 if (conf == NULL) { 978 nxt_log(task, NXT_LOG_CRIT, "configuration parsing error"); 979 return NXT_ERROR; 980 } 981 982 mp = tmcf->conf->mem_pool; 983 984 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 985 nxt_nitems(nxt_router_conf), tmcf->conf); 986 if (ret != NXT_OK) { 987 nxt_log(task, NXT_LOG_CRIT, "root map error"); 988 return NXT_ERROR; 989 } 990 991 if (tmcf->conf->threads == 0) { 992 tmcf->conf->threads = nxt_ncpu; 993 } 994 995 applications = nxt_conf_get_path(conf, &applications_path); 996 if (applications == NULL) { 997 nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block"); 998 return NXT_ERROR; 999 } 1000 1001 router = tmcf->conf->router; 1002 1003 next = 0; 1004 1005 for ( ;; ) { 1006 application = nxt_conf_next_object_member(applications, &name, &next); 1007 if (application == NULL) { 1008 break; 1009 } 1010 1011 nxt_debug(task, "application \"%V\"", &name); 1012 1013 size = nxt_conf_json_length(application, NULL); 1014 1015 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); 1016 if (app == NULL) { 1017 goto fail; 1018 } 1019 1020 nxt_memzero(app, sizeof(nxt_app_t)); 1021 1022 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1023 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length); 1024 1025 p = nxt_conf_json_print(app->conf.start, application, NULL); 1026 app->conf.length = p - app->conf.start; 1027 1028 nxt_assert(app->conf.length <= size); 1029 1030 nxt_debug(task, "application conf \"%V\"", &app->conf); 1031 1032 prev = nxt_router_app_find(&router->apps, &name); 1033 1034 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1035 nxt_free(app); 1036 1037 nxt_queue_remove(&prev->link); 1038 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1039 continue; 1040 } 1041 1042 apcf.workers = 1; 1043 apcf.timeout = 0; 1044 apcf.requests = 0; 1045 apcf.limits_value = NULL; 1046 1047 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1048 nxt_nitems(nxt_router_app_conf), &apcf); 1049 if (ret != NXT_OK) { 1050 nxt_log(task, NXT_LOG_CRIT, "application map error"); 1051 goto app_fail; 1052 } 1053 1054 if (apcf.limits_value != NULL) { 1055 1056 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1057 nxt_log(task, NXT_LOG_CRIT, "application limits is not object"); 1058 goto app_fail; 1059 } 1060 1061 ret = nxt_conf_map_object(mp, apcf.limits_value, 1062 nxt_router_app_limits_conf, 1063 nxt_nitems(nxt_router_app_limits_conf), 1064 &apcf); 1065 if (ret != NXT_OK) { 1066 nxt_log(task, NXT_LOG_CRIT, "application limits map error"); 1067 goto app_fail; 1068 } 1069 } 1070 1071 nxt_debug(task, "application type: %V", &apcf.type); 1072 nxt_debug(task, "application workers: %D", apcf.workers); 1073 nxt_debug(task, "application timeout: %D", apcf.timeout); 1074 nxt_debug(task, "application requests: %D", apcf.requests); 1075 1076 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1077 1078 if (lang == NULL) { 1079 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", 1080 &apcf.type); 1081 goto app_fail; 1082 } 1083 1084 nxt_debug(task, "application language module: \"%s\"", lang->file); 1085 1086 ret = nxt_thread_mutex_create(&app->mutex); 1087 if (ret != NXT_OK) { 1088 goto app_fail; 1089 } 1090 1091 nxt_queue_init(&app->ports); 1092 nxt_queue_init(&app->requests); 1093 1094 app->name.length = name.length; 1095 nxt_memcpy(app->name.start, name.start, name.length); 1096 1097 app->type = lang->type; 1098 app->max_workers = apcf.workers; 1099 app->timeout = apcf.timeout; 1100 app->live = 1; 1101 app->max_pending_responses = 2; 1102 app->prepare_msg = nxt_app_prepare_msg[lang->type]; 1103 1104 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1105 1106 nxt_router_app_use(task, app, 1); 1107 } 1108 1109 http = nxt_conf_get_path(conf, &http_path); 1110#if 0 1111 if (http == NULL) { 1112 nxt_log(task, NXT_LOG_CRIT, "no \"http\" block"); 1113 return NXT_ERROR; 1114 } 1115#endif 1116 1117 listeners = nxt_conf_get_path(conf, &listeners_path); 1118 if (listeners == NULL) { 1119 nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block"); 1120 return NXT_ERROR; 1121 } 1122 1123 next = 0; 1124 1125 for ( ;; ) { 1126 listener = nxt_conf_next_object_member(listeners, &name, &next); 1127 if (listener == NULL) { 1128 break; 1129 } 1130 1131 skcf = nxt_router_socket_conf(task, tmcf, &name); 1132 if (skcf == NULL) { 1133 goto fail; 1134 } 1135 1136 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1137 nxt_nitems(nxt_router_listener_conf), &lscf); 1138 if (ret != NXT_OK) { 1139 nxt_log(task, NXT_LOG_CRIT, "listener map error"); 1140 goto fail; 1141 } 1142 1143 nxt_debug(task, "application: %V", &lscf.application); 1144 1145 // STUB, default values if http block is not defined. 1146 skcf->header_buffer_size = 2048; 1147 skcf->large_header_buffer_size = 8192; 1148 skcf->large_header_buffers = 4; 1149 skcf->body_buffer_size = 16 * 1024; 1150 skcf->max_body_size = 2 * 1024 * 1024; 1151 skcf->header_read_timeout = 5000; 1152 skcf->body_read_timeout = 5000; 1153 1154 if (http != NULL) { 1155 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1156 nxt_nitems(nxt_router_http_conf), skcf); 1157 if (ret != NXT_OK) { 1158 nxt_log(task, NXT_LOG_CRIT, "http map error"); 1159 goto fail; 1160 } 1161 } 1162 1163 skcf->listen->handler = nxt_router_conn_init; 1164 skcf->router_conf = tmcf->conf; 1165 skcf->router_conf->count++; 1166 skcf->application = nxt_router_listener_application(tmcf, 1167 &lscf.application); 1168 } 1169 1170 nxt_queue_add(&tmcf->deleting, &router->sockets); 1171 nxt_queue_init(&router->sockets); 1172 1173 return NXT_OK; 1174 1175app_fail: 1176 1177 nxt_free(app); 1178 1179fail: 1180 1181 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1182 1183 nxt_queue_remove(&app->link); 1184 nxt_thread_mutex_destroy(&app->mutex); 1185 nxt_free(app); 1186 1187 } nxt_queue_loop; 1188 1189 return NXT_ERROR; 1190} 1191 1192 1193static nxt_app_t * 1194nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1195{ 1196 nxt_app_t *app; 1197 1198 nxt_queue_each(app, queue, nxt_app_t, link) { 1199 1200 if (nxt_strstr_eq(name, &app->name)) { 1201 return app; 1202 } 1203 1204 } nxt_queue_loop; 1205 1206 return NULL; 1207} 1208 1209 1210static nxt_app_t * 1211nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) 1212{ 1213 nxt_app_t *app; 1214 1215 app = nxt_router_app_find(&tmcf->apps, name); 1216 1217 if (app == NULL) { 1218 app = nxt_router_app_find(&tmcf->previous, name); 1219 } 1220 1221 return app; 1222} 1223 1224 1225static nxt_socket_conf_t * 1226nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1227 nxt_str_t *name) 1228{ 1229 size_t size; 1230 nxt_int_t ret; 1231 nxt_bool_t wildcard; 1232 nxt_sockaddr_t *sa; 1233 nxt_socket_conf_t *skcf; 1234 nxt_listen_socket_t *ls; 1235 1236 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 1237 if (nxt_slow_path(sa == NULL)) { 1238 nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", name); 1239 return NULL; 1240 } 1241 1242 sa->type = SOCK_STREAM; 1243 1244 nxt_debug(task, "router listener: \"%*s\"", 1245 sa->length, nxt_sockaddr_start(sa)); 1246 1247 skcf = nxt_mp_zget(tmcf->conf->mem_pool, sizeof(nxt_socket_conf_t)); 1248 if (nxt_slow_path(skcf == NULL)) { 1249 return NULL; 1250 } 1251 1252 size = nxt_sockaddr_size(sa); 1253 1254 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 1255 1256 if (ret != NXT_OK) { 1257 1258 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 1259 if (nxt_slow_path(ls == NULL)) { 1260 return NULL; 1261 } 1262 1263 skcf->listen = ls; 1264 1265 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 1266 nxt_memcpy(ls->sockaddr, sa, size); 1267 1268 nxt_listen_socket_remote_size(ls); 1269 1270 ls->socket = -1; 1271 ls->backlog = NXT_LISTEN_BACKLOG; 1272 ls->flags = NXT_NONBLOCK; 1273 ls->read_after_accept = 1; 1274 } 1275 1276 switch (sa->u.sockaddr.sa_family) { 1277#if (NXT_HAVE_UNIX_DOMAIN) 1278 case AF_UNIX: 1279 wildcard = 0; 1280 break; 1281#endif 1282#if (NXT_INET6) 1283 case AF_INET6: 1284 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 1285 break; 1286#endif 1287 case AF_INET: 1288 default: 1289 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 1290 break; 1291 } 1292 1293 if (!wildcard) { 1294 skcf->sockaddr = nxt_mp_zget(tmcf->conf->mem_pool, size); 1295 if (nxt_slow_path(skcf->sockaddr == NULL)) { 1296 return NULL; 1297 } 1298 1299 nxt_memcpy(skcf->sockaddr, sa, size); 1300 } 1301 1302 return skcf; 1303} 1304 1305 1306static nxt_int_t 1307nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 1308 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 1309{ 1310 nxt_router_t *router; 1311 nxt_queue_link_t *qlk; 1312 nxt_socket_conf_t *skcf; 1313 1314 router = tmcf->conf->router; 1315 1316 for (qlk = nxt_queue_first(&router->sockets); 1317 qlk != nxt_queue_tail(&router->sockets); 1318 qlk = nxt_queue_next(qlk)) 1319 { 1320 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1321 1322 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 1323 nskcf->listen = skcf->listen; 1324 1325 nxt_queue_remove(qlk); 1326 nxt_queue_insert_tail(&tmcf->keeping, qlk); 1327 1328 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link); 1329 1330 return NXT_OK; 1331 } 1332 } 1333 1334 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link); 1335 1336 return NXT_DECLINED; 1337} 1338 1339 1340static void 1341nxt_router_listen_socket_rpc_create(nxt_task_t *task, 1342 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 1343{ 1344 size_t size; 1345 uint32_t stream; 1346 nxt_buf_t *b; 1347 nxt_port_t *main_port, *router_port; 1348 nxt_runtime_t *rt; 1349 nxt_socket_rpc_t *rpc; 1350 1351 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 1352 if (rpc == NULL) { 1353 goto fail; 1354 } 1355 1356 rpc->socket_conf = skcf; 1357 rpc->temp_conf = tmcf; 1358 1359 size = nxt_sockaddr_size(skcf->listen->sockaddr); 1360 1361 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1362 if (b == NULL) { 1363 goto fail; 1364 } 1365 1366 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 1367 1368 rt = task->thread->runtime; 1369 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1370 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1371 1372 stream = nxt_port_rpc_register_handler(task, router_port, 1373 nxt_router_listen_socket_ready, 1374 nxt_router_listen_socket_error, 1375 main_port->pid, rpc); 1376 if (stream == 0) { 1377 goto fail; 1378 } 1379 1380 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 1381 stream, router_port->id, b); 1382 1383 return; 1384 1385fail: 1386 1387 nxt_router_conf_error(task, tmcf); 1388} 1389 1390 1391static void 1392nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1393 void *data) 1394{ 1395 nxt_int_t ret; 1396 nxt_socket_t s; 1397 nxt_socket_rpc_t *rpc; 1398 1399 rpc = data; 1400 1401 s = msg->fd; 1402 1403 ret = nxt_socket_nonblocking(task, s); 1404 if (nxt_slow_path(ret != NXT_OK)) { 1405 goto fail; 1406 } 1407 1408 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 1409 1410 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 1411 if (nxt_slow_path(ret != NXT_OK)) { 1412 goto fail; 1413 } 1414 1415 rpc->socket_conf->listen->socket = s; 1416 1417 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 1418 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 1419 1420 return; 1421 1422fail: 1423 1424 nxt_socket_close(task, s); 1425 1426 nxt_router_conf_error(task, rpc->temp_conf); 1427} 1428 1429 1430static void 1431nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1432 void *data) 1433{ 1434 u_char *p; 1435 size_t size; 1436 uint8_t error; 1437 nxt_buf_t *in, *out; 1438 nxt_sockaddr_t *sa; 1439 nxt_socket_rpc_t *rpc; 1440 nxt_router_temp_conf_t *tmcf; 1441 1442 static nxt_str_t socket_errors[] = { 1443 nxt_string("ListenerSystem"), 1444 nxt_string("ListenerNoIPv6"), 1445 nxt_string("ListenerPort"), 1446 nxt_string("ListenerInUse"), 1447 nxt_string("ListenerNoAddress"), 1448 nxt_string("ListenerNoAccess"), 1449 nxt_string("ListenerPath"), 1450 }; 1451 1452 rpc = data; 1453 sa = rpc->socket_conf->listen->sockaddr; 1454 tmcf = rpc->temp_conf; 1455 1456 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 1457 1458 nxt_assert(in != NULL); 1459 1460 p = in->mem.pos; 1461 1462 error = *p++; 1463 1464 size = sizeof("listen socket error: ") - 1 1465 + sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1 1466 + sa->length + socket_errors[error].length + (in->mem.free - p); 1467 1468 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1469 if (nxt_slow_path(out == NULL)) { 1470 return; 1471 } 1472 1473 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 1474 "listen socket error: " 1475 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 1476 sa->length, nxt_sockaddr_start(sa), 1477 &socket_errors[error], in->mem.free - p, p); 1478 1479 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 1480 1481 nxt_router_conf_error(task, tmcf); 1482} 1483 1484 1485static nxt_int_t 1486nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 1487 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 1488{ 1489 nxt_int_t ret; 1490 nxt_uint_t n, threads; 1491 nxt_queue_link_t *qlk; 1492 nxt_router_engine_conf_t *recf; 1493 1494 threads = tmcf->conf->threads; 1495 1496 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 1497 sizeof(nxt_router_engine_conf_t)); 1498 if (nxt_slow_path(tmcf->engines == NULL)) { 1499 return NXT_ERROR; 1500 } 1501 1502 n = 0; 1503 1504 for (qlk = nxt_queue_first(&router->engines); 1505 qlk != nxt_queue_tail(&router->engines); 1506 qlk = nxt_queue_next(qlk)) 1507 { 1508 recf = nxt_array_zero_add(tmcf->engines); 1509 if (nxt_slow_path(recf == NULL)) { 1510 return NXT_ERROR; 1511 } 1512 1513 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 1514 1515 if (n < threads) { 1516 recf->action = NXT_ROUTER_ENGINE_KEEP; 1517 ret = nxt_router_engine_conf_update(tmcf, recf); 1518 1519 } else { 1520 recf->action = NXT_ROUTER_ENGINE_DELETE; 1521 ret = nxt_router_engine_conf_delete(tmcf, recf); 1522 } 1523 1524 if (nxt_slow_path(ret != NXT_OK)) { 1525 return ret; 1526 } 1527 1528 n++; 1529 } 1530 1531 tmcf->new_threads = n; 1532 1533 while (n < threads) { 1534 recf = nxt_array_zero_add(tmcf->engines); 1535 if (nxt_slow_path(recf == NULL)) { 1536 return NXT_ERROR; 1537 } 1538 1539 recf->action = NXT_ROUTER_ENGINE_ADD; 1540 1541 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 1542 if (nxt_slow_path(recf->engine == NULL)) { 1543 return NXT_ERROR; 1544 } 1545 1546 ret = nxt_router_engine_conf_create(tmcf, recf); 1547 if (nxt_slow_path(ret != NXT_OK)) { 1548 return ret; 1549 } 1550 1551 n++; 1552 } 1553 1554 return NXT_OK; 1555} 1556 1557 1558static nxt_int_t 1559nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 1560 nxt_router_engine_conf_t *recf) 1561{ 1562 nxt_int_t ret; 1563 1564 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 1565 nxt_router_listen_socket_create); 1566 if (nxt_slow_path(ret != NXT_OK)) { 1567 return ret; 1568 } 1569 1570 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 1571 nxt_router_listen_socket_create); 1572 if (nxt_slow_path(ret != NXT_OK)) { 1573 return ret; 1574 } 1575 1576 return ret; 1577} 1578 1579 1580static nxt_int_t 1581nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 1582 nxt_router_engine_conf_t *recf) 1583{ 1584 nxt_int_t ret; 1585 1586 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 1587 nxt_router_listen_socket_create); 1588 if (nxt_slow_path(ret != NXT_OK)) { 1589 return ret; 1590 } 1591 1592 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 1593 nxt_router_listen_socket_update); 1594 if (nxt_slow_path(ret != NXT_OK)) { 1595 return ret; 1596 } 1597 1598 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 1599 if (nxt_slow_path(ret != NXT_OK)) { 1600 return ret; 1601 } 1602 1603 return ret; 1604} 1605 1606 1607static nxt_int_t 1608nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 1609 nxt_router_engine_conf_t *recf) 1610{ 1611 nxt_int_t ret; 1612 1613 ret = nxt_router_engine_quit(tmcf, recf); 1614 if (nxt_slow_path(ret != NXT_OK)) { 1615 return ret; 1616 } 1617 1618 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); 1619 if (nxt_slow_path(ret != NXT_OK)) { 1620 return ret; 1621 } 1622 1623 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 1624} 1625 1626 1627static nxt_int_t 1628nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 1629 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 1630 nxt_work_handler_t handler) 1631{ 1632 nxt_joint_job_t *job; 1633 nxt_queue_link_t *qlk; 1634 nxt_socket_conf_t *skcf; 1635 nxt_socket_conf_joint_t *joint; 1636 1637 for (qlk = nxt_queue_first(sockets); 1638 qlk != nxt_queue_tail(sockets); 1639 qlk = nxt_queue_next(qlk)) 1640 { 1641 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 1642 if (nxt_slow_path(job == NULL)) { 1643 return NXT_ERROR; 1644 } 1645 1646 job->work.next = recf->jobs; 1647 recf->jobs = &job->work; 1648 1649 job->task = tmcf->engine->task; 1650 job->work.handler = handler; 1651 job->work.task = &job->task; 1652 job->work.obj = job; 1653 job->tmcf = tmcf; 1654 1655 tmcf->count++; 1656 1657 joint = nxt_mp_alloc(tmcf->conf->mem_pool, 1658 sizeof(nxt_socket_conf_joint_t)); 1659 if (nxt_slow_path(joint == NULL)) { 1660 return NXT_ERROR; 1661 } 1662 1663 job->work.data = joint; 1664 1665 joint->count = 1; 1666 1667 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1668 skcf->count++; 1669 joint->socket_conf = skcf; 1670 1671 joint->engine = recf->engine; 1672 } 1673 1674 return NXT_OK; 1675} 1676 1677 1678static nxt_int_t 1679nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 1680 nxt_router_engine_conf_t *recf) 1681{ 1682 nxt_joint_job_t *job; 1683 1684 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 1685 if (nxt_slow_path(job == NULL)) { 1686 return NXT_ERROR; 1687 } 1688 1689 job->work.next = recf->jobs; 1690 recf->jobs = &job->work; 1691 1692 job->task = tmcf->engine->task; 1693 job->work.handler = nxt_router_worker_thread_quit; 1694 job->work.task = &job->task; 1695 job->work.obj = NULL; 1696 job->work.data = NULL; 1697 job->tmcf = NULL; 1698 1699 return NXT_OK; 1700} 1701 1702 1703static nxt_int_t 1704nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 1705 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 1706{ 1707 nxt_joint_job_t *job; 1708 nxt_queue_link_t *qlk; 1709 1710 for (qlk = nxt_queue_first(sockets); 1711 qlk != nxt_queue_tail(sockets); 1712 qlk = nxt_queue_next(qlk)) 1713 { 1714 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 1715 if (nxt_slow_path(job == NULL)) { 1716 return NXT_ERROR; 1717 } 1718 1719 job->work.next = recf->jobs; 1720 recf->jobs = &job->work; 1721 1722 job->task = tmcf->engine->task; 1723 job->work.handler = nxt_router_listen_socket_delete; 1724 job->work.task = &job->task; 1725 job->work.obj = job; 1726 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1727 job->tmcf = tmcf; 1728 1729 tmcf->count++; 1730 } 1731 1732 return NXT_OK; 1733} 1734 1735 1736static nxt_int_t 1737nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 1738 nxt_router_temp_conf_t *tmcf) 1739{ 1740 nxt_int_t ret; 1741 nxt_uint_t i, threads; 1742 nxt_router_engine_conf_t *recf; 1743 1744 recf = tmcf->engines->elts; 1745 threads = tmcf->conf->threads; 1746 1747 for (i = tmcf->new_threads; i < threads; i++) { 1748 ret = nxt_router_thread_create(task, rt, recf[i].engine); 1749 if (nxt_slow_path(ret != NXT_OK)) { 1750 return ret; 1751 } 1752 } 1753 1754 return NXT_OK; 1755} 1756 1757 1758static nxt_int_t 1759nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 1760 nxt_event_engine_t *engine) 1761{ 1762 nxt_int_t ret; 1763 nxt_thread_link_t *link; 1764 nxt_thread_handle_t handle; 1765 1766 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 1767 1768 if (nxt_slow_path(link == NULL)) { 1769 return NXT_ERROR; 1770 } 1771 1772 link->start = nxt_router_thread_start; 1773 link->engine = engine; 1774 link->work.handler = nxt_router_thread_exit_handler; 1775 link->work.task = task; 1776 link->work.data = link; 1777 1778 nxt_queue_insert_tail(&rt->engines, &engine->link); 1779 1780 ret = nxt_thread_create(&handle, link); 1781 1782 if (nxt_slow_path(ret != NXT_OK)) { 1783 nxt_queue_remove(&engine->link); 1784 } 1785 1786 return ret; 1787} 1788 1789 1790static void 1791nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 1792 nxt_router_temp_conf_t *tmcf) 1793{ 1794 nxt_app_t *app; 1795 nxt_port_t *port; 1796 1797 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 1798 1799 nxt_queue_remove(&app->link); 1800 1801 nxt_debug(task, "about to free app '%V' %p", &app->name, app); 1802 1803 app->live = 0; 1804 1805 do { 1806 port = nxt_router_app_get_idle_port(app); 1807 if (port == NULL) { 1808 break; 1809 } 1810 1811 nxt_debug(task, "port %p send quit", port); 1812 1813 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, 1814 NULL); 1815 1816 nxt_port_use(task, port, -1); 1817 } while (1); 1818 1819 nxt_router_app_use(task, app, -1); 1820 1821 } nxt_queue_loop; 1822 1823 nxt_queue_add(&router->apps, &tmcf->previous); 1824 nxt_queue_add(&router->apps, &tmcf->apps); 1825} 1826 1827 1828static void 1829nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 1830{ 1831 nxt_uint_t n; 1832 nxt_event_engine_t *engine; 1833 nxt_router_engine_conf_t *recf; 1834 1835 recf = tmcf->engines->elts; 1836 1837 for (n = tmcf->engines->nelts; n != 0; n--) { 1838 engine = recf->engine; 1839 1840 switch (recf->action) { 1841 1842 case NXT_ROUTER_ENGINE_KEEP: 1843 break; 1844 1845 case NXT_ROUTER_ENGINE_ADD: 1846 nxt_queue_insert_tail(&router->engines, &engine->link0); 1847 break; 1848 1849 case NXT_ROUTER_ENGINE_DELETE: 1850 nxt_queue_remove(&engine->link0); 1851 break; 1852 } 1853 1854 nxt_router_engine_post(engine, recf->jobs); 1855 1856 recf++; 1857 } 1858} 1859 1860 1861static void 1862nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 1863{ 1864 nxt_work_t *work, *next; 1865 1866 for (work = jobs; work != NULL; work = next) { 1867 next = work->next; 1868 work->next = NULL; 1869 1870 nxt_event_engine_post(engine, work); 1871 } 1872} 1873 1874 1875static nxt_port_handlers_t nxt_router_app_port_handlers = { 1876 .mmap = nxt_port_mmap_handler, 1877 .data = nxt_port_rpc_handler, 1878}; 1879 1880 1881static void 1882nxt_router_thread_start(void *data) 1883{ 1884 nxt_int_t ret; 1885 nxt_port_t *port; 1886 nxt_task_t *task; 1887 nxt_thread_t *thread; 1888 nxt_thread_link_t *link; 1889 nxt_event_engine_t *engine; 1890 1891 link = data; 1892 engine = link->engine; 1893 task = &engine->task; 1894 1895 thread = nxt_thread(); 1896 1897 nxt_event_engine_thread_adopt(engine); 1898 1899 /* STUB */ 1900 thread->runtime = engine->task.thread->runtime; 1901 1902 engine->task.thread = thread; 1903 engine->task.log = thread->log; 1904 thread->engine = engine; 1905 thread->task = &engine->task; 1906#if 0 1907 thread->fiber = &engine->fibers->fiber; 1908#endif 1909 1910 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 1911 if (nxt_slow_path(engine->mem_pool == NULL)) { 1912 return; 1913 } 1914 1915 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 1916 NXT_PROCESS_ROUTER); 1917 if (nxt_slow_path(port == NULL)) { 1918 return; 1919 } 1920 1921 ret = nxt_port_socket_init(task, port, 0); 1922 if (nxt_slow_path(ret != NXT_OK)) { 1923 nxt_port_use(task, port, -1); 1924 return; 1925 } 1926 1927 engine->port = port; 1928 1929 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 1930 1931 nxt_event_engine_start(engine); 1932} 1933 1934 1935static void 1936nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 1937{ 1938 nxt_joint_job_t *job; 1939 nxt_socket_conf_t *skcf; 1940 nxt_listen_event_t *lev; 1941 nxt_listen_socket_t *ls; 1942 nxt_thread_spinlock_t *lock; 1943 nxt_socket_conf_joint_t *joint; 1944 1945 job = obj; 1946 joint = data; 1947 1948 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 1949 1950 skcf = joint->socket_conf; 1951 ls = skcf->listen; 1952 1953 lev = nxt_listen_event(task, ls); 1954 if (nxt_slow_path(lev == NULL)) { 1955 nxt_router_listen_socket_release(task, skcf); 1956 return; 1957 } 1958 1959 lev->socket.data = joint; 1960 1961 lock = &skcf->router_conf->router->lock; 1962 1963 nxt_thread_spin_lock(lock); 1964 ls->count++; 1965 nxt_thread_spin_unlock(lock); 1966 1967 job->work.next = NULL; 1968 job->work.handler = nxt_router_conf_wait; 1969 1970 nxt_event_engine_post(job->tmcf->engine, &job->work); 1971} 1972 1973 1974nxt_inline nxt_listen_event_t * 1975nxt_router_listen_event(nxt_queue_t *listen_connections, 1976 nxt_socket_conf_t *skcf) 1977{ 1978 nxt_socket_t fd; 1979 nxt_queue_link_t *qlk; 1980 nxt_listen_event_t *lev; 1981 1982 fd = skcf->listen->socket; 1983 1984 for (qlk = nxt_queue_first(listen_connections); 1985 qlk != nxt_queue_tail(listen_connections); 1986 qlk = nxt_queue_next(qlk)) 1987 { 1988 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 1989 1990 if (fd == lev->socket.fd) { 1991 return lev; 1992 } 1993 } 1994 1995 return NULL; 1996} 1997 1998 1999static void 2000nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 2001{ 2002 nxt_joint_job_t *job; 2003 nxt_event_engine_t *engine; 2004 nxt_listen_event_t *lev; 2005 nxt_socket_conf_joint_t *joint, *old; 2006 2007 job = obj; 2008 joint = data; 2009 2010 engine = task->thread->engine; 2011 2012 nxt_queue_insert_tail(&engine->joints, &joint->link); 2013 2014 lev = nxt_router_listen_event(&engine->listen_connections, 2015 joint->socket_conf); 2016 2017 old = lev->socket.data; 2018 lev->socket.data = joint; 2019 lev->listen = joint->socket_conf->listen; 2020 2021 job->work.next = NULL; 2022 job->work.handler = nxt_router_conf_wait; 2023 2024 nxt_event_engine_post(job->tmcf->engine, &job->work); 2025 2026 /* 2027 * The task is allocated from configuration temporary 2028 * memory pool so it can be freed after engine post operation. 2029 */ 2030 2031 nxt_router_conf_release(&engine->task, old); 2032} 2033 2034 2035static void 2036nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 2037{ 2038 nxt_joint_job_t *job; 2039 nxt_socket_conf_t *skcf; 2040 nxt_listen_event_t *lev; 2041 nxt_event_engine_t *engine; 2042 2043 job = obj; 2044 skcf = data; 2045 2046 engine = task->thread->engine; 2047 2048 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 2049 2050 nxt_fd_event_delete(engine, &lev->socket); 2051 2052 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 2053 lev->socket.fd); 2054 2055 lev->timer.handler = nxt_router_listen_socket_close; 2056 lev->timer.work_queue = &engine->fast_work_queue; 2057 2058 nxt_timer_add(engine, &lev->timer, 0); 2059 2060 job->work.next = NULL; 2061 job->work.handler = nxt_router_conf_wait; 2062 2063 nxt_event_engine_post(job->tmcf->engine, &job->work); 2064} 2065 2066 2067static void 2068nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 2069{ 2070 nxt_event_engine_t *engine; 2071 2072 nxt_debug(task, "router worker thread quit"); 2073 2074 engine = task->thread->engine; 2075 2076 engine->shutdown = 1; 2077 2078 if (nxt_queue_is_empty(&engine->joints)) { 2079 nxt_thread_exit(task->thread); 2080 } 2081} 2082 2083 2084static void 2085nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 2086{ 2087 nxt_timer_t *timer; 2088 nxt_listen_event_t *lev; 2089 nxt_socket_conf_joint_t *joint; 2090 2091 timer = obj; 2092 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 2093 joint = lev->socket.data; 2094 2095 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 2096 lev->socket.fd); 2097 2098 nxt_queue_remove(&lev->link); 2099 2100 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 2101 task = &task->thread->engine->task; 2102 2103 nxt_router_listen_socket_release(task, joint->socket_conf); 2104 2105 nxt_free(lev); 2106 2107 nxt_router_conf_release(task, joint); 2108} 2109 2110 2111static void 2112nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 2113{ 2114 nxt_listen_socket_t *ls; 2115 nxt_thread_spinlock_t *lock; 2116 2117 ls = skcf->listen; 2118 lock = &skcf->router_conf->router->lock; 2119 2120 nxt_thread_spin_lock(lock); 2121 2122 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 2123 task->thread->engine, ls->count); 2124 2125 if (--ls->count != 0) { 2126 ls = NULL; 2127 } 2128 2129 nxt_thread_spin_unlock(lock); 2130 2131 if (ls != NULL) { 2132 nxt_socket_close(task, ls->socket); 2133 nxt_free(ls); 2134 } 2135} 2136 2137 2138static void 2139nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 2140{ 2141 nxt_bool_t exit; 2142 nxt_socket_conf_t *skcf; 2143 nxt_router_conf_t *rtcf; 2144 nxt_event_engine_t *engine; 2145 nxt_thread_spinlock_t *lock; 2146 2147 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 2148 2149 if (--joint->count != 0) { 2150 return; 2151 } 2152 2153 nxt_queue_remove(&joint->link); 2154 2155 skcf = joint->socket_conf; 2156 rtcf = skcf->router_conf; 2157 lock = &rtcf->router->lock; 2158 2159 nxt_thread_spin_lock(lock); 2160 2161 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 2162 rtcf, rtcf->count); 2163 2164 if (--skcf->count != 0) { 2165 rtcf = NULL; 2166 2167 } else { 2168 nxt_queue_remove(&skcf->link); 2169 2170 if (--rtcf->count != 0) { 2171 rtcf = NULL; 2172 } 2173 } 2174 2175 nxt_thread_spin_unlock(lock); 2176 2177 /* TODO remove engine->port */ 2178 /* TODO excude from connected ports */ 2179 2180 /* The joint content can be used before memory pool destruction. */ 2181 engine = joint->engine; 2182 exit = (engine->shutdown && nxt_queue_is_empty(&engine->joints)); 2183 2184 if (rtcf != NULL) { 2185 nxt_debug(task, "old router conf is destroyed"); 2186 2187 nxt_mp_thread_adopt(rtcf->mem_pool); 2188 2189 nxt_mp_destroy(rtcf->mem_pool); 2190 } 2191 2192 if (exit) { 2193 nxt_thread_exit(task->thread); 2194 } 2195} 2196 2197 2198static void 2199nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 2200{ 2201 nxt_port_t *port; 2202 nxt_thread_link_t *link; 2203 nxt_event_engine_t *engine; 2204 nxt_thread_handle_t handle; 2205 2206 handle = (nxt_thread_handle_t) obj; 2207 link = data; 2208 2209 nxt_thread_wait(handle); 2210 2211 engine = link->engine; 2212 2213 nxt_queue_remove(&engine->link); 2214 2215 port = engine->port; 2216 2217 // TODO notify all apps 2218 2219 port->engine = task->thread->engine; 2220 nxt_mp_thread_adopt(port->mem_pool); 2221 nxt_port_use(task, port, -1); 2222 2223 nxt_mp_thread_adopt(engine->mem_pool); 2224 nxt_mp_destroy(engine->mem_pool); 2225 2226 nxt_event_engine_free(engine); 2227 2228 nxt_free(link); 2229} 2230 2231 2232static const nxt_conn_state_t nxt_router_conn_read_header_state 2233 nxt_aligned(64) = 2234{ 2235 .ready_handler = nxt_router_conn_http_header_parse, 2236 .close_handler = nxt_router_conn_close, 2237 .error_handler = nxt_router_conn_error, 2238 2239 .timer_handler = nxt_router_conn_timeout, 2240 .timer_value = nxt_router_conn_timeout_value, 2241 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), 2242}; 2243 2244 2245static const nxt_conn_state_t nxt_router_conn_read_body_state 2246 nxt_aligned(64) = 2247{ 2248 .ready_handler = nxt_router_conn_http_body_read, 2249 .close_handler = nxt_router_conn_close, 2250 .error_handler = nxt_router_conn_error, 2251 2252 .timer_handler = nxt_router_conn_timeout, 2253 .timer_value = nxt_router_conn_timeout_value, 2254 .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout), 2255 .timer_autoreset = 1, 2256}; 2257 2258 2259static void 2260nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) 2261{ 2262 size_t size; 2263 nxt_conn_t *c; 2264 nxt_socket_conf_t *skcf; 2265 nxt_event_engine_t *engine; 2266 nxt_socket_conf_joint_t *joint; 2267 2268 c = obj; 2269 joint = data; 2270 2271 nxt_debug(task, "router conn init"); 2272 2273 c->joint = joint; 2274 joint->count++; 2275 2276 skcf = joint->socket_conf; 2277 c->local = skcf->sockaddr; 2278 2279 size = skcf->header_buffer_size; 2280 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0); 2281 2282 c->socket.data = NULL; 2283 2284 engine = task->thread->engine; 2285 c->read_work_queue = &engine->fast_work_queue; 2286 c->write_work_queue = &engine->fast_work_queue; 2287 2288 c->read_state = &nxt_router_conn_read_header_state; 2289 2290 nxt_conn_read(engine, c); 2291} 2292 2293 2294static const nxt_conn_state_t nxt_router_conn_write_state 2295 nxt_aligned(64) = 2296{ 2297 .ready_handler = nxt_router_conn_ready, 2298 .close_handler = nxt_router_conn_close, 2299 .error_handler = nxt_router_conn_error, 2300}; 2301 2302 2303static void 2304nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2305 void *data) 2306{ 2307 size_t dump_size; 2308 nxt_buf_t *b, *last; 2309 nxt_conn_t *c; 2310 nxt_req_conn_link_t *rc; 2311 2312 b = msg->buf; 2313 rc = data; 2314 2315 c = rc->conn; 2316 2317 dump_size = nxt_buf_used_size(b); 2318 2319 if (dump_size > 300) { 2320 dump_size = 300; 2321 } 2322 2323 nxt_debug(task, "%srouter app data (%z): %*s", 2324 msg->port_msg.last ? "last " : "", msg->size, dump_size, 2325 b->mem.pos); 2326 2327 if (msg->size == 0) { 2328 b = NULL; 2329 } 2330 2331 if (msg->port_msg.last != 0) { 2332 nxt_debug(task, "router data create last buf"); 2333 2334 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); 2335 if (nxt_slow_path(last == NULL)) { 2336 /* TODO pogorevaTb */ 2337 } 2338 2339 nxt_buf_chain_add(&b, last); 2340 2341 nxt_router_rc_unlink(task, rc); 2342 } 2343 2344 if (b == NULL) { 2345 return; 2346 } 2347 2348 if (msg->buf == b) { 2349 /* Disable instant buffer completion/re-using by port. */ 2350 msg->buf = NULL; 2351 } 2352 2353 if (c->write == NULL) { 2354 c->write = b; 2355 c->write_state = &nxt_router_conn_write_state; 2356 2357 nxt_conn_write(task->thread->engine, c); 2358 2359 } else { 2360 nxt_debug(task, "router data attach out bufs to existing chain"); 2361 2362 nxt_buf_chain_add(&c->write, b); 2363 } 2364} 2365 2366 2367static void 2368nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2369 void *data) 2370{ 2371 nxt_req_conn_link_t *rc; 2372 2373 rc = data; 2374 2375 nxt_router_gen_error(task, rc->conn, 500, 2376 "Application terminated unexpectedly"); 2377 2378 nxt_router_rc_unlink(task, rc); 2379} 2380 2381 2382nxt_inline const char * 2383nxt_router_text_by_code(int code) 2384{ 2385 switch (code) { 2386 case 400: return "Bad request"; 2387 case 404: return "Not found"; 2388 case 403: return "Forbidden"; 2389 case 408: return "Request Timeout"; 2390 case 411: return "Length Required"; 2391 case 413: return "Request Entity Too Large"; 2392 case 500: 2393 default: return "Internal server error"; 2394 } 2395} 2396 2397 2398static nxt_buf_t * 2399nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, 2400 const char* str) 2401{ 2402 nxt_buf_t *b, *last; 2403 2404 b = nxt_buf_mem_alloc(mp, 16384, 0); 2405 if (nxt_slow_path(b == NULL)) { 2406 return NULL; 2407 } 2408 2409 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, 2410 "HTTP/1.0 %d %s\r\n" 2411 "Content-Type: text/plain\r\n" 2412 "Connection: close\r\n\r\n", 2413 code, nxt_router_text_by_code(code)); 2414 2415 b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str)); 2416 2417 nxt_log_alert(task->log, "error %d: %s", code, str); 2418 2419 last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST); 2420 2421 if (nxt_slow_path(last == NULL)) { 2422 nxt_mp_free(mp, b); 2423 return NULL; 2424 } 2425 2426 nxt_buf_chain_add(&b, last); 2427 2428 return b; 2429} 2430 2431 2432 2433static void 2434nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 2435 const char* str) 2436{ 2437 nxt_mp_t *mp; 2438 nxt_buf_t *b; 2439 2440 /* TODO: fix when called in the middle of response */ 2441 2442 mp = c->mem_pool; 2443 2444 b = nxt_router_get_error_buf(task, mp, code, str); 2445 2446 if (c->socket.fd == -1) { 2447 nxt_mp_free(mp, b->next); 2448 nxt_mp_free(mp, b); 2449 return; 2450 } 2451 2452 if (c->write == NULL) { 2453 c->write = b; 2454 c->write_state = &nxt_router_conn_write_state; 2455 2456 nxt_conn_write(task->thread->engine, c); 2457 2458 } else { 2459 nxt_debug(task, "router data attach out bufs to existing chain"); 2460 2461 nxt_buf_chain_add(&c->write, b); 2462 } 2463} 2464 2465 2466static void 2467nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2468 void *data) 2469{ 2470 nxt_app_t *app; 2471 nxt_port_t *port; 2472 2473 app = data; 2474 port = msg->u.new_port; 2475 2476 nxt_assert(app != NULL); 2477 nxt_assert(port != NULL); 2478 2479 port->app = app; 2480 2481 nxt_thread_mutex_lock(&app->mutex); 2482 2483 nxt_assert(app->pending_workers != 0); 2484 2485 app->pending_workers--; 2486 app->workers++; 2487 2488 nxt_thread_mutex_unlock(&app->mutex); 2489 2490 nxt_debug(task, "app '%V' %p new port ready", &app->name, app); 2491 2492 nxt_router_app_port_release(task, port, 0, 0); 2493} 2494 2495 2496static void 2497nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2498 void *data) 2499{ 2500 nxt_app_t *app; 2501 nxt_queue_link_t *lnk; 2502 nxt_req_app_link_t *ra; 2503 2504 app = data; 2505 2506 nxt_assert(app != NULL); 2507 2508 nxt_debug(task, "app '%V' %p start error", &app->name, app); 2509 2510 nxt_thread_mutex_lock(&app->mutex); 2511 2512 nxt_assert(app->pending_workers != 0); 2513 2514 app->pending_workers--; 2515 2516 if (!nxt_queue_is_empty(&app->requests)) { 2517 lnk = nxt_queue_last(&app->requests); 2518 nxt_queue_remove(lnk); 2519 lnk->next = NULL; 2520 2521 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); 2522 2523 } else { 2524 ra = NULL; 2525 } 2526 2527 nxt_thread_mutex_unlock(&app->mutex); 2528 2529 if (ra != NULL) { 2530 nxt_debug(task, "app '%V' %p abort next stream #%uD", 2531 &app->name, app, ra->stream); 2532 2533 nxt_router_ra_abort(task, ra, ra->work.data); 2534 } 2535 2536 nxt_router_app_use(task, app, -1); 2537} 2538 2539 2540void 2541nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 2542{ 2543 int c; 2544 2545 c = nxt_atomic_fetch_add(&app->use_count, i); 2546 2547 if (i < 0 && c == -i) { 2548 2549 nxt_assert(app->live == 0); 2550 nxt_assert(app->workers == 0); 2551 nxt_assert(app->pending_workers == 0); 2552 nxt_assert(nxt_queue_is_empty(&app->requests) != 0); 2553 nxt_assert(nxt_queue_is_empty(&app->ports) != 0); 2554 2555 nxt_thread_mutex_destroy(&app->mutex); 2556 nxt_free(app); 2557 } 2558} 2559 2560 2561nxt_inline nxt_port_t * 2562nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) 2563{ 2564 nxt_port_t *port; 2565 nxt_queue_link_t *lnk; 2566 2567 lnk = nxt_queue_first(&app->ports); 2568 nxt_queue_remove(lnk); 2569 2570 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 2571 2572 port->app_requests++; 2573 2574 if (app->live && 2575 (app->max_pending_responses == 0 || 2576 (port->app_requests - port->app_responses) < 2577 app->max_pending_responses) ) 2578 { 2579 nxt_queue_insert_tail(&app->ports, lnk); 2580 2581 } else { 2582 lnk->next = NULL; 2583 2584 (*use_delta)--; 2585 } 2586 2587 return port; 2588} 2589 2590 2591static nxt_port_t * 2592nxt_router_app_get_idle_port(nxt_app_t *app) 2593{ 2594 nxt_port_t *port; 2595 2596 port = NULL; 2597 2598 nxt_thread_mutex_lock(&app->mutex); 2599 2600 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 2601 2602 if (port->app_requests > port->app_responses) { 2603 port = NULL; 2604 2605 continue; 2606 } 2607 2608 nxt_queue_remove(&port->app_link); 2609 port->app_link.next = NULL; 2610 2611 break; 2612 2613 } nxt_queue_loop; 2614 2615 nxt_thread_mutex_unlock(&app->mutex); 2616 2617 return port; 2618} 2619 2620 2621static void 2622nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) 2623{ 2624 nxt_app_t *app; 2625 nxt_req_app_link_t *ra; 2626 2627 app = obj; 2628 ra = data; 2629 2630 nxt_assert(app != NULL); 2631 nxt_assert(ra != NULL); 2632 nxt_assert(ra->app_port != NULL); 2633 2634 nxt_debug(task, "app '%V' %p process next stream #%uD", 2635 &app->name, app, ra->stream); 2636 2637 nxt_router_process_http_request_mp(task, ra); 2638} 2639 2640 2641static void 2642nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 2643 uint32_t request_failed, uint32_t got_response) 2644{ 2645 int use_delta, ra_use_delta; 2646 nxt_app_t *app; 2647 nxt_bool_t send_quit; 2648 nxt_queue_link_t *lnk; 2649 nxt_req_app_link_t *ra; 2650 2651 nxt_assert(port != NULL); 2652 nxt_assert(port->app != NULL); 2653 2654 app = port->app; 2655 2656 use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1; 2657 2658 nxt_thread_mutex_lock(&app->mutex); 2659 2660 port->app_requests -= request_failed; 2661 port->app_responses += got_response; 2662 2663 if (app->live != 0 && 2664 port->pair[1] != -1 && 2665 port->app_link.next == NULL && 2666 (app->max_pending_responses == 0 || 2667 (port->app_requests - port->app_responses) < 2668 app->max_pending_responses) ) 2669 { 2670 nxt_queue_insert_tail(&app->ports, &port->app_link); 2671 use_delta++; 2672 } 2673 2674 if (app->live != 0 && 2675 !nxt_queue_is_empty(&app->ports) && 2676 !nxt_queue_is_empty(&app->requests)) 2677 { 2678 lnk = nxt_queue_first(&app->requests); 2679 nxt_queue_remove(lnk); 2680 lnk->next = NULL; 2681 2682 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); 2683 2684 ra_use_delta = 1; 2685 ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta); 2686 2687 } else { 2688 ra = NULL; 2689 ra_use_delta = 0; 2690 } 2691 2692 send_quit = app->live == 0 && port->app_requests == port->app_responses; 2693 2694 nxt_thread_mutex_unlock(&app->mutex); 2695 2696 if (ra != NULL) { 2697 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2698 nxt_router_app_process_request, 2699 &task->thread->engine->task, app, ra); 2700 2701 goto adjust_use; 2702 } 2703 2704 /* ? */ 2705 if (port->pair[1] == -1) { 2706 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 2707 &app->name, app, port, port->pid); 2708 2709 goto adjust_use; 2710 } 2711 2712 if (send_quit) { 2713 nxt_debug(task, "app '%V' %p is not alive, send QUIT to port", 2714 &app->name, app); 2715 2716 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, 2717 -1, 0, 0, NULL); 2718 2719 goto adjust_use; 2720 } 2721 2722 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 2723 &app->name, app); 2724 2725adjust_use: 2726 2727 if (use_delta != 0) { 2728 nxt_port_use(task, port, use_delta); 2729 } 2730 2731 if (ra_use_delta != 0) { 2732 nxt_port_use(task, ra->app_port, ra_use_delta); 2733 } 2734} 2735 2736 2737void 2738nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 2739{ 2740 nxt_app_t *app; 2741 nxt_bool_t unchain, start_worker; 2742 2743 app = port->app; 2744 2745 nxt_assert(app != NULL); 2746 2747 nxt_thread_mutex_lock(&app->mutex); 2748 2749 unchain = port->app_link.next != NULL; 2750 2751 if (unchain) { 2752 nxt_queue_remove(&port->app_link); 2753 port->app_link.next = NULL; 2754 } 2755 2756 app->workers--; 2757 2758 start_worker = app->live != 0 && 2759 nxt_queue_is_empty(&app->requests) == 0 && 2760 app->workers + app->pending_workers < app->max_workers; 2761 2762 if (start_worker) { 2763 app->pending_workers++; 2764 } 2765 2766 nxt_thread_mutex_unlock(&app->mutex); 2767 2768 nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port); 2769 2770 if (unchain) { 2771 nxt_port_use(task, port, -1); 2772 } 2773 2774 if (start_worker) { 2775 nxt_router_start_worker(task, app); 2776 } 2777} 2778 2779 2780static nxt_int_t 2781nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) 2782{ 2783 int use_delta; 2784 nxt_int_t res; 2785 nxt_app_t *app; 2786 nxt_bool_t can_start_worker; 2787 nxt_conn_t *c; 2788 nxt_port_t *port; 2789 nxt_event_engine_t *engine; 2790 nxt_socket_conf_joint_t *joint; 2791 2792 port = NULL; 2793 use_delta = 1; 2794 c = ra->rc->conn; 2795 2796 joint = c->joint; 2797 app = joint->socket_conf->application; 2798 2799 if (app == NULL) { 2800 nxt_router_gen_error(task, c, 500, 2801 "Application is NULL in socket_conf"); 2802 return NXT_ERROR; 2803 } 2804 2805 ra->rc->app = app; 2806 2807 nxt_router_app_use(task, app, 1); 2808 2809 engine = task->thread->engine; 2810 2811 nxt_timer_disable(engine, &c->read_timer); 2812 2813 if (app->timeout != 0) { 2814 c->read_timer.handler = nxt_router_app_timeout; 2815 nxt_timer_add(engine, &c->read_timer, app->timeout); 2816 } 2817 2818 can_start_worker = 0; 2819 2820 nxt_thread_mutex_lock(&app->mutex); 2821 2822 if (!nxt_queue_is_empty(&app->ports)) { 2823 port = nxt_router_app_get_port_unsafe(app, &use_delta); 2824 2825 } else { 2826 ra = nxt_router_ra_create(task, ra); 2827 2828 if (nxt_fast_path(ra != NULL)) { 2829 nxt_queue_insert_tail(&app->requests, &ra->link); 2830 2831 can_start_worker = (app->workers + app->pending_workers) < 2832 app->max_workers; 2833 if (can_start_worker) { 2834 app->pending_workers++; 2835 } 2836 } 2837 2838 port = NULL; 2839 } 2840 2841 nxt_thread_mutex_unlock(&app->mutex); 2842 2843 if (nxt_slow_path(ra == NULL)) { 2844 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2845 "req<->app link"); 2846 return NXT_ERROR; 2847 } 2848 2849 if (port != NULL) { 2850 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); 2851 2852 ra->app_port = port; 2853 2854 if (use_delta != 0) { 2855 nxt_port_use(task, port, use_delta); 2856 } 2857 return NXT_OK; 2858 } 2859 2860 nxt_debug(task, "ra stream #%uD allocated", ra->stream); 2861 2862 if (!can_start_worker) { 2863 nxt_debug(task, "app '%V' %p too many running or pending workers", 2864 &app->name, app); 2865 2866 return NXT_AGAIN; 2867 } 2868 2869 res = nxt_router_start_worker(task, app); 2870 2871 if (nxt_slow_path(res != NXT_OK)) { 2872 nxt_router_gen_error(task, c, 500, "Failed to start worker"); 2873 2874 return NXT_ERROR; 2875 } 2876 2877 return NXT_AGAIN; 2878} 2879 2880 2881static void 2882nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) 2883{ 2884 size_t size; 2885 nxt_int_t ret; 2886 nxt_buf_t *buf; 2887 nxt_conn_t *c; 2888 nxt_sockaddr_t *local; 2889 nxt_app_parse_ctx_t *ap; 2890 nxt_app_request_body_t *b; 2891 nxt_socket_conf_joint_t *joint; 2892 nxt_app_request_header_t *h; 2893 2894 c = obj; 2895 ap = data; 2896 buf = c->read; 2897 joint = c->joint; 2898 2899 nxt_debug(task, "router conn http header parse"); 2900 2901 if (ap == NULL) { 2902 ap = nxt_app_http_req_init(task); 2903 if (nxt_slow_path(ap == NULL)) { 2904 nxt_router_gen_error(task, c, 500, 2905 "Failed to allocate parse context"); 2906 return; 2907 } 2908 2909 c->socket.data = ap; 2910 2911 ap->r.remote.start = nxt_sockaddr_address(c->remote); 2912 ap->r.remote.length = c->remote->address_length; 2913 2914 /* 2915 * TODO: need an application flag to get local address 2916 * required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go. 2917 */ 2918 local = nxt_router_local_addr(task, c); 2919 2920 if (nxt_fast_path(local != NULL)) { 2921 ap->r.local.start = nxt_sockaddr_address(local); 2922 ap->r.local.length = local->address_length; 2923 } 2924 2925 ap->r.header.buf = buf; 2926 } 2927 2928 h = &ap->r.header; 2929 b = &ap->r.body; 2930 2931 ret = nxt_app_http_req_header_parse(task, ap, buf); 2932 2933 nxt_debug(task, "http parse request header: %d", ret); 2934 2935 switch (nxt_expect(NXT_DONE, ret)) { 2936 2937 case NXT_DONE: 2938 nxt_debug(task, "router request header parsing complete, " 2939 "content length: %O, preread: %uz", 2940 h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem)); 2941 2942 if (b->done) { 2943 nxt_router_process_http_request(task, c, ap); 2944 2945 return; 2946 } 2947 2948 if (joint->socket_conf->max_body_size > 0 2949 && (size_t) h->parsed_content_length 2950 > joint->socket_conf->max_body_size) 2951 { 2952 nxt_router_gen_error(task, c, 413, "Content-Length too big"); 2953 return; 2954 } 2955 2956 if (nxt_buf_mem_free_size(&buf->mem) == 0) { 2957 size = nxt_min(joint->socket_conf->body_buffer_size, 2958 (size_t) h->parsed_content_length); 2959 2960 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); 2961 if (nxt_slow_path(buf->next == NULL)) { 2962 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2963 "buffer for request body"); 2964 return; 2965 } 2966 2967 c->read = buf->next; 2968 2969 b->preread_size += nxt_buf_mem_used_size(&buf->mem); 2970 } 2971 2972 if (b->buf == NULL) { 2973 b->buf = c->read; 2974 } 2975 2976 c->read_state = &nxt_router_conn_read_body_state; 2977 break; 2978 2979 case NXT_ERROR: 2980 nxt_router_gen_error(task, c, 400, "Request header parse error"); 2981 return; 2982 2983 default: /* NXT_AGAIN */ 2984 2985 if (c->read->mem.free == c->read->mem.end) { 2986 size = joint->socket_conf->large_header_buffer_size; 2987 2988 if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) 2989 || ap->r.header.bufs 2990 >= joint->socket_conf->large_header_buffers) 2991 { 2992 nxt_router_gen_error(task, c, 413, 2993 "Too long request headers"); 2994 return; 2995 } 2996 2997 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); 2998 if (nxt_slow_path(buf->next == NULL)) { 2999 nxt_router_gen_error(task, c, 500, 3000 "Failed to allocate large header " 3001 "buffer"); 3002 return; 3003 } 3004 3005 ap->r.header.bufs++; 3006 3007 size = c->read->mem.free - c->read->mem.pos; 3008 3009 c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size); 3010 } 3011 3012 } 3013 3014 nxt_conn_read(task->thread->engine, c); 3015} 3016 3017 3018static nxt_sockaddr_t * 3019nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c) 3020{ 3021 int ret; 3022 size_t size, length; 3023 socklen_t socklen; 3024 nxt_sockaddr_t *sa; 3025 3026 if (c->local != NULL) { 3027 return c->local; 3028 } 3029 3030 /* AF_UNIX should not get in here. */ 3031 3032 switch (c->remote->u.sockaddr.sa_family) { 3033#if (NXT_INET6) 3034 case AF_INET6: 3035 socklen = sizeof(struct sockaddr_in6); 3036 length = NXT_INET6_ADDR_STR_LEN; 3037 size = offsetof(nxt_sockaddr_t, u) + socklen + length; 3038 break; 3039#endif 3040 case AF_INET: 3041 default: 3042 socklen = sizeof(struct sockaddr_in); 3043 length = NXT_INET_ADDR_STR_LEN; 3044 size = offsetof(nxt_sockaddr_t, u) + socklen + length; 3045 break; 3046 } 3047 3048 sa = nxt_mp_get(c->mem_pool, size); 3049 if (nxt_slow_path(sa == NULL)) { 3050 return NULL; 3051 } 3052 3053 sa->socklen = socklen; 3054 sa->length = length; 3055 3056 ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen); 3057 if (nxt_slow_path(ret != 0)) { 3058 nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd); 3059 return NULL; 3060 } 3061 3062 c->local = sa; 3063 3064 nxt_sockaddr_text(sa); 3065 3066 /* 3067 * TODO: here we can adjust the end of non-freeable block 3068 * in c->mem_pool to the end of actual sockaddr length. 3069 */ 3070 3071 return sa; 3072} 3073 3074 3075static void 3076nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data) 3077{ 3078 size_t size; 3079 nxt_int_t ret; 3080 nxt_buf_t *buf; 3081 nxt_conn_t *c; 3082 nxt_app_parse_ctx_t *ap; 3083 nxt_app_request_body_t *b; 3084 nxt_socket_conf_joint_t *joint; 3085 nxt_app_request_header_t *h; 3086 3087 c = obj; 3088 ap = data; 3089 buf = c->read; 3090 3091 nxt_debug(task, "router conn http body read"); 3092 3093 nxt_assert(ap != NULL); 3094 3095 b = &ap->r.body; 3096 h = &ap->r.header; 3097 3098 ret = nxt_app_http_req_body_read(task, ap, buf); 3099 3100 nxt_debug(task, "http read request body: %d", ret); 3101 3102 switch (nxt_expect(NXT_DONE, ret)) { 3103 3104 case NXT_DONE: 3105 nxt_router_process_http_request(task, c, ap); 3106 return; 3107 3108 case NXT_ERROR: 3109 nxt_router_gen_error(task, c, 500, "Read body error"); 3110 return; 3111 3112 default: /* NXT_AGAIN */ 3113 3114 if (nxt_buf_mem_free_size(&buf->mem) == 0) { 3115 joint = c->joint; 3116 3117 b->preread_size += nxt_buf_mem_used_size(&buf->mem); 3118 3119 size = nxt_min(joint->socket_conf->body_buffer_size, 3120 (size_t) h->parsed_content_length - b->preread_size); 3121 3122 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); 3123 if (nxt_slow_path(buf->next == NULL)) { 3124 nxt_router_gen_error(task, c, 500, "Failed to allocate " 3125 "buffer for request body"); 3126 return; 3127 } 3128 3129 c->read = buf->next; 3130 } 3131 3132 nxt_debug(task, "router request body read again, rest: %uz", 3133 h->parsed_content_length - b->preread_size); 3134 } 3135 3136 nxt_conn_read(task->thread->engine, c); 3137} 3138 3139 3140static void 3141nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, 3142 nxt_app_parse_ctx_t *ap) 3143{ 3144 nxt_int_t res; 3145 nxt_port_t *port; 3146 nxt_event_engine_t *engine; 3147 nxt_req_app_link_t ra_local, *ra; 3148 nxt_req_conn_link_t *rc; 3149 3150 engine = task->thread->engine; 3151 3152 rc = nxt_port_rpc_register_handler_ex(task, engine->port, 3153 nxt_router_response_ready_handler, 3154 nxt_router_response_error_handler, 3155 sizeof(nxt_req_conn_link_t)); 3156 3157 if (nxt_slow_path(rc == NULL)) { 3158 nxt_router_gen_error(task, c, 500, "Failed to allocate " 3159 "req<->conn link"); 3160 3161 return; 3162 } 3163 3164 rc->stream = nxt_port_rpc_ex_stream(rc); 3165 rc->conn = c; 3166 3167 nxt_queue_insert_tail(&c->requests, &rc->link); 3168 3169 nxt_debug(task, "stream #%uD linked to conn %p at engine %p", 3170 rc->stream, c, engine); 3171 3172 rc->ap = ap; 3173 c->socket.data = NULL; 3174 3175 ra = &ra_local; 3176 nxt_router_ra_init(task, ra, rc); 3177 3178 res = nxt_router_app_port(task, ra); 3179 3180 if (res != NXT_OK) { 3181 return; 3182 } 3183 3184 port = ra->app_port; 3185 3186 if (nxt_slow_path(port == NULL)) { 3187 nxt_router_gen_error(task, c, 500, "Application port not found"); 3188 return; 3189 } 3190 3191 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); 3192 3193 nxt_router_process_http_request_mp(task, ra); 3194} 3195 3196 3197static void 3198nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) 3199{ 3200 uint32_t request_failed; 3201 nxt_int_t res; 3202 nxt_port_t *port, *c_port, *reply_port; 3203 nxt_app_wmsg_t wmsg; 3204 nxt_app_parse_ctx_t *ap; 3205 3206 nxt_assert(ra->app_port != NULL); 3207 3208 port = ra->app_port; 3209 reply_port = ra->reply_port; 3210 ap = ra->ap; 3211 3212 request_failed = 1; 3213 3214 c_port = nxt_process_connected_port_find(port->process, reply_port->pid, 3215 reply_port->id); 3216 if (nxt_slow_path(c_port != reply_port)) { 3217 res = nxt_port_send_port(task, port, reply_port, 0); 3218 3219 if (nxt_slow_path(res != NXT_OK)) { 3220 nxt_router_ra_error(task, ra, 500, 3221 "Failed to send reply port to application"); 3222 ra = NULL; 3223 goto release_port; 3224 } 3225 3226 nxt_process_connected_port_add(port->process, reply_port); 3227 } 3228 3229 wmsg.port = port; 3230 wmsg.write = NULL; 3231 wmsg.buf = &wmsg.write; 3232 wmsg.stream = ra->stream; 3233 3234 res = port->app->prepare_msg(task, &ap->r, &wmsg); 3235 3236 if (nxt_slow_path(res != NXT_OK)) { 3237 nxt_router_ra_error(task, ra, 500, 3238 "Failed to prepare message for application"); 3239 ra = NULL; 3240 goto release_port; 3241 } 3242 3243 nxt_debug(task, "about to send %d bytes buffer to worker port %d", 3244 nxt_buf_used_size(wmsg.write), 3245 wmsg.port->socket.fd); 3246 3247 request_failed = 0; 3248 3249 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, 3250 -1, ra->stream, reply_port->id, wmsg.write); 3251 3252 if (nxt_slow_path(res != NXT_OK)) { 3253 nxt_router_ra_error(task, ra, 500, 3254 "Failed to send message to application"); 3255 ra = NULL; 3256 goto release_port; 3257 } 3258 3259release_port: 3260 3261 nxt_router_app_port_release(task, port, request_failed, 0); 3262 3263 if (ra != NULL) { 3264 if (request_failed != 0) { 3265 ra->app_port = 0; 3266 } 3267 3268 nxt_router_ra_release(task, ra, ra->work.data); 3269 } 3270} 3271 3272 3273static nxt_int_t 3274nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 3275 nxt_app_wmsg_t *wmsg) 3276{ 3277 nxt_int_t rc; 3278 nxt_buf_t *b; 3279 nxt_http_field_t *field; 3280 nxt_app_request_header_t *h; 3281 3282 static const nxt_str_t prefix = nxt_string("HTTP_"); 3283 static const nxt_str_t eof = nxt_null_string; 3284 3285 h = &r->header; 3286 3287#define RC(S) \ 3288 do { \ 3289 rc = (S); \ 3290 if (nxt_slow_path(rc != NXT_OK)) { \ 3291 goto fail; \ 3292 } \ 3293 } while(0) 3294 3295#define NXT_WRITE(N) \ 3296 RC(nxt_app_msg_write_str(task, wmsg, N)) 3297 3298 /* TODO error handle, async mmap buffer assignment */ 3299 3300 NXT_WRITE(&h->method); 3301 NXT_WRITE(&h->target); 3302 3303 if (h->path.start == h->target.start) { 3304 NXT_WRITE(&eof); 3305 3306 } else { 3307 NXT_WRITE(&h->path); 3308 } 3309 3310 if (h->query.start != NULL) { 3311 RC(nxt_app_msg_write_size(task, wmsg, 3312 h->query.start - h->target.start + 1)); 3313 } else { 3314 RC(nxt_app_msg_write_size(task, wmsg, 0)); 3315 } 3316 3317 NXT_WRITE(&h->version); 3318 3319 NXT_WRITE(&r->remote); 3320 NXT_WRITE(&r->local); 3321 3322 NXT_WRITE(&h->host); 3323 NXT_WRITE(&h->content_type); 3324 NXT_WRITE(&h->content_length); 3325 3326 nxt_list_each(field, h->fields) { 3327 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, 3328 &prefix, &field->name)); 3329 NXT_WRITE(&field->value); 3330 3331 } nxt_list_loop; 3332 3333 /* end-of-headers mark */ 3334 NXT_WRITE(&eof); 3335 3336 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 3337 3338 for(b = r->body.buf; b != NULL; b = b->next) { 3339 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3340 nxt_buf_mem_used_size(&b->mem))); 3341 } 3342 3343#undef NXT_WRITE 3344#undef RC 3345 3346 return NXT_OK; 3347 3348fail: 3349 3350 return NXT_ERROR; 3351} 3352 3353 3354static nxt_int_t 3355nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 3356 nxt_app_wmsg_t *wmsg) 3357{ 3358 nxt_int_t rc; 3359 nxt_buf_t *b; 3360 nxt_bool_t method_is_post; 3361 nxt_http_field_t *field; 3362 nxt_app_request_header_t *h; 3363 3364 static const nxt_str_t prefix = nxt_string("HTTP_"); 3365 static const nxt_str_t eof = nxt_null_string; 3366 3367 h = &r->header; 3368 3369#define RC(S) \ 3370 do { \ 3371 rc = (S); \ 3372 if (nxt_slow_path(rc != NXT_OK)) { \ 3373 goto fail; \ 3374 } \ 3375 } while(0) 3376 3377#define NXT_WRITE(N) \ 3378 RC(nxt_app_msg_write_str(task, wmsg, N)) 3379 3380 /* TODO error handle, async mmap buffer assignment */ 3381 3382 NXT_WRITE(&h->method); 3383 NXT_WRITE(&h->target); 3384 3385 if (h->path.start == h->target.start) { 3386 NXT_WRITE(&eof); 3387 3388 } else { 3389 NXT_WRITE(&h->path); 3390 } 3391 3392 if (h->query.start != NULL) { 3393 RC(nxt_app_msg_write_size(task, wmsg, 3394 h->query.start - h->target.start + 1)); 3395 } else { 3396 RC(nxt_app_msg_write_size(task, wmsg, 0)); 3397 } 3398 3399 NXT_WRITE(&h->version); 3400 3401 // PHP_SELF 3402 // SCRIPT_NAME 3403 // SCRIPT_FILENAME 3404 // DOCUMENT_ROOT 3405 3406 NXT_WRITE(&r->remote); 3407 NXT_WRITE(&r->local); 3408 3409 NXT_WRITE(&h->host); 3410 NXT_WRITE(&h->cookie); 3411 NXT_WRITE(&h->content_type); 3412 NXT_WRITE(&h->content_length); 3413 3414 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); 3415 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 3416 3417 method_is_post = h->method.length == 4 && 3418 h->method.start[0] == 'P' && 3419 h->method.start[1] == 'O' && 3420 h->method.start[2] == 'S' && 3421 h->method.start[3] == 'T'; 3422 3423 if (method_is_post) { 3424 for(b = r->body.buf; b != NULL; b = b->next) { 3425 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3426 nxt_buf_mem_used_size(&b->mem))); 3427 } 3428 } 3429 3430 nxt_list_each(field, h->fields) { 3431 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, 3432 &prefix, &field->name)); 3433 NXT_WRITE(&field->value); 3434 3435 } nxt_list_loop; 3436 3437 /* end-of-headers mark */ 3438 NXT_WRITE(&eof); 3439 3440 if (!method_is_post) { 3441 for(b = r->body.buf; b != NULL; b = b->next) { 3442 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3443 nxt_buf_mem_used_size(&b->mem))); 3444 } 3445 } 3446 3447#undef NXT_WRITE 3448#undef RC 3449 3450 return NXT_OK; 3451 3452fail: 3453 3454 return NXT_ERROR; 3455} 3456 3457 3458static nxt_int_t 3459nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) 3460{ 3461 nxt_int_t rc; 3462 nxt_buf_t *b; 3463 nxt_http_field_t *field; 3464 nxt_app_request_header_t *h; 3465 3466 static const nxt_str_t eof = nxt_null_string; 3467 3468 h = &r->header; 3469 3470#define RC(S) \ 3471 do { \ 3472 rc = (S); \ 3473 if (nxt_slow_path(rc != NXT_OK)) { \ 3474 goto fail; \ 3475 } \ 3476 } while(0) 3477 3478#define NXT_WRITE(N) \ 3479 RC(nxt_app_msg_write_str(task, wmsg, N)) 3480 3481 /* TODO error handle, async mmap buffer assignment */ 3482 3483 NXT_WRITE(&h->method); 3484 NXT_WRITE(&h->target); 3485 3486 if (h->path.start == h->target.start) { 3487 NXT_WRITE(&eof); 3488 3489 } else { 3490 NXT_WRITE(&h->path); 3491 } 3492 3493 if (h->query.start != NULL) { 3494 RC(nxt_app_msg_write_size(task, wmsg, 3495 h->query.start - h->target.start + 1)); 3496 } else { 3497 RC(nxt_app_msg_write_size(task, wmsg, 0)); 3498 } 3499 3500 NXT_WRITE(&h->version); 3501 NXT_WRITE(&r->remote); 3502 3503 NXT_WRITE(&h->host); 3504 NXT_WRITE(&h->cookie); 3505 NXT_WRITE(&h->content_type); 3506 NXT_WRITE(&h->content_length); 3507 3508 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); 3509 3510 nxt_list_each(field, h->fields) { 3511 NXT_WRITE(&field->name); 3512 NXT_WRITE(&field->value); 3513 3514 } nxt_list_loop; 3515 3516 /* end-of-headers mark */ 3517 NXT_WRITE(&eof); 3518 3519 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 3520 3521 for(b = r->body.buf; b != NULL; b = b->next) { 3522 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3523 nxt_buf_mem_used_size(&b->mem))); 3524 } 3525 3526#undef NXT_WRITE 3527#undef RC 3528 3529 return NXT_OK; 3530 3531fail: 3532 3533 return NXT_ERROR; 3534} 3535 3536 3537static const nxt_conn_state_t nxt_router_conn_close_state 3538 nxt_aligned(64) = 3539{ 3540 .ready_handler = nxt_router_conn_free, 3541}; 3542 3543 3544static void 3545nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data) 3546{ 3547 nxt_buf_t *b; 3548 nxt_bool_t last; 3549 nxt_conn_t *c; 3550 nxt_work_queue_t *wq; 3551 3552 nxt_debug(task, "router conn ready %p", obj); 3553 3554 c = obj; 3555 b = c->write; 3556 3557 wq = &task->thread->engine->fast_work_queue; 3558 3559 last = 0; 3560 3561 while (b != NULL) { 3562 if (!nxt_buf_is_sync(b)) { 3563 if (nxt_buf_used_size(b) > 0) { 3564 break; 3565 } 3566 } 3567 3568 if (nxt_buf_is_last(b)) { 3569 last = 1; 3570 } 3571 3572 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 3573 3574 b = b->next; 3575 } 3576 3577 c->write = b; 3578 3579 if (b != NULL) { 3580 nxt_debug(task, "router conn %p has more data to write", obj); 3581 3582 nxt_conn_write(task->thread->engine, c); 3583 3584 } else { 3585 nxt_debug(task, "router conn %p no more data to write, last = %d", obj, 3586 last); 3587 3588 if (last != 0) { 3589 nxt_debug(task, "enqueue router conn close %p (ready handler)", c); 3590 3591 nxt_work_queue_add(wq, nxt_router_conn_close, task, c, 3592 c->socket.data); 3593 } 3594 } 3595} 3596 3597 3598static void 3599nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) 3600{ 3601 nxt_conn_t *c; 3602 3603 c = obj; 3604 3605 nxt_debug(task, "router conn close"); 3606 3607 c->write_state = &nxt_router_conn_close_state; 3608 3609 nxt_conn_close(task->thread->engine, c); 3610} 3611 3612 3613static void 3614nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data) 3615{ 3616 nxt_socket_conf_joint_t *joint; 3617 3618 joint = obj; 3619 3620 nxt_router_conf_release(task, joint); 3621} 3622 3623 3624static void 3625nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) 3626{ 3627 nxt_conn_t *c; 3628 nxt_event_engine_t *engine; 3629 nxt_req_conn_link_t *rc; 3630 nxt_app_parse_ctx_t *ap; 3631 nxt_socket_conf_joint_t *joint; 3632 3633 c = obj; 3634 ap = data; 3635 3636 nxt_debug(task, "router conn close done"); 3637 3638 if (ap != NULL) { 3639 nxt_app_http_req_done(task, ap); 3640 3641 c->socket.data = NULL; 3642 } 3643 3644 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { 3645 3646 nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream); 3647 3648 nxt_router_rc_unlink(task, rc); 3649 3650 nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream); 3651 3652 } nxt_queue_loop; 3653 3654 nxt_queue_remove(&c->link); 3655 3656 engine = task->thread->engine; 3657 3658 nxt_sockaddr_cache_free(engine, c); 3659 3660 joint = c->joint; 3661 3662 nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup, 3663 &engine->task, joint, NULL); 3664
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> 9#include <nxt_conf.h> 10 11 12typedef struct { 13 nxt_str_t type; 14 uint32_t workers; 15 nxt_msec_t timeout; 16 uint32_t requests; 17 nxt_conf_value_t *limits_value; 18} nxt_router_app_conf_t; 19 20 21typedef struct { 22 nxt_str_t application; 23} nxt_router_listener_conf_t; 24 25 26typedef struct nxt_req_app_link_s nxt_req_app_link_t; 27 28 29typedef struct { 30 uint32_t stream; 31 nxt_conn_t *conn; 32 nxt_app_t *app; 33 nxt_port_t *app_port; 34 nxt_app_parse_ctx_t *ap; 35 nxt_req_app_link_t *ra; 36 37 nxt_queue_link_t link; /* for nxt_conn_t.requests */ 38} nxt_req_conn_link_t; 39 40 41struct nxt_req_app_link_s { 42 uint32_t stream; 43 nxt_port_t *app_port; 44 nxt_pid_t app_pid; 45 nxt_port_t *reply_port; 46 nxt_app_parse_ctx_t *ap; 47 nxt_req_conn_link_t *rc; 48 49 nxt_queue_link_t link; /* for nxt_app_t.requests */ 50 51 nxt_mp_t *mem_pool; 52 nxt_work_t work; 53 54 int err_code; 55 const char *err_str; 56}; 57 58 59typedef struct { 60 nxt_socket_conf_t *socket_conf; 61 nxt_router_temp_conf_t *temp_conf; 62} nxt_socket_rpc_t; 63 64 65static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); 66 67static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, 68 int code, const char* str); 69 70static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 71static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 72static void nxt_router_conf_ready(nxt_task_t *task, 73 nxt_router_temp_conf_t *tmcf); 74static void nxt_router_conf_error(nxt_task_t *task, 75 nxt_router_temp_conf_t *tmcf); 76static void nxt_router_conf_send(nxt_task_t *task, 77 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 78 79static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 80 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 81static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 82static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, 83 nxt_str_t *name); 84static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 85 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 86static void nxt_router_listen_socket_ready(nxt_task_t *task, 87 nxt_port_recv_msg_t *msg, void *data); 88static void nxt_router_listen_socket_error(nxt_task_t *task, 89 nxt_port_recv_msg_t *msg, void *data); 90static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 91 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 92static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 93 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 94 95static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 96 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 97 const nxt_event_interface_t *interface); 98static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 99 nxt_router_engine_conf_t *recf); 100static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 101 nxt_router_engine_conf_t *recf); 102static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 103 nxt_router_engine_conf_t *recf); 104static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 105 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 106 nxt_work_handler_t handler); 107static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 108 nxt_router_engine_conf_t *recf); 109static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 110 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 111 112static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 113 nxt_router_temp_conf_t *tmcf); 114static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 115 nxt_event_engine_t *engine); 116static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 117 nxt_router_temp_conf_t *tmcf); 118 119static void nxt_router_engines_post(nxt_router_t *router, 120 nxt_router_temp_conf_t *tmcf); 121static void nxt_router_engine_post(nxt_event_engine_t *engine, 122 nxt_work_t *jobs); 123 124static void nxt_router_thread_start(void *data); 125static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 126 void *data); 127static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 128 void *data); 129static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 130 void *data); 131static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 132 void *data); 133static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 134 void *data); 135static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 136 void *data); 137static void nxt_router_listen_socket_release(nxt_task_t *task, 138 nxt_socket_conf_t *skcf); 139static void nxt_router_conf_release(nxt_task_t *task, 140 nxt_socket_conf_joint_t *joint); 141 142static void nxt_router_app_port_ready(nxt_task_t *task, 143 nxt_port_recv_msg_t *msg, void *data); 144static void nxt_router_app_port_error(nxt_task_t *task, 145 nxt_port_recv_msg_t *msg, void *data); 146 147static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); 148static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 149 uint32_t request_failed, uint32_t got_response); 150 151static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); 152static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, 153 void *data); 154static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c); 155static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, 156 void *data); 157static void nxt_router_process_http_request(nxt_task_t *task, 158 nxt_conn_t *c, nxt_app_parse_ctx_t *ap); 159static void nxt_router_process_http_request_mp(nxt_task_t *task, 160 nxt_req_app_link_t *ra); 161static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 162 nxt_app_wmsg_t *wmsg); 163static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 164 nxt_app_wmsg_t *wmsg); 165static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 166 nxt_app_wmsg_t *wmsg); 167static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); 168static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); 169static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); 170static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); 171static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); 172static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 173static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); 174 175static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 176 const char* str); 177 178static nxt_router_t *nxt_router; 179 180 181static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = { 182 nxt_python_prepare_msg, 183 nxt_php_prepare_msg, 184 nxt_go_prepare_msg, 185}; 186 187 188nxt_int_t 189nxt_router_start(nxt_task_t *task, void *data) 190{ 191 nxt_int_t ret; 192 nxt_router_t *router; 193 nxt_runtime_t *rt; 194 195 rt = task->thread->runtime; 196 197 ret = nxt_app_http_init(task, rt); 198 if (nxt_slow_path(ret != NXT_OK)) { 199 return ret; 200 } 201 202 router = nxt_zalloc(sizeof(nxt_router_t)); 203 if (nxt_slow_path(router == NULL)) { 204 return NXT_ERROR; 205 } 206 207 nxt_queue_init(&router->engines); 208 nxt_queue_init(&router->sockets); 209 nxt_queue_init(&router->apps); 210 211 nxt_router = router; 212 213 return NXT_OK; 214} 215 216 217static void 218nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data) 219{ 220 size_t size; 221 uint32_t stream; 222 nxt_app_t *app; 223 nxt_buf_t *b; 224 nxt_port_t *main_port; 225 nxt_runtime_t *rt; 226 227 app = data; 228 229 rt = task->thread->runtime; 230 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 231 232 nxt_debug(task, "app '%V' %p start worker", &app->name, app); 233 234 size = app->name.length + 1 + app->conf.length; 235 236 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 237 238 if (nxt_slow_path(b == NULL)) { 239 goto failed; 240 } 241 242 nxt_buf_cpystr(b, &app->name); 243 *b->mem.free++ = '\0'; 244 nxt_buf_cpystr(b, &app->conf); 245 246 stream = nxt_port_rpc_register_handler(task, port, 247 nxt_router_app_port_ready, 248 nxt_router_app_port_error, 249 -1, app); 250 251 if (nxt_slow_path(stream == 0)) { 252 nxt_mp_release(b->data, b); 253 254 goto failed; 255 } 256 257 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 258 stream, port->id, b); 259 260 return; 261 262failed: 263 264 nxt_thread_mutex_lock(&app->mutex); 265 266 app->pending_workers--; 267 268 nxt_thread_mutex_unlock(&app->mutex); 269 270 nxt_router_app_use(task, app, -1); 271} 272 273 274static nxt_int_t 275nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) 276{ 277 nxt_int_t res; 278 nxt_port_t *router_port; 279 nxt_runtime_t *rt; 280 281 rt = task->thread->runtime; 282 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 283 284 nxt_router_app_use(task, app, 1); 285 286 res = nxt_port_post(task, router_port, nxt_router_start_worker_handler, 287 app); 288 289 if (res == NXT_OK) { 290 return res; 291 } 292 293 nxt_thread_mutex_lock(&app->mutex); 294 295 app->pending_workers--; 296 297 nxt_thread_mutex_unlock(&app->mutex); 298 299 nxt_router_app_use(task, app, -1); 300 301 return NXT_ERROR; 302} 303 304 305nxt_inline void 306nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra, 307 nxt_req_conn_link_t *rc) 308{ 309 nxt_event_engine_t *engine; 310 311 engine = task->thread->engine; 312 313 nxt_memzero(ra, sizeof(nxt_req_app_link_t)); 314 315 ra->stream = rc->stream; 316 ra->app_pid = -1; 317 ra->rc = rc; 318 rc->ra = ra; 319 ra->reply_port = engine->port; 320 ra->ap = rc->ap; 321 322 ra->work.handler = NULL; 323 ra->work.task = &engine->task; 324 ra->work.obj = ra; 325 ra->work.data = engine; 326} 327 328 329nxt_inline nxt_req_app_link_t * 330nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) 331{ 332 nxt_mp_t *mp; 333 nxt_req_app_link_t *ra; 334 335 mp = ra_src->ap->mem_pool; 336 337 ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); 338 339 if (nxt_slow_path(ra == NULL)) { 340 341 ra_src->rc->ra = NULL; 342 ra_src->rc = NULL; 343 344 return NULL; 345 } 346 347 nxt_router_ra_init(task, ra, ra_src->rc); 348 349 ra->mem_pool = mp; 350 351 return ra; 352} 353 354 355static void 356nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) 357{ 358 nxt_req_app_link_t *ra; 359 nxt_event_engine_t *engine; 360 nxt_req_conn_link_t *rc; 361 362 ra = obj; 363 engine = data; 364 365 if (task->thread->engine != engine) { 366 if (ra->app_port != NULL) { 367 ra->app_pid = ra->app_port->pid; 368 } 369 370 ra->work.handler = nxt_router_ra_release; 371 ra->work.task = &engine->task; 372 ra->work.next = NULL; 373 374 nxt_debug(task, "ra stream #%uD post release to %p", 375 ra->stream, engine); 376 377 nxt_event_engine_post(engine, &ra->work); 378 379 return; 380 } 381 382 nxt_debug(task, "ra stream #%uD release", ra->stream); 383 384 rc = ra->rc; 385 386 if (rc != NULL) { 387 if (ra->app_pid != -1) { 388 nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid); 389 } 390 391 rc->app_port = ra->app_port; 392 393 ra->app_port = NULL; 394 rc->ra = NULL; 395 ra->rc = NULL; 396 } 397 398 if (ra->app_port != NULL) { 399 nxt_router_app_port_release(task, ra->app_port, 0, 1); 400 401 ra->app_port = NULL; 402 } 403 404 if (ra->mem_pool != NULL) { 405 nxt_mp_release(ra->mem_pool, ra); 406 } 407} 408 409 410static void 411nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) 412{ 413 nxt_conn_t *c; 414 nxt_req_app_link_t *ra; 415 nxt_req_conn_link_t *rc; 416 nxt_event_engine_t *engine; 417 418 ra = obj; 419 engine = data; 420 421 if (task->thread->engine != engine) { 422 ra->work.handler = nxt_router_ra_abort; 423 ra->work.task = &engine->task; 424 ra->work.next = NULL; 425 426 nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine); 427 428 nxt_event_engine_post(engine, &ra->work); 429 430 return; 431 } 432 433 nxt_debug(task, "ra stream #%uD abort", ra->stream); 434 435 rc = ra->rc; 436 437 if (rc != NULL) { 438 c = rc->conn; 439 440 nxt_router_gen_error(task, c, 500, 441 "Failed to start application worker"); 442 443 rc->ra = NULL; 444 ra->rc = NULL; 445 } 446 447 if (ra->app_port != NULL) { 448 nxt_router_app_port_release(task, ra->app_port, 0, 1); 449 450 ra->app_port = NULL; 451 } 452 453 if (ra->mem_pool != NULL) { 454 nxt_mp_release(ra->mem_pool, ra); 455 } 456} 457 458 459static void 460nxt_router_ra_error_handler(nxt_task_t *task, void *obj, void *data) 461{ 462 nxt_req_app_link_t *ra; 463 464 ra = obj; 465 466 nxt_router_ra_error(task, ra, ra->err_code, ra->err_str); 467} 468 469 470static void 471nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code, 472 const char* str) 473{ 474 nxt_conn_t *c; 475 nxt_req_conn_link_t *rc; 476 nxt_event_engine_t *engine; 477 478 engine = ra->work.data; 479 480 if (task->thread->engine != engine) { 481 ra->err_code = code; 482 ra->err_str = str; 483 484 ra->work.handler = nxt_router_ra_error_handler; 485 ra->work.task = &engine->task; 486 ra->work.next = NULL; 487 488 nxt_debug(task, "ra stream #%uD post error to %p", ra->stream, engine); 489 490 nxt_event_engine_post(engine, &ra->work); 491 492 return; 493 } 494 495 nxt_debug(task, "ra stream #%uD error", ra->stream); 496 497 rc = ra->rc; 498 499 if (rc != NULL) { 500 c = rc->conn; 501 502 nxt_router_gen_error(task, c, code, str); 503 504 rc->ra = NULL; 505 ra->rc = NULL; 506 } 507 508 if (ra->app_port != NULL) { 509 nxt_router_app_port_release(task, ra->app_port, 0, 1); 510 511 ra->app_port = NULL; 512 } 513 514 if (ra->mem_pool != NULL) { 515 nxt_mp_release(ra->mem_pool, ra); 516 } 517} 518 519 520nxt_inline void 521nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) 522{ 523 nxt_req_app_link_t *ra; 524 525 if (rc->app_port != NULL) { 526 nxt_router_app_port_release(task, rc->app_port, 0, 1); 527 528 rc->app_port = NULL; 529 } 530 531 ra = rc->ra; 532 533 if (ra != NULL) { 534 rc->ra = NULL; 535 ra->rc = NULL; 536 537 nxt_thread_mutex_lock(&rc->app->mutex); 538 539 if (ra->link.next != NULL) { 540 nxt_queue_remove(&ra->link); 541 542 ra->link.next = NULL; 543 544 } else { 545 ra = NULL; 546 } 547 548 nxt_thread_mutex_unlock(&rc->app->mutex); 549 } 550 551 if (ra != NULL) { 552 nxt_router_ra_release(task, ra, ra->work.data); 553 } 554 555 if (rc->app != NULL) { 556 nxt_router_app_use(task, rc->app, -1); 557 558 rc->app = NULL; 559 } 560 561 if (rc->ap != NULL) { 562 nxt_app_http_req_done(task, rc->ap); 563 564 rc->ap = NULL; 565 } 566 567 nxt_queue_remove(&rc->link); 568 569 rc->conn = NULL; 570} 571 572 573void 574nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 575{ 576 nxt_port_new_port_handler(task, msg); 577 578 if (msg->port_msg.stream == 0) { 579 return; 580 } 581 582 if (msg->u.new_port == NULL || 583 msg->u.new_port->type != NXT_PROCESS_WORKER) 584 { 585 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 586 } 587 588 nxt_port_rpc_handler(task, msg); 589} 590 591 592void 593nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 594{ 595 nxt_int_t ret; 596 nxt_buf_t *b; 597 nxt_router_temp_conf_t *tmcf; 598 599 tmcf = nxt_router_temp_conf(task); 600 if (nxt_slow_path(tmcf == NULL)) { 601 return; 602 } 603 604 b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size); 605 606 nxt_assert(b != NULL); 607 608 tmcf->conf->router = nxt_router; 609 tmcf->stream = msg->port_msg.stream; 610 tmcf->port = nxt_runtime_port_find(task->thread->runtime, 611 msg->port_msg.pid, 612 msg->port_msg.reply_port); 613 614 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); 615 616 if (nxt_fast_path(ret == NXT_OK)) { 617 nxt_router_conf_apply(task, tmcf, NULL); 618 619 } else { 620 nxt_router_conf_error(task, tmcf); 621 } 622} 623 624 625static void 626nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data) 627{ 628 union { 629 nxt_pid_t removed_pid; 630 void *data; 631 } u; 632 633 u.data = data; 634 635 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 636} 637 638 639void 640nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 641{ 642 nxt_event_engine_t *engine; 643 644 nxt_port_remove_pid_handler(task, msg); 645 646 if (msg->port_msg.stream == 0) { 647 return; 648 } 649 650 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 651 { 652 nxt_port_post(task, engine->port, nxt_router_worker_remove_pid, 653 msg->u.data); 654 } 655 nxt_queue_loop; 656 657 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 658 659 nxt_port_rpc_handler(task, msg); 660} 661 662 663static nxt_router_temp_conf_t * 664nxt_router_temp_conf(nxt_task_t *task) 665{ 666 nxt_mp_t *mp, *tmp; 667 nxt_router_conf_t *rtcf; 668 nxt_router_temp_conf_t *tmcf; 669 670 mp = nxt_mp_create(1024, 128, 256, 32); 671 if (nxt_slow_path(mp == NULL)) { 672 return NULL; 673 } 674 675 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 676 if (nxt_slow_path(rtcf == NULL)) { 677 goto fail; 678 } 679 680 rtcf->mem_pool = mp; 681 682 tmp = nxt_mp_create(1024, 128, 256, 32); 683 if (nxt_slow_path(tmp == NULL)) { 684 goto fail; 685 } 686 687 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 688 if (nxt_slow_path(tmcf == NULL)) { 689 goto temp_fail; 690 } 691 692 tmcf->mem_pool = tmp; 693 tmcf->conf = rtcf; 694 tmcf->count = 1; 695 tmcf->engine = task->thread->engine; 696 697 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 698 sizeof(nxt_router_engine_conf_t)); 699 if (nxt_slow_path(tmcf->engines == NULL)) { 700 goto temp_fail; 701 } 702 703 nxt_queue_init(&tmcf->deleting); 704 nxt_queue_init(&tmcf->keeping); 705 nxt_queue_init(&tmcf->updating); 706 nxt_queue_init(&tmcf->pending); 707 nxt_queue_init(&tmcf->creating); 708 nxt_queue_init(&tmcf->apps); 709 nxt_queue_init(&tmcf->previous); 710 711 return tmcf; 712 713temp_fail: 714 715 nxt_mp_destroy(tmp); 716 717fail: 718 719 nxt_mp_destroy(mp); 720 721 return NULL; 722} 723 724 725static void 726nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 727{ 728 nxt_int_t ret; 729 nxt_router_t *router; 730 nxt_runtime_t *rt; 731 nxt_queue_link_t *qlk; 732 nxt_socket_conf_t *skcf; 733 nxt_router_temp_conf_t *tmcf; 734 const nxt_event_interface_t *interface; 735 736 tmcf = obj; 737 738 qlk = nxt_queue_first(&tmcf->pending); 739 740 if (qlk != nxt_queue_tail(&tmcf->pending)) { 741 nxt_queue_remove(qlk); 742 nxt_queue_insert_tail(&tmcf->creating, qlk); 743 744 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 745 746 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 747 748 return; 749 } 750 751 rt = task->thread->runtime; 752 753 interface = nxt_service_get(rt->services, "engine", NULL); 754 755 router = tmcf->conf->router; 756 757 ret = nxt_router_engines_create(task, router, tmcf, interface); 758 if (nxt_slow_path(ret != NXT_OK)) { 759 goto fail; 760 } 761 762 ret = nxt_router_threads_create(task, rt, tmcf); 763 if (nxt_slow_path(ret != NXT_OK)) { 764 goto fail; 765 } 766 767 nxt_router_apps_sort(task, router, tmcf); 768 769 nxt_router_engines_post(router, tmcf); 770 771 nxt_queue_add(&router->sockets, &tmcf->updating); 772 nxt_queue_add(&router->sockets, &tmcf->creating); 773 774 nxt_router_conf_ready(task, tmcf); 775 776 return; 777 778fail: 779 780 nxt_router_conf_error(task, tmcf); 781 782 return; 783} 784 785 786static void 787nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 788{ 789 nxt_joint_job_t *job; 790 791 job = obj; 792 793 nxt_router_conf_ready(task, job->tmcf); 794} 795 796 797static void 798nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 799{ 800 nxt_debug(task, "temp conf count:%D", tmcf->count); 801 802 if (--tmcf->count == 0) { 803 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 804 } 805} 806 807 808static void 809nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 810{ 811 nxt_socket_t s; 812 nxt_router_t *router; 813 nxt_queue_link_t *qlk; 814 nxt_socket_conf_t *skcf; 815 816 nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf"); 817 818 for (qlk = nxt_queue_first(&tmcf->creating); 819 qlk != nxt_queue_tail(&tmcf->creating); 820 qlk = nxt_queue_next(qlk)) 821 { 822 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 823 s = skcf->listen->socket; 824 825 if (s != -1) { 826 nxt_socket_close(task, s); 827 } 828 829 nxt_free(skcf->listen); 830 } 831 832 router = tmcf->conf->router; 833 834 nxt_queue_add(&router->sockets, &tmcf->keeping); 835 nxt_queue_add(&router->sockets, &tmcf->deleting); 836 837 // TODO: new engines and threads 838 839 nxt_mp_destroy(tmcf->conf->mem_pool); 840 841 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 842} 843 844 845static void 846nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 847 nxt_port_msg_type_t type) 848{ 849 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 850} 851 852 853static nxt_conf_map_t nxt_router_conf[] = { 854 { 855 nxt_string("listeners_threads"), 856 NXT_CONF_MAP_INT32, 857 offsetof(nxt_router_conf_t, threads), 858 }, 859}; 860 861 862static nxt_conf_map_t nxt_router_app_conf[] = { 863 { 864 nxt_string("type"), 865 NXT_CONF_MAP_STR, 866 offsetof(nxt_router_app_conf_t, type), 867 }, 868 869 { 870 nxt_string("workers"), 871 NXT_CONF_MAP_INT32, 872 offsetof(nxt_router_app_conf_t, workers), 873 }, 874 875 { 876 nxt_string("limits"), 877 NXT_CONF_MAP_PTR, 878 offsetof(nxt_router_app_conf_t, limits_value), 879 }, 880}; 881 882 883static nxt_conf_map_t nxt_router_app_limits_conf[] = { 884 { 885 nxt_string("timeout"), 886 NXT_CONF_MAP_MSEC, 887 offsetof(nxt_router_app_conf_t, timeout), 888 }, 889 890 { 891 nxt_string("requests"), 892 NXT_CONF_MAP_INT32, 893 offsetof(nxt_router_app_conf_t, requests), 894 }, 895}; 896 897 898static nxt_conf_map_t nxt_router_listener_conf[] = { 899 { 900 nxt_string("application"), 901 NXT_CONF_MAP_STR, 902 offsetof(nxt_router_listener_conf_t, application), 903 }, 904}; 905 906 907static nxt_conf_map_t nxt_router_http_conf[] = { 908 { 909 nxt_string("header_buffer_size"), 910 NXT_CONF_MAP_SIZE, 911 offsetof(nxt_socket_conf_t, header_buffer_size), 912 }, 913 914 { 915 nxt_string("large_header_buffer_size"), 916 NXT_CONF_MAP_SIZE, 917 offsetof(nxt_socket_conf_t, large_header_buffer_size), 918 }, 919 920 { 921 nxt_string("large_header_buffers"), 922 NXT_CONF_MAP_SIZE, 923 offsetof(nxt_socket_conf_t, large_header_buffers), 924 }, 925 926 { 927 nxt_string("body_buffer_size"), 928 NXT_CONF_MAP_SIZE, 929 offsetof(nxt_socket_conf_t, body_buffer_size), 930 }, 931 932 { 933 nxt_string("max_body_size"), 934 NXT_CONF_MAP_SIZE, 935 offsetof(nxt_socket_conf_t, max_body_size), 936 }, 937 938 { 939 nxt_string("header_read_timeout"), 940 NXT_CONF_MAP_MSEC, 941 offsetof(nxt_socket_conf_t, header_read_timeout), 942 }, 943 944 { 945 nxt_string("body_read_timeout"), 946 NXT_CONF_MAP_MSEC, 947 offsetof(nxt_socket_conf_t, body_read_timeout), 948 }, 949}; 950 951 952static nxt_int_t 953nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 954 u_char *start, u_char *end) 955{ 956 u_char *p; 957 size_t size; 958 nxt_mp_t *mp; 959 uint32_t next; 960 nxt_int_t ret; 961 nxt_str_t name; 962 nxt_app_t *app, *prev; 963 nxt_router_t *router; 964 nxt_conf_value_t *conf, *http; 965 nxt_conf_value_t *applications, *application; 966 nxt_conf_value_t *listeners, *listener; 967 nxt_socket_conf_t *skcf; 968 nxt_app_lang_module_t *lang; 969 nxt_router_app_conf_t apcf; 970 nxt_router_listener_conf_t lscf; 971 972 static nxt_str_t http_path = nxt_string("/http"); 973 static nxt_str_t applications_path = nxt_string("/applications"); 974 static nxt_str_t listeners_path = nxt_string("/listeners"); 975 976 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 977 if (conf == NULL) { 978 nxt_log(task, NXT_LOG_CRIT, "configuration parsing error"); 979 return NXT_ERROR; 980 } 981 982 mp = tmcf->conf->mem_pool; 983 984 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 985 nxt_nitems(nxt_router_conf), tmcf->conf); 986 if (ret != NXT_OK) { 987 nxt_log(task, NXT_LOG_CRIT, "root map error"); 988 return NXT_ERROR; 989 } 990 991 if (tmcf->conf->threads == 0) { 992 tmcf->conf->threads = nxt_ncpu; 993 } 994 995 applications = nxt_conf_get_path(conf, &applications_path); 996 if (applications == NULL) { 997 nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block"); 998 return NXT_ERROR; 999 } 1000 1001 router = tmcf->conf->router; 1002 1003 next = 0; 1004 1005 for ( ;; ) { 1006 application = nxt_conf_next_object_member(applications, &name, &next); 1007 if (application == NULL) { 1008 break; 1009 } 1010 1011 nxt_debug(task, "application \"%V\"", &name); 1012 1013 size = nxt_conf_json_length(application, NULL); 1014 1015 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); 1016 if (app == NULL) { 1017 goto fail; 1018 } 1019 1020 nxt_memzero(app, sizeof(nxt_app_t)); 1021 1022 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1023 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length); 1024 1025 p = nxt_conf_json_print(app->conf.start, application, NULL); 1026 app->conf.length = p - app->conf.start; 1027 1028 nxt_assert(app->conf.length <= size); 1029 1030 nxt_debug(task, "application conf \"%V\"", &app->conf); 1031 1032 prev = nxt_router_app_find(&router->apps, &name); 1033 1034 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1035 nxt_free(app); 1036 1037 nxt_queue_remove(&prev->link); 1038 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1039 continue; 1040 } 1041 1042 apcf.workers = 1; 1043 apcf.timeout = 0; 1044 apcf.requests = 0; 1045 apcf.limits_value = NULL; 1046 1047 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1048 nxt_nitems(nxt_router_app_conf), &apcf); 1049 if (ret != NXT_OK) { 1050 nxt_log(task, NXT_LOG_CRIT, "application map error"); 1051 goto app_fail; 1052 } 1053 1054 if (apcf.limits_value != NULL) { 1055 1056 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1057 nxt_log(task, NXT_LOG_CRIT, "application limits is not object"); 1058 goto app_fail; 1059 } 1060 1061 ret = nxt_conf_map_object(mp, apcf.limits_value, 1062 nxt_router_app_limits_conf, 1063 nxt_nitems(nxt_router_app_limits_conf), 1064 &apcf); 1065 if (ret != NXT_OK) { 1066 nxt_log(task, NXT_LOG_CRIT, "application limits map error"); 1067 goto app_fail; 1068 } 1069 } 1070 1071 nxt_debug(task, "application type: %V", &apcf.type); 1072 nxt_debug(task, "application workers: %D", apcf.workers); 1073 nxt_debug(task, "application timeout: %D", apcf.timeout); 1074 nxt_debug(task, "application requests: %D", apcf.requests); 1075 1076 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1077 1078 if (lang == NULL) { 1079 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", 1080 &apcf.type); 1081 goto app_fail; 1082 } 1083 1084 nxt_debug(task, "application language module: \"%s\"", lang->file); 1085 1086 ret = nxt_thread_mutex_create(&app->mutex); 1087 if (ret != NXT_OK) { 1088 goto app_fail; 1089 } 1090 1091 nxt_queue_init(&app->ports); 1092 nxt_queue_init(&app->requests); 1093 1094 app->name.length = name.length; 1095 nxt_memcpy(app->name.start, name.start, name.length); 1096 1097 app->type = lang->type; 1098 app->max_workers = apcf.workers; 1099 app->timeout = apcf.timeout; 1100 app->live = 1; 1101 app->max_pending_responses = 2; 1102 app->prepare_msg = nxt_app_prepare_msg[lang->type]; 1103 1104 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1105 1106 nxt_router_app_use(task, app, 1); 1107 } 1108 1109 http = nxt_conf_get_path(conf, &http_path); 1110#if 0 1111 if (http == NULL) { 1112 nxt_log(task, NXT_LOG_CRIT, "no \"http\" block"); 1113 return NXT_ERROR; 1114 } 1115#endif 1116 1117 listeners = nxt_conf_get_path(conf, &listeners_path); 1118 if (listeners == NULL) { 1119 nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block"); 1120 return NXT_ERROR; 1121 } 1122 1123 next = 0; 1124 1125 for ( ;; ) { 1126 listener = nxt_conf_next_object_member(listeners, &name, &next); 1127 if (listener == NULL) { 1128 break; 1129 } 1130 1131 skcf = nxt_router_socket_conf(task, tmcf, &name); 1132 if (skcf == NULL) { 1133 goto fail; 1134 } 1135 1136 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1137 nxt_nitems(nxt_router_listener_conf), &lscf); 1138 if (ret != NXT_OK) { 1139 nxt_log(task, NXT_LOG_CRIT, "listener map error"); 1140 goto fail; 1141 } 1142 1143 nxt_debug(task, "application: %V", &lscf.application); 1144 1145 // STUB, default values if http block is not defined. 1146 skcf->header_buffer_size = 2048; 1147 skcf->large_header_buffer_size = 8192; 1148 skcf->large_header_buffers = 4; 1149 skcf->body_buffer_size = 16 * 1024; 1150 skcf->max_body_size = 2 * 1024 * 1024; 1151 skcf->header_read_timeout = 5000; 1152 skcf->body_read_timeout = 5000; 1153 1154 if (http != NULL) { 1155 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1156 nxt_nitems(nxt_router_http_conf), skcf); 1157 if (ret != NXT_OK) { 1158 nxt_log(task, NXT_LOG_CRIT, "http map error"); 1159 goto fail; 1160 } 1161 } 1162 1163 skcf->listen->handler = nxt_router_conn_init; 1164 skcf->router_conf = tmcf->conf; 1165 skcf->router_conf->count++; 1166 skcf->application = nxt_router_listener_application(tmcf, 1167 &lscf.application); 1168 } 1169 1170 nxt_queue_add(&tmcf->deleting, &router->sockets); 1171 nxt_queue_init(&router->sockets); 1172 1173 return NXT_OK; 1174 1175app_fail: 1176 1177 nxt_free(app); 1178 1179fail: 1180 1181 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1182 1183 nxt_queue_remove(&app->link); 1184 nxt_thread_mutex_destroy(&app->mutex); 1185 nxt_free(app); 1186 1187 } nxt_queue_loop; 1188 1189 return NXT_ERROR; 1190} 1191 1192 1193static nxt_app_t * 1194nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1195{ 1196 nxt_app_t *app; 1197 1198 nxt_queue_each(app, queue, nxt_app_t, link) { 1199 1200 if (nxt_strstr_eq(name, &app->name)) { 1201 return app; 1202 } 1203 1204 } nxt_queue_loop; 1205 1206 return NULL; 1207} 1208 1209 1210static nxt_app_t * 1211nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) 1212{ 1213 nxt_app_t *app; 1214 1215 app = nxt_router_app_find(&tmcf->apps, name); 1216 1217 if (app == NULL) { 1218 app = nxt_router_app_find(&tmcf->previous, name); 1219 } 1220 1221 return app; 1222} 1223 1224 1225static nxt_socket_conf_t * 1226nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1227 nxt_str_t *name) 1228{ 1229 size_t size; 1230 nxt_int_t ret; 1231 nxt_bool_t wildcard; 1232 nxt_sockaddr_t *sa; 1233 nxt_socket_conf_t *skcf; 1234 nxt_listen_socket_t *ls; 1235 1236 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 1237 if (nxt_slow_path(sa == NULL)) { 1238 nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", name); 1239 return NULL; 1240 } 1241 1242 sa->type = SOCK_STREAM; 1243 1244 nxt_debug(task, "router listener: \"%*s\"", 1245 sa->length, nxt_sockaddr_start(sa)); 1246 1247 skcf = nxt_mp_zget(tmcf->conf->mem_pool, sizeof(nxt_socket_conf_t)); 1248 if (nxt_slow_path(skcf == NULL)) { 1249 return NULL; 1250 } 1251 1252 size = nxt_sockaddr_size(sa); 1253 1254 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 1255 1256 if (ret != NXT_OK) { 1257 1258 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 1259 if (nxt_slow_path(ls == NULL)) { 1260 return NULL; 1261 } 1262 1263 skcf->listen = ls; 1264 1265 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 1266 nxt_memcpy(ls->sockaddr, sa, size); 1267 1268 nxt_listen_socket_remote_size(ls); 1269 1270 ls->socket = -1; 1271 ls->backlog = NXT_LISTEN_BACKLOG; 1272 ls->flags = NXT_NONBLOCK; 1273 ls->read_after_accept = 1; 1274 } 1275 1276 switch (sa->u.sockaddr.sa_family) { 1277#if (NXT_HAVE_UNIX_DOMAIN) 1278 case AF_UNIX: 1279 wildcard = 0; 1280 break; 1281#endif 1282#if (NXT_INET6) 1283 case AF_INET6: 1284 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 1285 break; 1286#endif 1287 case AF_INET: 1288 default: 1289 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 1290 break; 1291 } 1292 1293 if (!wildcard) { 1294 skcf->sockaddr = nxt_mp_zget(tmcf->conf->mem_pool, size); 1295 if (nxt_slow_path(skcf->sockaddr == NULL)) { 1296 return NULL; 1297 } 1298 1299 nxt_memcpy(skcf->sockaddr, sa, size); 1300 } 1301 1302 return skcf; 1303} 1304 1305 1306static nxt_int_t 1307nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 1308 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 1309{ 1310 nxt_router_t *router; 1311 nxt_queue_link_t *qlk; 1312 nxt_socket_conf_t *skcf; 1313 1314 router = tmcf->conf->router; 1315 1316 for (qlk = nxt_queue_first(&router->sockets); 1317 qlk != nxt_queue_tail(&router->sockets); 1318 qlk = nxt_queue_next(qlk)) 1319 { 1320 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1321 1322 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 1323 nskcf->listen = skcf->listen; 1324 1325 nxt_queue_remove(qlk); 1326 nxt_queue_insert_tail(&tmcf->keeping, qlk); 1327 1328 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link); 1329 1330 return NXT_OK; 1331 } 1332 } 1333 1334 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link); 1335 1336 return NXT_DECLINED; 1337} 1338 1339 1340static void 1341nxt_router_listen_socket_rpc_create(nxt_task_t *task, 1342 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 1343{ 1344 size_t size; 1345 uint32_t stream; 1346 nxt_buf_t *b; 1347 nxt_port_t *main_port, *router_port; 1348 nxt_runtime_t *rt; 1349 nxt_socket_rpc_t *rpc; 1350 1351 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 1352 if (rpc == NULL) { 1353 goto fail; 1354 } 1355 1356 rpc->socket_conf = skcf; 1357 rpc->temp_conf = tmcf; 1358 1359 size = nxt_sockaddr_size(skcf->listen->sockaddr); 1360 1361 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1362 if (b == NULL) { 1363 goto fail; 1364 } 1365 1366 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 1367 1368 rt = task->thread->runtime; 1369 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1370 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1371 1372 stream = nxt_port_rpc_register_handler(task, router_port, 1373 nxt_router_listen_socket_ready, 1374 nxt_router_listen_socket_error, 1375 main_port->pid, rpc); 1376 if (stream == 0) { 1377 goto fail; 1378 } 1379 1380 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 1381 stream, router_port->id, b); 1382 1383 return; 1384 1385fail: 1386 1387 nxt_router_conf_error(task, tmcf); 1388} 1389 1390 1391static void 1392nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1393 void *data) 1394{ 1395 nxt_int_t ret; 1396 nxt_socket_t s; 1397 nxt_socket_rpc_t *rpc; 1398 1399 rpc = data; 1400 1401 s = msg->fd; 1402 1403 ret = nxt_socket_nonblocking(task, s); 1404 if (nxt_slow_path(ret != NXT_OK)) { 1405 goto fail; 1406 } 1407 1408 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 1409 1410 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 1411 if (nxt_slow_path(ret != NXT_OK)) { 1412 goto fail; 1413 } 1414 1415 rpc->socket_conf->listen->socket = s; 1416 1417 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 1418 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 1419 1420 return; 1421 1422fail: 1423 1424 nxt_socket_close(task, s); 1425 1426 nxt_router_conf_error(task, rpc->temp_conf); 1427} 1428 1429 1430static void 1431nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1432 void *data) 1433{ 1434 u_char *p; 1435 size_t size; 1436 uint8_t error; 1437 nxt_buf_t *in, *out; 1438 nxt_sockaddr_t *sa; 1439 nxt_socket_rpc_t *rpc; 1440 nxt_router_temp_conf_t *tmcf; 1441 1442 static nxt_str_t socket_errors[] = { 1443 nxt_string("ListenerSystem"), 1444 nxt_string("ListenerNoIPv6"), 1445 nxt_string("ListenerPort"), 1446 nxt_string("ListenerInUse"), 1447 nxt_string("ListenerNoAddress"), 1448 nxt_string("ListenerNoAccess"), 1449 nxt_string("ListenerPath"), 1450 }; 1451 1452 rpc = data; 1453 sa = rpc->socket_conf->listen->sockaddr; 1454 tmcf = rpc->temp_conf; 1455 1456 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 1457 1458 nxt_assert(in != NULL); 1459 1460 p = in->mem.pos; 1461 1462 error = *p++; 1463 1464 size = sizeof("listen socket error: ") - 1 1465 + sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1 1466 + sa->length + socket_errors[error].length + (in->mem.free - p); 1467 1468 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1469 if (nxt_slow_path(out == NULL)) { 1470 return; 1471 } 1472 1473 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 1474 "listen socket error: " 1475 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 1476 sa->length, nxt_sockaddr_start(sa), 1477 &socket_errors[error], in->mem.free - p, p); 1478 1479 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 1480 1481 nxt_router_conf_error(task, tmcf); 1482} 1483 1484 1485static nxt_int_t 1486nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 1487 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 1488{ 1489 nxt_int_t ret; 1490 nxt_uint_t n, threads; 1491 nxt_queue_link_t *qlk; 1492 nxt_router_engine_conf_t *recf; 1493 1494 threads = tmcf->conf->threads; 1495 1496 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 1497 sizeof(nxt_router_engine_conf_t)); 1498 if (nxt_slow_path(tmcf->engines == NULL)) { 1499 return NXT_ERROR; 1500 } 1501 1502 n = 0; 1503 1504 for (qlk = nxt_queue_first(&router->engines); 1505 qlk != nxt_queue_tail(&router->engines); 1506 qlk = nxt_queue_next(qlk)) 1507 { 1508 recf = nxt_array_zero_add(tmcf->engines); 1509 if (nxt_slow_path(recf == NULL)) { 1510 return NXT_ERROR; 1511 } 1512 1513 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 1514 1515 if (n < threads) { 1516 recf->action = NXT_ROUTER_ENGINE_KEEP; 1517 ret = nxt_router_engine_conf_update(tmcf, recf); 1518 1519 } else { 1520 recf->action = NXT_ROUTER_ENGINE_DELETE; 1521 ret = nxt_router_engine_conf_delete(tmcf, recf); 1522 } 1523 1524 if (nxt_slow_path(ret != NXT_OK)) { 1525 return ret; 1526 } 1527 1528 n++; 1529 } 1530 1531 tmcf->new_threads = n; 1532 1533 while (n < threads) { 1534 recf = nxt_array_zero_add(tmcf->engines); 1535 if (nxt_slow_path(recf == NULL)) { 1536 return NXT_ERROR; 1537 } 1538 1539 recf->action = NXT_ROUTER_ENGINE_ADD; 1540 1541 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 1542 if (nxt_slow_path(recf->engine == NULL)) { 1543 return NXT_ERROR; 1544 } 1545 1546 ret = nxt_router_engine_conf_create(tmcf, recf); 1547 if (nxt_slow_path(ret != NXT_OK)) { 1548 return ret; 1549 } 1550 1551 n++; 1552 } 1553 1554 return NXT_OK; 1555} 1556 1557 1558static nxt_int_t 1559nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 1560 nxt_router_engine_conf_t *recf) 1561{ 1562 nxt_int_t ret; 1563 1564 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 1565 nxt_router_listen_socket_create); 1566 if (nxt_slow_path(ret != NXT_OK)) { 1567 return ret; 1568 } 1569 1570 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 1571 nxt_router_listen_socket_create); 1572 if (nxt_slow_path(ret != NXT_OK)) { 1573 return ret; 1574 } 1575 1576 return ret; 1577} 1578 1579 1580static nxt_int_t 1581nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 1582 nxt_router_engine_conf_t *recf) 1583{ 1584 nxt_int_t ret; 1585 1586 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 1587 nxt_router_listen_socket_create); 1588 if (nxt_slow_path(ret != NXT_OK)) { 1589 return ret; 1590 } 1591 1592 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 1593 nxt_router_listen_socket_update); 1594 if (nxt_slow_path(ret != NXT_OK)) { 1595 return ret; 1596 } 1597 1598 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 1599 if (nxt_slow_path(ret != NXT_OK)) { 1600 return ret; 1601 } 1602 1603 return ret; 1604} 1605 1606 1607static nxt_int_t 1608nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 1609 nxt_router_engine_conf_t *recf) 1610{ 1611 nxt_int_t ret; 1612 1613 ret = nxt_router_engine_quit(tmcf, recf); 1614 if (nxt_slow_path(ret != NXT_OK)) { 1615 return ret; 1616 } 1617 1618 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); 1619 if (nxt_slow_path(ret != NXT_OK)) { 1620 return ret; 1621 } 1622 1623 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 1624} 1625 1626 1627static nxt_int_t 1628nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 1629 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 1630 nxt_work_handler_t handler) 1631{ 1632 nxt_joint_job_t *job; 1633 nxt_queue_link_t *qlk; 1634 nxt_socket_conf_t *skcf; 1635 nxt_socket_conf_joint_t *joint; 1636 1637 for (qlk = nxt_queue_first(sockets); 1638 qlk != nxt_queue_tail(sockets); 1639 qlk = nxt_queue_next(qlk)) 1640 { 1641 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 1642 if (nxt_slow_path(job == NULL)) { 1643 return NXT_ERROR; 1644 } 1645 1646 job->work.next = recf->jobs; 1647 recf->jobs = &job->work; 1648 1649 job->task = tmcf->engine->task; 1650 job->work.handler = handler; 1651 job->work.task = &job->task; 1652 job->work.obj = job; 1653 job->tmcf = tmcf; 1654 1655 tmcf->count++; 1656 1657 joint = nxt_mp_alloc(tmcf->conf->mem_pool, 1658 sizeof(nxt_socket_conf_joint_t)); 1659 if (nxt_slow_path(joint == NULL)) { 1660 return NXT_ERROR; 1661 } 1662 1663 job->work.data = joint; 1664 1665 joint->count = 1; 1666 1667 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1668 skcf->count++; 1669 joint->socket_conf = skcf; 1670 1671 joint->engine = recf->engine; 1672 } 1673 1674 return NXT_OK; 1675} 1676 1677 1678static nxt_int_t 1679nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 1680 nxt_router_engine_conf_t *recf) 1681{ 1682 nxt_joint_job_t *job; 1683 1684 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 1685 if (nxt_slow_path(job == NULL)) { 1686 return NXT_ERROR; 1687 } 1688 1689 job->work.next = recf->jobs; 1690 recf->jobs = &job->work; 1691 1692 job->task = tmcf->engine->task; 1693 job->work.handler = nxt_router_worker_thread_quit; 1694 job->work.task = &job->task; 1695 job->work.obj = NULL; 1696 job->work.data = NULL; 1697 job->tmcf = NULL; 1698 1699 return NXT_OK; 1700} 1701 1702 1703static nxt_int_t 1704nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 1705 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 1706{ 1707 nxt_joint_job_t *job; 1708 nxt_queue_link_t *qlk; 1709 1710 for (qlk = nxt_queue_first(sockets); 1711 qlk != nxt_queue_tail(sockets); 1712 qlk = nxt_queue_next(qlk)) 1713 { 1714 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 1715 if (nxt_slow_path(job == NULL)) { 1716 return NXT_ERROR; 1717 } 1718 1719 job->work.next = recf->jobs; 1720 recf->jobs = &job->work; 1721 1722 job->task = tmcf->engine->task; 1723 job->work.handler = nxt_router_listen_socket_delete; 1724 job->work.task = &job->task; 1725 job->work.obj = job; 1726 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1727 job->tmcf = tmcf; 1728 1729 tmcf->count++; 1730 } 1731 1732 return NXT_OK; 1733} 1734 1735 1736static nxt_int_t 1737nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 1738 nxt_router_temp_conf_t *tmcf) 1739{ 1740 nxt_int_t ret; 1741 nxt_uint_t i, threads; 1742 nxt_router_engine_conf_t *recf; 1743 1744 recf = tmcf->engines->elts; 1745 threads = tmcf->conf->threads; 1746 1747 for (i = tmcf->new_threads; i < threads; i++) { 1748 ret = nxt_router_thread_create(task, rt, recf[i].engine); 1749 if (nxt_slow_path(ret != NXT_OK)) { 1750 return ret; 1751 } 1752 } 1753 1754 return NXT_OK; 1755} 1756 1757 1758static nxt_int_t 1759nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 1760 nxt_event_engine_t *engine) 1761{ 1762 nxt_int_t ret; 1763 nxt_thread_link_t *link; 1764 nxt_thread_handle_t handle; 1765 1766 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 1767 1768 if (nxt_slow_path(link == NULL)) { 1769 return NXT_ERROR; 1770 } 1771 1772 link->start = nxt_router_thread_start; 1773 link->engine = engine; 1774 link->work.handler = nxt_router_thread_exit_handler; 1775 link->work.task = task; 1776 link->work.data = link; 1777 1778 nxt_queue_insert_tail(&rt->engines, &engine->link); 1779 1780 ret = nxt_thread_create(&handle, link); 1781 1782 if (nxt_slow_path(ret != NXT_OK)) { 1783 nxt_queue_remove(&engine->link); 1784 } 1785 1786 return ret; 1787} 1788 1789 1790static void 1791nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 1792 nxt_router_temp_conf_t *tmcf) 1793{ 1794 nxt_app_t *app; 1795 nxt_port_t *port; 1796 1797 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 1798 1799 nxt_queue_remove(&app->link); 1800 1801 nxt_debug(task, "about to free app '%V' %p", &app->name, app); 1802 1803 app->live = 0; 1804 1805 do { 1806 port = nxt_router_app_get_idle_port(app); 1807 if (port == NULL) { 1808 break; 1809 } 1810 1811 nxt_debug(task, "port %p send quit", port); 1812 1813 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, 1814 NULL); 1815 1816 nxt_port_use(task, port, -1); 1817 } while (1); 1818 1819 nxt_router_app_use(task, app, -1); 1820 1821 } nxt_queue_loop; 1822 1823 nxt_queue_add(&router->apps, &tmcf->previous); 1824 nxt_queue_add(&router->apps, &tmcf->apps); 1825} 1826 1827 1828static void 1829nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 1830{ 1831 nxt_uint_t n; 1832 nxt_event_engine_t *engine; 1833 nxt_router_engine_conf_t *recf; 1834 1835 recf = tmcf->engines->elts; 1836 1837 for (n = tmcf->engines->nelts; n != 0; n--) { 1838 engine = recf->engine; 1839 1840 switch (recf->action) { 1841 1842 case NXT_ROUTER_ENGINE_KEEP: 1843 break; 1844 1845 case NXT_ROUTER_ENGINE_ADD: 1846 nxt_queue_insert_tail(&router->engines, &engine->link0); 1847 break; 1848 1849 case NXT_ROUTER_ENGINE_DELETE: 1850 nxt_queue_remove(&engine->link0); 1851 break; 1852 } 1853 1854 nxt_router_engine_post(engine, recf->jobs); 1855 1856 recf++; 1857 } 1858} 1859 1860 1861static void 1862nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 1863{ 1864 nxt_work_t *work, *next; 1865 1866 for (work = jobs; work != NULL; work = next) { 1867 next = work->next; 1868 work->next = NULL; 1869 1870 nxt_event_engine_post(engine, work); 1871 } 1872} 1873 1874 1875static nxt_port_handlers_t nxt_router_app_port_handlers = { 1876 .mmap = nxt_port_mmap_handler, 1877 .data = nxt_port_rpc_handler, 1878}; 1879 1880 1881static void 1882nxt_router_thread_start(void *data) 1883{ 1884 nxt_int_t ret; 1885 nxt_port_t *port; 1886 nxt_task_t *task; 1887 nxt_thread_t *thread; 1888 nxt_thread_link_t *link; 1889 nxt_event_engine_t *engine; 1890 1891 link = data; 1892 engine = link->engine; 1893 task = &engine->task; 1894 1895 thread = nxt_thread(); 1896 1897 nxt_event_engine_thread_adopt(engine); 1898 1899 /* STUB */ 1900 thread->runtime = engine->task.thread->runtime; 1901 1902 engine->task.thread = thread; 1903 engine->task.log = thread->log; 1904 thread->engine = engine; 1905 thread->task = &engine->task; 1906#if 0 1907 thread->fiber = &engine->fibers->fiber; 1908#endif 1909 1910 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 1911 if (nxt_slow_path(engine->mem_pool == NULL)) { 1912 return; 1913 } 1914 1915 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 1916 NXT_PROCESS_ROUTER); 1917 if (nxt_slow_path(port == NULL)) { 1918 return; 1919 } 1920 1921 ret = nxt_port_socket_init(task, port, 0); 1922 if (nxt_slow_path(ret != NXT_OK)) { 1923 nxt_port_use(task, port, -1); 1924 return; 1925 } 1926 1927 engine->port = port; 1928 1929 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 1930 1931 nxt_event_engine_start(engine); 1932} 1933 1934 1935static void 1936nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 1937{ 1938 nxt_joint_job_t *job; 1939 nxt_socket_conf_t *skcf; 1940 nxt_listen_event_t *lev; 1941 nxt_listen_socket_t *ls; 1942 nxt_thread_spinlock_t *lock; 1943 nxt_socket_conf_joint_t *joint; 1944 1945 job = obj; 1946 joint = data; 1947 1948 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 1949 1950 skcf = joint->socket_conf; 1951 ls = skcf->listen; 1952 1953 lev = nxt_listen_event(task, ls); 1954 if (nxt_slow_path(lev == NULL)) { 1955 nxt_router_listen_socket_release(task, skcf); 1956 return; 1957 } 1958 1959 lev->socket.data = joint; 1960 1961 lock = &skcf->router_conf->router->lock; 1962 1963 nxt_thread_spin_lock(lock); 1964 ls->count++; 1965 nxt_thread_spin_unlock(lock); 1966 1967 job->work.next = NULL; 1968 job->work.handler = nxt_router_conf_wait; 1969 1970 nxt_event_engine_post(job->tmcf->engine, &job->work); 1971} 1972 1973 1974nxt_inline nxt_listen_event_t * 1975nxt_router_listen_event(nxt_queue_t *listen_connections, 1976 nxt_socket_conf_t *skcf) 1977{ 1978 nxt_socket_t fd; 1979 nxt_queue_link_t *qlk; 1980 nxt_listen_event_t *lev; 1981 1982 fd = skcf->listen->socket; 1983 1984 for (qlk = nxt_queue_first(listen_connections); 1985 qlk != nxt_queue_tail(listen_connections); 1986 qlk = nxt_queue_next(qlk)) 1987 { 1988 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 1989 1990 if (fd == lev->socket.fd) { 1991 return lev; 1992 } 1993 } 1994 1995 return NULL; 1996} 1997 1998 1999static void 2000nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 2001{ 2002 nxt_joint_job_t *job; 2003 nxt_event_engine_t *engine; 2004 nxt_listen_event_t *lev; 2005 nxt_socket_conf_joint_t *joint, *old; 2006 2007 job = obj; 2008 joint = data; 2009 2010 engine = task->thread->engine; 2011 2012 nxt_queue_insert_tail(&engine->joints, &joint->link); 2013 2014 lev = nxt_router_listen_event(&engine->listen_connections, 2015 joint->socket_conf); 2016 2017 old = lev->socket.data; 2018 lev->socket.data = joint; 2019 lev->listen = joint->socket_conf->listen; 2020 2021 job->work.next = NULL; 2022 job->work.handler = nxt_router_conf_wait; 2023 2024 nxt_event_engine_post(job->tmcf->engine, &job->work); 2025 2026 /* 2027 * The task is allocated from configuration temporary 2028 * memory pool so it can be freed after engine post operation. 2029 */ 2030 2031 nxt_router_conf_release(&engine->task, old); 2032} 2033 2034 2035static void 2036nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 2037{ 2038 nxt_joint_job_t *job; 2039 nxt_socket_conf_t *skcf; 2040 nxt_listen_event_t *lev; 2041 nxt_event_engine_t *engine; 2042 2043 job = obj; 2044 skcf = data; 2045 2046 engine = task->thread->engine; 2047 2048 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 2049 2050 nxt_fd_event_delete(engine, &lev->socket); 2051 2052 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 2053 lev->socket.fd); 2054 2055 lev->timer.handler = nxt_router_listen_socket_close; 2056 lev->timer.work_queue = &engine->fast_work_queue; 2057 2058 nxt_timer_add(engine, &lev->timer, 0); 2059 2060 job->work.next = NULL; 2061 job->work.handler = nxt_router_conf_wait; 2062 2063 nxt_event_engine_post(job->tmcf->engine, &job->work); 2064} 2065 2066 2067static void 2068nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 2069{ 2070 nxt_event_engine_t *engine; 2071 2072 nxt_debug(task, "router worker thread quit"); 2073 2074 engine = task->thread->engine; 2075 2076 engine->shutdown = 1; 2077 2078 if (nxt_queue_is_empty(&engine->joints)) { 2079 nxt_thread_exit(task->thread); 2080 } 2081} 2082 2083 2084static void 2085nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 2086{ 2087 nxt_timer_t *timer; 2088 nxt_listen_event_t *lev; 2089 nxt_socket_conf_joint_t *joint; 2090 2091 timer = obj; 2092 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 2093 joint = lev->socket.data; 2094 2095 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 2096 lev->socket.fd); 2097 2098 nxt_queue_remove(&lev->link); 2099 2100 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 2101 task = &task->thread->engine->task; 2102 2103 nxt_router_listen_socket_release(task, joint->socket_conf); 2104 2105 nxt_free(lev); 2106 2107 nxt_router_conf_release(task, joint); 2108} 2109 2110 2111static void 2112nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 2113{ 2114 nxt_listen_socket_t *ls; 2115 nxt_thread_spinlock_t *lock; 2116 2117 ls = skcf->listen; 2118 lock = &skcf->router_conf->router->lock; 2119 2120 nxt_thread_spin_lock(lock); 2121 2122 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 2123 task->thread->engine, ls->count); 2124 2125 if (--ls->count != 0) { 2126 ls = NULL; 2127 } 2128 2129 nxt_thread_spin_unlock(lock); 2130 2131 if (ls != NULL) { 2132 nxt_socket_close(task, ls->socket); 2133 nxt_free(ls); 2134 } 2135} 2136 2137 2138static void 2139nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 2140{ 2141 nxt_bool_t exit; 2142 nxt_socket_conf_t *skcf; 2143 nxt_router_conf_t *rtcf; 2144 nxt_event_engine_t *engine; 2145 nxt_thread_spinlock_t *lock; 2146 2147 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 2148 2149 if (--joint->count != 0) { 2150 return; 2151 } 2152 2153 nxt_queue_remove(&joint->link); 2154 2155 skcf = joint->socket_conf; 2156 rtcf = skcf->router_conf; 2157 lock = &rtcf->router->lock; 2158 2159 nxt_thread_spin_lock(lock); 2160 2161 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 2162 rtcf, rtcf->count); 2163 2164 if (--skcf->count != 0) { 2165 rtcf = NULL; 2166 2167 } else { 2168 nxt_queue_remove(&skcf->link); 2169 2170 if (--rtcf->count != 0) { 2171 rtcf = NULL; 2172 } 2173 } 2174 2175 nxt_thread_spin_unlock(lock); 2176 2177 /* TODO remove engine->port */ 2178 /* TODO excude from connected ports */ 2179 2180 /* The joint content can be used before memory pool destruction. */ 2181 engine = joint->engine; 2182 exit = (engine->shutdown && nxt_queue_is_empty(&engine->joints)); 2183 2184 if (rtcf != NULL) { 2185 nxt_debug(task, "old router conf is destroyed"); 2186 2187 nxt_mp_thread_adopt(rtcf->mem_pool); 2188 2189 nxt_mp_destroy(rtcf->mem_pool); 2190 } 2191 2192 if (exit) { 2193 nxt_thread_exit(task->thread); 2194 } 2195} 2196 2197 2198static void 2199nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 2200{ 2201 nxt_port_t *port; 2202 nxt_thread_link_t *link; 2203 nxt_event_engine_t *engine; 2204 nxt_thread_handle_t handle; 2205 2206 handle = (nxt_thread_handle_t) obj; 2207 link = data; 2208 2209 nxt_thread_wait(handle); 2210 2211 engine = link->engine; 2212 2213 nxt_queue_remove(&engine->link); 2214 2215 port = engine->port; 2216 2217 // TODO notify all apps 2218 2219 port->engine = task->thread->engine; 2220 nxt_mp_thread_adopt(port->mem_pool); 2221 nxt_port_use(task, port, -1); 2222 2223 nxt_mp_thread_adopt(engine->mem_pool); 2224 nxt_mp_destroy(engine->mem_pool); 2225 2226 nxt_event_engine_free(engine); 2227 2228 nxt_free(link); 2229} 2230 2231 2232static const nxt_conn_state_t nxt_router_conn_read_header_state 2233 nxt_aligned(64) = 2234{ 2235 .ready_handler = nxt_router_conn_http_header_parse, 2236 .close_handler = nxt_router_conn_close, 2237 .error_handler = nxt_router_conn_error, 2238 2239 .timer_handler = nxt_router_conn_timeout, 2240 .timer_value = nxt_router_conn_timeout_value, 2241 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), 2242}; 2243 2244 2245static const nxt_conn_state_t nxt_router_conn_read_body_state 2246 nxt_aligned(64) = 2247{ 2248 .ready_handler = nxt_router_conn_http_body_read, 2249 .close_handler = nxt_router_conn_close, 2250 .error_handler = nxt_router_conn_error, 2251 2252 .timer_handler = nxt_router_conn_timeout, 2253 .timer_value = nxt_router_conn_timeout_value, 2254 .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout), 2255 .timer_autoreset = 1, 2256}; 2257 2258 2259static void 2260nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) 2261{ 2262 size_t size; 2263 nxt_conn_t *c; 2264 nxt_socket_conf_t *skcf; 2265 nxt_event_engine_t *engine; 2266 nxt_socket_conf_joint_t *joint; 2267 2268 c = obj; 2269 joint = data; 2270 2271 nxt_debug(task, "router conn init"); 2272 2273 c->joint = joint; 2274 joint->count++; 2275 2276 skcf = joint->socket_conf; 2277 c->local = skcf->sockaddr; 2278 2279 size = skcf->header_buffer_size; 2280 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0); 2281 2282 c->socket.data = NULL; 2283 2284 engine = task->thread->engine; 2285 c->read_work_queue = &engine->fast_work_queue; 2286 c->write_work_queue = &engine->fast_work_queue; 2287 2288 c->read_state = &nxt_router_conn_read_header_state; 2289 2290 nxt_conn_read(engine, c); 2291} 2292 2293 2294static const nxt_conn_state_t nxt_router_conn_write_state 2295 nxt_aligned(64) = 2296{ 2297 .ready_handler = nxt_router_conn_ready, 2298 .close_handler = nxt_router_conn_close, 2299 .error_handler = nxt_router_conn_error, 2300}; 2301 2302 2303static void 2304nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2305 void *data) 2306{ 2307 size_t dump_size; 2308 nxt_buf_t *b, *last; 2309 nxt_conn_t *c; 2310 nxt_req_conn_link_t *rc; 2311 2312 b = msg->buf; 2313 rc = data; 2314 2315 c = rc->conn; 2316 2317 dump_size = nxt_buf_used_size(b); 2318 2319 if (dump_size > 300) { 2320 dump_size = 300; 2321 } 2322 2323 nxt_debug(task, "%srouter app data (%z): %*s", 2324 msg->port_msg.last ? "last " : "", msg->size, dump_size, 2325 b->mem.pos); 2326 2327 if (msg->size == 0) { 2328 b = NULL; 2329 } 2330 2331 if (msg->port_msg.last != 0) { 2332 nxt_debug(task, "router data create last buf"); 2333 2334 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); 2335 if (nxt_slow_path(last == NULL)) { 2336 /* TODO pogorevaTb */ 2337 } 2338 2339 nxt_buf_chain_add(&b, last); 2340 2341 nxt_router_rc_unlink(task, rc); 2342 } 2343 2344 if (b == NULL) { 2345 return; 2346 } 2347 2348 if (msg->buf == b) { 2349 /* Disable instant buffer completion/re-using by port. */ 2350 msg->buf = NULL; 2351 } 2352 2353 if (c->write == NULL) { 2354 c->write = b; 2355 c->write_state = &nxt_router_conn_write_state; 2356 2357 nxt_conn_write(task->thread->engine, c); 2358 2359 } else { 2360 nxt_debug(task, "router data attach out bufs to existing chain"); 2361 2362 nxt_buf_chain_add(&c->write, b); 2363 } 2364} 2365 2366 2367static void 2368nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2369 void *data) 2370{ 2371 nxt_req_conn_link_t *rc; 2372 2373 rc = data; 2374 2375 nxt_router_gen_error(task, rc->conn, 500, 2376 "Application terminated unexpectedly"); 2377 2378 nxt_router_rc_unlink(task, rc); 2379} 2380 2381 2382nxt_inline const char * 2383nxt_router_text_by_code(int code) 2384{ 2385 switch (code) { 2386 case 400: return "Bad request"; 2387 case 404: return "Not found"; 2388 case 403: return "Forbidden"; 2389 case 408: return "Request Timeout"; 2390 case 411: return "Length Required"; 2391 case 413: return "Request Entity Too Large"; 2392 case 500: 2393 default: return "Internal server error"; 2394 } 2395} 2396 2397 2398static nxt_buf_t * 2399nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, 2400 const char* str) 2401{ 2402 nxt_buf_t *b, *last; 2403 2404 b = nxt_buf_mem_alloc(mp, 16384, 0); 2405 if (nxt_slow_path(b == NULL)) { 2406 return NULL; 2407 } 2408 2409 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, 2410 "HTTP/1.0 %d %s\r\n" 2411 "Content-Type: text/plain\r\n" 2412 "Connection: close\r\n\r\n", 2413 code, nxt_router_text_by_code(code)); 2414 2415 b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str)); 2416 2417 nxt_log_alert(task->log, "error %d: %s", code, str); 2418 2419 last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST); 2420 2421 if (nxt_slow_path(last == NULL)) { 2422 nxt_mp_free(mp, b); 2423 return NULL; 2424 } 2425 2426 nxt_buf_chain_add(&b, last); 2427 2428 return b; 2429} 2430 2431 2432 2433static void 2434nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 2435 const char* str) 2436{ 2437 nxt_mp_t *mp; 2438 nxt_buf_t *b; 2439 2440 /* TODO: fix when called in the middle of response */ 2441 2442 mp = c->mem_pool; 2443 2444 b = nxt_router_get_error_buf(task, mp, code, str); 2445 2446 if (c->socket.fd == -1) { 2447 nxt_mp_free(mp, b->next); 2448 nxt_mp_free(mp, b); 2449 return; 2450 } 2451 2452 if (c->write == NULL) { 2453 c->write = b; 2454 c->write_state = &nxt_router_conn_write_state; 2455 2456 nxt_conn_write(task->thread->engine, c); 2457 2458 } else { 2459 nxt_debug(task, "router data attach out bufs to existing chain"); 2460 2461 nxt_buf_chain_add(&c->write, b); 2462 } 2463} 2464 2465 2466static void 2467nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2468 void *data) 2469{ 2470 nxt_app_t *app; 2471 nxt_port_t *port; 2472 2473 app = data; 2474 port = msg->u.new_port; 2475 2476 nxt_assert(app != NULL); 2477 nxt_assert(port != NULL); 2478 2479 port->app = app; 2480 2481 nxt_thread_mutex_lock(&app->mutex); 2482 2483 nxt_assert(app->pending_workers != 0); 2484 2485 app->pending_workers--; 2486 app->workers++; 2487 2488 nxt_thread_mutex_unlock(&app->mutex); 2489 2490 nxt_debug(task, "app '%V' %p new port ready", &app->name, app); 2491 2492 nxt_router_app_port_release(task, port, 0, 0); 2493} 2494 2495 2496static void 2497nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2498 void *data) 2499{ 2500 nxt_app_t *app; 2501 nxt_queue_link_t *lnk; 2502 nxt_req_app_link_t *ra; 2503 2504 app = data; 2505 2506 nxt_assert(app != NULL); 2507 2508 nxt_debug(task, "app '%V' %p start error", &app->name, app); 2509 2510 nxt_thread_mutex_lock(&app->mutex); 2511 2512 nxt_assert(app->pending_workers != 0); 2513 2514 app->pending_workers--; 2515 2516 if (!nxt_queue_is_empty(&app->requests)) { 2517 lnk = nxt_queue_last(&app->requests); 2518 nxt_queue_remove(lnk); 2519 lnk->next = NULL; 2520 2521 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); 2522 2523 } else { 2524 ra = NULL; 2525 } 2526 2527 nxt_thread_mutex_unlock(&app->mutex); 2528 2529 if (ra != NULL) { 2530 nxt_debug(task, "app '%V' %p abort next stream #%uD", 2531 &app->name, app, ra->stream); 2532 2533 nxt_router_ra_abort(task, ra, ra->work.data); 2534 } 2535 2536 nxt_router_app_use(task, app, -1); 2537} 2538 2539 2540void 2541nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 2542{ 2543 int c; 2544 2545 c = nxt_atomic_fetch_add(&app->use_count, i); 2546 2547 if (i < 0 && c == -i) { 2548 2549 nxt_assert(app->live == 0); 2550 nxt_assert(app->workers == 0); 2551 nxt_assert(app->pending_workers == 0); 2552 nxt_assert(nxt_queue_is_empty(&app->requests) != 0); 2553 nxt_assert(nxt_queue_is_empty(&app->ports) != 0); 2554 2555 nxt_thread_mutex_destroy(&app->mutex); 2556 nxt_free(app); 2557 } 2558} 2559 2560 2561nxt_inline nxt_port_t * 2562nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) 2563{ 2564 nxt_port_t *port; 2565 nxt_queue_link_t *lnk; 2566 2567 lnk = nxt_queue_first(&app->ports); 2568 nxt_queue_remove(lnk); 2569 2570 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 2571 2572 port->app_requests++; 2573 2574 if (app->live && 2575 (app->max_pending_responses == 0 || 2576 (port->app_requests - port->app_responses) < 2577 app->max_pending_responses) ) 2578 { 2579 nxt_queue_insert_tail(&app->ports, lnk); 2580 2581 } else { 2582 lnk->next = NULL; 2583 2584 (*use_delta)--; 2585 } 2586 2587 return port; 2588} 2589 2590 2591static nxt_port_t * 2592nxt_router_app_get_idle_port(nxt_app_t *app) 2593{ 2594 nxt_port_t *port; 2595 2596 port = NULL; 2597 2598 nxt_thread_mutex_lock(&app->mutex); 2599 2600 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 2601 2602 if (port->app_requests > port->app_responses) { 2603 port = NULL; 2604 2605 continue; 2606 } 2607 2608 nxt_queue_remove(&port->app_link); 2609 port->app_link.next = NULL; 2610 2611 break; 2612 2613 } nxt_queue_loop; 2614 2615 nxt_thread_mutex_unlock(&app->mutex); 2616 2617 return port; 2618} 2619 2620 2621static void 2622nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) 2623{ 2624 nxt_app_t *app; 2625 nxt_req_app_link_t *ra; 2626 2627 app = obj; 2628 ra = data; 2629 2630 nxt_assert(app != NULL); 2631 nxt_assert(ra != NULL); 2632 nxt_assert(ra->app_port != NULL); 2633 2634 nxt_debug(task, "app '%V' %p process next stream #%uD", 2635 &app->name, app, ra->stream); 2636 2637 nxt_router_process_http_request_mp(task, ra); 2638} 2639 2640 2641static void 2642nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 2643 uint32_t request_failed, uint32_t got_response) 2644{ 2645 int use_delta, ra_use_delta; 2646 nxt_app_t *app; 2647 nxt_bool_t send_quit; 2648 nxt_queue_link_t *lnk; 2649 nxt_req_app_link_t *ra; 2650 2651 nxt_assert(port != NULL); 2652 nxt_assert(port->app != NULL); 2653 2654 app = port->app; 2655 2656 use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1; 2657 2658 nxt_thread_mutex_lock(&app->mutex); 2659 2660 port->app_requests -= request_failed; 2661 port->app_responses += got_response; 2662 2663 if (app->live != 0 && 2664 port->pair[1] != -1 && 2665 port->app_link.next == NULL && 2666 (app->max_pending_responses == 0 || 2667 (port->app_requests - port->app_responses) < 2668 app->max_pending_responses) ) 2669 { 2670 nxt_queue_insert_tail(&app->ports, &port->app_link); 2671 use_delta++; 2672 } 2673 2674 if (app->live != 0 && 2675 !nxt_queue_is_empty(&app->ports) && 2676 !nxt_queue_is_empty(&app->requests)) 2677 { 2678 lnk = nxt_queue_first(&app->requests); 2679 nxt_queue_remove(lnk); 2680 lnk->next = NULL; 2681 2682 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); 2683 2684 ra_use_delta = 1; 2685 ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta); 2686 2687 } else { 2688 ra = NULL; 2689 ra_use_delta = 0; 2690 } 2691 2692 send_quit = app->live == 0 && port->app_requests == port->app_responses; 2693 2694 nxt_thread_mutex_unlock(&app->mutex); 2695 2696 if (ra != NULL) { 2697 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2698 nxt_router_app_process_request, 2699 &task->thread->engine->task, app, ra); 2700 2701 goto adjust_use; 2702 } 2703 2704 /* ? */ 2705 if (port->pair[1] == -1) { 2706 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 2707 &app->name, app, port, port->pid); 2708 2709 goto adjust_use; 2710 } 2711 2712 if (send_quit) { 2713 nxt_debug(task, "app '%V' %p is not alive, send QUIT to port", 2714 &app->name, app); 2715 2716 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, 2717 -1, 0, 0, NULL); 2718 2719 goto adjust_use; 2720 } 2721 2722 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 2723 &app->name, app); 2724 2725adjust_use: 2726 2727 if (use_delta != 0) { 2728 nxt_port_use(task, port, use_delta); 2729 } 2730 2731 if (ra_use_delta != 0) { 2732 nxt_port_use(task, ra->app_port, ra_use_delta); 2733 } 2734} 2735 2736 2737void 2738nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 2739{ 2740 nxt_app_t *app; 2741 nxt_bool_t unchain, start_worker; 2742 2743 app = port->app; 2744 2745 nxt_assert(app != NULL); 2746 2747 nxt_thread_mutex_lock(&app->mutex); 2748 2749 unchain = port->app_link.next != NULL; 2750 2751 if (unchain) { 2752 nxt_queue_remove(&port->app_link); 2753 port->app_link.next = NULL; 2754 } 2755 2756 app->workers--; 2757 2758 start_worker = app->live != 0 && 2759 nxt_queue_is_empty(&app->requests) == 0 && 2760 app->workers + app->pending_workers < app->max_workers; 2761 2762 if (start_worker) { 2763 app->pending_workers++; 2764 } 2765 2766 nxt_thread_mutex_unlock(&app->mutex); 2767 2768 nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port); 2769 2770 if (unchain) { 2771 nxt_port_use(task, port, -1); 2772 } 2773 2774 if (start_worker) { 2775 nxt_router_start_worker(task, app); 2776 } 2777} 2778 2779 2780static nxt_int_t 2781nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) 2782{ 2783 int use_delta; 2784 nxt_int_t res; 2785 nxt_app_t *app; 2786 nxt_bool_t can_start_worker; 2787 nxt_conn_t *c; 2788 nxt_port_t *port; 2789 nxt_event_engine_t *engine; 2790 nxt_socket_conf_joint_t *joint; 2791 2792 port = NULL; 2793 use_delta = 1; 2794 c = ra->rc->conn; 2795 2796 joint = c->joint; 2797 app = joint->socket_conf->application; 2798 2799 if (app == NULL) { 2800 nxt_router_gen_error(task, c, 500, 2801 "Application is NULL in socket_conf"); 2802 return NXT_ERROR; 2803 } 2804 2805 ra->rc->app = app; 2806 2807 nxt_router_app_use(task, app, 1); 2808 2809 engine = task->thread->engine; 2810 2811 nxt_timer_disable(engine, &c->read_timer); 2812 2813 if (app->timeout != 0) { 2814 c->read_timer.handler = nxt_router_app_timeout; 2815 nxt_timer_add(engine, &c->read_timer, app->timeout); 2816 } 2817 2818 can_start_worker = 0; 2819 2820 nxt_thread_mutex_lock(&app->mutex); 2821 2822 if (!nxt_queue_is_empty(&app->ports)) { 2823 port = nxt_router_app_get_port_unsafe(app, &use_delta); 2824 2825 } else { 2826 ra = nxt_router_ra_create(task, ra); 2827 2828 if (nxt_fast_path(ra != NULL)) { 2829 nxt_queue_insert_tail(&app->requests, &ra->link); 2830 2831 can_start_worker = (app->workers + app->pending_workers) < 2832 app->max_workers; 2833 if (can_start_worker) { 2834 app->pending_workers++; 2835 } 2836 } 2837 2838 port = NULL; 2839 } 2840 2841 nxt_thread_mutex_unlock(&app->mutex); 2842 2843 if (nxt_slow_path(ra == NULL)) { 2844 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2845 "req<->app link"); 2846 return NXT_ERROR; 2847 } 2848 2849 if (port != NULL) { 2850 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); 2851 2852 ra->app_port = port; 2853 2854 if (use_delta != 0) { 2855 nxt_port_use(task, port, use_delta); 2856 } 2857 return NXT_OK; 2858 } 2859 2860 nxt_debug(task, "ra stream #%uD allocated", ra->stream); 2861 2862 if (!can_start_worker) { 2863 nxt_debug(task, "app '%V' %p too many running or pending workers", 2864 &app->name, app); 2865 2866 return NXT_AGAIN; 2867 } 2868 2869 res = nxt_router_start_worker(task, app); 2870 2871 if (nxt_slow_path(res != NXT_OK)) { 2872 nxt_router_gen_error(task, c, 500, "Failed to start worker"); 2873 2874 return NXT_ERROR; 2875 } 2876 2877 return NXT_AGAIN; 2878} 2879 2880 2881static void 2882nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) 2883{ 2884 size_t size; 2885 nxt_int_t ret; 2886 nxt_buf_t *buf; 2887 nxt_conn_t *c; 2888 nxt_sockaddr_t *local; 2889 nxt_app_parse_ctx_t *ap; 2890 nxt_app_request_body_t *b; 2891 nxt_socket_conf_joint_t *joint; 2892 nxt_app_request_header_t *h; 2893 2894 c = obj; 2895 ap = data; 2896 buf = c->read; 2897 joint = c->joint; 2898 2899 nxt_debug(task, "router conn http header parse"); 2900 2901 if (ap == NULL) { 2902 ap = nxt_app_http_req_init(task); 2903 if (nxt_slow_path(ap == NULL)) { 2904 nxt_router_gen_error(task, c, 500, 2905 "Failed to allocate parse context"); 2906 return; 2907 } 2908 2909 c->socket.data = ap; 2910 2911 ap->r.remote.start = nxt_sockaddr_address(c->remote); 2912 ap->r.remote.length = c->remote->address_length; 2913 2914 /* 2915 * TODO: need an application flag to get local address 2916 * required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go. 2917 */ 2918 local = nxt_router_local_addr(task, c); 2919 2920 if (nxt_fast_path(local != NULL)) { 2921 ap->r.local.start = nxt_sockaddr_address(local); 2922 ap->r.local.length = local->address_length; 2923 } 2924 2925 ap->r.header.buf = buf; 2926 } 2927 2928 h = &ap->r.header; 2929 b = &ap->r.body; 2930 2931 ret = nxt_app_http_req_header_parse(task, ap, buf); 2932 2933 nxt_debug(task, "http parse request header: %d", ret); 2934 2935 switch (nxt_expect(NXT_DONE, ret)) { 2936 2937 case NXT_DONE: 2938 nxt_debug(task, "router request header parsing complete, " 2939 "content length: %O, preread: %uz", 2940 h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem)); 2941 2942 if (b->done) { 2943 nxt_router_process_http_request(task, c, ap); 2944 2945 return; 2946 } 2947 2948 if (joint->socket_conf->max_body_size > 0 2949 && (size_t) h->parsed_content_length 2950 > joint->socket_conf->max_body_size) 2951 { 2952 nxt_router_gen_error(task, c, 413, "Content-Length too big"); 2953 return; 2954 } 2955 2956 if (nxt_buf_mem_free_size(&buf->mem) == 0) { 2957 size = nxt_min(joint->socket_conf->body_buffer_size, 2958 (size_t) h->parsed_content_length); 2959 2960 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); 2961 if (nxt_slow_path(buf->next == NULL)) { 2962 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2963 "buffer for request body"); 2964 return; 2965 } 2966 2967 c->read = buf->next; 2968 2969 b->preread_size += nxt_buf_mem_used_size(&buf->mem); 2970 } 2971 2972 if (b->buf == NULL) { 2973 b->buf = c->read; 2974 } 2975 2976 c->read_state = &nxt_router_conn_read_body_state; 2977 break; 2978 2979 case NXT_ERROR: 2980 nxt_router_gen_error(task, c, 400, "Request header parse error"); 2981 return; 2982 2983 default: /* NXT_AGAIN */ 2984 2985 if (c->read->mem.free == c->read->mem.end) { 2986 size = joint->socket_conf->large_header_buffer_size; 2987 2988 if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) 2989 || ap->r.header.bufs 2990 >= joint->socket_conf->large_header_buffers) 2991 { 2992 nxt_router_gen_error(task, c, 413, 2993 "Too long request headers"); 2994 return; 2995 } 2996 2997 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); 2998 if (nxt_slow_path(buf->next == NULL)) { 2999 nxt_router_gen_error(task, c, 500, 3000 "Failed to allocate large header " 3001 "buffer"); 3002 return; 3003 } 3004 3005 ap->r.header.bufs++; 3006 3007 size = c->read->mem.free - c->read->mem.pos; 3008 3009 c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size); 3010 } 3011 3012 } 3013 3014 nxt_conn_read(task->thread->engine, c); 3015} 3016 3017 3018static nxt_sockaddr_t * 3019nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c) 3020{ 3021 int ret; 3022 size_t size, length; 3023 socklen_t socklen; 3024 nxt_sockaddr_t *sa; 3025 3026 if (c->local != NULL) { 3027 return c->local; 3028 } 3029 3030 /* AF_UNIX should not get in here. */ 3031 3032 switch (c->remote->u.sockaddr.sa_family) { 3033#if (NXT_INET6) 3034 case AF_INET6: 3035 socklen = sizeof(struct sockaddr_in6); 3036 length = NXT_INET6_ADDR_STR_LEN; 3037 size = offsetof(nxt_sockaddr_t, u) + socklen + length; 3038 break; 3039#endif 3040 case AF_INET: 3041 default: 3042 socklen = sizeof(struct sockaddr_in); 3043 length = NXT_INET_ADDR_STR_LEN; 3044 size = offsetof(nxt_sockaddr_t, u) + socklen + length; 3045 break; 3046 } 3047 3048 sa = nxt_mp_get(c->mem_pool, size); 3049 if (nxt_slow_path(sa == NULL)) { 3050 return NULL; 3051 } 3052 3053 sa->socklen = socklen; 3054 sa->length = length; 3055 3056 ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen); 3057 if (nxt_slow_path(ret != 0)) { 3058 nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd); 3059 return NULL; 3060 } 3061 3062 c->local = sa; 3063 3064 nxt_sockaddr_text(sa); 3065 3066 /* 3067 * TODO: here we can adjust the end of non-freeable block 3068 * in c->mem_pool to the end of actual sockaddr length. 3069 */ 3070 3071 return sa; 3072} 3073 3074 3075static void 3076nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data) 3077{ 3078 size_t size; 3079 nxt_int_t ret; 3080 nxt_buf_t *buf; 3081 nxt_conn_t *c; 3082 nxt_app_parse_ctx_t *ap; 3083 nxt_app_request_body_t *b; 3084 nxt_socket_conf_joint_t *joint; 3085 nxt_app_request_header_t *h; 3086 3087 c = obj; 3088 ap = data; 3089 buf = c->read; 3090 3091 nxt_debug(task, "router conn http body read"); 3092 3093 nxt_assert(ap != NULL); 3094 3095 b = &ap->r.body; 3096 h = &ap->r.header; 3097 3098 ret = nxt_app_http_req_body_read(task, ap, buf); 3099 3100 nxt_debug(task, "http read request body: %d", ret); 3101 3102 switch (nxt_expect(NXT_DONE, ret)) { 3103 3104 case NXT_DONE: 3105 nxt_router_process_http_request(task, c, ap); 3106 return; 3107 3108 case NXT_ERROR: 3109 nxt_router_gen_error(task, c, 500, "Read body error"); 3110 return; 3111 3112 default: /* NXT_AGAIN */ 3113 3114 if (nxt_buf_mem_free_size(&buf->mem) == 0) { 3115 joint = c->joint; 3116 3117 b->preread_size += nxt_buf_mem_used_size(&buf->mem); 3118 3119 size = nxt_min(joint->socket_conf->body_buffer_size, 3120 (size_t) h->parsed_content_length - b->preread_size); 3121 3122 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); 3123 if (nxt_slow_path(buf->next == NULL)) { 3124 nxt_router_gen_error(task, c, 500, "Failed to allocate " 3125 "buffer for request body"); 3126 return; 3127 } 3128 3129 c->read = buf->next; 3130 } 3131 3132 nxt_debug(task, "router request body read again, rest: %uz", 3133 h->parsed_content_length - b->preread_size); 3134 } 3135 3136 nxt_conn_read(task->thread->engine, c); 3137} 3138 3139 3140static void 3141nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, 3142 nxt_app_parse_ctx_t *ap) 3143{ 3144 nxt_int_t res; 3145 nxt_port_t *port; 3146 nxt_event_engine_t *engine; 3147 nxt_req_app_link_t ra_local, *ra; 3148 nxt_req_conn_link_t *rc; 3149 3150 engine = task->thread->engine; 3151 3152 rc = nxt_port_rpc_register_handler_ex(task, engine->port, 3153 nxt_router_response_ready_handler, 3154 nxt_router_response_error_handler, 3155 sizeof(nxt_req_conn_link_t)); 3156 3157 if (nxt_slow_path(rc == NULL)) { 3158 nxt_router_gen_error(task, c, 500, "Failed to allocate " 3159 "req<->conn link"); 3160 3161 return; 3162 } 3163 3164 rc->stream = nxt_port_rpc_ex_stream(rc); 3165 rc->conn = c; 3166 3167 nxt_queue_insert_tail(&c->requests, &rc->link); 3168 3169 nxt_debug(task, "stream #%uD linked to conn %p at engine %p", 3170 rc->stream, c, engine); 3171 3172 rc->ap = ap; 3173 c->socket.data = NULL; 3174 3175 ra = &ra_local; 3176 nxt_router_ra_init(task, ra, rc); 3177 3178 res = nxt_router_app_port(task, ra); 3179 3180 if (res != NXT_OK) { 3181 return; 3182 } 3183 3184 port = ra->app_port; 3185 3186 if (nxt_slow_path(port == NULL)) { 3187 nxt_router_gen_error(task, c, 500, "Application port not found"); 3188 return; 3189 } 3190 3191 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); 3192 3193 nxt_router_process_http_request_mp(task, ra); 3194} 3195 3196 3197static void 3198nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) 3199{ 3200 uint32_t request_failed; 3201 nxt_int_t res; 3202 nxt_port_t *port, *c_port, *reply_port; 3203 nxt_app_wmsg_t wmsg; 3204 nxt_app_parse_ctx_t *ap; 3205 3206 nxt_assert(ra->app_port != NULL); 3207 3208 port = ra->app_port; 3209 reply_port = ra->reply_port; 3210 ap = ra->ap; 3211 3212 request_failed = 1; 3213 3214 c_port = nxt_process_connected_port_find(port->process, reply_port->pid, 3215 reply_port->id); 3216 if (nxt_slow_path(c_port != reply_port)) { 3217 res = nxt_port_send_port(task, port, reply_port, 0); 3218 3219 if (nxt_slow_path(res != NXT_OK)) { 3220 nxt_router_ra_error(task, ra, 500, 3221 "Failed to send reply port to application"); 3222 ra = NULL; 3223 goto release_port; 3224 } 3225 3226 nxt_process_connected_port_add(port->process, reply_port); 3227 } 3228 3229 wmsg.port = port; 3230 wmsg.write = NULL; 3231 wmsg.buf = &wmsg.write; 3232 wmsg.stream = ra->stream; 3233 3234 res = port->app->prepare_msg(task, &ap->r, &wmsg); 3235 3236 if (nxt_slow_path(res != NXT_OK)) { 3237 nxt_router_ra_error(task, ra, 500, 3238 "Failed to prepare message for application"); 3239 ra = NULL; 3240 goto release_port; 3241 } 3242 3243 nxt_debug(task, "about to send %d bytes buffer to worker port %d", 3244 nxt_buf_used_size(wmsg.write), 3245 wmsg.port->socket.fd); 3246 3247 request_failed = 0; 3248 3249 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, 3250 -1, ra->stream, reply_port->id, wmsg.write); 3251 3252 if (nxt_slow_path(res != NXT_OK)) { 3253 nxt_router_ra_error(task, ra, 500, 3254 "Failed to send message to application"); 3255 ra = NULL; 3256 goto release_port; 3257 } 3258 3259release_port: 3260 3261 nxt_router_app_port_release(task, port, request_failed, 0); 3262 3263 if (ra != NULL) { 3264 if (request_failed != 0) { 3265 ra->app_port = 0; 3266 } 3267 3268 nxt_router_ra_release(task, ra, ra->work.data); 3269 } 3270} 3271 3272 3273static nxt_int_t 3274nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 3275 nxt_app_wmsg_t *wmsg) 3276{ 3277 nxt_int_t rc; 3278 nxt_buf_t *b; 3279 nxt_http_field_t *field; 3280 nxt_app_request_header_t *h; 3281 3282 static const nxt_str_t prefix = nxt_string("HTTP_"); 3283 static const nxt_str_t eof = nxt_null_string; 3284 3285 h = &r->header; 3286 3287#define RC(S) \ 3288 do { \ 3289 rc = (S); \ 3290 if (nxt_slow_path(rc != NXT_OK)) { \ 3291 goto fail; \ 3292 } \ 3293 } while(0) 3294 3295#define NXT_WRITE(N) \ 3296 RC(nxt_app_msg_write_str(task, wmsg, N)) 3297 3298 /* TODO error handle, async mmap buffer assignment */ 3299 3300 NXT_WRITE(&h->method); 3301 NXT_WRITE(&h->target); 3302 3303 if (h->path.start == h->target.start) { 3304 NXT_WRITE(&eof); 3305 3306 } else { 3307 NXT_WRITE(&h->path); 3308 } 3309 3310 if (h->query.start != NULL) { 3311 RC(nxt_app_msg_write_size(task, wmsg, 3312 h->query.start - h->target.start + 1)); 3313 } else { 3314 RC(nxt_app_msg_write_size(task, wmsg, 0)); 3315 } 3316 3317 NXT_WRITE(&h->version); 3318 3319 NXT_WRITE(&r->remote); 3320 NXT_WRITE(&r->local); 3321 3322 NXT_WRITE(&h->host); 3323 NXT_WRITE(&h->content_type); 3324 NXT_WRITE(&h->content_length); 3325 3326 nxt_list_each(field, h->fields) { 3327 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, 3328 &prefix, &field->name)); 3329 NXT_WRITE(&field->value); 3330 3331 } nxt_list_loop; 3332 3333 /* end-of-headers mark */ 3334 NXT_WRITE(&eof); 3335 3336 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 3337 3338 for(b = r->body.buf; b != NULL; b = b->next) { 3339 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3340 nxt_buf_mem_used_size(&b->mem))); 3341 } 3342 3343#undef NXT_WRITE 3344#undef RC 3345 3346 return NXT_OK; 3347 3348fail: 3349 3350 return NXT_ERROR; 3351} 3352 3353 3354static nxt_int_t 3355nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 3356 nxt_app_wmsg_t *wmsg) 3357{ 3358 nxt_int_t rc; 3359 nxt_buf_t *b; 3360 nxt_bool_t method_is_post; 3361 nxt_http_field_t *field; 3362 nxt_app_request_header_t *h; 3363 3364 static const nxt_str_t prefix = nxt_string("HTTP_"); 3365 static const nxt_str_t eof = nxt_null_string; 3366 3367 h = &r->header; 3368 3369#define RC(S) \ 3370 do { \ 3371 rc = (S); \ 3372 if (nxt_slow_path(rc != NXT_OK)) { \ 3373 goto fail; \ 3374 } \ 3375 } while(0) 3376 3377#define NXT_WRITE(N) \ 3378 RC(nxt_app_msg_write_str(task, wmsg, N)) 3379 3380 /* TODO error handle, async mmap buffer assignment */ 3381 3382 NXT_WRITE(&h->method); 3383 NXT_WRITE(&h->target); 3384 3385 if (h->path.start == h->target.start) { 3386 NXT_WRITE(&eof); 3387 3388 } else { 3389 NXT_WRITE(&h->path); 3390 } 3391 3392 if (h->query.start != NULL) { 3393 RC(nxt_app_msg_write_size(task, wmsg, 3394 h->query.start - h->target.start + 1)); 3395 } else { 3396 RC(nxt_app_msg_write_size(task, wmsg, 0)); 3397 } 3398 3399 NXT_WRITE(&h->version); 3400 3401 // PHP_SELF 3402 // SCRIPT_NAME 3403 // SCRIPT_FILENAME 3404 // DOCUMENT_ROOT 3405 3406 NXT_WRITE(&r->remote); 3407 NXT_WRITE(&r->local); 3408 3409 NXT_WRITE(&h->host); 3410 NXT_WRITE(&h->cookie); 3411 NXT_WRITE(&h->content_type); 3412 NXT_WRITE(&h->content_length); 3413 3414 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); 3415 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 3416 3417 method_is_post = h->method.length == 4 && 3418 h->method.start[0] == 'P' && 3419 h->method.start[1] == 'O' && 3420 h->method.start[2] == 'S' && 3421 h->method.start[3] == 'T'; 3422 3423 if (method_is_post) { 3424 for(b = r->body.buf; b != NULL; b = b->next) { 3425 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3426 nxt_buf_mem_used_size(&b->mem))); 3427 } 3428 } 3429 3430 nxt_list_each(field, h->fields) { 3431 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, 3432 &prefix, &field->name)); 3433 NXT_WRITE(&field->value); 3434 3435 } nxt_list_loop; 3436 3437 /* end-of-headers mark */ 3438 NXT_WRITE(&eof); 3439 3440 if (!method_is_post) { 3441 for(b = r->body.buf; b != NULL; b = b->next) { 3442 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3443 nxt_buf_mem_used_size(&b->mem))); 3444 } 3445 } 3446 3447#undef NXT_WRITE 3448#undef RC 3449 3450 return NXT_OK; 3451 3452fail: 3453 3454 return NXT_ERROR; 3455} 3456 3457 3458static nxt_int_t 3459nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) 3460{ 3461 nxt_int_t rc; 3462 nxt_buf_t *b; 3463 nxt_http_field_t *field; 3464 nxt_app_request_header_t *h; 3465 3466 static const nxt_str_t eof = nxt_null_string; 3467 3468 h = &r->header; 3469 3470#define RC(S) \ 3471 do { \ 3472 rc = (S); \ 3473 if (nxt_slow_path(rc != NXT_OK)) { \ 3474 goto fail; \ 3475 } \ 3476 } while(0) 3477 3478#define NXT_WRITE(N) \ 3479 RC(nxt_app_msg_write_str(task, wmsg, N)) 3480 3481 /* TODO error handle, async mmap buffer assignment */ 3482 3483 NXT_WRITE(&h->method); 3484 NXT_WRITE(&h->target); 3485 3486 if (h->path.start == h->target.start) { 3487 NXT_WRITE(&eof); 3488 3489 } else { 3490 NXT_WRITE(&h->path); 3491 } 3492 3493 if (h->query.start != NULL) { 3494 RC(nxt_app_msg_write_size(task, wmsg, 3495 h->query.start - h->target.start + 1)); 3496 } else { 3497 RC(nxt_app_msg_write_size(task, wmsg, 0)); 3498 } 3499 3500 NXT_WRITE(&h->version); 3501 NXT_WRITE(&r->remote); 3502 3503 NXT_WRITE(&h->host); 3504 NXT_WRITE(&h->cookie); 3505 NXT_WRITE(&h->content_type); 3506 NXT_WRITE(&h->content_length); 3507 3508 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); 3509 3510 nxt_list_each(field, h->fields) { 3511 NXT_WRITE(&field->name); 3512 NXT_WRITE(&field->value); 3513 3514 } nxt_list_loop; 3515 3516 /* end-of-headers mark */ 3517 NXT_WRITE(&eof); 3518 3519 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 3520 3521 for(b = r->body.buf; b != NULL; b = b->next) { 3522 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3523 nxt_buf_mem_used_size(&b->mem))); 3524 } 3525 3526#undef NXT_WRITE 3527#undef RC 3528 3529 return NXT_OK; 3530 3531fail: 3532 3533 return NXT_ERROR; 3534} 3535 3536 3537static const nxt_conn_state_t nxt_router_conn_close_state 3538 nxt_aligned(64) = 3539{ 3540 .ready_handler = nxt_router_conn_free, 3541}; 3542 3543 3544static void 3545nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data) 3546{ 3547 nxt_buf_t *b; 3548 nxt_bool_t last; 3549 nxt_conn_t *c; 3550 nxt_work_queue_t *wq; 3551 3552 nxt_debug(task, "router conn ready %p", obj); 3553 3554 c = obj; 3555 b = c->write; 3556 3557 wq = &task->thread->engine->fast_work_queue; 3558 3559 last = 0; 3560 3561 while (b != NULL) { 3562 if (!nxt_buf_is_sync(b)) { 3563 if (nxt_buf_used_size(b) > 0) { 3564 break; 3565 } 3566 } 3567 3568 if (nxt_buf_is_last(b)) { 3569 last = 1; 3570 } 3571 3572 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 3573 3574 b = b->next; 3575 } 3576 3577 c->write = b; 3578 3579 if (b != NULL) { 3580 nxt_debug(task, "router conn %p has more data to write", obj); 3581 3582 nxt_conn_write(task->thread->engine, c); 3583 3584 } else { 3585 nxt_debug(task, "router conn %p no more data to write, last = %d", obj, 3586 last); 3587 3588 if (last != 0) { 3589 nxt_debug(task, "enqueue router conn close %p (ready handler)", c); 3590 3591 nxt_work_queue_add(wq, nxt_router_conn_close, task, c, 3592 c->socket.data); 3593 } 3594 } 3595} 3596 3597 3598static void 3599nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) 3600{ 3601 nxt_conn_t *c; 3602 3603 c = obj; 3604 3605 nxt_debug(task, "router conn close"); 3606 3607 c->write_state = &nxt_router_conn_close_state; 3608 3609 nxt_conn_close(task->thread->engine, c); 3610} 3611 3612 3613static void 3614nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data) 3615{ 3616 nxt_socket_conf_joint_t *joint; 3617 3618 joint = obj; 3619 3620 nxt_router_conf_release(task, joint); 3621} 3622 3623 3624static void 3625nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) 3626{ 3627 nxt_conn_t *c; 3628 nxt_event_engine_t *engine; 3629 nxt_req_conn_link_t *rc; 3630 nxt_app_parse_ctx_t *ap; 3631 nxt_socket_conf_joint_t *joint; 3632 3633 c = obj; 3634 ap = data; 3635 3636 nxt_debug(task, "router conn close done"); 3637 3638 if (ap != NULL) { 3639 nxt_app_http_req_done(task, ap); 3640 3641 c->socket.data = NULL; 3642 } 3643 3644 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { 3645 3646 nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream); 3647 3648 nxt_router_rc_unlink(task, rc); 3649 3650 nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream); 3651 3652 } nxt_queue_loop; 3653 3654 nxt_queue_remove(&c->link); 3655 3656 engine = task->thread->engine; 3657 3658 nxt_sockaddr_cache_free(engine, c); 3659 3660 joint = c->joint; 3661 3662 nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup, 3663 &engine->task, joint, NULL); 3664
|