1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 11 12 typedef struct { 13 nxt_str_t type; 14 uint32_t workers; 15 } nxt_router_app_conf_t; 16 17 18 typedef struct { 19 nxt_str_t application; 20 } nxt_router_listener_conf_t; 21 22 23 typedef struct nxt_req_app_link_s nxt_req_app_link_t; 24 typedef struct nxt_start_worker_s nxt_start_worker_t; 25 26 struct nxt_start_worker_s { 27 nxt_app_t *app; 28 nxt_req_app_link_t *ra; 29 30 nxt_work_t work; 31 }; 32 33 34 struct nxt_req_app_link_s { 35 nxt_req_id_t req_id; 36 nxt_port_t *app_port; 37 nxt_port_t *reply_port; 38 nxt_app_parse_ctx_t *ap; 39 nxt_req_conn_link_t *rc; 40 41 nxt_queue_link_t link; /* for nxt_app_t.requests */ 42 43 nxt_mp_t *mem_pool; 44 nxt_work_t work; 45 }; 46 47 48 typedef struct { 49 nxt_socket_conf_t *socket_conf; 50 nxt_router_temp_conf_t *temp_conf; 51 } nxt_socket_rpc_t; 52 53 54 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 55 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 56 static void nxt_router_conf_ready(nxt_task_t *task, 57 nxt_router_temp_conf_t *tmcf); 58 static void nxt_router_conf_error(nxt_task_t *task, 59 nxt_router_temp_conf_t *tmcf); 60 static void nxt_router_conf_send(nxt_task_t *task, 61 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 62 static void nxt_router_listen_sockets_sort(nxt_router_t *router, 63 nxt_router_temp_conf_t *tmcf); 64 65 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 66 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 67 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 68 static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, 69 nxt_str_t *name); 70 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 71 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 72 static void nxt_router_listen_socket_ready(nxt_task_t *task, 73 nxt_port_recv_msg_t *msg, void *data); 74 static void nxt_router_listen_socket_error(nxt_task_t *task, 75 nxt_port_recv_msg_t *msg, void *data); 76 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, 77 nxt_sockaddr_t *sa); 78 79 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 80 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 81 const nxt_event_interface_t *interface); 82 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 83 nxt_router_engine_conf_t *recf); 84 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 85 nxt_router_engine_conf_t *recf); 86 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 87 nxt_router_engine_conf_t *recf); 88 static void nxt_router_engine_socket_count(nxt_queue_t *sockets); 89 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 90 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 91 nxt_work_handler_t handler); 92 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 93 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 94 95 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 96 nxt_router_temp_conf_t *tmcf); 97 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 98 nxt_event_engine_t *engine); 99 static void nxt_router_apps_sort(nxt_router_t *router, 100 nxt_router_temp_conf_t *tmcf); 101 102 static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf); 103 static void nxt_router_engine_post(nxt_router_engine_conf_t *recf); 104 static void nxt_router_app_data_handler(nxt_task_t *task, 105 nxt_port_recv_msg_t *msg); 106 107 static void nxt_router_thread_start(void *data); 108 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 109 void *data); 110 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 111 void *data); 112 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 113 void *data); 114 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 115 void *data); 116 static void nxt_router_listen_socket_release(nxt_task_t *task, 117 nxt_socket_conf_joint_t *joint); 118 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 119 void *data); 120 static void nxt_router_conf_release(nxt_task_t *task, 121 nxt_socket_conf_joint_t *joint); 122 123 static void nxt_router_send_sw_request(nxt_task_t *task, void *obj, 124 void *data); 125 static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app); 126 static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id); 127 static void nxt_router_app_release_port(nxt_task_t *task, void *obj, 128 void *data); 129 130 static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); 131 static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, 132 void *data); 133 static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, 134 void *data); 135 static void nxt_router_process_http_request(nxt_task_t *task, 136 nxt_conn_t *c, nxt_app_parse_ctx_t *ap); 137 static void nxt_router_process_http_request_mp(nxt_task_t *task, 138 nxt_req_app_link_t *ra, nxt_port_t *port); 139 static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 140 nxt_app_wmsg_t *wmsg); 141 static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 142 nxt_app_wmsg_t *wmsg); 143 static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 144 nxt_app_wmsg_t *wmsg); 145 static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); 146 static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); 147 static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); 148 static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); 149 static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); 150 static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); 151 152 static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 153 const char* fmt, ...); 154 155 static nxt_router_t *nxt_router; 156 157 158 static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = { 159 nxt_python_prepare_msg, 160 nxt_php_prepare_msg, 161 nxt_go_prepare_msg, 162 }; 163 164 165 nxt_int_t 166 nxt_router_start(nxt_task_t *task, void *data) 167 { 168 nxt_int_t ret; 169 nxt_router_t *router; 170 nxt_runtime_t *rt; 171 172 rt = task->thread->runtime; 173 174 ret = nxt_app_http_init(task, rt); 175 if (nxt_slow_path(ret != NXT_OK)) { 176 return ret; 177 } 178 179 router = nxt_zalloc(sizeof(nxt_router_t)); 180 if (nxt_slow_path(router == NULL)) { 181 return NXT_ERROR; 182 } 183 184 nxt_queue_init(&router->engines); 185 nxt_queue_init(&router->sockets); 186 nxt_queue_init(&router->apps); 187 188 nxt_router = router; 189 190 return NXT_OK; 191 } 192 193 194 static nxt_start_worker_t * 195 nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) 196 { 197 nxt_port_t *main_port; 198 nxt_runtime_t *rt; 199 nxt_start_worker_t *sw; 200 201 sw = nxt_zalloc(sizeof(nxt_start_worker_t)); 202 203 if (nxt_slow_path(sw == NULL)) { 204 return NULL; 205 } 206 207 sw->app = app; 208 sw->ra = ra; 209 210 nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw, 211 ra->req_id, &app->name, app); 212 213 rt = task->thread->runtime; 214 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 215 216 sw->work.handler = nxt_router_send_sw_request; 217 sw->work.task = &main_port->engine->task; 218 sw->work.obj = sw; 219 sw->work.data = task->thread->engine; 220 sw->work.next = NULL; 221 222 if (task->thread->engine != main_port->engine) { 223 nxt_debug(task, "sw %p post send to main engine %p", sw, 224 main_port->engine); 225 226 nxt_event_engine_post(main_port->engine, &sw->work); 227 228 } else { 229 nxt_router_send_sw_request(task, sw, sw->work.data); 230 } 231 232 return sw; 233 } 234 235 236 nxt_inline void 237 nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw) 238 { 239 nxt_debug(task, "sw %p release", sw); 240 241 nxt_free(sw); 242 } 243 244 245 static nxt_req_app_link_t * 246 nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) 247 { 248 nxt_mp_t *mp; 249 nxt_req_app_link_t *ra; 250 251 mp = rc->conn->mem_pool; 252 253 ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); 254 255 if (nxt_slow_path(ra == NULL)) { 256 return NULL; 257 } 258 259 nxt_debug(task, "ra #%uxD create", rc->req_id); 260 261 nxt_memzero(ra, sizeof(nxt_req_app_link_t)); 262 263 ra->req_id = rc->req_id; 264 ra->app_port = NULL; 265 ra->rc = rc; 266 267 ra->mem_pool = mp; 268 269 ra->work.handler = NULL; 270 ra->work.task = &task->thread->engine->task; 271 ra->work.obj = ra; 272 ra->work.data = task->thread->engine; 273 274 return ra; 275 } 276 277 278 static void 279 nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) 280 { 281 nxt_req_app_link_t *ra; 282 nxt_event_engine_t *engine; 283 284 ra = obj; 285 engine = data; 286 287 if (task->thread->engine != engine) { 288 ra->work.handler = nxt_router_ra_release; 289 ra->work.task = &engine->task; 290 ra->work.next = NULL; 291 292 nxt_debug(task, "ra #%uxD post release to %p", ra->req_id, engine); 293 294 nxt_event_engine_post(engine, &ra->work); 295 296 return; 297 } 298 299 nxt_debug(task, "ra #%uxD release", ra->req_id); 300 301 if (ra->app_port != NULL) { 302 303 nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); 304 305 #if 0 306 /* Uncomment to hold app port until complete response received. */ 307 if (ra->rc->conn != NULL) { 308 ra->rc->app_port = ra->app_port; 309 310 } else { 311 nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); 312 } 313 #endif 314 } 315 316 nxt_mp_release(ra->mem_pool, ra); 317 } 318 319 320 void 321 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 322 { 323 nxt_port_new_port_handler(task, msg); 324 325 if (msg->port_msg.stream == 0) { 326 return; 327 } 328 329 if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) { 330 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 331 } 332 333 nxt_port_rpc_handler(task, msg); 334 } 335 336 337 void 338 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 339 { 340 size_t dump_size; 341 nxt_int_t ret; 342 nxt_buf_t *b; 343 nxt_router_temp_conf_t *tmcf; 344 345 b = msg->buf; 346 347 dump_size = nxt_buf_used_size(b); 348 349 if (dump_size > 300) { 350 dump_size = 300; 351 } 352 353 nxt_debug(task, "router conf data (%z): %*s", 354 msg->size, dump_size, b->mem.pos); 355 356 tmcf = nxt_router_temp_conf(task); 357 if (nxt_slow_path(tmcf == NULL)) { 358 return; 359 } 360 361 tmcf->conf->router = nxt_router; 362 tmcf->stream = msg->port_msg.stream; 363 tmcf->port = nxt_runtime_port_find(task->thread->runtime, 364 msg->port_msg.pid, 365 msg->port_msg.reply_port); 366 367 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); 368 369 if (nxt_fast_path(ret == NXT_OK)) { 370 nxt_router_conf_apply(task, tmcf, NULL); 371 372 } else { 373 nxt_router_conf_error(task, tmcf); 374 } 375 } 376 377 378 void 379 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 380 { 381 nxt_port_remove_pid_handler(task, msg); 382 383 if (msg->port_msg.stream == 0) { 384 return; 385 } 386 387 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 388 389 nxt_port_rpc_handler(task, msg); 390 } 391 392 393 static nxt_router_temp_conf_t * 394 nxt_router_temp_conf(nxt_task_t *task) 395 { 396 nxt_mp_t *mp, *tmp; 397 nxt_router_conf_t *rtcf; 398 nxt_router_temp_conf_t *tmcf; 399 400 mp = nxt_mp_create(1024, 128, 256, 32); 401 if (nxt_slow_path(mp == NULL)) { 402 return NULL; 403 } 404 405 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 406 if (nxt_slow_path(rtcf == NULL)) { 407 goto fail; 408 } 409 410 rtcf->mem_pool = mp; 411 412 tmp = nxt_mp_create(1024, 128, 256, 32); 413 if (nxt_slow_path(tmp == NULL)) { 414 goto fail; 415 } 416 417 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 418 if (nxt_slow_path(tmcf == NULL)) { 419 goto temp_fail; 420 } 421 422 tmcf->mem_pool = tmp; 423 tmcf->conf = rtcf; 424 tmcf->count = 1; 425 tmcf->engine = task->thread->engine; 426 427 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 428 sizeof(nxt_router_engine_conf_t)); 429 if (nxt_slow_path(tmcf->engines == NULL)) { 430 goto temp_fail; 431 } 432 433 nxt_queue_init(&tmcf->deleting); 434 nxt_queue_init(&tmcf->keeping); 435 nxt_queue_init(&tmcf->updating); 436 nxt_queue_init(&tmcf->pending); 437 nxt_queue_init(&tmcf->creating); 438 nxt_queue_init(&tmcf->apps); 439 nxt_queue_init(&tmcf->previous); 440 441 return tmcf; 442 443 temp_fail: 444 445 nxt_mp_destroy(tmp); 446 447 fail: 448 449 nxt_mp_destroy(mp); 450 451 return NULL; 452 } 453 454 455 static void 456 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 457 { 458 nxt_int_t ret; 459 nxt_router_t *router; 460 nxt_runtime_t *rt; 461 nxt_queue_link_t *qlk; 462 nxt_socket_conf_t *skcf; 463 nxt_router_temp_conf_t *tmcf; 464 const nxt_event_interface_t *interface; 465 466 tmcf = obj; 467 468 qlk = nxt_queue_first(&tmcf->pending); 469 470 if (qlk != nxt_queue_tail(&tmcf->pending)) { 471 nxt_queue_remove(qlk); 472 nxt_queue_insert_tail(&tmcf->creating, qlk); 473 474 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 475 476 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 477 478 return; 479 } 480 481 rt = task->thread->runtime; 482 483 interface = nxt_service_get(rt->services, "engine", NULL); 484 485 router = tmcf->conf->router; 486 487 ret = nxt_router_engines_create(task, router, tmcf, interface); 488 if (nxt_slow_path(ret != NXT_OK)) { 489 goto fail; 490 } 491 492 ret = nxt_router_threads_create(task, rt, tmcf); 493 if (nxt_slow_path(ret != NXT_OK)) { 494 goto fail; 495 } 496 497 nxt_router_apps_sort(router, tmcf); 498 499 nxt_router_engines_post(tmcf); 500 501 nxt_queue_add(&router->sockets, &tmcf->updating); 502 nxt_queue_add(&router->sockets, &tmcf->creating); 503 504 nxt_router_conf_ready(task, tmcf); 505 506 return; 507 508 fail: 509 510 nxt_router_conf_error(task, tmcf); 511 512 return; 513 } 514 515 516 static void 517 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 518 { 519 nxt_joint_job_t *job; 520 521 job = obj; 522 523 nxt_router_conf_ready(task, job->tmcf); 524 } 525 526 527 static void 528 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 529 { 530 nxt_debug(task, "temp conf count:%D", tmcf->count); 531 532 if (--tmcf->count == 0) { 533 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 534 } 535 } 536 537 538 static void 539 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 540 { 541 nxt_socket_t s; 542 nxt_router_t *router; 543 nxt_queue_link_t *qlk; 544 nxt_socket_conf_t *skcf; 545 546 nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf"); 547 548 for (qlk = nxt_queue_first(&tmcf->creating); 549 qlk != nxt_queue_tail(&tmcf->creating); 550 qlk = nxt_queue_next(qlk)) 551 { 552 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 553 s = skcf->listen.socket; 554 555 if (s != -1) { 556 nxt_socket_close(task, s); 557 } 558 559 nxt_free(skcf->socket); 560 } 561 562 router = tmcf->conf->router; 563 564 nxt_queue_add(&router->sockets, &tmcf->keeping); 565 nxt_queue_add(&router->sockets, &tmcf->deleting); 566 567 // TODO: new engines and threads 568 569 nxt_mp_destroy(tmcf->conf->mem_pool); 570 571 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 572 } 573 574 575 static void 576 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 577 nxt_port_msg_type_t type) 578 { 579 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 580 } 581 582 583 static nxt_conf_map_t nxt_router_conf[] = { 584 { 585 nxt_string("listeners_threads"), 586 NXT_CONF_MAP_INT32, 587 offsetof(nxt_router_conf_t, threads), 588 }, 589 }; 590 591 592 static nxt_conf_map_t nxt_router_app_conf[] = { 593 { 594 nxt_string("type"), 595 NXT_CONF_MAP_STR, 596 offsetof(nxt_router_app_conf_t, type), 597 }, 598 599 { 600 nxt_string("workers"), 601 NXT_CONF_MAP_INT32, 602 offsetof(nxt_router_app_conf_t, workers), 603 }, 604 }; 605 606 607 static nxt_conf_map_t nxt_router_listener_conf[] = { 608 { 609 nxt_string("application"), 610 NXT_CONF_MAP_STR, 611 offsetof(nxt_router_listener_conf_t, application), 612 }, 613 }; 614 615 616 static nxt_conf_map_t nxt_router_http_conf[] = { 617 { 618 nxt_string("header_buffer_size"), 619 NXT_CONF_MAP_SIZE, 620 offsetof(nxt_socket_conf_t, header_buffer_size), 621 }, 622 623 { 624 nxt_string("large_header_buffer_size"), 625 NXT_CONF_MAP_SIZE, 626 offsetof(nxt_socket_conf_t, large_header_buffer_size), 627 }, 628 629 { 630 nxt_string("large_header_buffers"), 631 NXT_CONF_MAP_SIZE, 632 offsetof(nxt_socket_conf_t, large_header_buffers), 633 }, 634 635 { 636 nxt_string("body_buffer_size"), 637 NXT_CONF_MAP_SIZE, 638 offsetof(nxt_socket_conf_t, body_buffer_size), 639 }, 640 641 { 642 nxt_string("max_body_size"), 643 NXT_CONF_MAP_SIZE, 644 offsetof(nxt_socket_conf_t, max_body_size), 645 }, 646 647 { 648 nxt_string("header_read_timeout"), 649 NXT_CONF_MAP_MSEC, 650 offsetof(nxt_socket_conf_t, header_read_timeout), 651 }, 652 653 { 654 nxt_string("body_read_timeout"), 655 NXT_CONF_MAP_MSEC, 656 offsetof(nxt_socket_conf_t, body_read_timeout), 657 }, 658 }; 659 660 661 static nxt_int_t 662 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 663 u_char *start, u_char *end) 664 { 665 u_char *p; 666 size_t size; 667 nxt_mp_t *mp; 668 uint32_t next; 669 nxt_int_t ret; 670 nxt_str_t name; 671 nxt_app_t *app, *prev; 672 nxt_app_type_t type; 673 nxt_sockaddr_t *sa; 674 nxt_conf_value_t *conf, *http; 675 nxt_conf_value_t *applications, *application; 676 nxt_conf_value_t *listeners, *listener; 677 nxt_socket_conf_t *skcf; 678 nxt_app_lang_module_t *lang; 679 nxt_router_app_conf_t apcf; 680 nxt_router_listener_conf_t lscf; 681 682 static nxt_str_t http_path = nxt_string("/http"); 683 static nxt_str_t applications_path = nxt_string("/applications"); 684 static nxt_str_t listeners_path = nxt_string("/listeners"); 685 686 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 687 if (conf == NULL) { 688 nxt_log(task, NXT_LOG_CRIT, "configuration parsing error"); 689 return NXT_ERROR; 690 } 691 692 mp = tmcf->conf->mem_pool; 693 694 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 695 nxt_nitems(nxt_router_conf), tmcf->conf); 696 if (ret != NXT_OK) { 697 nxt_log(task, NXT_LOG_CRIT, "root map error"); 698 return NXT_ERROR; 699 } 700 701 if (tmcf->conf->threads == 0) { 702 tmcf->conf->threads = nxt_ncpu; 703 } 704 705 applications = nxt_conf_get_path(conf, &applications_path); 706 if (applications == NULL) { 707 nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block"); 708 return NXT_ERROR; 709 } 710 711 next = 0; 712 713 for ( ;; ) { 714 application = nxt_conf_next_object_member(applications, &name, &next); 715 if (application == NULL) { 716 break; 717 } 718 719 nxt_debug(task, "application \"%V\"", &name); 720 721 size = nxt_conf_json_length(application, NULL); 722 723 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); 724 if (app == NULL) { 725 goto fail; 726 } 727 728 nxt_memzero(app, sizeof(nxt_app_t)); 729 730 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 731 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length); 732 733 p = nxt_conf_json_print(app->conf.start, application, NULL); 734 app->conf.length = p - app->conf.start; 735 736 nxt_assert(app->conf.length <= size); 737 738 nxt_debug(task, "application conf \"%V\"", &app->conf); 739 740 prev = nxt_router_app_find(&tmcf->conf->router->apps, &name); 741 742 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 743 nxt_free(app); 744 745 nxt_queue_remove(&prev->link); 746 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 747 continue; 748 } 749 750 apcf.workers = 1; 751 752 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 753 nxt_nitems(nxt_router_app_conf), &apcf); 754 if (ret != NXT_OK) { 755 nxt_log(task, NXT_LOG_CRIT, "application map error"); 756 goto app_fail; 757 } 758 759 nxt_debug(task, "application type: %V", &apcf.type); 760 nxt_debug(task, "application workers: %D", apcf.workers); 761 762 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 763 764 if (lang == NULL) { 765 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", 766 &apcf.type); 767 goto app_fail; 768 } 769 770 nxt_debug(task, "application language module: \"%s\"", lang->file); 771 772 type = nxt_app_parse_type(&lang->type); 773 774 if (type == NXT_APP_UNKNOWN) { 775 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", 776 &lang->type); 777 goto app_fail; 778 } 779 780 if (nxt_app_prepare_msg[type] == NULL) { 781 nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"", 782 &lang->type); 783 goto app_fail; 784 } 785 786 ret = nxt_thread_mutex_create(&app->mutex); 787 if (ret != NXT_OK) { 788 goto app_fail; 789 } 790 791 nxt_queue_init(&app->ports); 792 nxt_queue_init(&app->requests); 793 794 app->name.length = name.length; 795 nxt_memcpy(app->name.start, name.start, name.length); 796 797 app->type = type; 798 app->max_workers = apcf.workers; 799 app->live = 1; 800 app->prepare_msg = nxt_app_prepare_msg[type]; 801 802 nxt_queue_insert_tail(&tmcf->apps, &app->link); 803 } 804 805 http = nxt_conf_get_path(conf, &http_path); 806 #if 0 807 if (http == NULL) { 808 nxt_log(task, NXT_LOG_CRIT, "no \"http\" block"); 809 return NXT_ERROR; 810 } 811 #endif 812 813 listeners = nxt_conf_get_path(conf, &listeners_path); 814 if (listeners == NULL) { 815 nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block"); 816 return NXT_ERROR; 817 } 818 819 next = 0; 820 821 for ( ;; ) { 822 listener = nxt_conf_next_object_member(listeners, &name, &next); 823 if (listener == NULL) { 824 break; 825 } 826 827 sa = nxt_sockaddr_parse(mp, &name); 828 if (sa == NULL) { 829 nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name); 830 goto fail; 831 } 832 833 sa->type = SOCK_STREAM; 834 835 nxt_debug(task, "router listener: \"%*s\"", 836 sa->length, nxt_sockaddr_start(sa)); 837 838 skcf = nxt_router_socket_conf(task, mp, sa); 839 if (skcf == NULL) { 840 goto fail; 841 } 842 843 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 844 nxt_nitems(nxt_router_listener_conf), &lscf); 845 if (ret != NXT_OK) { 846 nxt_log(task, NXT_LOG_CRIT, "listener map error"); 847 goto fail; 848 } 849 850 nxt_debug(task, "application: %V", &lscf.application); 851 852 // STUB, default values if http block is not defined. 853 skcf->header_buffer_size = 2048; 854 skcf->large_header_buffer_size = 8192; 855 skcf->large_header_buffers = 4; 856 skcf->body_buffer_size = 16 * 1024; 857 skcf->max_body_size = 2 * 1024 * 1024; 858 skcf->header_read_timeout = 5000; 859 skcf->body_read_timeout = 5000; 860 861 if (http != NULL) { 862 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 863 nxt_nitems(nxt_router_http_conf), skcf); 864 if (ret != NXT_OK) { 865 nxt_log(task, NXT_LOG_CRIT, "http map error"); 866 goto fail; 867 } 868 } 869 870 skcf->listen.handler = nxt_router_conn_init; 871 skcf->router_conf = tmcf->conf; 872 skcf->router_conf->count++; 873 skcf->application = nxt_router_listener_application(tmcf, 874 &lscf.application); 875 876 nxt_queue_insert_tail(&tmcf->pending, &skcf->link); 877 } 878 879 nxt_router_listen_sockets_sort(tmcf->conf->router, tmcf); 880 881 return NXT_OK; 882 883 app_fail: 884 885 nxt_free(app); 886 887 fail: 888 889 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 890 891 nxt_queue_remove(&app->link); 892 nxt_thread_mutex_destroy(&app->mutex); 893 nxt_free(app); 894 895 } nxt_queue_loop; 896 897 return NXT_ERROR; 898 } 899 900 901 static nxt_app_t * 902 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 903 { 904 nxt_app_t *app; 905 906 nxt_queue_each(app, queue, nxt_app_t, link) { 907 908 if (nxt_strstr_eq(name, &app->name)) { 909 return app; 910 } 911 912 } nxt_queue_loop; 913 914 return NULL; 915 } 916 917 918 static nxt_app_t * 919 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) 920 { 921 nxt_app_t *app; 922 923 app = nxt_router_app_find(&tmcf->apps, name); 924 925 if (app == NULL) { 926 app = nxt_router_app_find(&tmcf->previous, name); 927 } 928 929 return app; 930 } 931 932 933 static nxt_socket_conf_t * 934 nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa) 935 { 936 nxt_socket_conf_t *skcf; 937 938 skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t)); 939 if (nxt_slow_path(skcf == NULL)) { 940 return NULL; 941 } 942 943 skcf->sockaddr = sa; 944 945 skcf->listen.sockaddr = sa; 946 skcf->listen.socklen = sa->socklen; 947 skcf->listen.address_length = sa->length; 948 949 skcf->listen.socket = -1; 950 skcf->listen.backlog = NXT_LISTEN_BACKLOG; 951 skcf->listen.flags = NXT_NONBLOCK; 952 skcf->listen.read_after_accept = 1; 953 954 return skcf; 955 } 956 957 958 static void 959 nxt_router_listen_sockets_sort(nxt_router_t *router, 960 nxt_router_temp_conf_t *tmcf) 961 { 962 nxt_queue_link_t *nqlk, *oqlk, *next; 963 nxt_socket_conf_t *nskcf, *oskcf; 964 965 for (nqlk = nxt_queue_first(&tmcf->pending); 966 nqlk != nxt_queue_tail(&tmcf->pending); 967 nqlk = next) 968 { 969 next = nxt_queue_next(nqlk); 970 nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link); 971 972 for (oqlk = nxt_queue_first(&router->sockets); 973 oqlk != nxt_queue_tail(&router->sockets); 974 oqlk = nxt_queue_next(oqlk)) 975 { 976 oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link); 977 978 if (nxt_sockaddr_cmp(nskcf->sockaddr, oskcf->sockaddr)) { 979 nskcf->socket = oskcf->socket; 980 nskcf->listen.socket = oskcf->listen.socket; 981 982 nxt_queue_remove(oqlk); 983 nxt_queue_insert_tail(&tmcf->keeping, oqlk); 984 985 nxt_queue_remove(nqlk); 986 nxt_queue_insert_tail(&tmcf->updating, nqlk); 987 988 break; 989 } 990 } 991 } 992 993 nxt_queue_add(&tmcf->deleting, &router->sockets); 994 nxt_queue_init(&router->sockets); 995 } 996 997 998 static void 999 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 1000 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 1001 { 1002 uint32_t stream; 1003 nxt_buf_t *b; 1004 nxt_port_t *main_port, *router_port; 1005 nxt_runtime_t *rt; 1006 nxt_socket_rpc_t *rpc; 1007 1008 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 1009 if (rpc == NULL) { 1010 goto fail; 1011 } 1012 1013 rpc->socket_conf = skcf; 1014 rpc->temp_conf = tmcf; 1015 1016 b = nxt_buf_mem_alloc(tmcf->mem_pool, skcf->sockaddr->sockaddr_size, 0); 1017 if (b == NULL) { 1018 goto fail; 1019 } 1020 1021 b->mem.free = nxt_cpymem(b->mem.free, skcf->sockaddr, 1022 skcf->sockaddr->sockaddr_size); 1023 1024 rt = task->thread->runtime; 1025 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1026 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1027 1028 stream = nxt_port_rpc_register_handler(task, router_port, 1029 nxt_router_listen_socket_ready, 1030 nxt_router_listen_socket_error, 1031 main_port->pid, rpc); 1032 if (stream == 0) { 1033 goto fail; 1034 } 1035 1036 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 1037 stream, router_port->id, b); 1038 1039 return; 1040 1041 fail: 1042 1043 nxt_router_conf_error(task, tmcf); 1044 } 1045 1046 1047 static void 1048 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1049 void *data) 1050 { 1051 nxt_int_t ret; 1052 nxt_socket_t s; 1053 nxt_socket_rpc_t *rpc; 1054 nxt_router_socket_t *rtsk; 1055 1056 rpc = data; 1057 1058 s = msg->fd; 1059 1060 ret = nxt_socket_nonblocking(task, s); 1061 if (nxt_slow_path(ret != NXT_OK)) { 1062 goto fail; 1063 } 1064 1065 nxt_socket_defer_accept(task, s, rpc->socket_conf->sockaddr); 1066 1067 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 1068 if (nxt_slow_path(ret != NXT_OK)) { 1069 goto fail; 1070 } 1071 1072 rtsk = nxt_malloc(sizeof(nxt_router_socket_t)); 1073 if (nxt_slow_path(rtsk == NULL)) { 1074 goto fail; 1075 } 1076 1077 rtsk->count = 0; 1078 rtsk->fd = s; 1079 1080 rpc->socket_conf->listen.socket = s; 1081 rpc->socket_conf->socket = rtsk; 1082 1083 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 1084 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 1085 1086 return; 1087 1088 fail: 1089 1090 nxt_socket_close(task, s); 1091 1092 nxt_router_conf_error(task, rpc->temp_conf); 1093 } 1094 1095 1096 static void 1097 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1098 void *data) 1099 { 1100 u_char *p; 1101 size_t size; 1102 uint8_t error; 1103 nxt_buf_t *in, *out; 1104 nxt_sockaddr_t *sa; 1105 nxt_socket_rpc_t *rpc; 1106 nxt_router_temp_conf_t *tmcf; 1107 1108 static nxt_str_t socket_errors[] = { 1109 nxt_string("ListenerSystem"), 1110 nxt_string("ListenerNoIPv6"), 1111 nxt_string("ListenerPort"), 1112 nxt_string("ListenerInUse"), 1113 nxt_string("ListenerNoAddress"), 1114 nxt_string("ListenerNoAccess"), 1115 nxt_string("ListenerPath"), 1116 }; 1117 1118 rpc = data; 1119 sa = rpc->socket_conf->sockaddr; 1120 1121 in = msg->buf; 1122 p = in->mem.pos; 1123 1124 error = *p++; 1125 1126 size = sizeof("listen socket error: ") - 1 1127 + sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1 1128 + sa->length + socket_errors[error].length + (in->mem.free - p); 1129 1130 tmcf = rpc->temp_conf; 1131 1132 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1133 if (nxt_slow_path(out == NULL)) { 1134 return; 1135 } 1136 1137 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 1138 "listen socket error: " 1139 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 1140 sa->length, nxt_sockaddr_start(sa), 1141 &socket_errors[error], in->mem.free - p, p); 1142 1143 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 1144 1145 nxt_router_conf_error(task, tmcf); 1146 } 1147 1148 1149 static nxt_int_t 1150 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 1151 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 1152 { 1153 nxt_int_t ret; 1154 nxt_uint_t n, threads; 1155 nxt_queue_link_t *qlk; 1156 nxt_router_engine_conf_t *recf; 1157 1158 threads = tmcf->conf->threads; 1159 1160 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 1161 sizeof(nxt_router_engine_conf_t)); 1162 if (nxt_slow_path(tmcf->engines == NULL)) { 1163 return NXT_ERROR; 1164 } 1165 1166 n = 0; 1167 1168 for (qlk = nxt_queue_first(&router->engines); 1169 qlk != nxt_queue_tail(&router->engines); 1170 qlk = nxt_queue_next(qlk)) 1171 { 1172 recf = nxt_array_zero_add(tmcf->engines); 1173 if (nxt_slow_path(recf == NULL)) { 1174 return NXT_ERROR; 1175 } 1176 1177 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 1178 1179 if (n < threads) { 1180 ret = nxt_router_engine_conf_update(tmcf, recf); 1181 1182 } else { 1183 ret = nxt_router_engine_conf_delete(tmcf, recf); 1184 } 1185 1186 if (nxt_slow_path(ret != NXT_OK)) { 1187 return ret; 1188 } 1189 1190 n++; 1191 } 1192 1193 tmcf->new_threads = n; 1194 1195 while (n < threads) { 1196 recf = nxt_array_zero_add(tmcf->engines); 1197 if (nxt_slow_path(recf == NULL)) { 1198 return NXT_ERROR; 1199 } 1200 1201 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 1202 if (nxt_slow_path(recf->engine == NULL)) { 1203 return NXT_ERROR; 1204 } 1205 1206 ret = nxt_router_engine_conf_create(tmcf, recf); 1207 if (nxt_slow_path(ret != NXT_OK)) { 1208 return ret; 1209 } 1210 1211 nxt_queue_insert_tail(&router->engines, &recf->engine->link0); 1212 1213 n++; 1214 } 1215 1216 return NXT_OK; 1217 } 1218 1219 1220 static nxt_int_t 1221 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 1222 nxt_router_engine_conf_t *recf) 1223 { 1224 nxt_int_t ret; 1225 nxt_thread_spinlock_t *lock; 1226 1227 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 1228 nxt_router_listen_socket_create); 1229 if (nxt_slow_path(ret != NXT_OK)) { 1230 return ret; 1231 } 1232 1233 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 1234 nxt_router_listen_socket_create); 1235 if (nxt_slow_path(ret != NXT_OK)) { 1236 return ret; 1237 } 1238 1239 lock = &tmcf->conf->router->lock; 1240 1241 nxt_thread_spin_lock(lock); 1242 1243 nxt_router_engine_socket_count(&tmcf->creating); 1244 nxt_router_engine_socket_count(&tmcf->updating); 1245 1246 nxt_thread_spin_unlock(lock); 1247 1248 return ret; 1249 } 1250 1251 1252 static nxt_int_t 1253 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 1254 nxt_router_engine_conf_t *recf) 1255 { 1256 nxt_int_t ret; 1257 nxt_thread_spinlock_t *lock; 1258 1259 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 1260 nxt_router_listen_socket_create); 1261 if (nxt_slow_path(ret != NXT_OK)) { 1262 return ret; 1263 } 1264 1265 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 1266 nxt_router_listen_socket_update); 1267 if (nxt_slow_path(ret != NXT_OK)) { 1268 return ret; 1269 } 1270 1271 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 1272 if (nxt_slow_path(ret != NXT_OK)) { 1273 return ret; 1274 } 1275 1276 lock = &tmcf->conf->router->lock; 1277 1278 nxt_thread_spin_lock(lock); 1279 1280 nxt_router_engine_socket_count(&tmcf->creating); 1281 1282 nxt_thread_spin_unlock(lock); 1283 1284 return ret; 1285 } 1286 1287 1288 static nxt_int_t 1289 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 1290 nxt_router_engine_conf_t *recf) 1291 { 1292 nxt_int_t ret; 1293 1294 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); 1295 if (nxt_slow_path(ret != NXT_OK)) { 1296 return ret; 1297 } 1298 1299 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 1300 } 1301 1302 1303 static nxt_int_t 1304 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 1305 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 1306 nxt_work_handler_t handler) 1307 { 1308 nxt_joint_job_t *job; 1309 nxt_queue_link_t *qlk; 1310 nxt_socket_conf_t *skcf; 1311 nxt_socket_conf_joint_t *joint; 1312 1313 for (qlk = nxt_queue_first(sockets); 1314 qlk != nxt_queue_tail(sockets); 1315 qlk = nxt_queue_next(qlk)) 1316 { 1317 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 1318 if (nxt_slow_path(job == NULL)) { 1319 return NXT_ERROR; 1320 } 1321 1322 job->work.next = recf->jobs; 1323 recf->jobs = &job->work; 1324 1325 job->task = tmcf->engine->task; 1326 job->work.handler = handler; 1327 job->work.task = &job->task; 1328 job->work.obj = job; 1329 job->tmcf = tmcf; 1330 1331 tmcf->count++; 1332 1333 joint = nxt_mp_alloc(tmcf->conf->mem_pool, 1334 sizeof(nxt_socket_conf_joint_t)); 1335 if (nxt_slow_path(joint == NULL)) { 1336 return NXT_ERROR; 1337 } 1338 1339 job->work.data = joint; 1340 1341 joint->count = 1; 1342 1343 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1344 skcf->count++; 1345 joint->socket_conf = skcf; 1346 1347 joint->engine = recf->engine; 1348 } 1349 1350 return NXT_OK; 1351 } 1352 1353 1354 static void 1355 nxt_router_engine_socket_count(nxt_queue_t *sockets) 1356 { 1357 nxt_queue_link_t *qlk; 1358 nxt_socket_conf_t *skcf; 1359 1360 for (qlk = nxt_queue_first(sockets); 1361 qlk != nxt_queue_tail(sockets); 1362 qlk = nxt_queue_next(qlk)) 1363 { 1364 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1365 skcf->socket->count++; 1366 } 1367 } 1368 1369 1370 static nxt_int_t 1371 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 1372 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 1373 { 1374 nxt_joint_job_t *job; 1375 nxt_queue_link_t *qlk; 1376 1377 for (qlk = nxt_queue_first(sockets); 1378 qlk != nxt_queue_tail(sockets); 1379 qlk = nxt_queue_next(qlk)) 1380 { 1381 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 1382 if (nxt_slow_path(job == NULL)) { 1383 return NXT_ERROR; 1384 } 1385 1386 job->work.next = recf->jobs; 1387 recf->jobs = &job->work; 1388 1389 job->task = tmcf->engine->task; 1390 job->work.handler = nxt_router_listen_socket_delete; 1391 job->work.task = &job->task; 1392 job->work.obj = job; 1393 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1394 job->tmcf = tmcf; 1395 1396 tmcf->count++; 1397 } 1398 1399 return NXT_OK; 1400 } 1401 1402 1403 static nxt_int_t 1404 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 1405 nxt_router_temp_conf_t *tmcf) 1406 { 1407 nxt_int_t ret; 1408 nxt_uint_t i, threads; 1409 nxt_router_engine_conf_t *recf; 1410 1411 recf = tmcf->engines->elts; 1412 threads = tmcf->conf->threads; 1413 1414 for (i = tmcf->new_threads; i < threads; i++) { 1415 ret = nxt_router_thread_create(task, rt, recf[i].engine); 1416 if (nxt_slow_path(ret != NXT_OK)) { 1417 return ret; 1418 } 1419 } 1420 1421 return NXT_OK; 1422 } 1423 1424 1425 static nxt_int_t 1426 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 1427 nxt_event_engine_t *engine) 1428 { 1429 nxt_int_t ret; 1430 nxt_thread_link_t *link; 1431 nxt_thread_handle_t handle; 1432 1433 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 1434 1435 if (nxt_slow_path(link == NULL)) { 1436 return NXT_ERROR; 1437 } 1438 1439 link->start = nxt_router_thread_start; 1440 link->engine = engine; 1441 link->work.handler = nxt_router_thread_exit_handler; 1442 link->work.task = task; 1443 link->work.data = link; 1444 1445 nxt_queue_insert_tail(&rt->engines, &engine->link); 1446 1447 ret = nxt_thread_create(&handle, link); 1448 1449 if (nxt_slow_path(ret != NXT_OK)) { 1450 nxt_queue_remove(&engine->link); 1451 } 1452 1453 return ret; 1454 } 1455 1456 1457 static void 1458 nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 1459 { 1460 nxt_app_t *app; 1461 nxt_port_t *port; 1462 1463 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 1464 1465 nxt_queue_remove(&app->link); 1466 1467 nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app); 1468 1469 app->live = 0; 1470 1471 if (nxt_router_app_free(NULL, app) != 0) { 1472 continue; 1473 } 1474 1475 if (!nxt_queue_is_empty(&app->requests)) { 1476 1477 nxt_thread_log_debug("app '%V' %p pending requests found", 1478 &app->name, app); 1479 continue; 1480 } 1481 1482 do { 1483 port = nxt_router_app_get_port(app, 0); 1484 if (port == NULL) { 1485 break; 1486 } 1487 1488 nxt_thread_log_debug("port %p send quit", port); 1489 1490 nxt_port_socket_write(&port->engine->task, port, 1491 NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 1492 } while (1); 1493 1494 } nxt_queue_loop; 1495 1496 nxt_queue_add(&router->apps, &tmcf->previous); 1497 nxt_queue_add(&router->apps, &tmcf->apps); 1498 } 1499 1500 1501 static void 1502 nxt_router_engines_post(nxt_router_temp_conf_t *tmcf) 1503 { 1504 nxt_uint_t n; 1505 nxt_router_engine_conf_t *recf; 1506 1507 recf = tmcf->engines->elts; 1508 1509 for (n = tmcf->engines->nelts; n != 0; n--) { 1510 nxt_router_engine_post(recf); 1511 recf++; 1512 } 1513 } 1514 1515 1516 static void 1517 nxt_router_engine_post(nxt_router_engine_conf_t *recf) 1518 { 1519 nxt_work_t *work, *next; 1520 1521 for (work = recf->jobs; work != NULL; work = next) { 1522 next = work->next; 1523 work->next = NULL; 1524 1525 nxt_event_engine_post(recf->engine, work); 1526 } 1527 } 1528 1529 1530 static nxt_port_handler_t nxt_router_app_port_handlers[] = { 1531 NULL, /* NXT_PORT_MSG_QUIT */ 1532 NULL, /* NXT_PORT_MSG_NEW_PORT */ 1533 NULL, /* NXT_PORT_MSG_CHANGE_FILE */ 1534 /* TODO: remove mmap_handler from app ports */ 1535 nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */ 1536 nxt_router_app_data_handler, 1537 NULL, /* NXT_PORT_MSG_REMOVE_PID */ 1538 NULL, /* NXT_PORT_MSG_READY */ 1539 NULL, /* NXT_PORT_MSG_START_WORKER */ 1540 nxt_port_rpc_handler, 1541 nxt_port_rpc_handler, 1542 }; 1543 1544 1545 static void 1546 nxt_router_thread_start(void *data) 1547 { 1548 nxt_int_t ret; 1549 nxt_port_t *port; 1550 nxt_task_t *task; 1551 nxt_thread_t *thread; 1552 nxt_thread_link_t *link; 1553 nxt_event_engine_t *engine; 1554 1555 link = data; 1556 engine = link->engine; 1557 task = &engine->task; 1558 1559 thread = nxt_thread(); 1560 1561 nxt_event_engine_thread_adopt(engine); 1562 1563 /* STUB */ 1564 thread->runtime = engine->task.thread->runtime; 1565 1566 engine->task.thread = thread; 1567 engine->task.log = thread->log; 1568 thread->engine = engine; 1569 thread->task = &engine->task; 1570 thread->fiber = &engine->fibers->fiber; 1571 1572 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 1573 1574 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 1575 NXT_PROCESS_ROUTER); 1576 if (nxt_slow_path(port == NULL)) { 1577 return; 1578 } 1579 1580 ret = nxt_port_socket_init(task, port, 0); 1581 if (nxt_slow_path(ret != NXT_OK)) { 1582 nxt_mp_release(port->mem_pool, port); 1583 return; 1584 } 1585 1586 engine->port = port; 1587 1588 nxt_port_enable(task, port, nxt_router_app_port_handlers); 1589 1590 nxt_event_engine_start(engine); 1591 } 1592 1593 1594 static void 1595 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 1596 { 1597 nxt_joint_job_t *job; 1598 nxt_listen_event_t *listen; 1599 nxt_listen_socket_t *ls; 1600 nxt_socket_conf_joint_t *joint; 1601 1602 job = obj; 1603 joint = data; 1604 1605 ls = &joint->socket_conf->listen; 1606 1607 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 1608 1609 listen = nxt_listen_event(task, ls); 1610 if (nxt_slow_path(listen == NULL)) { 1611 nxt_router_listen_socket_release(task, joint); 1612 return; 1613 } 1614 1615 listen->socket.data = joint; 1616 1617 job->work.next = NULL; 1618 job->work.handler = nxt_router_conf_wait; 1619 1620 nxt_event_engine_post(job->tmcf->engine, &job->work); 1621 } 1622 1623 1624 nxt_inline nxt_listen_event_t * 1625 nxt_router_listen_event(nxt_queue_t *listen_connections, 1626 nxt_socket_conf_t *skcf) 1627 { 1628 nxt_socket_t fd; 1629 nxt_queue_link_t *qlk; 1630 nxt_listen_event_t *listen; 1631 1632 fd = skcf->socket->fd; 1633 1634 for (qlk = nxt_queue_first(listen_connections); 1635 qlk != nxt_queue_tail(listen_connections); 1636 qlk = nxt_queue_next(qlk)) 1637 { 1638 listen = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 1639 1640 if (fd == listen->socket.fd) { 1641 return listen; 1642 } 1643 } 1644 1645 return NULL; 1646 } 1647 1648 1649 static void 1650 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 1651 { 1652 nxt_joint_job_t *job; 1653 nxt_event_engine_t *engine; 1654 nxt_listen_event_t *listen; 1655 nxt_socket_conf_joint_t *joint, *old; 1656 1657 job = obj; 1658 joint = data; 1659 1660 engine = task->thread->engine; 1661 1662 nxt_queue_insert_tail(&engine->joints, &joint->link); 1663 1664 listen = nxt_router_listen_event(&engine->listen_connections, 1665 joint->socket_conf); 1666 1667 old = listen->socket.data; 1668 listen->socket.data = joint; 1669 listen->listen = &joint->socket_conf->listen; 1670 1671 job->work.next = NULL; 1672 job->work.handler = nxt_router_conf_wait; 1673 1674 nxt_event_engine_post(job->tmcf->engine, &job->work); 1675 1676 /* 1677 * The task is allocated from configuration temporary 1678 * memory pool so it can be freed after engine post operation. 1679 */ 1680 1681 nxt_router_conf_release(&engine->task, old); 1682 } 1683 1684 1685 static void 1686 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 1687 { 1688 nxt_joint_job_t *job; 1689 nxt_socket_conf_t *skcf; 1690 nxt_listen_event_t *listen; 1691 nxt_event_engine_t *engine; 1692 1693 job = obj; 1694 skcf = data; 1695 1696 engine = task->thread->engine; 1697 1698 listen = nxt_router_listen_event(&engine->listen_connections, skcf); 1699 1700 nxt_fd_event_delete(engine, &listen->socket); 1701 1702 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 1703 listen->socket.fd); 1704 1705 listen->timer.handler = nxt_router_listen_socket_close; 1706 listen->timer.work_queue = &engine->fast_work_queue; 1707 1708 nxt_timer_add(engine, &listen->timer, 0); 1709 1710 job->work.next = NULL; 1711 job->work.handler = nxt_router_conf_wait; 1712 1713 nxt_event_engine_post(job->tmcf->engine, &job->work); 1714 } 1715 1716 1717 static void 1718 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 1719 { 1720 nxt_timer_t *timer; 1721 nxt_listen_event_t *listen; 1722 nxt_socket_conf_joint_t *joint; 1723 1724 timer = obj; 1725 listen = nxt_timer_data(timer, nxt_listen_event_t, timer); 1726 joint = listen->socket.data; 1727 1728 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 1729 listen->socket.fd); 1730 1731 nxt_queue_remove(&listen->link); 1732 1733 /* 'task' refers to listen->task and we cannot use after nxt_free() */ 1734 task = &task->thread->engine->task; 1735 1736 nxt_free(listen); 1737 1738 nxt_router_listen_socket_release(task, joint); 1739 } 1740 1741 1742 static void 1743 nxt_router_listen_socket_release(nxt_task_t *task, 1744 nxt_socket_conf_joint_t *joint) 1745 { 1746 nxt_socket_conf_t *skcf; 1747 nxt_router_socket_t *rtsk; 1748 nxt_thread_spinlock_t *lock; 1749 1750 skcf = joint->socket_conf; 1751 rtsk = skcf->socket; 1752 lock = &skcf->router_conf->router->lock; 1753 1754 nxt_thread_spin_lock(lock); 1755 1756 nxt_debug(task, "engine %p: listen socket release: rtsk->count %D", 1757 task->thread->engine, rtsk->count); 1758 1759 if (--rtsk->count != 0) { 1760 rtsk = NULL; 1761 } 1762 1763 nxt_thread_spin_unlock(lock); 1764 1765 if (rtsk != NULL) { 1766 nxt_socket_close(task, rtsk->fd); 1767 nxt_free(rtsk); 1768 skcf->socket = NULL; 1769 } 1770 1771 nxt_router_conf_release(task, joint); 1772 } 1773 1774 1775 static void 1776 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 1777 { 1778 nxt_bool_t exit; 1779 nxt_socket_conf_t *skcf; 1780 nxt_router_conf_t *rtcf; 1781 nxt_thread_spinlock_t *lock; 1782 1783 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 1784 1785 if (--joint->count != 0) { 1786 return; 1787 } 1788 1789 nxt_queue_remove(&joint->link); 1790 1791 skcf = joint->socket_conf; 1792 rtcf = skcf->router_conf; 1793 lock = &rtcf->router->lock; 1794 1795 nxt_thread_spin_lock(lock); 1796 1797 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 1798 rtcf, rtcf->count); 1799 1800 if (--skcf->count != 0) { 1801 rtcf = NULL; 1802 1803 } else { 1804 nxt_queue_remove(&skcf->link); 1805 1806 if (--rtcf->count != 0) { 1807 rtcf = NULL; 1808 } 1809 } 1810 1811 nxt_thread_spin_unlock(lock); 1812 1813 /* TODO remove engine->port */ 1814 /* TODO excude from connected ports */ 1815 1816 /* The joint content can be used before memory pool destruction. */ 1817 exit = nxt_queue_is_empty(&joint->engine->joints); 1818 1819 if (rtcf != NULL) { 1820 nxt_debug(task, "old router conf is destroyed"); 1821 1822 nxt_mp_thread_adopt(rtcf->mem_pool); 1823 1824 nxt_mp_destroy(rtcf->mem_pool); 1825 } 1826 1827 if (exit) { 1828 nxt_thread_exit(task->thread); 1829 } 1830 } 1831 1832 1833 static void 1834 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 1835 { 1836 nxt_port_t *port; 1837 nxt_thread_link_t *link; 1838 nxt_event_engine_t *engine; 1839 nxt_thread_handle_t handle; 1840 1841 handle = (nxt_thread_handle_t) obj; 1842 link = data; 1843 1844 nxt_thread_wait(handle); 1845 1846 engine = link->engine; 1847 1848 nxt_queue_remove(&engine->link); 1849 1850 port = engine->port; 1851 1852 // TODO notify all apps 1853 1854 nxt_mp_thread_adopt(port->mem_pool); 1855 nxt_port_release(port); 1856 1857 nxt_mp_thread_adopt(engine->mem_pool); 1858 nxt_mp_destroy(engine->mem_pool); 1859 1860 nxt_event_engine_free(engine); 1861 1862 nxt_free(link); 1863 } 1864 1865 1866 static const nxt_conn_state_t nxt_router_conn_read_header_state 1867 nxt_aligned(64) = 1868 { 1869 .ready_handler = nxt_router_conn_http_header_parse, 1870 .close_handler = nxt_router_conn_close, 1871 .error_handler = nxt_router_conn_error, 1872 1873 .timer_handler = nxt_router_conn_timeout, 1874 .timer_value = nxt_router_conn_timeout_value, 1875 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), 1876 }; 1877 1878 1879 static const nxt_conn_state_t nxt_router_conn_read_body_state 1880 nxt_aligned(64) = 1881 { 1882 .ready_handler = nxt_router_conn_http_body_read, 1883 .close_handler = nxt_router_conn_close, 1884 .error_handler = nxt_router_conn_error, 1885 1886 .timer_handler = nxt_router_conn_timeout, 1887 .timer_value = nxt_router_conn_timeout_value, 1888 .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout), 1889 .timer_autoreset = 1, 1890 }; 1891 1892 1893 static void 1894 nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) 1895 { 1896 size_t size; 1897 nxt_conn_t *c; 1898 nxt_event_engine_t *engine; 1899 nxt_socket_conf_joint_t *joint; 1900 1901 c = obj; 1902 joint = data; 1903 1904 nxt_debug(task, "router conn init"); 1905 1906 joint->count++; 1907 1908 size = joint->socket_conf->header_buffer_size; 1909 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0); 1910 1911 c->socket.data = NULL; 1912 1913 engine = task->thread->engine; 1914 c->read_work_queue = &engine->fast_work_queue; 1915 c->write_work_queue = &engine->fast_work_queue; 1916 1917 c->read_state = &nxt_router_conn_read_header_state; 1918 1919 nxt_conn_read(engine, c); 1920 } 1921 1922 1923 static const nxt_conn_state_t nxt_router_conn_write_state 1924 nxt_aligned(64) = 1925 { 1926 .ready_handler = nxt_router_conn_ready, 1927 .close_handler = nxt_router_conn_close, 1928 .error_handler = nxt_router_conn_error, 1929 }; 1930 1931 1932 static void 1933 nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1934 { 1935 size_t dump_size; 1936 nxt_buf_t *b, *last; 1937 nxt_conn_t *c; 1938 nxt_req_conn_link_t *rc; 1939 nxt_event_engine_t *engine; 1940 1941 b = msg->buf; 1942 engine = task->thread->engine; 1943 1944 rc = nxt_event_engine_request_find(engine, msg->port_msg.stream); 1945 if (nxt_slow_path(rc == NULL)) { 1946 nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream); 1947 1948 return; 1949 } 1950 1951 c = rc->conn; 1952 1953 dump_size = nxt_buf_used_size(b); 1954 1955 if (dump_size > 300) { 1956 dump_size = 300; 1957 } 1958 1959 nxt_debug(task, "%srouter app data (%z): %*s", 1960 msg->port_msg.last ? "last " : "", msg->size, dump_size, 1961 b->mem.pos); 1962 1963 if (msg->size == 0) { 1964 b = NULL; 1965 } 1966 1967 if (msg->port_msg.last != 0) { 1968 nxt_debug(task, "router data create last buf"); 1969 1970 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); 1971 if (nxt_slow_path(last == NULL)) { 1972 /* TODO pogorevaTb */ 1973 } 1974 1975 nxt_buf_chain_add(&b, last); 1976 1977 if (rc->app_port != NULL) { 1978 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); 1979 1980 rc->app_port = NULL; 1981 } 1982 1983 rc->conn = NULL; 1984 } 1985 1986 if (b == NULL) { 1987 return; 1988 } 1989 1990 if (msg->buf == b) { 1991 /* Disable instant buffer completion/re-using by port. */ 1992 msg->buf = NULL; 1993 } 1994 1995 if (c->write == NULL) { 1996 c->write = b; 1997 c->write_state = &nxt_router_conn_write_state; 1998 1999 nxt_conn_write(task->thread->engine, c); 2000 2001 } else { 2002 nxt_debug(task, "router data attach out bufs to existing chain"); 2003 2004 nxt_buf_chain_add(&c->write, b); 2005 } 2006 } 2007 2008 2009 nxt_inline const char * 2010 nxt_router_text_by_code(int code) 2011 { 2012 switch (code) { 2013 case 400: return "Bad request"; 2014 case 404: return "Not found"; 2015 case 403: return "Forbidden"; 2016 case 408: return "Request Timeout"; 2017 case 411: return "Length Required"; 2018 case 413: return "Request Entity Too Large"; 2019 case 500: 2020 default: return "Internal server error"; 2021 } 2022 } 2023 2024 2025 static nxt_buf_t * 2026 nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, 2027 const char* fmt, va_list args) 2028 { 2029 nxt_buf_t *b, *last; 2030 const char *msg; 2031 2032 b = nxt_buf_mem_ts_alloc(task, mp, 16384); 2033 if (nxt_slow_path(b == NULL)) { 2034 return NULL; 2035 } 2036 2037 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, 2038 "HTTP/1.0 %d %s\r\n" 2039 "Content-Type: text/plain\r\n" 2040 "Connection: close\r\n\r\n", 2041 code, nxt_router_text_by_code(code)); 2042 2043 msg = (const char *) b->mem.free; 2044 2045 b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); 2046 b->mem.free[0] = '\0'; 2047 2048 nxt_log_alert(task->log, "error %d: %s", code, msg); 2049 2050 last = nxt_buf_mem_ts_alloc(task, mp, 0); 2051 2052 if (nxt_slow_path(last == NULL)) { 2053 nxt_mp_release(mp, b); 2054 return NULL; 2055 } 2056 2057 nxt_buf_set_sync(last); 2058 nxt_buf_set_last(last); 2059 2060 nxt_buf_chain_add(&b, last); 2061 2062 return b; 2063 } 2064 2065 2066 2067 static void 2068 nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 2069 const char* fmt, ...) 2070 { 2071 va_list args; 2072 nxt_buf_t *b; 2073 2074 va_start(args, fmt); 2075 b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args); 2076 va_end(args); 2077 2078 if (c->socket.data != NULL) { 2079 nxt_mp_free(c->mem_pool, c->socket.data); 2080 c->socket.data = NULL; 2081 } 2082 2083 if (c->socket.fd == -1) { 2084 nxt_mp_release(c->mem_pool, b->next); 2085 nxt_mp_release(c->mem_pool, b); 2086 return; 2087 } 2088 2089 if (c->write == NULL) { 2090 c->write = b; 2091 c->write_state = &nxt_router_conn_write_state; 2092 2093 nxt_conn_write(task->thread->engine, c); 2094 2095 } else { 2096 nxt_debug(task, "router data attach out bufs to existing chain"); 2097 2098 nxt_buf_chain_add(&c->write, b); 2099 } 2100 } 2101 2102 2103 static void 2104 nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) 2105 { 2106 nxt_start_worker_t *sw; 2107 2108 sw = data; 2109 2110 nxt_assert(sw != NULL); 2111 nxt_assert(sw->app->pending_workers != 0); 2112 2113 msg->new_port->app = sw->app; 2114 2115 sw->app->pending_workers--; 2116 sw->app->workers++; 2117 2118 nxt_debug(task, "sw %p got port %p", sw, msg->new_port); 2119 2120 nxt_router_app_release_port(task, msg->new_port, sw->app); 2121 2122 nxt_router_sw_release(task, sw); 2123 } 2124 2125 2126 static void 2127 nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) 2128 { 2129 nxt_start_worker_t *sw; 2130 2131 sw = data; 2132 2133 nxt_assert(sw != NULL); 2134 nxt_assert(sw->app->pending_workers != 0); 2135 2136 sw->app->pending_workers--; 2137 2138 nxt_debug(task, "sw %p error, failed to start app '%V'", 2139 sw, &sw->app->name); 2140 2141 nxt_router_sw_release(task, sw); 2142 } 2143 2144 2145 static void 2146 nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) 2147 { 2148 size_t size; 2149 uint32_t stream; 2150 nxt_buf_t *b; 2151 nxt_app_t *app; 2152 nxt_port_t *main_port, *router_port, *app_port; 2153 nxt_runtime_t *rt; 2154 nxt_start_worker_t *sw; 2155 nxt_req_app_link_t *ra; 2156 2157 sw = obj; 2158 app = sw->app; 2159 2160 if (nxt_queue_is_empty(&app->requests)) { 2161 ra = sw->ra; 2162 app_port = nxt_router_app_get_port(app, ra->req_id); 2163 2164 if (app_port != NULL) { 2165 nxt_debug(task, "app '%V' %p process request #%uxD", 2166 &app->name, app, ra->req_id); 2167 2168 ra->app_port = app_port; 2169 2170 nxt_router_process_http_request_mp(task, ra, app_port); 2171 2172 nxt_router_ra_release(task, ra, ra->work.data); 2173 nxt_router_sw_release(task, sw); 2174 2175 return; 2176 } 2177 } 2178 2179 nxt_queue_insert_tail(&app->requests, &sw->ra->link); 2180 2181 if (app->workers + app->pending_workers >= app->max_workers) { 2182 nxt_debug(task, "app '%V' %p %uD/%uD running/pending workers, " 2183 "max_workers (%uD) reached", &app->name, app, 2184 app->workers, app->pending_workers, app->max_workers); 2185 2186 nxt_router_sw_release(task, sw); 2187 2188 return; 2189 } 2190 2191 app->pending_workers++; 2192 2193 nxt_debug(task, "sw %p send", sw); 2194 2195 rt = task->thread->runtime; 2196 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2197 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2198 2199 size = app->name.length + 1 + app->conf.length; 2200 2201 b = nxt_buf_mem_alloc(main_port->mem_pool, size, 0); 2202 2203 nxt_buf_cpystr(b, &app->name); 2204 *b->mem.free++ = '\0'; 2205 nxt_buf_cpystr(b, &app->conf); 2206 2207 stream = nxt_port_rpc_register_handler(task, router_port, 2208 nxt_router_sw_ready, 2209 nxt_router_sw_error, 2210 main_port->pid, sw); 2211 2212 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 2213 stream, router_port->id, b); 2214 } 2215 2216 2217 static nxt_bool_t 2218 nxt_router_app_free(nxt_task_t *task, nxt_app_t *app) 2219 { 2220 nxt_queue_link_t *lnk; 2221 nxt_req_app_link_t *ra; 2222 2223 nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app, 2224 app->live, app->workers, app->pending_workers, 2225 nxt_queue_is_empty(&app->requests)); 2226 2227 if (app->live == 0 2228 && app->workers == 0 2229 && app->pending_workers == 0 2230 && nxt_queue_is_empty(&app->requests)) 2231 { 2232 nxt_thread_mutex_destroy(&app->mutex); 2233 nxt_free(app); 2234 2235 return 1; 2236 } 2237 2238 if (app->live == 1 2239 && nxt_queue_is_empty(&app->requests) == 0 2240 && app->workers + app->pending_workers < app->max_workers) 2241 { 2242 lnk = nxt_queue_first(&app->requests); 2243 nxt_queue_remove(lnk); 2244 2245 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); 2246 2247 nxt_router_sw_create(task, app, ra); 2248 } 2249 2250 return 0; 2251 } 2252 2253 2254 static nxt_port_t * 2255 nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id) 2256 { 2257 nxt_port_t *port; 2258 nxt_queue_link_t *lnk; 2259 2260 port = NULL; 2261 2262 nxt_thread_mutex_lock(&app->mutex); 2263 2264 if (!nxt_queue_is_empty(&app->ports)) { 2265 lnk = nxt_queue_first(&app->ports); 2266 nxt_queue_remove(lnk); 2267 2268 lnk->next = NULL; 2269 2270 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 2271 2272 port->app_req_id = req_id; 2273 } 2274 2275 nxt_thread_mutex_unlock(&app->mutex); 2276 2277 return port; 2278 } 2279 2280 2281 static void 2282 nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) 2283 { 2284 nxt_app_t *app; 2285 nxt_port_t *port; 2286 nxt_work_t *work; 2287 nxt_queue_link_t *lnk; 2288 nxt_req_app_link_t *ra; 2289 2290 port = obj; 2291 app = data; 2292 2293 nxt_assert(app != NULL); 2294 nxt_assert(app == port->app); 2295 nxt_assert(port->app_link.next == NULL); 2296 2297 2298 if (task->thread->engine != port->engine) { 2299 work = &port->work; 2300 2301 nxt_debug(task, "post release port to engine %p", port->engine); 2302 2303 work->next = NULL; 2304 work->handler = nxt_router_app_release_port; 2305 work->task = &port->engine->task; 2306 work->obj = port; 2307 work->data = app; 2308 2309 nxt_event_engine_post(port->engine, work); 2310 2311 return; 2312 } 2313 2314 if (!nxt_queue_is_empty(&app->requests)) { 2315 lnk = nxt_queue_first(&app->requests); 2316 nxt_queue_remove(lnk); 2317 2318 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); 2319 2320 nxt_debug(task, "app '%V' %p process next request #%uxD", 2321 &app->name, app, ra->req_id); 2322 2323 ra->app_port = port; 2324 port->app_req_id = ra->req_id; 2325 2326 nxt_router_process_http_request_mp(task, ra, port); 2327 2328 nxt_router_ra_release(task, ra, ra->work.data); 2329 2330 return; 2331 } 2332 2333 port->app_req_id = 0; 2334 2335 if (port->pair[1] == -1) { 2336 nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)", 2337 &app->name, app, port->pid); 2338 2339 app->workers--; 2340 nxt_router_app_free(task, app); 2341 2342 port->app = NULL; 2343 2344 nxt_port_release(port); 2345 2346 return; 2347 } 2348 2349 if (!app->live) { 2350 nxt_debug(task, "app '%V' %p is not alive, send QUIT to port", 2351 &app->name, app); 2352 2353 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, 2354 -1, 0, 0, NULL); 2355 2356 return; 2357 } 2358 2359 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 2360 &app->name, app); 2361 2362 nxt_thread_mutex_lock(&app->mutex); 2363 2364 nxt_queue_insert_head(&app->ports, &port->app_link); 2365 2366 nxt_thread_mutex_unlock(&app->mutex); 2367 } 2368 2369 2370 nxt_bool_t 2371 nxt_router_app_remove_port(nxt_port_t *port) 2372 { 2373 nxt_app_t *app; 2374 nxt_bool_t busy; 2375 2376 app = port->app; 2377 busy = port->app_req_id != 0; 2378 2379 if (app == NULL) { 2380 nxt_thread_log_debug("port %p app remove, no app", port); 2381 2382 nxt_assert(port->app_link.next == NULL); 2383 2384 return 1; 2385 } 2386 2387 nxt_thread_mutex_lock(&app->mutex); 2388 2389 if (port->app_link.next != NULL) { 2390 2391 nxt_queue_remove(&port->app_link); 2392 port->app_link.next = NULL; 2393 2394 } 2395 2396 nxt_thread_mutex_unlock(&app->mutex); 2397 2398 if (busy == 0) { 2399 nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port, 2400 &app->name, app); 2401 2402 app->workers--; 2403 nxt_router_app_free(&port->engine->task, app); 2404 2405 return 1; 2406 } 2407 2408 nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, req #%uxD", 2409 port, &app->name, app, port->app_req_id); 2410 2411 return 0; 2412 } 2413 2414 2415 static nxt_int_t 2416 nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) 2417 { 2418 nxt_app_t *app; 2419 nxt_conn_t *c; 2420 nxt_port_t *port; 2421 nxt_start_worker_t *sw; 2422 nxt_socket_conf_joint_t *joint; 2423 2424 port = NULL; 2425 c = ra->rc->conn; 2426 2427 joint = c->listen->socket.data; 2428 app = joint->socket_conf->application; 2429 2430 if (app == NULL) { 2431 nxt_router_gen_error(task, c, 500, 2432 "Application is NULL in socket_conf"); 2433 return NXT_ERROR; 2434 } 2435 2436 2437 port = nxt_router_app_get_port(app, ra->req_id); 2438 2439 if (port != NULL) { 2440 nxt_debug(task, "already have port for app '%V'", &app->name); 2441 2442 ra->app_port = port; 2443 return NXT_OK; 2444 } 2445 2446 sw = nxt_router_sw_create(task, app, ra); 2447 2448 if (nxt_slow_path(sw == NULL)) { 2449 nxt_router_gen_error(task, c, 500, 2450 "Failed to allocate start worker struct"); 2451 return NXT_ERROR; 2452 } 2453 2454 return NXT_AGAIN; 2455 } 2456 2457 2458 static void 2459 nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) 2460 { 2461 size_t size; 2462 nxt_int_t ret; 2463 nxt_buf_t *buf; 2464 nxt_conn_t *c; 2465 nxt_sockaddr_t *local; 2466 nxt_app_parse_ctx_t *ap; 2467 nxt_app_request_body_t *b; 2468 nxt_socket_conf_joint_t *joint; 2469 nxt_app_request_header_t *h; 2470 2471 c = obj; 2472 ap = data; 2473 buf = c->read; 2474 joint = c->listen->socket.data; 2475 2476 nxt_debug(task, "router conn http header parse"); 2477 2478 if (ap == NULL) { 2479 ap = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_app_parse_ctx_t)); 2480 if (nxt_slow_path(ap == NULL)) { 2481 nxt_router_conn_close(task, c, data); 2482 return; 2483 } 2484 2485 ret = nxt_app_http_req_init(task, ap); 2486 if (nxt_slow_path(ret != NXT_OK)) { 2487 nxt_router_conn_close(task, c, data); 2488 return; 2489 } 2490 2491 c->socket.data = ap; 2492 2493 ap->r.remote.start = nxt_sockaddr_address(c->remote); 2494 ap->r.remote.length = c->remote->address_length; 2495 2496 local = joint->socket_conf->sockaddr; 2497 ap->r.local.start = nxt_sockaddr_address(local); 2498 ap->r.local.length = local->address_length; 2499 2500 ap->r.header.buf = buf; 2501 } 2502 2503 h = &ap->r.header; 2504 b = &ap->r.body; 2505 2506 ret = nxt_app_http_req_header_parse(task, ap, buf); 2507 2508 nxt_debug(task, "http parse request header: %d", ret); 2509 2510 switch (nxt_expect(NXT_DONE, ret)) { 2511 2512 case NXT_DONE: 2513 nxt_debug(task, "router request header parsing complete, " 2514 "content length: %O, preread: %uz", 2515 h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem)); 2516 2517 if (b->done) { 2518 nxt_router_process_http_request(task, c, ap); 2519 2520 return; 2521 } 2522 2523 if (joint->socket_conf->max_body_size > 0 2524 && (size_t) h->parsed_content_length 2525 > joint->socket_conf->max_body_size) 2526 { 2527 nxt_router_gen_error(task, c, 413, "Content-Length too big"); 2528 return; 2529 } 2530 2531 if (nxt_buf_mem_free_size(&buf->mem) == 0) { 2532 size = nxt_min(joint->socket_conf->body_buffer_size, 2533 (size_t) h->parsed_content_length); 2534 2535 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); 2536 if (nxt_slow_path(buf->next == NULL)) { 2537 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2538 "buffer for request body"); 2539 return; 2540 } 2541 2542 c->read = buf->next; 2543 2544 b->preread_size += nxt_buf_mem_used_size(&buf->mem); 2545 } 2546 2547 if (b->buf == NULL) { 2548 b->buf = c->read; 2549 } 2550 2551 c->read_state = &nxt_router_conn_read_body_state; 2552 break; 2553 2554 case NXT_ERROR: 2555 nxt_router_gen_error(task, c, 400, "Request header parse error"); 2556 return; 2557 2558 default: /* NXT_AGAIN */ 2559 2560 if (c->read->mem.free == c->read->mem.end) { 2561 size = joint->socket_conf->large_header_buffer_size; 2562 2563 if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) 2564 || ap->r.header.bufs 2565 >= joint->socket_conf->large_header_buffers) 2566 { 2567 nxt_router_gen_error(task, c, 413, 2568 "Too long request headers"); 2569 return; 2570 } 2571 2572 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); 2573 if (nxt_slow_path(buf->next == NULL)) { 2574 nxt_router_gen_error(task, c, 500, 2575 "Failed to allocate large header " 2576 "buffer"); 2577 return; 2578 } 2579 2580 ap->r.header.bufs++; 2581 2582 size = c->read->mem.free - c->read->mem.pos; 2583 2584 c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size); 2585 } 2586 2587 } 2588 2589 nxt_conn_read(task->thread->engine, c); 2590 } 2591 2592 2593 static void 2594 nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data) 2595 { 2596 size_t size; 2597 nxt_int_t ret; 2598 nxt_buf_t *buf; 2599 nxt_conn_t *c; 2600 nxt_app_parse_ctx_t *ap; 2601 nxt_app_request_body_t *b; 2602 nxt_socket_conf_joint_t *joint; 2603 nxt_app_request_header_t *h; 2604 2605 c = obj; 2606 ap = data; 2607 buf = c->read; 2608 2609 nxt_debug(task, "router conn http body read"); 2610 2611 nxt_assert(ap != NULL); 2612 2613 b = &ap->r.body; 2614 h = &ap->r.header; 2615 2616 ret = nxt_app_http_req_body_read(task, ap, buf); 2617 2618 nxt_debug(task, "http read request body: %d", ret); 2619 2620 switch (nxt_expect(NXT_DONE, ret)) { 2621 2622 case NXT_DONE: 2623 nxt_router_process_http_request(task, c, ap); 2624 return; 2625 2626 case NXT_ERROR: 2627 nxt_router_gen_error(task, c, 500, "Read body error"); 2628 return; 2629 2630 default: /* NXT_AGAIN */ 2631 2632 if (nxt_buf_mem_free_size(&buf->mem) == 0) { 2633 joint = c->listen->socket.data; 2634 2635 b->preread_size += nxt_buf_mem_used_size(&buf->mem); 2636 2637 size = nxt_min(joint->socket_conf->body_buffer_size, 2638 (size_t) h->parsed_content_length - b->preread_size); 2639 2640 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); 2641 if (nxt_slow_path(buf->next == NULL)) { 2642 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2643 "buffer for request body"); 2644 return; 2645 } 2646 2647 c->read = buf->next; 2648 } 2649 2650 nxt_debug(task, "router request body read again, rest: %uz", 2651 h->parsed_content_length - b->preread_size); 2652 } 2653 2654 nxt_conn_read(task->thread->engine, c); 2655 } 2656 2657 2658 static void 2659 nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, 2660 nxt_app_parse_ctx_t *ap) 2661 { 2662 nxt_mp_t *port_mp; 2663 nxt_int_t res; 2664 nxt_port_t *port; 2665 nxt_req_id_t req_id; 2666 nxt_event_engine_t *engine; 2667 nxt_req_app_link_t *ra; 2668 nxt_req_conn_link_t *rc; 2669 2670 engine = task->thread->engine; 2671 2672 do { 2673 req_id = nxt_random(&task->thread->random); 2674 } while (nxt_event_engine_request_find(engine, req_id) != NULL); 2675 2676 rc = nxt_conn_request_add(c, req_id); 2677 2678 if (nxt_slow_path(rc == NULL)) { 2679 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2680 "req->conn link"); 2681 2682 return; 2683 } 2684 2685 nxt_event_engine_request_add(engine, rc); 2686 2687 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", 2688 req_id, c, engine); 2689 2690 c->socket.data = NULL; 2691 2692 ra = nxt_router_ra_create(task, rc); 2693 2694 ra->ap = ap; 2695 ra->reply_port = engine->port; 2696 2697 res = nxt_router_app_port(task, ra); 2698 2699 if (res != NXT_OK) { 2700 return; 2701 } 2702 2703 port = ra->app_port; 2704 2705 if (nxt_slow_path(port == NULL)) { 2706 nxt_router_gen_error(task, rc->conn, 500, "Application port not found"); 2707 return; 2708 } 2709 2710 port_mp = port->mem_pool; 2711 port->mem_pool = c->mem_pool; 2712 2713 nxt_router_process_http_request_mp(task, ra, port); 2714 2715 port->mem_pool = port_mp; 2716 2717 2718 nxt_router_ra_release(task, ra, ra->work.data); 2719 } 2720 2721 2722 static void 2723 nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, 2724 nxt_port_t *port) 2725 { 2726 nxt_int_t res; 2727 nxt_port_t *c_port, *reply_port; 2728 nxt_conn_t *c; 2729 nxt_app_wmsg_t wmsg; 2730 nxt_app_parse_ctx_t *ap; 2731 2732 reply_port = ra->reply_port; 2733 ap = ra->ap; 2734 c = ra->rc->conn; 2735 2736 c_port = nxt_process_connected_port_find(port->process, reply_port->pid, 2737 reply_port->id); 2738 if (nxt_slow_path(c_port != reply_port)) { 2739 res = nxt_port_send_port(task, port, reply_port, 0); 2740 2741 if (nxt_slow_path(res != NXT_OK)) { 2742 nxt_router_gen_error(task, c, 500, 2743 "Failed to send reply port to application"); 2744 return; 2745 } 2746 2747 nxt_process_connected_port_add(port->process, reply_port); 2748 } 2749 2750 wmsg.port = port; 2751 wmsg.write = NULL; 2752 wmsg.buf = &wmsg.write; 2753 wmsg.stream = ra->req_id; 2754 2755 res = port->app->prepare_msg(task, &ap->r, &wmsg); 2756 2757 if (nxt_slow_path(res != NXT_OK)) { 2758 nxt_router_gen_error(task, c, 500, 2759 "Failed to prepare message for application"); 2760 return; 2761 } 2762 2763 nxt_debug(task, "about to send %d bytes buffer to worker port %d", 2764 nxt_buf_used_size(wmsg.write), 2765 wmsg.port->socket.fd); 2766 2767 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, 2768 -1, ra->req_id, reply_port->id, wmsg.write); 2769 2770 if (nxt_slow_path(res != NXT_OK)) { 2771 nxt_router_gen_error(task, c, 500, 2772 "Failed to send message to application"); 2773 return; 2774 } 2775 } 2776 2777 2778 static nxt_int_t 2779 nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 2780 nxt_app_wmsg_t *wmsg) 2781 { 2782 nxt_int_t rc; 2783 nxt_buf_t *b; 2784 nxt_http_field_t *field; 2785 nxt_app_request_header_t *h; 2786 2787 static const nxt_str_t prefix = nxt_string("HTTP_"); 2788 static const nxt_str_t eof = nxt_null_string; 2789 2790 h = &r->header; 2791 2792 #define RC(S) \ 2793 do { \ 2794 rc = (S); \ 2795 if (nxt_slow_path(rc != NXT_OK)) { \ 2796 goto fail; \ 2797 } \ 2798 } while(0) 2799 2800 #define NXT_WRITE(N) \ 2801 RC(nxt_app_msg_write_str(task, wmsg, N)) 2802 2803 /* TODO error handle, async mmap buffer assignment */ 2804 2805 NXT_WRITE(&h->method); 2806 NXT_WRITE(&h->target); 2807 2808 if (h->path.start == h->target.start) { 2809 NXT_WRITE(&eof); 2810 2811 } else { 2812 NXT_WRITE(&h->path); 2813 } 2814 2815 if (h->query.start != NULL) { 2816 RC(nxt_app_msg_write_size(task, wmsg, 2817 h->query.start - h->target.start + 1)); 2818 } else { 2819 RC(nxt_app_msg_write_size(task, wmsg, 0)); 2820 } 2821 2822 NXT_WRITE(&h->version); 2823 2824 NXT_WRITE(&r->remote); 2825 NXT_WRITE(&r->local); 2826 2827 NXT_WRITE(&h->host); 2828 NXT_WRITE(&h->content_type); 2829 NXT_WRITE(&h->content_length); 2830 2831 nxt_list_each(field, h->fields) { 2832 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, 2833 &prefix, &field->name)); 2834 NXT_WRITE(&field->value); 2835 2836 } nxt_list_loop; 2837 2838 /* end-of-headers mark */ 2839 NXT_WRITE(&eof); 2840 2841 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 2842 2843 for(b = r->body.buf; b != NULL; b = b->next) { 2844 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 2845 nxt_buf_mem_used_size(&b->mem))); 2846 } 2847 2848 #undef NXT_WRITE 2849 #undef RC 2850 2851 return NXT_OK; 2852 2853 fail: 2854 2855 return NXT_ERROR; 2856 } 2857 2858 2859 static nxt_int_t 2860 nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 2861 nxt_app_wmsg_t *wmsg) 2862 { 2863 nxt_int_t rc; 2864 nxt_buf_t *b; 2865 nxt_http_field_t *field; 2866 nxt_app_request_header_t *h; 2867 2868 static const nxt_str_t prefix = nxt_string("HTTP_"); 2869 static const nxt_str_t eof = nxt_null_string; 2870 2871 h = &r->header; 2872 2873 #define RC(S) \ 2874 do { \ 2875 rc = (S); \ 2876 if (nxt_slow_path(rc != NXT_OK)) { \ 2877 goto fail; \ 2878 } \ 2879 } while(0) 2880 2881 #define NXT_WRITE(N) \ 2882 RC(nxt_app_msg_write_str(task, wmsg, N)) 2883 2884 /* TODO error handle, async mmap buffer assignment */ 2885 2886 NXT_WRITE(&h->method); 2887 NXT_WRITE(&h->target); 2888 2889 if (h->path.start == h->target.start) { 2890 NXT_WRITE(&eof); 2891 2892 } else { 2893 NXT_WRITE(&h->path); 2894 } 2895 2896 if (h->query.start != NULL) { 2897 RC(nxt_app_msg_write_size(task, wmsg, 2898 h->query.start - h->target.start + 1)); 2899 } else { 2900 RC(nxt_app_msg_write_size(task, wmsg, 0)); 2901 } 2902 2903 NXT_WRITE(&h->version); 2904 2905 // PHP_SELF 2906 // SCRIPT_NAME 2907 // SCRIPT_FILENAME 2908 // DOCUMENT_ROOT 2909 2910 NXT_WRITE(&r->remote); 2911 NXT_WRITE(&r->local); 2912 2913 NXT_WRITE(&h->host); 2914 NXT_WRITE(&h->cookie); 2915 NXT_WRITE(&h->content_type); 2916 NXT_WRITE(&h->content_length); 2917 2918 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); 2919 2920 nxt_list_each(field, h->fields) { 2921 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, 2922 &prefix, &field->name)); 2923 NXT_WRITE(&field->value); 2924 2925 } nxt_list_loop; 2926 2927 /* end-of-headers mark */ 2928 NXT_WRITE(&eof); 2929 2930 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 2931 2932 for(b = r->body.buf; b != NULL; b = b->next) { 2933 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 2934 nxt_buf_mem_used_size(&b->mem))); 2935 } 2936 2937 #undef NXT_WRITE 2938 #undef RC 2939 2940 return NXT_OK; 2941 2942 fail: 2943 2944 return NXT_ERROR; 2945 } 2946 2947 2948 static nxt_int_t 2949 nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) 2950 { 2951 nxt_int_t rc; 2952 nxt_buf_t *b; 2953 nxt_http_field_t *field; 2954 nxt_app_request_header_t *h; 2955 2956 static const nxt_str_t eof = nxt_null_string; 2957 2958 h = &r->header; 2959 2960 #define RC(S) \ 2961 do { \ 2962 rc = (S); \ 2963 if (nxt_slow_path(rc != NXT_OK)) { \ 2964 goto fail; \ 2965 } \ 2966 } while(0) 2967 2968 #define NXT_WRITE(N) \ 2969 RC(nxt_app_msg_write_str(task, wmsg, N)) 2970 2971 /* TODO error handle, async mmap buffer assignment */ 2972 2973 NXT_WRITE(&h->method); 2974 NXT_WRITE(&h->target); 2975 2976 if (h->path.start == h->target.start) { 2977 NXT_WRITE(&eof); 2978 2979 } else { 2980 NXT_WRITE(&h->path); 2981 } 2982 2983 if (h->query.start != NULL) { 2984 RC(nxt_app_msg_write_size(task, wmsg, 2985 h->query.start - h->target.start + 1)); 2986 } else { 2987 RC(nxt_app_msg_write_size(task, wmsg, 0)); 2988 } 2989 2990 NXT_WRITE(&h->version); 2991 NXT_WRITE(&r->remote); 2992 2993 NXT_WRITE(&h->host); 2994 NXT_WRITE(&h->cookie); 2995 NXT_WRITE(&h->content_type); 2996 NXT_WRITE(&h->content_length); 2997 2998 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); 2999 3000 nxt_list_each(field, h->fields) { 3001 NXT_WRITE(&field->name); 3002 NXT_WRITE(&field->value); 3003 3004 } nxt_list_loop; 3005 3006 /* end-of-headers mark */ 3007 NXT_WRITE(&eof); 3008 3009 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 3010 3011 for(b = r->body.buf; b != NULL; b = b->next) { 3012 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3013 nxt_buf_mem_used_size(&b->mem))); 3014 } 3015 3016 #undef NXT_WRITE 3017 #undef RC 3018 3019 return NXT_OK; 3020 3021 fail: 3022 3023 return NXT_ERROR; 3024 } 3025 3026 3027 static const nxt_conn_state_t nxt_router_conn_close_state 3028 nxt_aligned(64) = 3029 { 3030 .ready_handler = nxt_router_conn_free, 3031 }; 3032 3033 3034 static void 3035 nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data) 3036 { 3037 nxt_buf_t *b; 3038 nxt_bool_t last; 3039 nxt_conn_t *c; 3040 nxt_work_queue_t *wq; 3041 3042 nxt_debug(task, "router conn ready %p", obj); 3043 3044 c = obj; 3045 b = c->write; 3046 3047 wq = &task->thread->engine->fast_work_queue; 3048 3049 last = 0; 3050 3051 while (b != NULL) { 3052 if (!nxt_buf_is_sync(b)) { 3053 if (nxt_buf_used_size(b) > 0) { 3054 break; 3055 } 3056 } 3057 3058 if (nxt_buf_is_last(b)) { 3059 last = 1; 3060 } 3061 3062 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 3063 3064 b = b->next; 3065 } 3066 3067 c->write = b; 3068 3069 if (b != NULL) { 3070 nxt_debug(task, "router conn %p has more data to write", obj); 3071 3072 nxt_conn_write(task->thread->engine, c); 3073 3074 } else { 3075 nxt_debug(task, "router conn %p no more data to write, last = %d", obj, 3076 last); 3077 3078 if (last != 0) { 3079 nxt_debug(task, "enqueue router conn close %p (ready handler)", c); 3080 3081 nxt_work_queue_add(wq, nxt_router_conn_close, task, c, 3082 c->socket.data); 3083 } 3084 } 3085 } 3086 3087 3088 static void 3089 nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) 3090 { 3091 nxt_conn_t *c; 3092 3093 c = obj; 3094 3095 nxt_debug(task, "router conn close"); 3096 3097 c->write_state = &nxt_router_conn_close_state; 3098 3099 nxt_conn_close(task->thread->engine, c); 3100 } 3101 3102 3103 static void 3104 nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data) 3105 { 3106 nxt_socket_conf_joint_t *joint; 3107 3108 joint = obj; 3109 3110 nxt_router_conf_release(task, joint); 3111 } 3112 3113 3114 static void 3115 nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) 3116 { 3117 nxt_conn_t *c; 3118 nxt_req_conn_link_t *rc; 3119 nxt_socket_conf_joint_t *joint; 3120 3121 c = obj; 3122 3123 nxt_debug(task, "router conn close done"); 3124 3125 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { 3126 3127 nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); 3128 3129 if (rc->app_port != NULL) { 3130 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); 3131 3132 rc->app_port = NULL; 3133 } 3134 3135 rc->conn = NULL; 3136 3137 nxt_event_engine_request_remove(task->thread->engine, rc); 3138 3139 } nxt_queue_loop; 3140 3141 nxt_queue_remove(&c->link); 3142 3143 joint = c->listen->socket.data; 3144 3145 task = &task->thread->engine->task; 3146 3147 nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup, task, joint, NULL); 3148 3149 nxt_mp_release(c->mem_pool, c); 3150 } 3151 3152 3153 static void 3154 nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) 3155 { 3156 nxt_conn_t *c; 3157 3158 c = obj; 3159 3160 nxt_debug(task, "router conn error"); 3161 3162 if (c->socket.fd != -1) { 3163 c->write_state = &nxt_router_conn_close_state; 3164 3165 nxt_conn_close(task->thread->engine, c); 3166 } 3167 } 3168 3169 3170 static void 3171 nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) 3172 { 3173 nxt_conn_t *c; 3174 nxt_timer_t *timer; 3175 3176 timer = obj; 3177 3178 nxt_debug(task, "router conn timeout"); 3179 3180 c = nxt_read_timer_conn(timer); 3181 3182 if (c->read_state == &nxt_router_conn_read_header_state) { 3183 nxt_router_gen_error(task, c, 408, "Read header timeout"); 3184 3185 } else { 3186 nxt_router_gen_error(task, c, 408, "Read body timeout"); 3187 } 3188 } 3189 3190 3191 static nxt_msec_t 3192 nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 3193 { 3194 nxt_socket_conf_joint_t *joint; 3195 3196 joint = c->listen->socket.data; 3197 3198 return nxt_value_at(nxt_msec_t, joint->socket_conf, data); 3199 } 3200