1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> 9#include <nxt_application.h> 10 11 12static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task, 13 nxt_router_t *router); 14static void nxt_router_listen_sockets_sort(nxt_router_t *router, 15 nxt_router_temp_conf_t *tmcf); 16 17static nxt_int_t nxt_router_stub_conf(nxt_task_t *task, 18 nxt_router_temp_conf_t *tmcf); 19static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task, 20 nxt_router_temp_conf_t *tmcf); 21static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, 22 nxt_sockaddr_t *sa); 23static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task, 24 nxt_mp_t *mp, uint32_t port); 25 26static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 27 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 28 const nxt_event_interface_t *interface); 29static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp, 30 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); 31static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp, 32 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); 33static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp, 34 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); 35static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp, 36 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, 37 nxt_work_handler_t handler); 38static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task, 39 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array); 40 41static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 42 nxt_router_temp_conf_t *tmcf); 43static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 44 nxt_event_engine_t *engine); 45 46static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf); 47static void nxt_router_engine_post(nxt_router_engine_conf_t *recf); 48 49static void nxt_router_thread_start(void *data); 50static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 51 void *data); 52static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 53 void *data); 54static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 55 void *data); 56static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 57 void *data); 58static void nxt_router_listen_socket_release(nxt_task_t *task, 59 nxt_socket_conf_joint_t *joint); 60static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 61 void *data); 62static void nxt_router_conf_release(nxt_task_t *task, 63 nxt_socket_conf_joint_t *joint); 64 65static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); 66static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, 67 void *data); 68static void nxt_router_process_http_request(nxt_task_t *task, 69 nxt_conn_t *c, nxt_app_parse_ctx_t *ap); 70static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); 71static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); 72static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); 73static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); 74static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); 75static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); 76 77 78nxt_int_t 79nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) 80{ 81 nxt_int_t ret; 82 nxt_router_t *router; 83 nxt_router_temp_conf_t *tmcf; 84 const nxt_event_interface_t *interface; 85 86 ret = nxt_app_http_init(task, rt); 87 if (nxt_slow_path(ret != NXT_OK)) { 88 return ret; 89 } 90 91 router = nxt_zalloc(sizeof(nxt_router_t)); 92 if (nxt_slow_path(router == NULL)) { 93 return NXT_ERROR; 94 } 95 96 nxt_queue_init(&router->engines); 97 nxt_queue_init(&router->sockets); 98 99 /**/ 100 101 tmcf = nxt_router_temp_conf(task, router); 102 if (nxt_slow_path(tmcf == NULL)) { 103 return NXT_ERROR; 104 } 105 106 ret = nxt_router_stub_conf(task, tmcf); 107 if (nxt_slow_path(ret != NXT_OK)) { 108 return ret; 109 } 110 111 nxt_router_listen_sockets_sort(router, tmcf); 112 113 ret = nxt_router_listen_sockets_stub_create(task, tmcf); 114 if (nxt_slow_path(ret != NXT_OK)) { 115 return ret; 116 } 117 118 interface = nxt_service_get(rt->services, "engine", NULL); 119 120 ret = nxt_router_engines_create(task, router, tmcf, interface); 121 if (nxt_slow_path(ret != NXT_OK)) { 122 return ret; 123 } 124 125 ret = nxt_router_threads_create(task, rt, tmcf); 126 if (nxt_slow_path(ret != NXT_OK)) { 127 return ret; 128 } 129 130 nxt_router_engines_post(tmcf); 131 132 nxt_queue_add(&router->sockets, &tmcf->updating); 133 nxt_queue_add(&router->sockets, &tmcf->creating); 134 135 return NXT_OK; 136} 137 138 139static nxt_router_temp_conf_t * 140nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router) 141{ 142 nxt_mp_t *mp, *tmp; 143 nxt_router_conf_t *rtcf; 144 nxt_router_temp_conf_t *tmcf; 145 146 mp = nxt_mp_create(1024, 128, 256, 32); 147 if (nxt_slow_path(mp == NULL)) { 148 return NULL; 149 } 150 151 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 152 if (nxt_slow_path(rtcf == NULL)) { 153 goto fail; 154 } 155 156 rtcf->mem_pool = mp; 157 rtcf->router = router; 158 rtcf->count = 1; 159 160 tmp = nxt_mp_create(1024, 128, 256, 32); 161 if (nxt_slow_path(tmp == NULL)) { 162 goto fail; 163 } 164 165 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 166 if (nxt_slow_path(tmcf == NULL)) { 167 goto temp_fail; 168 } 169 170 tmcf->mem_pool = tmp; 171 tmcf->conf = rtcf; 172 173 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 174 sizeof(nxt_router_engine_conf_t)); 175 if (nxt_slow_path(tmcf->engines == NULL)) { 176 goto temp_fail; 177 } 178 179 nxt_queue_init(&tmcf->deleting); 180 nxt_queue_init(&tmcf->keeping); 181 nxt_queue_init(&tmcf->updating); 182 nxt_queue_init(&tmcf->pending); 183 nxt_queue_init(&tmcf->creating); 184 185 return tmcf; 186 187temp_fail: 188 189 nxt_mp_destroy(tmp); 190 191fail: 192 193 nxt_mp_destroy(mp); 194 195 return NULL; 196} 197 198 199static nxt_int_t 200nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 201{ 202 nxt_mp_t *mp; 203 nxt_sockaddr_t *sa; 204 nxt_socket_conf_t *skcf; 205 206 tmcf->conf->threads = 1; 207 208 mp = tmcf->conf->mem_pool; 209 210 sa = nxt_router_listen_sockaddr_stub(task, mp, 8000); 211 skcf = nxt_router_socket_conf(task, mp, sa); 212 213 skcf->listen.handler = nxt_router_conn_init; 214 skcf->header_buffer_size = 2048; 215 skcf->large_header_buffer_size = 8192; 216 skcf->header_read_timeout = 5000; 217 218 nxt_queue_insert_tail(&tmcf->pending, &skcf->link); 219 220 sa = nxt_router_listen_sockaddr_stub(task, mp, 8001); 221 skcf = nxt_router_socket_conf(task, mp, sa); 222 223 skcf->listen.handler = nxt_stream_connection_init; 224 skcf->header_read_timeout = 5000; 225 226 nxt_queue_insert_tail(&tmcf->pending, &skcf->link); 227 228 return NXT_OK; 229} 230 231 232static nxt_socket_conf_t * 233nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa) 234{ 235 nxt_socket_conf_t *conf; 236 237 conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t)); 238 if (nxt_slow_path(conf == NULL)) { 239 return NULL; 240 } 241 242 conf->listen.sockaddr = sa; 243 conf->listen.socklen = sa->socklen; 244 conf->listen.address_length = sa->length; 245 246 conf->listen.socket = -1; 247 conf->listen.backlog = NXT_LISTEN_BACKLOG; 248 conf->listen.flags = NXT_NONBLOCK; 249 conf->listen.read_after_accept = 1; 250 251 return conf; 252} 253 254 255static nxt_sockaddr_t * 256nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mp_t *mp, uint32_t port) 257{ 258 nxt_sockaddr_t *sa; 259 struct sockaddr_in sin; 260 261 nxt_memzero(&sin, sizeof(struct sockaddr_in)); 262 263 sin.sin_family = AF_INET; 264 sin.sin_port = htons(port); 265 266 sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin, 267 sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN); 268 if (nxt_slow_path(sa == NULL)) { 269 return NULL; 270 } 271 272 sa->type = SOCK_STREAM; 273 274 nxt_sockaddr_text(sa); 275 276 return sa; 277} 278 279 280static void 281nxt_router_listen_sockets_sort(nxt_router_t *router, 282 nxt_router_temp_conf_t *tmcf) 283{ 284 nxt_queue_link_t *nqlk, *oqlk, *next; 285 nxt_socket_conf_t *nskcf, *oskcf; 286 287 for (nqlk = nxt_queue_first(&tmcf->pending); 288 nqlk != nxt_queue_tail(&tmcf->pending); 289 nqlk = next) 290 { 291 next = nxt_queue_next(nqlk); 292 nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link); 293 294 for (oqlk = nxt_queue_first(&router->sockets); 295 oqlk != nxt_queue_tail(&router->sockets); 296 oqlk = nxt_queue_next(oqlk)) 297 { 298 oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link); 299 300 if (nxt_sockaddr_cmp(nskcf->listen.sockaddr, 301 oskcf->listen.sockaddr)) 302 { 303 nxt_queue_remove(oqlk); 304 nxt_queue_insert_tail(&tmcf->keeping, oqlk); 305 306 nxt_queue_remove(nqlk); 307 nxt_queue_insert_tail(&tmcf->updating, nqlk); 308 309 break; 310 } 311 } 312 } 313 314 nxt_queue_add(&tmcf->deleting, &router->sockets); 315} 316 317 318static nxt_int_t 319nxt_router_listen_sockets_stub_create(nxt_task_t *task, 320 nxt_router_temp_conf_t *tmcf) 321{ 322 nxt_queue_link_t *qlk, *nqlk; 323 nxt_socket_conf_t *skcf; 324 325 for (qlk = nxt_queue_first(&tmcf->pending); 326 qlk != nxt_queue_tail(&tmcf->pending); 327 qlk = nqlk) 328 { 329 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 330 331 if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) { 332 return NXT_ERROR; 333 } 334 335 nqlk = nxt_queue_next(qlk); 336 nxt_queue_remove(qlk); 337 nxt_queue_insert_tail(&tmcf->creating, qlk); 338 } 339 340 return NXT_OK; 341} 342 343 344static nxt_int_t 345nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 346 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 347{ 348 nxt_mp_t *mp; 349 nxt_int_t ret; 350 nxt_uint_t n, threads; 351 nxt_queue_link_t *qlk; 352 nxt_router_engine_conf_t *recf; 353 354 mp = tmcf->conf->mem_pool; 355 threads = tmcf->conf->threads; 356 357 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 358 sizeof(nxt_router_engine_conf_t)); 359 if (nxt_slow_path(tmcf->engines == NULL)) { 360 return NXT_ERROR; 361 } 362 363 n = 0; 364 365 for (qlk = nxt_queue_first(&router->engines); 366 qlk != nxt_queue_tail(&router->engines); 367 qlk = nxt_queue_next(qlk)) 368 { 369 recf = nxt_array_zero_add(tmcf->engines); 370 if (nxt_slow_path(recf == NULL)) { 371 return NXT_ERROR; 372 } 373 374 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link); 375 // STUB 376 recf->task = recf->engine->task; 377 378 if (n < threads) { 379 ret = nxt_router_engine_conf_update(task, mp, tmcf, recf); 380 381 } else { 382 ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf); 383 } 384 385 if (nxt_slow_path(ret != NXT_OK)) { 386 return ret; 387 } 388 389 n++; 390 } 391 392 tmcf->new_threads = n; 393 394 while (n < threads) { 395 recf = nxt_array_zero_add(tmcf->engines); 396 if (nxt_slow_path(recf == NULL)) { 397 return NXT_ERROR; 398 } 399 400 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 401 if (nxt_slow_path(recf->engine == NULL)) { 402 return NXT_ERROR; 403 } 404 // STUB 405 recf->task = recf->engine->task; 406 407 ret = nxt_router_engine_conf_create(task, mp, tmcf, recf); 408 if (nxt_slow_path(ret != NXT_OK)) { 409 return ret; 410 } 411 412 n++; 413 } 414 415 return NXT_OK; 416} 417 418 419static nxt_int_t 420nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp, 421 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) 422{ 423 nxt_int_t ret; 424 425 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 426 if (nxt_slow_path(recf->creating == NULL)) { 427 return NXT_ERROR; 428 } 429 430 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, 431 recf->creating, nxt_router_listen_socket_create); 432 if (nxt_slow_path(ret != NXT_OK)) { 433 return ret; 434 } 435 436 return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, 437 recf->creating, nxt_router_listen_socket_create); 438} 439 440 441static nxt_int_t 442nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp, 443 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) 444{ 445 nxt_int_t ret; 446 447 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 448 if (nxt_slow_path(recf->creating == NULL)) { 449 return NXT_ERROR; 450 } 451 452 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, 453 recf->creating, nxt_router_listen_socket_create); 454 if (nxt_slow_path(ret != NXT_OK)) { 455 return ret; 456 } 457 458 recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 459 if (nxt_slow_path(recf->updating == NULL)) { 460 return NXT_ERROR; 461 } 462 463 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, 464 recf->updating, nxt_router_listen_socket_update); 465 if (nxt_slow_path(ret != NXT_OK)) { 466 return ret; 467 } 468 469 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 470 if (nxt_slow_path(recf->deleting == NULL)) { 471 return NXT_ERROR; 472 } 473 474 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, 475 recf->deleting); 476} 477 478 479static nxt_int_t 480nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp, 481 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) 482{ 483 nxt_int_t ret; 484 485 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 486 if (nxt_slow_path(recf->deleting == NULL)) { 487 return NXT_ERROR; 488 } 489 490 ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating, 491 recf->deleting); 492 if (nxt_slow_path(ret != NXT_OK)) { 493 return ret; 494 } 495 496 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, 497 recf->deleting); 498} 499 500 501static nxt_int_t 502nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp, 503 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, 504 nxt_work_handler_t handler) 505{ 506 nxt_work_t *work; 507 nxt_queue_link_t *qlk; 508 nxt_socket_conf_joint_t *joint; 509 510 for (qlk = nxt_queue_first(sockets); 511 qlk != nxt_queue_tail(sockets); 512 qlk = nxt_queue_next(qlk)) 513 { 514 work = nxt_array_add(array); 515 if (nxt_slow_path(work == NULL)) { 516 return NXT_ERROR; 517 } 518 519 work->next = NULL; 520 work->handler = handler; 521 work->task = &recf->task; 522 work->obj = recf->engine; 523 524 joint = nxt_mp_alloc(mp, sizeof(nxt_socket_conf_joint_t)); 525 if (nxt_slow_path(joint == NULL)) { 526 return NXT_ERROR; 527 } 528 529 work->data = joint; 530 531 joint->count = 1; 532 joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 533 joint->engine = recf->engine; 534 } 535 536 return NXT_OK; 537} 538 539 540static nxt_int_t 541nxt_router_engine_joints_delete(nxt_task_t *task, 542 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array) 543{ 544 nxt_work_t *work; 545 nxt_queue_link_t *qlk; 546 547 for (qlk = nxt_queue_first(sockets); 548 qlk != nxt_queue_tail(sockets); 549 qlk = nxt_queue_next(qlk)) 550 { 551 work = nxt_array_add(array); 552 if (nxt_slow_path(work == NULL)) { 553 return NXT_ERROR; 554 } 555 556 work->next = NULL; 557 work->handler = nxt_router_listen_socket_delete; 558 work->task = &recf->task; 559 work->obj = recf->engine; 560 work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 561 } 562 563 return NXT_OK; 564} 565 566 567static nxt_int_t 568nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 569 nxt_router_temp_conf_t *tmcf) 570{ 571 nxt_int_t ret; 572 nxt_uint_t i, threads; 573 nxt_router_engine_conf_t *recf; 574 575 recf = tmcf->engines->elts; 576 threads = tmcf->conf->threads; 577 578 for (i = tmcf->new_threads; i < threads; i++) { 579 ret = nxt_router_thread_create(task, rt, recf[i].engine); 580 if (nxt_slow_path(ret != NXT_OK)) { 581 return ret; 582 } 583 } 584 585 return NXT_OK; 586} 587 588 589static nxt_int_t 590nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 591 nxt_event_engine_t *engine) 592{ 593 nxt_mp_t *mp; 594 nxt_int_t ret; 595 nxt_port_t *port; 596 nxt_process_t *process; 597 nxt_thread_link_t *link; 598 nxt_thread_handle_t handle; 599 600 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 601 602 if (nxt_slow_path(link == NULL)) { 603 return NXT_ERROR; 604 } 605 606 link->start = nxt_router_thread_start; 607 link->engine = engine; 608 link->work.handler = nxt_router_thread_exit_handler; 609 link->work.task = task; 610 link->work.data = link; 611 612 nxt_queue_insert_tail(&rt->engines, &engine->link); 613 614 615 process = nxt_runtime_process_find(rt, nxt_pid); 616 if (nxt_slow_path(process == NULL)) { 617 return NXT_ERROR; 618 } 619 620 port = nxt_process_port_new(process); 621 if (nxt_slow_path(port == NULL)) { 622 return NXT_ERROR; 623 } 624 625 ret = nxt_port_socket_init(task, port, 0); 626 if (nxt_slow_path(ret != NXT_OK)) { 627 return ret; 628 } 629 630 mp = nxt_mp_create(1024, 128, 256, 32); 631 if (nxt_slow_path(mp == NULL)) { 632 return NXT_ERROR; 633 } 634 635 port->mem_pool = mp; 636 port->engine = 0; 637 port->type = NXT_PROCESS_ROUTER; 638 639 engine->port = port; 640 641 nxt_runtime_port_add(rt, port); 642 643 644 ret = nxt_thread_create(&handle, link); 645 646 if (nxt_slow_path(ret != NXT_OK)) { 647 nxt_queue_remove(&engine->link); 648 } 649 650 return ret; 651} 652 653 654static void 655nxt_router_engines_post(nxt_router_temp_conf_t *tmcf) 656{ 657 nxt_uint_t n; 658 nxt_router_engine_conf_t *recf; 659 660 recf = tmcf->engines->elts; 661 662 for (n = tmcf->engines->nelts; n != 0; n--) { 663 nxt_router_engine_post(recf); 664 recf++; 665 } 666} 667 668 669static void 670nxt_router_engine_post(nxt_router_engine_conf_t *recf) 671{ 672 nxt_uint_t n; 673 nxt_work_t *work; 674 675 work = recf->creating->elts; 676 677 for (n = recf->creating->nelts; n != 0; n--) { 678 nxt_event_engine_post(recf->engine, work); 679 work++; 680 } 681} 682 683 684static void 685nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 686 687nxt_port_handler_t nxt_router_process_port_handlers[] = { 688 NULL, 689 nxt_port_new_port_handler, 690 nxt_port_change_log_file_handler, 691 nxt_port_mmap_handler, 692 nxt_router_data_handler, 693}; 694 695 696static void 697nxt_router_thread_start(void *data) 698{ 699 nxt_task_t *task; 700 nxt_thread_t *thread; 701 nxt_thread_link_t *link; 702 nxt_event_engine_t *engine; 703 704 link = data; 705 engine = link->engine; 706 task = &engine->task; 707 708 thread = nxt_thread(); 709 710 /* STUB */ 711 thread->runtime = engine->task.thread->runtime; 712 713 engine->task.thread = thread; 714 engine->task.log = thread->log; 715 thread->engine = engine; 716 thread->task = &engine->task; 717 thread->fiber = &engine->fibers->fiber; 718 719 engine->port->socket.task = task; 720 nxt_port_create(task, engine->port, nxt_router_process_port_handlers); 721 722 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 723 724 nxt_event_engine_start(engine); 725} 726 727 728static void 729nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 730{ 731 nxt_listen_event_t *listen; 732 nxt_listen_socket_t *ls; 733 nxt_socket_conf_joint_t *joint; 734 735 joint = data; 736 737 ls = &joint->socket_conf->listen; 738 739 listen = nxt_listen_event(task, ls); 740 if (nxt_slow_path(listen == NULL)) { 741 nxt_router_listen_socket_release(task, joint); 742 return; 743 } 744 745 listen->socket.data = joint; 746} 747 748 749nxt_inline nxt_listen_event_t * 750nxt_router_listen_event(nxt_queue_t *listen_connections, 751 nxt_socket_conf_t *skcf) 752{ 753 nxt_socket_t socket; 754 nxt_queue_link_t *link; 755 nxt_listen_event_t *listen; 756 757 socket = skcf->listen.socket; 758 759 for (link = nxt_queue_first(listen_connections); 760 link != nxt_queue_tail(listen_connections); 761 link = nxt_queue_next(link)) 762 { 763 listen = nxt_queue_link_data(link, nxt_listen_event_t, link); 764 765 if (socket == listen->socket.fd) { 766 return listen; 767 } 768 } 769 770 return NULL; 771} 772 773 774static void 775nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 776{ 777 nxt_event_engine_t *engine; 778 nxt_listen_event_t *listen; 779 nxt_socket_conf_joint_t *joint, *old; 780 781 engine = obj; 782 joint = data; 783 784 listen = nxt_router_listen_event(&engine->listen_connections, 785 joint->socket_conf); 786 787 old = listen->socket.data; 788 listen->socket.data = joint; 789 790 nxt_router_conf_release(task, old); 791} 792 793 794static void 795nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 796{ 797 nxt_socket_conf_t *skcf; 798 nxt_listen_event_t *listen; 799 nxt_event_engine_t *engine; 800 801 engine = obj; 802 skcf = data; 803 804 listen = nxt_router_listen_event(&engine->listen_connections, skcf); 805 806 nxt_fd_event_delete(engine, &listen->socket); 807 808 listen->timer.handler = nxt_router_listen_socket_close; 809 listen->timer.work_queue = &engine->fast_work_queue; 810 811 nxt_timer_add(engine, &listen->timer, 0); 812} 813 814 815static void 816nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 817{ 818 nxt_timer_t *timer; 819 nxt_listen_event_t *listen; 820 nxt_socket_conf_joint_t *joint; 821 822 timer = obj; 823 listen = nxt_timer_data(timer, nxt_listen_event_t, timer); 824 joint = listen->socket.data; 825 826 nxt_queue_remove(&listen->link); 827 nxt_free(listen); 828 829 nxt_router_listen_socket_release(task, joint); 830} 831 832 833static void 834nxt_router_listen_socket_release(nxt_task_t *task, 835 nxt_socket_conf_joint_t *joint) 836{ 837 nxt_socket_t s; 838 nxt_listen_socket_t *ls; 839 nxt_thread_spinlock_t *lock; 840 841 s = -1; 842 ls = &joint->socket_conf->listen; 843 lock = &joint->socket_conf->router_conf->router->lock; 844 845 nxt_thread_spin_lock(lock); 846 847 if (--ls->count == 0) { 848 s = ls->socket; 849 ls->socket = -1; 850 } 851 852 nxt_thread_spin_unlock(lock); 853 854 if (s != -1) { 855 nxt_socket_close(task, s); 856 } 857 858 nxt_router_conf_release(task, joint); 859} 860 861 862static void 863nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 864{ 865 nxt_socket_conf_t *skcf; 866 nxt_router_conf_t *rtcf; 867 nxt_thread_spinlock_t *lock; 868 869 nxt_debug(task, "conf joint count: %D", joint->count); 870 871 if (--joint->count != 0) { 872 return; 873 } 874 875 nxt_queue_remove(&joint->link); 876 877 skcf = joint->socket_conf; 878 rtcf = skcf->router_conf; 879 lock = &rtcf->router->lock; 880 881 nxt_thread_spin_lock(lock); 882 883 if (--skcf->count != 0) { 884 rtcf = NULL; 885 886 } else { 887 nxt_queue_remove(&skcf->link); 888 889 if (--rtcf->count != 0) { 890 rtcf = NULL; 891 } 892 } 893 894 nxt_thread_spin_unlock(lock); 895 896 if (rtcf != NULL) { 897 nxt_mp_destroy(rtcf->mem_pool); 898 } 899 900 if (nxt_queue_is_empty(&joint->engine->joints)) { 901 nxt_thread_exit(task->thread); 902 } 903} 904 905 906static void 907nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 908{ 909 nxt_thread_link_t *link; 910 nxt_event_engine_t *engine; 911 nxt_thread_handle_t handle; 912 913 handle = (nxt_thread_handle_t) obj; 914 link = data; 915 916 nxt_thread_wait(handle); 917 918 engine = link->engine; 919 920 nxt_queue_remove(&engine->link); 921 922 nxt_mp_destroy(engine->mem_pool); 923 924 nxt_event_engine_free(engine); 925 926 nxt_free(link); 927 928 // TODO: free port 929} 930 931 932static const nxt_conn_state_t nxt_router_conn_read_state 933 nxt_aligned(64) = 934{ 935 .ready_handler = nxt_router_conn_http_header_parse, 936 .close_handler = nxt_router_conn_close, 937 .error_handler = nxt_router_conn_error, 938 939 .timer_handler = nxt_router_conn_timeout, 940 .timer_value = nxt_router_conn_timeout_value, 941 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), 942}; 943 944 945static void 946nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) 947{ 948 size_t size; 949 nxt_conn_t *c; 950 nxt_event_engine_t *engine; 951 nxt_socket_conf_joint_t *joint; 952 953 c = obj; 954 joint = data; 955 956 nxt_debug(task, "router conn init"); 957 958 joint->count++; 959 960 size = joint->socket_conf->header_buffer_size; 961 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0); 962 963 c->socket.data = NULL; 964 965 engine = task->thread->engine; 966 c->read_work_queue = &engine->fast_work_queue; 967 c->write_work_queue = &engine->fast_work_queue; 968 969 c->read_state = &nxt_router_conn_read_state; 970 971 nxt_conn_read(engine, c); 972} 973 974 975static const nxt_conn_state_t nxt_router_conn_write_state 976 nxt_aligned(64) = 977{ 978 .ready_handler = nxt_router_conn_ready, 979 .close_handler = nxt_router_conn_close, 980 .error_handler = nxt_router_conn_error, 981}; 982 983 984static void 985nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 986{ 987 size_t dump_size; 988 nxt_buf_t *b, *i, *last; 989 nxt_conn_t *c; 990 nxt_req_conn_link_t *rc; 991 nxt_event_engine_t *engine; 992 993 b = msg->buf; 994 engine = task->thread->engine; 995 996 rc = nxt_event_engine_request_find(engine, msg->port_msg.stream); 997 if (nxt_slow_path(rc == NULL)) { 998 999 nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream); 1000 1001 /* Mark buffers as read. */ 1002 for (i = b; i != NULL; i = i->next) { 1003 i->mem.pos = i->mem.free; 1004 } 1005 1006 return; 1007 } 1008 1009 c = rc->conn; 1010 1011 dump_size = nxt_buf_used_size(b); 1012 1013 if (dump_size > 300) { 1014 dump_size = 300; 1015 } 1016 1017 nxt_debug(task, "%srouter data (%z): %*s", 1018 msg->port_msg.last ? "last " : "", msg->size, dump_size, 1019 b->mem.pos); 1020 1021 if (msg->size == 0) { 1022 b = NULL; 1023 } 1024 1025 if (msg->port_msg.last != 0) { 1026 nxt_debug(task, "router data create last buf"); 1027 1028 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); 1029 if (nxt_slow_path(last == NULL)) { 1030 /* TODO pogorevaTb */ 1031 } 1032 1033 nxt_buf_chain_add(&b, last); 1034 } 1035 1036 if (b == NULL) { 1037 return; 1038 } 1039 1040 if (c->write == NULL) { 1041 c->write = b; 1042 c->write_state = &nxt_router_conn_write_state; 1043 1044 nxt_conn_write(task->thread->engine, c); 1045 } else { 1046 nxt_debug(task, "router data attach out bufs to existing chain"); 1047 1048 nxt_buf_chain_add(&c->write, b); 1049 } 1050} 1051 1052 1053nxt_inline nxt_port_t * 1054nxt_router_app_port(nxt_task_t *task) 1055{ 1056 nxt_port_t *port; 1057 nxt_runtime_t *rt; 1058 1059 rt = task->thread->runtime; 1060 1061 nxt_runtime_port_each(rt, port) { 1062 1063 if (nxt_pid == port->pid) { 1064 continue; 1065 } 1066 1067 if (port->type == NXT_PROCESS_WORKER) { 1068 return port; 1069 } 1070 1071 } nxt_runtime_port_loop; 1072 1073 return NULL; 1074} 1075 1076 1077static void 1078nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) 1079{ 1080 size_t size, preread; 1081 nxt_int_t ret; 1082 nxt_buf_t *b; 1083 nxt_conn_t *c; 1084 nxt_app_parse_ctx_t *ap; 1085 nxt_socket_conf_joint_t *joint; 1086 nxt_app_request_header_t *h; 1087 1088 c = obj; 1089 ap = data; 1090 b = c->read; 1091 1092 nxt_debug(task, "router conn http header parse"); 1093 1094 if (ap == NULL) { 1095 ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t)); 1096 if (nxt_slow_path(ap == NULL)) { 1097 nxt_router_conn_close(task, c, data); 1098 return; 1099 } 1100 1101 ret = nxt_app_http_req_init(task, ap); 1102 if (nxt_slow_path(ret != NXT_OK)) { 1103 nxt_router_conn_close(task, c, data); 1104 return; 1105 } 1106 1107 c->socket.data = ap;
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> 9#include <nxt_application.h> 10 11 12static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task, 13 nxt_router_t *router); 14static void nxt_router_listen_sockets_sort(nxt_router_t *router, 15 nxt_router_temp_conf_t *tmcf); 16 17static nxt_int_t nxt_router_stub_conf(nxt_task_t *task, 18 nxt_router_temp_conf_t *tmcf); 19static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task, 20 nxt_router_temp_conf_t *tmcf); 21static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, 22 nxt_sockaddr_t *sa); 23static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task, 24 nxt_mp_t *mp, uint32_t port); 25 26static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 27 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 28 const nxt_event_interface_t *interface); 29static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp, 30 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); 31static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp, 32 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); 33static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp, 34 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); 35static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp, 36 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, 37 nxt_work_handler_t handler); 38static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task, 39 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array); 40 41static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 42 nxt_router_temp_conf_t *tmcf); 43static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 44 nxt_event_engine_t *engine); 45 46static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf); 47static void nxt_router_engine_post(nxt_router_engine_conf_t *recf); 48 49static void nxt_router_thread_start(void *data); 50static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 51 void *data); 52static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 53 void *data); 54static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 55 void *data); 56static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 57 void *data); 58static void nxt_router_listen_socket_release(nxt_task_t *task, 59 nxt_socket_conf_joint_t *joint); 60static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 61 void *data); 62static void nxt_router_conf_release(nxt_task_t *task, 63 nxt_socket_conf_joint_t *joint); 64 65static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); 66static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, 67 void *data); 68static void nxt_router_process_http_request(nxt_task_t *task, 69 nxt_conn_t *c, nxt_app_parse_ctx_t *ap); 70static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); 71static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); 72static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); 73static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); 74static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); 75static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); 76 77 78nxt_int_t 79nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) 80{ 81 nxt_int_t ret; 82 nxt_router_t *router; 83 nxt_router_temp_conf_t *tmcf; 84 const nxt_event_interface_t *interface; 85 86 ret = nxt_app_http_init(task, rt); 87 if (nxt_slow_path(ret != NXT_OK)) { 88 return ret; 89 } 90 91 router = nxt_zalloc(sizeof(nxt_router_t)); 92 if (nxt_slow_path(router == NULL)) { 93 return NXT_ERROR; 94 } 95 96 nxt_queue_init(&router->engines); 97 nxt_queue_init(&router->sockets); 98 99 /**/ 100 101 tmcf = nxt_router_temp_conf(task, router); 102 if (nxt_slow_path(tmcf == NULL)) { 103 return NXT_ERROR; 104 } 105 106 ret = nxt_router_stub_conf(task, tmcf); 107 if (nxt_slow_path(ret != NXT_OK)) { 108 return ret; 109 } 110 111 nxt_router_listen_sockets_sort(router, tmcf); 112 113 ret = nxt_router_listen_sockets_stub_create(task, tmcf); 114 if (nxt_slow_path(ret != NXT_OK)) { 115 return ret; 116 } 117 118 interface = nxt_service_get(rt->services, "engine", NULL); 119 120 ret = nxt_router_engines_create(task, router, tmcf, interface); 121 if (nxt_slow_path(ret != NXT_OK)) { 122 return ret; 123 } 124 125 ret = nxt_router_threads_create(task, rt, tmcf); 126 if (nxt_slow_path(ret != NXT_OK)) { 127 return ret; 128 } 129 130 nxt_router_engines_post(tmcf); 131 132 nxt_queue_add(&router->sockets, &tmcf->updating); 133 nxt_queue_add(&router->sockets, &tmcf->creating); 134 135 return NXT_OK; 136} 137 138 139static nxt_router_temp_conf_t * 140nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router) 141{ 142 nxt_mp_t *mp, *tmp; 143 nxt_router_conf_t *rtcf; 144 nxt_router_temp_conf_t *tmcf; 145 146 mp = nxt_mp_create(1024, 128, 256, 32); 147 if (nxt_slow_path(mp == NULL)) { 148 return NULL; 149 } 150 151 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 152 if (nxt_slow_path(rtcf == NULL)) { 153 goto fail; 154 } 155 156 rtcf->mem_pool = mp; 157 rtcf->router = router; 158 rtcf->count = 1; 159 160 tmp = nxt_mp_create(1024, 128, 256, 32); 161 if (nxt_slow_path(tmp == NULL)) { 162 goto fail; 163 } 164 165 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 166 if (nxt_slow_path(tmcf == NULL)) { 167 goto temp_fail; 168 } 169 170 tmcf->mem_pool = tmp; 171 tmcf->conf = rtcf; 172 173 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 174 sizeof(nxt_router_engine_conf_t)); 175 if (nxt_slow_path(tmcf->engines == NULL)) { 176 goto temp_fail; 177 } 178 179 nxt_queue_init(&tmcf->deleting); 180 nxt_queue_init(&tmcf->keeping); 181 nxt_queue_init(&tmcf->updating); 182 nxt_queue_init(&tmcf->pending); 183 nxt_queue_init(&tmcf->creating); 184 185 return tmcf; 186 187temp_fail: 188 189 nxt_mp_destroy(tmp); 190 191fail: 192 193 nxt_mp_destroy(mp); 194 195 return NULL; 196} 197 198 199static nxt_int_t 200nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 201{ 202 nxt_mp_t *mp; 203 nxt_sockaddr_t *sa; 204 nxt_socket_conf_t *skcf; 205 206 tmcf->conf->threads = 1; 207 208 mp = tmcf->conf->mem_pool; 209 210 sa = nxt_router_listen_sockaddr_stub(task, mp, 8000); 211 skcf = nxt_router_socket_conf(task, mp, sa); 212 213 skcf->listen.handler = nxt_router_conn_init; 214 skcf->header_buffer_size = 2048; 215 skcf->large_header_buffer_size = 8192; 216 skcf->header_read_timeout = 5000; 217 218 nxt_queue_insert_tail(&tmcf->pending, &skcf->link); 219 220 sa = nxt_router_listen_sockaddr_stub(task, mp, 8001); 221 skcf = nxt_router_socket_conf(task, mp, sa); 222 223 skcf->listen.handler = nxt_stream_connection_init; 224 skcf->header_read_timeout = 5000; 225 226 nxt_queue_insert_tail(&tmcf->pending, &skcf->link); 227 228 return NXT_OK; 229} 230 231 232static nxt_socket_conf_t * 233nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa) 234{ 235 nxt_socket_conf_t *conf; 236 237 conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t)); 238 if (nxt_slow_path(conf == NULL)) { 239 return NULL; 240 } 241 242 conf->listen.sockaddr = sa; 243 conf->listen.socklen = sa->socklen; 244 conf->listen.address_length = sa->length; 245 246 conf->listen.socket = -1; 247 conf->listen.backlog = NXT_LISTEN_BACKLOG; 248 conf->listen.flags = NXT_NONBLOCK; 249 conf->listen.read_after_accept = 1; 250 251 return conf; 252} 253 254 255static nxt_sockaddr_t * 256nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mp_t *mp, uint32_t port) 257{ 258 nxt_sockaddr_t *sa; 259 struct sockaddr_in sin; 260 261 nxt_memzero(&sin, sizeof(struct sockaddr_in)); 262 263 sin.sin_family = AF_INET; 264 sin.sin_port = htons(port); 265 266 sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin, 267 sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN); 268 if (nxt_slow_path(sa == NULL)) { 269 return NULL; 270 } 271 272 sa->type = SOCK_STREAM; 273 274 nxt_sockaddr_text(sa); 275 276 return sa; 277} 278 279 280static void 281nxt_router_listen_sockets_sort(nxt_router_t *router, 282 nxt_router_temp_conf_t *tmcf) 283{ 284 nxt_queue_link_t *nqlk, *oqlk, *next; 285 nxt_socket_conf_t *nskcf, *oskcf; 286 287 for (nqlk = nxt_queue_first(&tmcf->pending); 288 nqlk != nxt_queue_tail(&tmcf->pending); 289 nqlk = next) 290 { 291 next = nxt_queue_next(nqlk); 292 nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link); 293 294 for (oqlk = nxt_queue_first(&router->sockets); 295 oqlk != nxt_queue_tail(&router->sockets); 296 oqlk = nxt_queue_next(oqlk)) 297 { 298 oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link); 299 300 if (nxt_sockaddr_cmp(nskcf->listen.sockaddr, 301 oskcf->listen.sockaddr)) 302 { 303 nxt_queue_remove(oqlk); 304 nxt_queue_insert_tail(&tmcf->keeping, oqlk); 305 306 nxt_queue_remove(nqlk); 307 nxt_queue_insert_tail(&tmcf->updating, nqlk); 308 309 break; 310 } 311 } 312 } 313 314 nxt_queue_add(&tmcf->deleting, &router->sockets); 315} 316 317 318static nxt_int_t 319nxt_router_listen_sockets_stub_create(nxt_task_t *task, 320 nxt_router_temp_conf_t *tmcf) 321{ 322 nxt_queue_link_t *qlk, *nqlk; 323 nxt_socket_conf_t *skcf; 324 325 for (qlk = nxt_queue_first(&tmcf->pending); 326 qlk != nxt_queue_tail(&tmcf->pending); 327 qlk = nqlk) 328 { 329 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 330 331 if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) { 332 return NXT_ERROR; 333 } 334 335 nqlk = nxt_queue_next(qlk); 336 nxt_queue_remove(qlk); 337 nxt_queue_insert_tail(&tmcf->creating, qlk); 338 } 339 340 return NXT_OK; 341} 342 343 344static nxt_int_t 345nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 346 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 347{ 348 nxt_mp_t *mp; 349 nxt_int_t ret; 350 nxt_uint_t n, threads; 351 nxt_queue_link_t *qlk; 352 nxt_router_engine_conf_t *recf; 353 354 mp = tmcf->conf->mem_pool; 355 threads = tmcf->conf->threads; 356 357 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 358 sizeof(nxt_router_engine_conf_t)); 359 if (nxt_slow_path(tmcf->engines == NULL)) { 360 return NXT_ERROR; 361 } 362 363 n = 0; 364 365 for (qlk = nxt_queue_first(&router->engines); 366 qlk != nxt_queue_tail(&router->engines); 367 qlk = nxt_queue_next(qlk)) 368 { 369 recf = nxt_array_zero_add(tmcf->engines); 370 if (nxt_slow_path(recf == NULL)) { 371 return NXT_ERROR; 372 } 373 374 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link); 375 // STUB 376 recf->task = recf->engine->task; 377 378 if (n < threads) { 379 ret = nxt_router_engine_conf_update(task, mp, tmcf, recf); 380 381 } else { 382 ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf); 383 } 384 385 if (nxt_slow_path(ret != NXT_OK)) { 386 return ret; 387 } 388 389 n++; 390 } 391 392 tmcf->new_threads = n; 393 394 while (n < threads) { 395 recf = nxt_array_zero_add(tmcf->engines); 396 if (nxt_slow_path(recf == NULL)) { 397 return NXT_ERROR; 398 } 399 400 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 401 if (nxt_slow_path(recf->engine == NULL)) { 402 return NXT_ERROR; 403 } 404 // STUB 405 recf->task = recf->engine->task; 406 407 ret = nxt_router_engine_conf_create(task, mp, tmcf, recf); 408 if (nxt_slow_path(ret != NXT_OK)) { 409 return ret; 410 } 411 412 n++; 413 } 414 415 return NXT_OK; 416} 417 418 419static nxt_int_t 420nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp, 421 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) 422{ 423 nxt_int_t ret; 424 425 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 426 if (nxt_slow_path(recf->creating == NULL)) { 427 return NXT_ERROR; 428 } 429 430 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, 431 recf->creating, nxt_router_listen_socket_create); 432 if (nxt_slow_path(ret != NXT_OK)) { 433 return ret; 434 } 435 436 return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, 437 recf->creating, nxt_router_listen_socket_create); 438} 439 440 441static nxt_int_t 442nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp, 443 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) 444{ 445 nxt_int_t ret; 446 447 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 448 if (nxt_slow_path(recf->creating == NULL)) { 449 return NXT_ERROR; 450 } 451 452 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, 453 recf->creating, nxt_router_listen_socket_create); 454 if (nxt_slow_path(ret != NXT_OK)) { 455 return ret; 456 } 457 458 recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 459 if (nxt_slow_path(recf->updating == NULL)) { 460 return NXT_ERROR; 461 } 462 463 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, 464 recf->updating, nxt_router_listen_socket_update); 465 if (nxt_slow_path(ret != NXT_OK)) { 466 return ret; 467 } 468 469 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 470 if (nxt_slow_path(recf->deleting == NULL)) { 471 return NXT_ERROR; 472 } 473 474 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, 475 recf->deleting); 476} 477 478 479static nxt_int_t 480nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp, 481 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) 482{ 483 nxt_int_t ret; 484 485 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 486 if (nxt_slow_path(recf->deleting == NULL)) { 487 return NXT_ERROR; 488 } 489 490 ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating, 491 recf->deleting); 492 if (nxt_slow_path(ret != NXT_OK)) { 493 return ret; 494 } 495 496 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, 497 recf->deleting); 498} 499 500 501static nxt_int_t 502nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp, 503 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, 504 nxt_work_handler_t handler) 505{ 506 nxt_work_t *work; 507 nxt_queue_link_t *qlk; 508 nxt_socket_conf_joint_t *joint; 509 510 for (qlk = nxt_queue_first(sockets); 511 qlk != nxt_queue_tail(sockets); 512 qlk = nxt_queue_next(qlk)) 513 { 514 work = nxt_array_add(array); 515 if (nxt_slow_path(work == NULL)) { 516 return NXT_ERROR; 517 } 518 519 work->next = NULL; 520 work->handler = handler; 521 work->task = &recf->task; 522 work->obj = recf->engine; 523 524 joint = nxt_mp_alloc(mp, sizeof(nxt_socket_conf_joint_t)); 525 if (nxt_slow_path(joint == NULL)) { 526 return NXT_ERROR; 527 } 528 529 work->data = joint; 530 531 joint->count = 1; 532 joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 533 joint->engine = recf->engine; 534 } 535 536 return NXT_OK; 537} 538 539 540static nxt_int_t 541nxt_router_engine_joints_delete(nxt_task_t *task, 542 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array) 543{ 544 nxt_work_t *work; 545 nxt_queue_link_t *qlk; 546 547 for (qlk = nxt_queue_first(sockets); 548 qlk != nxt_queue_tail(sockets); 549 qlk = nxt_queue_next(qlk)) 550 { 551 work = nxt_array_add(array); 552 if (nxt_slow_path(work == NULL)) { 553 return NXT_ERROR; 554 } 555 556 work->next = NULL; 557 work->handler = nxt_router_listen_socket_delete; 558 work->task = &recf->task; 559 work->obj = recf->engine; 560 work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 561 } 562 563 return NXT_OK; 564} 565 566 567static nxt_int_t 568nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 569 nxt_router_temp_conf_t *tmcf) 570{ 571 nxt_int_t ret; 572 nxt_uint_t i, threads; 573 nxt_router_engine_conf_t *recf; 574 575 recf = tmcf->engines->elts; 576 threads = tmcf->conf->threads; 577 578 for (i = tmcf->new_threads; i < threads; i++) { 579 ret = nxt_router_thread_create(task, rt, recf[i].engine); 580 if (nxt_slow_path(ret != NXT_OK)) { 581 return ret; 582 } 583 } 584 585 return NXT_OK; 586} 587 588 589static nxt_int_t 590nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 591 nxt_event_engine_t *engine) 592{ 593 nxt_mp_t *mp; 594 nxt_int_t ret; 595 nxt_port_t *port; 596 nxt_process_t *process; 597 nxt_thread_link_t *link; 598 nxt_thread_handle_t handle; 599 600 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 601 602 if (nxt_slow_path(link == NULL)) { 603 return NXT_ERROR; 604 } 605 606 link->start = nxt_router_thread_start; 607 link->engine = engine; 608 link->work.handler = nxt_router_thread_exit_handler; 609 link->work.task = task; 610 link->work.data = link; 611 612 nxt_queue_insert_tail(&rt->engines, &engine->link); 613 614 615 process = nxt_runtime_process_find(rt, nxt_pid); 616 if (nxt_slow_path(process == NULL)) { 617 return NXT_ERROR; 618 } 619 620 port = nxt_process_port_new(process); 621 if (nxt_slow_path(port == NULL)) { 622 return NXT_ERROR; 623 } 624 625 ret = nxt_port_socket_init(task, port, 0); 626 if (nxt_slow_path(ret != NXT_OK)) { 627 return ret; 628 } 629 630 mp = nxt_mp_create(1024, 128, 256, 32); 631 if (nxt_slow_path(mp == NULL)) { 632 return NXT_ERROR; 633 } 634 635 port->mem_pool = mp; 636 port->engine = 0; 637 port->type = NXT_PROCESS_ROUTER; 638 639 engine->port = port; 640 641 nxt_runtime_port_add(rt, port); 642 643 644 ret = nxt_thread_create(&handle, link); 645 646 if (nxt_slow_path(ret != NXT_OK)) { 647 nxt_queue_remove(&engine->link); 648 } 649 650 return ret; 651} 652 653 654static void 655nxt_router_engines_post(nxt_router_temp_conf_t *tmcf) 656{ 657 nxt_uint_t n; 658 nxt_router_engine_conf_t *recf; 659 660 recf = tmcf->engines->elts; 661 662 for (n = tmcf->engines->nelts; n != 0; n--) { 663 nxt_router_engine_post(recf); 664 recf++; 665 } 666} 667 668 669static void 670nxt_router_engine_post(nxt_router_engine_conf_t *recf) 671{ 672 nxt_uint_t n; 673 nxt_work_t *work; 674 675 work = recf->creating->elts; 676 677 for (n = recf->creating->nelts; n != 0; n--) { 678 nxt_event_engine_post(recf->engine, work); 679 work++; 680 } 681} 682 683 684static void 685nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 686 687nxt_port_handler_t nxt_router_process_port_handlers[] = { 688 NULL, 689 nxt_port_new_port_handler, 690 nxt_port_change_log_file_handler, 691 nxt_port_mmap_handler, 692 nxt_router_data_handler, 693}; 694 695 696static void 697nxt_router_thread_start(void *data) 698{ 699 nxt_task_t *task; 700 nxt_thread_t *thread; 701 nxt_thread_link_t *link; 702 nxt_event_engine_t *engine; 703 704 link = data; 705 engine = link->engine; 706 task = &engine->task; 707 708 thread = nxt_thread(); 709 710 /* STUB */ 711 thread->runtime = engine->task.thread->runtime; 712 713 engine->task.thread = thread; 714 engine->task.log = thread->log; 715 thread->engine = engine; 716 thread->task = &engine->task; 717 thread->fiber = &engine->fibers->fiber; 718 719 engine->port->socket.task = task; 720 nxt_port_create(task, engine->port, nxt_router_process_port_handlers); 721 722 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 723 724 nxt_event_engine_start(engine); 725} 726 727 728static void 729nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 730{ 731 nxt_listen_event_t *listen; 732 nxt_listen_socket_t *ls; 733 nxt_socket_conf_joint_t *joint; 734 735 joint = data; 736 737 ls = &joint->socket_conf->listen; 738 739 listen = nxt_listen_event(task, ls); 740 if (nxt_slow_path(listen == NULL)) { 741 nxt_router_listen_socket_release(task, joint); 742 return; 743 } 744 745 listen->socket.data = joint; 746} 747 748 749nxt_inline nxt_listen_event_t * 750nxt_router_listen_event(nxt_queue_t *listen_connections, 751 nxt_socket_conf_t *skcf) 752{ 753 nxt_socket_t socket; 754 nxt_queue_link_t *link; 755 nxt_listen_event_t *listen; 756 757 socket = skcf->listen.socket; 758 759 for (link = nxt_queue_first(listen_connections); 760 link != nxt_queue_tail(listen_connections); 761 link = nxt_queue_next(link)) 762 { 763 listen = nxt_queue_link_data(link, nxt_listen_event_t, link); 764 765 if (socket == listen->socket.fd) { 766 return listen; 767 } 768 } 769 770 return NULL; 771} 772 773 774static void 775nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 776{ 777 nxt_event_engine_t *engine; 778 nxt_listen_event_t *listen; 779 nxt_socket_conf_joint_t *joint, *old; 780 781 engine = obj; 782 joint = data; 783 784 listen = nxt_router_listen_event(&engine->listen_connections, 785 joint->socket_conf); 786 787 old = listen->socket.data; 788 listen->socket.data = joint; 789 790 nxt_router_conf_release(task, old); 791} 792 793 794static void 795nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 796{ 797 nxt_socket_conf_t *skcf; 798 nxt_listen_event_t *listen; 799 nxt_event_engine_t *engine; 800 801 engine = obj; 802 skcf = data; 803 804 listen = nxt_router_listen_event(&engine->listen_connections, skcf); 805 806 nxt_fd_event_delete(engine, &listen->socket); 807 808 listen->timer.handler = nxt_router_listen_socket_close; 809 listen->timer.work_queue = &engine->fast_work_queue; 810 811 nxt_timer_add(engine, &listen->timer, 0); 812} 813 814 815static void 816nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 817{ 818 nxt_timer_t *timer; 819 nxt_listen_event_t *listen; 820 nxt_socket_conf_joint_t *joint; 821 822 timer = obj; 823 listen = nxt_timer_data(timer, nxt_listen_event_t, timer); 824 joint = listen->socket.data; 825 826 nxt_queue_remove(&listen->link); 827 nxt_free(listen); 828 829 nxt_router_listen_socket_release(task, joint); 830} 831 832 833static void 834nxt_router_listen_socket_release(nxt_task_t *task, 835 nxt_socket_conf_joint_t *joint) 836{ 837 nxt_socket_t s; 838 nxt_listen_socket_t *ls; 839 nxt_thread_spinlock_t *lock; 840 841 s = -1; 842 ls = &joint->socket_conf->listen; 843 lock = &joint->socket_conf->router_conf->router->lock; 844 845 nxt_thread_spin_lock(lock); 846 847 if (--ls->count == 0) { 848 s = ls->socket; 849 ls->socket = -1; 850 } 851 852 nxt_thread_spin_unlock(lock); 853 854 if (s != -1) { 855 nxt_socket_close(task, s); 856 } 857 858 nxt_router_conf_release(task, joint); 859} 860 861 862static void 863nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 864{ 865 nxt_socket_conf_t *skcf; 866 nxt_router_conf_t *rtcf; 867 nxt_thread_spinlock_t *lock; 868 869 nxt_debug(task, "conf joint count: %D", joint->count); 870 871 if (--joint->count != 0) { 872 return; 873 } 874 875 nxt_queue_remove(&joint->link); 876 877 skcf = joint->socket_conf; 878 rtcf = skcf->router_conf; 879 lock = &rtcf->router->lock; 880 881 nxt_thread_spin_lock(lock); 882 883 if (--skcf->count != 0) { 884 rtcf = NULL; 885 886 } else { 887 nxt_queue_remove(&skcf->link); 888 889 if (--rtcf->count != 0) { 890 rtcf = NULL; 891 } 892 } 893 894 nxt_thread_spin_unlock(lock); 895 896 if (rtcf != NULL) { 897 nxt_mp_destroy(rtcf->mem_pool); 898 } 899 900 if (nxt_queue_is_empty(&joint->engine->joints)) { 901 nxt_thread_exit(task->thread); 902 } 903} 904 905 906static void 907nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 908{ 909 nxt_thread_link_t *link; 910 nxt_event_engine_t *engine; 911 nxt_thread_handle_t handle; 912 913 handle = (nxt_thread_handle_t) obj; 914 link = data; 915 916 nxt_thread_wait(handle); 917 918 engine = link->engine; 919 920 nxt_queue_remove(&engine->link); 921 922 nxt_mp_destroy(engine->mem_pool); 923 924 nxt_event_engine_free(engine); 925 926 nxt_free(link); 927 928 // TODO: free port 929} 930 931 932static const nxt_conn_state_t nxt_router_conn_read_state 933 nxt_aligned(64) = 934{ 935 .ready_handler = nxt_router_conn_http_header_parse, 936 .close_handler = nxt_router_conn_close, 937 .error_handler = nxt_router_conn_error, 938 939 .timer_handler = nxt_router_conn_timeout, 940 .timer_value = nxt_router_conn_timeout_value, 941 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), 942}; 943 944 945static void 946nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) 947{ 948 size_t size; 949 nxt_conn_t *c; 950 nxt_event_engine_t *engine; 951 nxt_socket_conf_joint_t *joint; 952 953 c = obj; 954 joint = data; 955 956 nxt_debug(task, "router conn init"); 957 958 joint->count++; 959 960 size = joint->socket_conf->header_buffer_size; 961 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0); 962 963 c->socket.data = NULL; 964 965 engine = task->thread->engine; 966 c->read_work_queue = &engine->fast_work_queue; 967 c->write_work_queue = &engine->fast_work_queue; 968 969 c->read_state = &nxt_router_conn_read_state; 970 971 nxt_conn_read(engine, c); 972} 973 974 975static const nxt_conn_state_t nxt_router_conn_write_state 976 nxt_aligned(64) = 977{ 978 .ready_handler = nxt_router_conn_ready, 979 .close_handler = nxt_router_conn_close, 980 .error_handler = nxt_router_conn_error, 981}; 982 983 984static void 985nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 986{ 987 size_t dump_size; 988 nxt_buf_t *b, *i, *last; 989 nxt_conn_t *c; 990 nxt_req_conn_link_t *rc; 991 nxt_event_engine_t *engine; 992 993 b = msg->buf; 994 engine = task->thread->engine; 995 996 rc = nxt_event_engine_request_find(engine, msg->port_msg.stream); 997 if (nxt_slow_path(rc == NULL)) { 998 999 nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream); 1000 1001 /* Mark buffers as read. */ 1002 for (i = b; i != NULL; i = i->next) { 1003 i->mem.pos = i->mem.free; 1004 } 1005 1006 return; 1007 } 1008 1009 c = rc->conn; 1010 1011 dump_size = nxt_buf_used_size(b); 1012 1013 if (dump_size > 300) { 1014 dump_size = 300; 1015 } 1016 1017 nxt_debug(task, "%srouter data (%z): %*s", 1018 msg->port_msg.last ? "last " : "", msg->size, dump_size, 1019 b->mem.pos); 1020 1021 if (msg->size == 0) { 1022 b = NULL; 1023 } 1024 1025 if (msg->port_msg.last != 0) { 1026 nxt_debug(task, "router data create last buf"); 1027 1028 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); 1029 if (nxt_slow_path(last == NULL)) { 1030 /* TODO pogorevaTb */ 1031 } 1032 1033 nxt_buf_chain_add(&b, last); 1034 } 1035 1036 if (b == NULL) { 1037 return; 1038 } 1039 1040 if (c->write == NULL) { 1041 c->write = b; 1042 c->write_state = &nxt_router_conn_write_state; 1043 1044 nxt_conn_write(task->thread->engine, c); 1045 } else { 1046 nxt_debug(task, "router data attach out bufs to existing chain"); 1047 1048 nxt_buf_chain_add(&c->write, b); 1049 } 1050} 1051 1052 1053nxt_inline nxt_port_t * 1054nxt_router_app_port(nxt_task_t *task) 1055{ 1056 nxt_port_t *port; 1057 nxt_runtime_t *rt; 1058 1059 rt = task->thread->runtime; 1060 1061 nxt_runtime_port_each(rt, port) { 1062 1063 if (nxt_pid == port->pid) { 1064 continue; 1065 } 1066 1067 if (port->type == NXT_PROCESS_WORKER) { 1068 return port; 1069 } 1070 1071 } nxt_runtime_port_loop; 1072 1073 return NULL; 1074} 1075 1076 1077static void 1078nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) 1079{ 1080 size_t size, preread; 1081 nxt_int_t ret; 1082 nxt_buf_t *b; 1083 nxt_conn_t *c; 1084 nxt_app_parse_ctx_t *ap; 1085 nxt_socket_conf_joint_t *joint; 1086 nxt_app_request_header_t *h; 1087 1088 c = obj; 1089 ap = data; 1090 b = c->read; 1091 1092 nxt_debug(task, "router conn http header parse"); 1093 1094 if (ap == NULL) { 1095 ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t)); 1096 if (nxt_slow_path(ap == NULL)) { 1097 nxt_router_conn_close(task, c, data); 1098 return; 1099 } 1100 1101 ret = nxt_app_http_req_init(task, ap); 1102 if (nxt_slow_path(ret != NXT_OK)) { 1103 nxt_router_conn_close(task, c, data); 1104 return; 1105 } 1106 1107 c->socket.data = ap;
|
1108 } 1109 1110 h = &ap->r.header; 1111 1112 ret = nxt_app_http_req_parse(task, ap, b); 1113 1114 nxt_debug(task, "http parse request: %d", ret); 1115 1116 switch (nxt_expect(NXT_DONE, ret)) { 1117 1118 case NXT_DONE: 1119 preread = nxt_buf_mem_used_size(&b->mem); 1120 1121 nxt_debug(task, "router request header parsing complete, " 1122 "content length: %O, preread: %uz", 1123 h->parsed_content_length, preread); 1124 1125 nxt_router_process_http_request(task, c, ap); 1126 return; 1127 1128 case NXT_ERROR: 1129 nxt_router_conn_close(task, c, data); 1130 return; 1131 1132 default: /* NXT_AGAIN */ 1133 1134 if (h->done == 0) { 1135 1136 if (c->read->mem.free == c->read->mem.end) { 1137 joint = c->listen->socket.data; 1138 size = joint->socket_conf->large_header_buffer_size; 1139 1140 if (size > (size_t) nxt_buf_mem_size(&b->mem)) { 1141 b = nxt_buf_mem_alloc(c->mem_pool, size, 0); 1142 if (nxt_slow_path(b == NULL)) { 1143 nxt_router_conn_close(task, c, data); 1144 return; 1145 } 1146 1147 size = c->read->mem.free - c->read->mem.pos; 1148 nxt_memcpy(b->mem.pos, c->read->mem.pos, size); 1149 1150 b->mem.free += size; 1151 c->read = b; 1152 } else { 1153 // TODO 500 Too long request headers 1154 nxt_log_alert(task->log, "Too long request headers"); 1155 } 1156 } 1157 } 1158 1159 if (ap->r.body.done == 0) { 1160 1161 preread = nxt_buf_mem_used_size(&b->mem); 1162 1163 if (h->parsed_content_length - preread > 1164 (size_t) nxt_buf_mem_free_size(&b->mem)) { 1165 1166 b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0); 1167 if (nxt_slow_path(b == NULL)) { 1168 // TODO 500 Failed to allocate buffer for request body 1169 nxt_log_alert(task->log, "Failed to allocate buffer for " 1170 "request body"); 1171 } 1172 1173 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, 1174 preread); 1175 1176 c->read = b; 1177 } 1178 1179 nxt_debug(task, "router request body read again, rest: %uz", 1180 h->parsed_content_length - preread); 1181 1182 } 1183 1184 } 1185 1186 nxt_conn_read(task->thread->engine, c); 1187} 1188 1189 1190static void 1191nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, 1192 nxt_app_parse_ctx_t *ap) 1193{ 1194 nxt_port_t *port, *c_port; 1195 nxt_req_id_t req_id; 1196 nxt_app_wmsg_t wmsg; 1197 nxt_event_engine_t *engine; 1198 nxt_req_conn_link_t *rc; 1199 1200 if (nxt_slow_path(nxt_app == NULL)) { 1201 // 500 Application not found 1202 nxt_log_alert(task->log, "application is NULL"); 1203 } 1204 1205 port = nxt_router_app_port(task); 1206 1207 if (nxt_slow_path(port == NULL)) { 1208 // 500 Application port not found 1209 nxt_log_alert(task->log, "application port not found"); 1210 } 1211 1212 engine = task->thread->engine; 1213 1214 do { 1215 req_id = nxt_random(&nxt_random_data); 1216 } while (nxt_event_engine_request_find(engine, req_id) != NULL); 1217 1218 rc = nxt_conn_request_add(c, req_id); 1219 if (nxt_slow_path(rc == NULL)) { 1220 // 500 Failed to allocate req->conn link 1221 nxt_log_alert(task->log, "failed to allocate req->conn link"); 1222 } 1223 1224 nxt_event_engine_request_add(engine, rc); 1225 1226 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", 1227 req_id, c, engine); 1228 1229 c_port = nxt_process_connected_port_find(port->process, 1230 engine->port->pid, 1231 engine->port->id); 1232 if (nxt_slow_path(c_port != engine->port)) { 1233 (void) nxt_port_send_port(task, port, engine->port); 1234 nxt_process_connected_port_add(port->process, engine->port); 1235 } 1236 1237 wmsg.port = port; 1238 wmsg.write = NULL; 1239 wmsg.buf = &wmsg.write; 1240 wmsg.stream = req_id; 1241 1242 (void)nxt_app->prepare_msg(task, &ap->r, &wmsg); 1243 1244 nxt_debug(task, "about to send %d bytes buffer to worker port %d", 1245 nxt_buf_used_size(wmsg.write), 1246 wmsg.port->socket.fd); 1247 1248 (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, 1249 -1, req_id, engine->port->id, wmsg.write); 1250} 1251 1252 1253static const nxt_conn_state_t nxt_router_conn_close_state 1254 nxt_aligned(64) = 1255{ 1256 .ready_handler = nxt_router_conn_free, 1257}; 1258 1259 1260static void 1261nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data) 1262{ 1263 nxt_buf_t *b; 1264 nxt_bool_t last; 1265 nxt_conn_t *c; 1266 nxt_work_queue_t *wq; 1267 1268 nxt_debug(task, "router conn ready %p", obj); 1269 1270 c = obj; 1271 b = c->write; 1272 1273 wq = &task->thread->engine->fast_work_queue; 1274 1275 last = 0; 1276 1277 while (b != NULL) { 1278 if (!nxt_buf_is_sync(b)) { 1279 if (nxt_buf_used_size(b) > 0) { 1280 break; 1281 } 1282 } 1283 1284 if (nxt_buf_is_last(b)) { 1285 last = 1; 1286 } 1287 1288 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 1289 1290 b = b->next; 1291 } 1292 1293 c->write = b; 1294 1295 if (b != NULL) { 1296 nxt_debug(task, "router conn %p has more data to write", obj); 1297 1298 nxt_conn_write(task->thread->engine, c); 1299 } else { 1300 nxt_debug(task, "router conn %p no more data to write, last = %d", obj, 1301 last); 1302 1303 if (last != 0) { 1304 nxt_debug(task, "enqueue router conn close %p (ready handler)", c); 1305 1306 nxt_work_queue_add(wq, nxt_router_conn_close, task, c, 1307 c->socket.data); 1308 } 1309 } 1310} 1311 1312 1313static void 1314nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) 1315{ 1316 nxt_conn_t *c; 1317 1318 c = obj; 1319 1320 nxt_debug(task, "router conn close"); 1321 1322 c->write_state = &nxt_router_conn_close_state; 1323 1324 nxt_conn_close(task->thread->engine, c); 1325} 1326 1327 1328static void 1329nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) 1330{ 1331 nxt_conn_t *c; 1332 nxt_req_conn_link_t *rc; 1333 nxt_socket_conf_joint_t *joint; 1334 1335 c = obj; 1336 1337 nxt_debug(task, "router conn close done"); 1338 1339 joint = c->listen->socket.data; 1340 nxt_router_conf_release(task, joint); 1341 1342 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { 1343 1344 nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); 1345 1346 nxt_event_engine_request_remove(task->thread->engine, rc); 1347 1348 } nxt_queue_loop; 1349 1350 nxt_mp_destroy(c->mem_pool); 1351} 1352 1353 1354static void 1355nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) 1356{ 1357 nxt_conn_t *c; 1358 1359 c = obj; 1360 1361 nxt_debug(task, "router conn error"); 1362 1363 c->write_state = &nxt_router_conn_close_state; 1364 1365 nxt_conn_close(task->thread->engine, c); 1366} 1367 1368 1369static void 1370nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) 1371{ 1372 nxt_conn_t *c; 1373 nxt_timer_t *timer; 1374 1375 timer = obj; 1376 1377 nxt_debug(task, "router conn timeout"); 1378 1379 c = nxt_read_timer_conn(timer); 1380 1381 c->write_state = &nxt_router_conn_close_state; 1382 1383 nxt_conn_close(task->thread->engine, c); 1384} 1385 1386 1387static nxt_msec_t 1388nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 1389{ 1390 nxt_socket_conf_joint_t *joint; 1391 1392 joint = c->listen->socket.data; 1393 1394 return nxt_value_at(nxt_msec_t, joint->socket_conf, data); 1395}
| 1111 } 1112 1113 h = &ap->r.header; 1114 1115 ret = nxt_app_http_req_parse(task, ap, b); 1116 1117 nxt_debug(task, "http parse request: %d", ret); 1118 1119 switch (nxt_expect(NXT_DONE, ret)) { 1120 1121 case NXT_DONE: 1122 preread = nxt_buf_mem_used_size(&b->mem); 1123 1124 nxt_debug(task, "router request header parsing complete, " 1125 "content length: %O, preread: %uz", 1126 h->parsed_content_length, preread); 1127 1128 nxt_router_process_http_request(task, c, ap); 1129 return; 1130 1131 case NXT_ERROR: 1132 nxt_router_conn_close(task, c, data); 1133 return; 1134 1135 default: /* NXT_AGAIN */ 1136 1137 if (h->done == 0) { 1138 1139 if (c->read->mem.free == c->read->mem.end) { 1140 joint = c->listen->socket.data; 1141 size = joint->socket_conf->large_header_buffer_size; 1142 1143 if (size > (size_t) nxt_buf_mem_size(&b->mem)) { 1144 b = nxt_buf_mem_alloc(c->mem_pool, size, 0); 1145 if (nxt_slow_path(b == NULL)) { 1146 nxt_router_conn_close(task, c, data); 1147 return; 1148 } 1149 1150 size = c->read->mem.free - c->read->mem.pos; 1151 nxt_memcpy(b->mem.pos, c->read->mem.pos, size); 1152 1153 b->mem.free += size; 1154 c->read = b; 1155 } else { 1156 // TODO 500 Too long request headers 1157 nxt_log_alert(task->log, "Too long request headers"); 1158 } 1159 } 1160 } 1161 1162 if (ap->r.body.done == 0) { 1163 1164 preread = nxt_buf_mem_used_size(&b->mem); 1165 1166 if (h->parsed_content_length - preread > 1167 (size_t) nxt_buf_mem_free_size(&b->mem)) { 1168 1169 b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0); 1170 if (nxt_slow_path(b == NULL)) { 1171 // TODO 500 Failed to allocate buffer for request body 1172 nxt_log_alert(task->log, "Failed to allocate buffer for " 1173 "request body"); 1174 } 1175 1176 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, 1177 preread); 1178 1179 c->read = b; 1180 } 1181 1182 nxt_debug(task, "router request body read again, rest: %uz", 1183 h->parsed_content_length - preread); 1184 1185 } 1186 1187 } 1188 1189 nxt_conn_read(task->thread->engine, c); 1190} 1191 1192 1193static void 1194nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, 1195 nxt_app_parse_ctx_t *ap) 1196{ 1197 nxt_port_t *port, *c_port; 1198 nxt_req_id_t req_id; 1199 nxt_app_wmsg_t wmsg; 1200 nxt_event_engine_t *engine; 1201 nxt_req_conn_link_t *rc; 1202 1203 if (nxt_slow_path(nxt_app == NULL)) { 1204 // 500 Application not found 1205 nxt_log_alert(task->log, "application is NULL"); 1206 } 1207 1208 port = nxt_router_app_port(task); 1209 1210 if (nxt_slow_path(port == NULL)) { 1211 // 500 Application port not found 1212 nxt_log_alert(task->log, "application port not found"); 1213 } 1214 1215 engine = task->thread->engine; 1216 1217 do { 1218 req_id = nxt_random(&nxt_random_data); 1219 } while (nxt_event_engine_request_find(engine, req_id) != NULL); 1220 1221 rc = nxt_conn_request_add(c, req_id); 1222 if (nxt_slow_path(rc == NULL)) { 1223 // 500 Failed to allocate req->conn link 1224 nxt_log_alert(task->log, "failed to allocate req->conn link"); 1225 } 1226 1227 nxt_event_engine_request_add(engine, rc); 1228 1229 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", 1230 req_id, c, engine); 1231 1232 c_port = nxt_process_connected_port_find(port->process, 1233 engine->port->pid, 1234 engine->port->id); 1235 if (nxt_slow_path(c_port != engine->port)) { 1236 (void) nxt_port_send_port(task, port, engine->port); 1237 nxt_process_connected_port_add(port->process, engine->port); 1238 } 1239 1240 wmsg.port = port; 1241 wmsg.write = NULL; 1242 wmsg.buf = &wmsg.write; 1243 wmsg.stream = req_id; 1244 1245 (void)nxt_app->prepare_msg(task, &ap->r, &wmsg); 1246 1247 nxt_debug(task, "about to send %d bytes buffer to worker port %d", 1248 nxt_buf_used_size(wmsg.write), 1249 wmsg.port->socket.fd); 1250 1251 (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, 1252 -1, req_id, engine->port->id, wmsg.write); 1253} 1254 1255 1256static const nxt_conn_state_t nxt_router_conn_close_state 1257 nxt_aligned(64) = 1258{ 1259 .ready_handler = nxt_router_conn_free, 1260}; 1261 1262 1263static void 1264nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data) 1265{ 1266 nxt_buf_t *b; 1267 nxt_bool_t last; 1268 nxt_conn_t *c; 1269 nxt_work_queue_t *wq; 1270 1271 nxt_debug(task, "router conn ready %p", obj); 1272 1273 c = obj; 1274 b = c->write; 1275 1276 wq = &task->thread->engine->fast_work_queue; 1277 1278 last = 0; 1279 1280 while (b != NULL) { 1281 if (!nxt_buf_is_sync(b)) { 1282 if (nxt_buf_used_size(b) > 0) { 1283 break; 1284 } 1285 } 1286 1287 if (nxt_buf_is_last(b)) { 1288 last = 1; 1289 } 1290 1291 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 1292 1293 b = b->next; 1294 } 1295 1296 c->write = b; 1297 1298 if (b != NULL) { 1299 nxt_debug(task, "router conn %p has more data to write", obj); 1300 1301 nxt_conn_write(task->thread->engine, c); 1302 } else { 1303 nxt_debug(task, "router conn %p no more data to write, last = %d", obj, 1304 last); 1305 1306 if (last != 0) { 1307 nxt_debug(task, "enqueue router conn close %p (ready handler)", c); 1308 1309 nxt_work_queue_add(wq, nxt_router_conn_close, task, c, 1310 c->socket.data); 1311 } 1312 } 1313} 1314 1315 1316static void 1317nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) 1318{ 1319 nxt_conn_t *c; 1320 1321 c = obj; 1322 1323 nxt_debug(task, "router conn close"); 1324 1325 c->write_state = &nxt_router_conn_close_state; 1326 1327 nxt_conn_close(task->thread->engine, c); 1328} 1329 1330 1331static void 1332nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) 1333{ 1334 nxt_conn_t *c; 1335 nxt_req_conn_link_t *rc; 1336 nxt_socket_conf_joint_t *joint; 1337 1338 c = obj; 1339 1340 nxt_debug(task, "router conn close done"); 1341 1342 joint = c->listen->socket.data; 1343 nxt_router_conf_release(task, joint); 1344 1345 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { 1346 1347 nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); 1348 1349 nxt_event_engine_request_remove(task->thread->engine, rc); 1350 1351 } nxt_queue_loop; 1352 1353 nxt_mp_destroy(c->mem_pool); 1354} 1355 1356 1357static void 1358nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) 1359{ 1360 nxt_conn_t *c; 1361 1362 c = obj; 1363 1364 nxt_debug(task, "router conn error"); 1365 1366 c->write_state = &nxt_router_conn_close_state; 1367 1368 nxt_conn_close(task->thread->engine, c); 1369} 1370 1371 1372static void 1373nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) 1374{ 1375 nxt_conn_t *c; 1376 nxt_timer_t *timer; 1377 1378 timer = obj; 1379 1380 nxt_debug(task, "router conn timeout"); 1381 1382 c = nxt_read_timer_conn(timer); 1383 1384 c->write_state = &nxt_router_conn_close_state; 1385 1386 nxt_conn_close(task->thread->engine, c); 1387} 1388 1389 1390static nxt_msec_t 1391nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 1392{ 1393 nxt_socket_conf_joint_t *joint; 1394 1395 joint = c->listen->socket.data; 1396 1397 return nxt_value_at(nxt_msec_t, joint->socket_conf, data); 1398}
|