1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #include <nxt_http.h> 11 12 13 typedef struct { 14 nxt_str_t type; 15 uint32_t processes; 16 uint32_t max_processes; 17 uint32_t spare_processes; 18 nxt_msec_t timeout; 19 nxt_msec_t res_timeout; 20 nxt_msec_t idle_timeout; 21 uint32_t requests; 22 nxt_conf_value_t *limits_value; 23 nxt_conf_value_t *processes_value; 24 } nxt_router_app_conf_t; 25 26 27 typedef struct { 28 nxt_str_t application; 29 } nxt_router_listener_conf_t; 30 31 32 typedef struct nxt_msg_info_s { 33 nxt_buf_t *buf; 34 nxt_port_mmap_tracking_t tracking; 35 nxt_work_handler_t completion_handler; 36 } nxt_msg_info_t; 37 38 39 typedef struct nxt_req_app_link_s nxt_req_app_link_t; 40 41 42 typedef struct { 43 uint32_t stream; 44 nxt_app_t *app; 45 nxt_port_t *app_port; 46 nxt_app_parse_ctx_t *ap; 47 nxt_msg_info_t msg_info; 48 nxt_req_app_link_t *ra; 49 50 nxt_queue_link_t link; /* for nxt_conn_t.requests */ 51 } nxt_req_conn_link_t; 52 53 54 struct nxt_req_app_link_s { 55 uint32_t stream; 56 nxt_atomic_t use_count; 57 nxt_port_t *app_port; 58 nxt_port_t *reply_port; 59 nxt_app_parse_ctx_t *ap; 60 nxt_msg_info_t msg_info; 61 nxt_req_conn_link_t *rc; 62 63 nxt_nsec_t res_time; 64 65 nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ 66 nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */ 67 nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ 68 69 nxt_mp_t *mem_pool; 70 nxt_work_t work; 71 72 int err_code; 73 const char *err_str; 74 }; 75 76 77 typedef struct { 78 nxt_socket_conf_t *socket_conf; 79 nxt_router_temp_conf_t *temp_conf; 80 } nxt_socket_rpc_t; 81 82 83 typedef struct { 84 nxt_app_t *app; 85 nxt_router_temp_conf_t *temp_conf; 86 } nxt_app_rpc_t; 87 88 89 struct nxt_port_select_state_s { 90 nxt_app_t *app; 91 nxt_req_app_link_t *ra; 92 93 nxt_port_t *failed_port; 94 int failed_port_use_delta; 95 96 uint8_t start_process; /* 1 bit */ 97 nxt_req_app_link_t *shared_ra; 98 nxt_port_t *port; 99 }; 100 101 typedef struct nxt_port_select_state_s nxt_port_select_state_t; 102 103 static void nxt_router_port_select(nxt_task_t *task, 104 nxt_port_select_state_t *state); 105 106 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, 107 nxt_port_select_state_t *state); 108 109 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 110 111 nxt_inline void 112 nxt_router_ra_inc_use(nxt_req_app_link_t *ra) 113 { 114 nxt_atomic_fetch_add(&ra->use_count, 1); 115 } 116 117 nxt_inline void 118 nxt_router_ra_dec_use(nxt_req_app_link_t *ra) 119 { 120 #if (NXT_DEBUG) 121 int c; 122 123 c = nxt_atomic_fetch_add(&ra->use_count, -1); 124 125 nxt_assert(c > 1); 126 #else 127 (void) nxt_atomic_fetch_add(&ra->use_count, -1); 128 #endif 129 } 130 131 static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i); 132 133 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 134 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 135 static void nxt_router_conf_ready(nxt_task_t *task, 136 nxt_router_temp_conf_t *tmcf); 137 static void nxt_router_conf_error(nxt_task_t *task, 138 nxt_router_temp_conf_t *tmcf); 139 static void nxt_router_conf_send(nxt_task_t *task, 140 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 141 142 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 143 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 144 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 145 static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, 146 nxt_str_t *name); 147 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 148 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 149 static void nxt_router_listen_socket_ready(nxt_task_t *task, 150 nxt_port_recv_msg_t *msg, void *data); 151 static void nxt_router_listen_socket_error(nxt_task_t *task, 152 nxt_port_recv_msg_t *msg, void *data); 153 static void nxt_router_app_rpc_create(nxt_task_t *task, 154 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 155 static void nxt_router_app_prefork_ready(nxt_task_t *task, 156 nxt_port_recv_msg_t *msg, void *data); 157 static void nxt_router_app_prefork_error(nxt_task_t *task, 158 nxt_port_recv_msg_t *msg, void *data); 159 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 160 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 161 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 162 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 163 164 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 165 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 166 const nxt_event_interface_t *interface); 167 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 168 nxt_router_engine_conf_t *recf); 169 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 170 nxt_router_engine_conf_t *recf); 171 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 172 nxt_router_engine_conf_t *recf); 173 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 174 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 175 nxt_work_handler_t handler); 176 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 177 nxt_router_engine_conf_t *recf); 178 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 179 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 180 181 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 182 nxt_router_temp_conf_t *tmcf); 183 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 184 nxt_event_engine_t *engine); 185 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 186 nxt_router_temp_conf_t *tmcf); 187 188 static void nxt_router_engines_post(nxt_router_t *router, 189 nxt_router_temp_conf_t *tmcf); 190 static void nxt_router_engine_post(nxt_event_engine_t *engine, 191 nxt_work_t *jobs); 192 193 static void nxt_router_thread_start(void *data); 194 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 195 void *data); 196 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 197 void *data); 198 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 199 void *data); 200 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 201 void *data); 202 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 203 void *data); 204 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 205 void *data); 206 static void nxt_router_listen_socket_release(nxt_task_t *task, 207 nxt_socket_conf_t *skcf); 208 static void nxt_router_conf_release(nxt_task_t *task, 209 nxt_socket_conf_joint_t *joint); 210 211 static void nxt_router_app_port_ready(nxt_task_t *task, 212 nxt_port_recv_msg_t *msg, void *data); 213 static void nxt_router_app_port_error(nxt_task_t *task, 214 nxt_port_recv_msg_t *msg, void *data); 215 216 static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app); 217 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 218 uint32_t request_failed, uint32_t got_response); 219 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, 220 nxt_req_app_link_t *ra); 221 222 static void nxt_router_app_prepare_request(nxt_task_t *task, 223 nxt_req_app_link_t *ra); 224 static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 225 nxt_app_wmsg_t *wmsg); 226 static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 227 nxt_app_wmsg_t *wmsg); 228 static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 229 nxt_app_wmsg_t *wmsg); 230 static nxt_int_t nxt_perl_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 231 nxt_app_wmsg_t *wmsg); 232 static nxt_int_t nxt_ruby_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 233 nxt_app_wmsg_t *wmsg); 234 235 static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); 236 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 237 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 238 void *data); 239 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 240 void *data); 241 static void nxt_router_app_release_handler(nxt_task_t *task, void *obj, 242 void *data); 243 244 static const nxt_http_request_state_t nxt_http_request_send_state; 245 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 246 247 static nxt_router_t *nxt_router; 248 249 250 static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = { 251 nxt_python_prepare_msg, 252 nxt_php_prepare_msg, 253 nxt_go_prepare_msg, 254 nxt_perl_prepare_msg, 255 nxt_ruby_prepare_msg, 256 }; 257 258 259 nxt_int_t 260 nxt_router_start(nxt_task_t *task, void *data) 261 { 262 nxt_int_t ret; 263 nxt_router_t *router; 264 nxt_runtime_t *rt; 265 266 rt = task->thread->runtime; 267 268 ret = nxt_http_init(task, rt); 269 if (nxt_slow_path(ret != NXT_OK)) { 270 return ret; 271 } 272 273 router = nxt_zalloc(sizeof(nxt_router_t)); 274 if (nxt_slow_path(router == NULL)) { 275 return NXT_ERROR; 276 } 277 278 nxt_queue_init(&router->engines); 279 nxt_queue_init(&router->sockets); 280 nxt_queue_init(&router->apps); 281 282 nxt_router = router; 283 284 return NXT_OK; 285 } 286 287 288 static void 289 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 290 void *data) 291 { 292 size_t size; 293 uint32_t stream; 294 nxt_mp_t *mp; 295 nxt_app_t *app; 296 nxt_buf_t *b; 297 nxt_port_t *main_port; 298 nxt_runtime_t *rt; 299 300 app = data; 301 302 rt = task->thread->runtime; 303 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 304 305 nxt_debug(task, "app '%V' %p start process", &app->name, app); 306 307 size = app->name.length + 1 + app->conf.length; 308 309 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 310 311 if (nxt_slow_path(b == NULL)) { 312 goto failed; 313 } 314 315 nxt_buf_cpystr(b, &app->name); 316 *b->mem.free++ = '\0'; 317 nxt_buf_cpystr(b, &app->conf); 318 319 stream = nxt_port_rpc_register_handler(task, port, 320 nxt_router_app_port_ready, 321 nxt_router_app_port_error, 322 -1, app); 323 324 if (nxt_slow_path(stream == 0)) { 325 mp = b->data; 326 nxt_mp_free(mp, b); 327 nxt_mp_release(mp); 328 329 goto failed; 330 } 331 332 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 333 stream, port->id, b); 334 335 return; 336 337 failed: 338 339 nxt_thread_mutex_lock(&app->mutex); 340 341 app->pending_processes--; 342 343 nxt_thread_mutex_unlock(&app->mutex); 344 345 nxt_router_app_use(task, app, -1); 346 } 347 348 349 static nxt_int_t 350 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 351 { 352 nxt_int_t res; 353 nxt_port_t *router_port; 354 nxt_runtime_t *rt; 355 356 rt = task->thread->runtime; 357 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 358 359 nxt_router_app_use(task, app, 1); 360 361 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 362 app); 363 364 if (res == NXT_OK) { 365 return res; 366 } 367 368 nxt_thread_mutex_lock(&app->mutex); 369 370 app->pending_processes--; 371 372 nxt_thread_mutex_unlock(&app->mutex); 373 374 nxt_router_app_use(task, app, -1); 375 376 return NXT_ERROR; 377 } 378 379 380 nxt_inline void 381 nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra, 382 nxt_req_conn_link_t *rc) 383 { 384 nxt_event_engine_t *engine; 385 386 engine = task->thread->engine; 387 388 nxt_memzero(ra, sizeof(nxt_req_app_link_t)); 389 390 ra->stream = rc->stream; 391 ra->use_count = 1; 392 ra->rc = rc; 393 rc->ra = ra; 394 ra->reply_port = engine->port; 395 ra->ap = rc->ap; 396 397 ra->work.handler = NULL; 398 ra->work.task = &engine->task; 399 ra->work.obj = ra; 400 ra->work.data = engine; 401 } 402 403 404 nxt_inline nxt_req_app_link_t * 405 nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) 406 { 407 nxt_mp_t *mp; 408 nxt_req_app_link_t *ra; 409 410 if (ra_src->mem_pool != NULL) { 411 return ra_src; 412 } 413 414 mp = ra_src->ap->mem_pool; 415 416 ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t)); 417 418 if (nxt_slow_path(ra == NULL)) { 419 420 ra_src->rc->ra = NULL; 421 ra_src->rc = NULL; 422 423 return NULL; 424 } 425 426 nxt_mp_retain(mp); 427 428 nxt_router_ra_init(task, ra, ra_src->rc); 429 430 ra->mem_pool = mp; 431 432 return ra; 433 } 434 435 436 nxt_inline nxt_bool_t 437 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, 438 uint32_t stream) 439 { 440 nxt_buf_t *b, *next; 441 nxt_bool_t cancelled; 442 443 if (msg_info->buf == NULL) { 444 return 0; 445 } 446 447 cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, 448 stream); 449 450 if (cancelled) { 451 nxt_debug(task, "stream #%uD: cancelled by router", stream); 452 } 453 454 for (b = msg_info->buf; b != NULL; b = next) { 455 next = b->next; 456 457 b->completion_handler = msg_info->completion_handler; 458 459 if (b->is_port_mmap_sent) { 460 b->is_port_mmap_sent = cancelled == 0; 461 b->completion_handler(task, b, b->parent); 462 } 463 } 464 465 msg_info->buf = NULL; 466 467 return cancelled; 468 } 469 470 471 static void 472 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra); 473 474 475 static void 476 nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data) 477 { 478 nxt_req_app_link_t *ra; 479 480 ra = obj; 481 482 nxt_router_ra_update_peer(task, ra); 483 484 nxt_router_ra_use(task, ra, -1); 485 } 486 487 488 static void 489 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra) 490 { 491 nxt_event_engine_t *engine; 492 nxt_req_conn_link_t *rc; 493 494 engine = ra->work.data; 495 496 if (task->thread->engine != engine) { 497 nxt_router_ra_inc_use(ra); 498 499 ra->work.handler = nxt_router_ra_update_peer_handler; 500 ra->work.task = &engine->task; 501 ra->work.next = NULL; 502 503 nxt_debug(task, "ra stream #%uD post update peer to %p", 504 ra->stream, engine); 505 506 nxt_event_engine_post(engine, &ra->work); 507 508 return; 509 } 510 511 nxt_debug(task, "ra stream #%uD update peer", ra->stream); 512 513 rc = ra->rc; 514 515 if (rc != NULL && ra->app_port != NULL) { 516 nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid); 517 } 518 519 nxt_router_ra_use(task, ra, -1); 520 } 521 522 523 static void 524 nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) 525 { 526 nxt_mp_t *mp; 527 nxt_req_conn_link_t *rc; 528 529 nxt_assert(task->thread->engine == ra->work.data); 530 nxt_assert(ra->use_count == 0); 531 532 nxt_debug(task, "ra stream #%uD release", ra->stream); 533 534 rc = ra->rc; 535 536 if (rc != NULL) { 537 if (nxt_slow_path(ra->err_code != 0)) { 538 nxt_http_request_error(task, rc->ap->request, ra->err_code); 539 540 } else { 541 rc->app_port = ra->app_port; 542 rc->msg_info = ra->msg_info; 543 544 if (rc->app->timeout != 0) { 545 rc->ap->timer.handler = nxt_router_app_timeout; 546 nxt_timer_add(task->thread->engine, &rc->ap->timer, 547 rc->app->timeout); 548 } 549 550 ra->app_port = NULL; 551 ra->msg_info.buf = NULL; 552 } 553 554 rc->ra = NULL; 555 ra->rc = NULL; 556 } 557 558 if (ra->app_port != NULL) { 559 nxt_router_app_port_release(task, ra->app_port, 0, 1); 560 561 ra->app_port = NULL; 562 } 563 564 nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); 565 566 mp = ra->mem_pool; 567 568 if (mp != NULL) { 569 nxt_mp_free(mp, ra); 570 nxt_mp_release(mp); 571 } 572 } 573 574 575 static void 576 nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data) 577 { 578 nxt_req_app_link_t *ra; 579 580 ra = obj; 581 582 nxt_assert(ra->work.data == data); 583 584 nxt_atomic_fetch_add(&ra->use_count, -1); 585 586 nxt_router_ra_release(task, ra); 587 } 588 589 590 static void 591 nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i) 592 { 593 int c; 594 nxt_event_engine_t *engine; 595 596 c = nxt_atomic_fetch_add(&ra->use_count, i); 597 598 if (i < 0 && c == -i) { 599 engine = ra->work.data; 600 601 if (task->thread->engine == engine) { 602 nxt_router_ra_release(task, ra); 603 604 return; 605 } 606 607 nxt_router_ra_inc_use(ra); 608 609 ra->work.handler = nxt_router_ra_release_handler; 610 ra->work.task = &engine->task; 611 ra->work.next = NULL; 612 613 nxt_debug(task, "ra stream #%uD post release to %p", 614 ra->stream, engine); 615 616 nxt_event_engine_post(engine, &ra->work); 617 } 618 } 619 620 621 nxt_inline void 622 nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str) 623 { 624 ra->app_port = NULL; 625 ra->err_code = code; 626 ra->err_str = str; 627 } 628 629 630 nxt_inline void 631 nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) 632 { 633 nxt_queue_insert_tail(&ra->app_port->pending_requests, 634 &ra->link_port_pending); 635 nxt_queue_insert_tail(&app->pending, &ra->link_app_pending); 636 637 nxt_router_ra_inc_use(ra); 638 639 ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout; 640 641 nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream); 642 } 643 644 645 nxt_inline nxt_bool_t 646 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 647 { 648 if (lnk->next != NULL) { 649 nxt_queue_remove(lnk); 650 651 lnk->next = NULL; 652 653 return 1; 654 } 655 656 return 0; 657 } 658 659 660 nxt_inline void 661 nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) 662 { 663 int ra_use_delta; 664 nxt_req_app_link_t *ra; 665 666 if (rc->app_port != NULL) { 667 nxt_router_app_port_release(task, rc->app_port, 0, 1); 668 669 rc->app_port = NULL; 670 } 671 672 nxt_router_msg_cancel(task, &rc->msg_info, rc->stream); 673 674 ra = rc->ra; 675 676 if (ra != NULL) { 677 rc->ra = NULL; 678 ra->rc = NULL; 679 680 ra_use_delta = 0; 681 682 nxt_thread_mutex_lock(&rc->app->mutex); 683 684 if (ra->link_app_requests.next == NULL 685 && ra->link_port_pending.next == NULL 686 && ra->link_app_pending.next == NULL) 687 { 688 ra = NULL; 689 690 } else { 691 ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests); 692 ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending); 693 nxt_queue_chk_remove(&ra->link_app_pending); 694 } 695 696 nxt_thread_mutex_unlock(&rc->app->mutex); 697 698 if (ra != NULL) { 699 nxt_router_ra_use(task, ra, ra_use_delta); 700 } 701 } 702 703 if (rc->app != NULL) { 704 nxt_router_app_use(task, rc->app, -1); 705 706 rc->app = NULL; 707 } 708 709 if (rc->ap != NULL) { 710 nxt_app_http_req_done(task, rc->ap); 711 712 rc->ap = NULL; 713 } 714 } 715 716 717 void 718 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 719 { 720 nxt_port_new_port_handler(task, msg); 721 722 if (msg->port_msg.stream == 0) { 723 return; 724 } 725 726 if (msg->u.new_port == NULL 727 || msg->u.new_port->type != NXT_PROCESS_WORKER) 728 { 729 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 730 } 731 732 nxt_port_rpc_handler(task, msg); 733 } 734 735 736 void 737 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 738 { 739 nxt_int_t ret; 740 nxt_buf_t *b; 741 nxt_router_temp_conf_t *tmcf; 742 743 tmcf = nxt_router_temp_conf(task); 744 if (nxt_slow_path(tmcf == NULL)) { 745 return; 746 } 747 748 nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s", 749 nxt_buf_used_size(msg->buf), 750 (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos); 751 752 tmcf->conf->router = nxt_router; 753 tmcf->stream = msg->port_msg.stream; 754 tmcf->port = nxt_runtime_port_find(task->thread->runtime, 755 msg->port_msg.pid, 756 msg->port_msg.reply_port); 757 758 b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size); 759 if (nxt_slow_path(b == NULL)) { 760 nxt_router_conf_error(task, tmcf); 761 762 return; 763 } 764 765 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); 766 767 if (nxt_fast_path(ret == NXT_OK)) { 768 nxt_router_conf_apply(task, tmcf, NULL); 769 770 } else { 771 nxt_router_conf_error(task, tmcf); 772 } 773 } 774 775 776 static void 777 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 778 void *data) 779 { 780 union { 781 nxt_pid_t removed_pid; 782 void *data; 783 } u; 784 785 u.data = data; 786 787 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 788 } 789 790 791 void 792 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 793 { 794 nxt_event_engine_t *engine; 795 796 nxt_port_remove_pid_handler(task, msg); 797 798 if (msg->port_msg.stream == 0) { 799 return; 800 } 801 802 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 803 { 804 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 805 msg->u.data); 806 } 807 nxt_queue_loop; 808 809 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 810 811 nxt_port_rpc_handler(task, msg); 812 } 813 814 815 static nxt_router_temp_conf_t * 816 nxt_router_temp_conf(nxt_task_t *task) 817 { 818 nxt_mp_t *mp, *tmp; 819 nxt_router_conf_t *rtcf; 820 nxt_router_temp_conf_t *tmcf; 821 822 mp = nxt_mp_create(1024, 128, 256, 32); 823 if (nxt_slow_path(mp == NULL)) { 824 return NULL; 825 } 826 827 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 828 if (nxt_slow_path(rtcf == NULL)) { 829 goto fail; 830 } 831 832 rtcf->mem_pool = mp; 833 834 tmp = nxt_mp_create(1024, 128, 256, 32); 835 if (nxt_slow_path(tmp == NULL)) { 836 goto fail; 837 } 838 839 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 840 if (nxt_slow_path(tmcf == NULL)) { 841 goto temp_fail; 842 } 843 844 tmcf->mem_pool = tmp; 845 tmcf->conf = rtcf; 846 tmcf->count = 1; 847 tmcf->engine = task->thread->engine; 848 849 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 850 sizeof(nxt_router_engine_conf_t)); 851 if (nxt_slow_path(tmcf->engines == NULL)) { 852 goto temp_fail; 853 } 854 855 nxt_queue_init(&tmcf->deleting); 856 nxt_queue_init(&tmcf->keeping); 857 nxt_queue_init(&tmcf->updating); 858 nxt_queue_init(&tmcf->pending); 859 nxt_queue_init(&tmcf->creating); 860 861 nxt_queue_init(&tmcf->apps); 862 nxt_queue_init(&tmcf->previous); 863 864 return tmcf; 865 866 temp_fail: 867 868 nxt_mp_destroy(tmp); 869 870 fail: 871 872 nxt_mp_destroy(mp); 873 874 return NULL; 875 } 876 877 878 nxt_inline nxt_bool_t 879 nxt_router_app_can_start(nxt_app_t *app) 880 { 881 return app->processes + app->pending_processes < app->max_processes 882 && app->pending_processes < app->max_pending_processes; 883 } 884 885 886 nxt_inline nxt_bool_t 887 nxt_router_app_need_start(nxt_app_t *app) 888 { 889 return app->idle_processes + app->pending_processes 890 < app->spare_processes; 891 } 892 893 894 static void 895 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 896 { 897 nxt_int_t ret; 898 nxt_app_t *app; 899 nxt_router_t *router; 900 nxt_runtime_t *rt; 901 nxt_queue_link_t *qlk; 902 nxt_socket_conf_t *skcf; 903 nxt_router_temp_conf_t *tmcf; 904 const nxt_event_interface_t *interface; 905 906 tmcf = obj; 907 908 qlk = nxt_queue_first(&tmcf->pending); 909 910 if (qlk != nxt_queue_tail(&tmcf->pending)) { 911 nxt_queue_remove(qlk); 912 nxt_queue_insert_tail(&tmcf->creating, qlk); 913 914 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 915 916 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 917 918 return; 919 } 920 921 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 922 923 if (nxt_router_app_need_start(app)) { 924 nxt_router_app_rpc_create(task, tmcf, app); 925 return; 926 } 927 928 } nxt_queue_loop; 929 930 rt = task->thread->runtime; 931 932 interface = nxt_service_get(rt->services, "engine", NULL); 933 934 router = tmcf->conf->router; 935 936 ret = nxt_router_engines_create(task, router, tmcf, interface); 937 if (nxt_slow_path(ret != NXT_OK)) { 938 goto fail; 939 } 940 941 ret = nxt_router_threads_create(task, rt, tmcf); 942 if (nxt_slow_path(ret != NXT_OK)) { 943 goto fail; 944 } 945 946 nxt_router_apps_sort(task, router, tmcf); 947 948 nxt_router_engines_post(router, tmcf); 949 950 nxt_queue_add(&router->sockets, &tmcf->updating); 951 nxt_queue_add(&router->sockets, &tmcf->creating); 952 953 nxt_router_conf_ready(task, tmcf); 954 955 return; 956 957 fail: 958 959 nxt_router_conf_error(task, tmcf); 960 961 return; 962 } 963 964 965 static void 966 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 967 { 968 nxt_joint_job_t *job; 969 970 job = obj; 971 972 nxt_router_conf_ready(task, job->tmcf); 973 } 974 975 976 static void 977 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 978 { 979 nxt_debug(task, "temp conf count:%D", tmcf->count); 980 981 if (--tmcf->count == 0) { 982 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 983 } 984 } 985 986 987 static void 988 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 989 { 990 nxt_app_t *app; 991 nxt_queue_t new_socket_confs; 992 nxt_socket_t s; 993 nxt_router_t *router; 994 nxt_queue_link_t *qlk; 995 nxt_socket_conf_t *skcf; 996 997 nxt_alert(task, "failed to apply new conf"); 998 999 for (qlk = nxt_queue_first(&tmcf->creating); 1000 qlk != nxt_queue_tail(&tmcf->creating); 1001 qlk = nxt_queue_next(qlk)) 1002 { 1003 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1004 s = skcf->listen->socket; 1005 1006 if (s != -1) { 1007 nxt_socket_close(task, s); 1008 } 1009 1010 nxt_free(skcf->listen); 1011 } 1012 1013 nxt_queue_init(&new_socket_confs); 1014 nxt_queue_add(&new_socket_confs, &tmcf->updating); 1015 nxt_queue_add(&new_socket_confs, &tmcf->pending); 1016 nxt_queue_add(&new_socket_confs, &tmcf->creating); 1017 1018 nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) { 1019 1020 if (skcf->application != NULL) { 1021 nxt_router_app_use(task, skcf->application, -1); 1022 skcf->application = NULL; 1023 } 1024 1025 } nxt_queue_loop; 1026 1027 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1028 1029 nxt_router_app_quit(task, app); 1030 1031 } nxt_queue_loop; 1032 1033 router = tmcf->conf->router; 1034 1035 nxt_queue_add(&router->sockets, &tmcf->keeping); 1036 nxt_queue_add(&router->sockets, &tmcf->deleting); 1037 1038 nxt_queue_add(&router->apps, &tmcf->previous); 1039 1040 // TODO: new engines and threads 1041 1042 nxt_mp_destroy(tmcf->conf->mem_pool); 1043 1044 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1045 } 1046 1047 1048 static void 1049 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1050 nxt_port_msg_type_t type) 1051 { 1052 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1053 } 1054 1055 1056 static nxt_conf_map_t nxt_router_conf[] = { 1057 { 1058 nxt_string("listeners_threads"), 1059 NXT_CONF_MAP_INT32, 1060 offsetof(nxt_router_conf_t, threads), 1061 }, 1062 }; 1063 1064 1065 static nxt_conf_map_t nxt_router_app_conf[] = { 1066 { 1067 nxt_string("type"), 1068 NXT_CONF_MAP_STR, 1069 offsetof(nxt_router_app_conf_t, type), 1070 }, 1071 1072 { 1073 nxt_string("limits"), 1074 NXT_CONF_MAP_PTR, 1075 offsetof(nxt_router_app_conf_t, limits_value), 1076 }, 1077 1078 { 1079 nxt_string("processes"), 1080 NXT_CONF_MAP_INT32, 1081 offsetof(nxt_router_app_conf_t, processes), 1082 }, 1083 1084 { 1085 nxt_string("processes"), 1086 NXT_CONF_MAP_PTR, 1087 offsetof(nxt_router_app_conf_t, processes_value), 1088 }, 1089 }; 1090 1091 1092 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1093 { 1094 nxt_string("timeout"), 1095 NXT_CONF_MAP_MSEC, 1096 offsetof(nxt_router_app_conf_t, timeout), 1097 }, 1098 1099 { 1100 nxt_string("reschedule_timeout"), 1101 NXT_CONF_MAP_MSEC, 1102 offsetof(nxt_router_app_conf_t, res_timeout), 1103 }, 1104 1105 { 1106 nxt_string("requests"), 1107 NXT_CONF_MAP_INT32, 1108 offsetof(nxt_router_app_conf_t, requests), 1109 }, 1110 }; 1111 1112 1113 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1114 { 1115 nxt_string("spare"), 1116 NXT_CONF_MAP_INT32, 1117 offsetof(nxt_router_app_conf_t, spare_processes), 1118 }, 1119 1120 { 1121 nxt_string("max"), 1122 NXT_CONF_MAP_INT32, 1123 offsetof(nxt_router_app_conf_t, max_processes), 1124 }, 1125 1126 { 1127 nxt_string("idle_timeout"), 1128 NXT_CONF_MAP_MSEC, 1129 offsetof(nxt_router_app_conf_t, idle_timeout), 1130 }, 1131 }; 1132 1133 1134 static nxt_conf_map_t nxt_router_listener_conf[] = { 1135 { 1136 nxt_string("application"), 1137 NXT_CONF_MAP_STR, 1138 offsetof(nxt_router_listener_conf_t, application), 1139 }, 1140 }; 1141 1142 1143 static nxt_conf_map_t nxt_router_http_conf[] = { 1144 { 1145 nxt_string("header_buffer_size"), 1146 NXT_CONF_MAP_SIZE, 1147 offsetof(nxt_socket_conf_t, header_buffer_size), 1148 }, 1149 1150 { 1151 nxt_string("large_header_buffer_size"), 1152 NXT_CONF_MAP_SIZE, 1153 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1154 }, 1155 1156 { 1157 nxt_string("large_header_buffers"), 1158 NXT_CONF_MAP_SIZE, 1159 offsetof(nxt_socket_conf_t, large_header_buffers), 1160 }, 1161 1162 { 1163 nxt_string("body_buffer_size"), 1164 NXT_CONF_MAP_SIZE, 1165 offsetof(nxt_socket_conf_t, body_buffer_size), 1166 }, 1167 1168 { 1169 nxt_string("max_body_size"), 1170 NXT_CONF_MAP_SIZE, 1171 offsetof(nxt_socket_conf_t, max_body_size), 1172 }, 1173 1174 { 1175 nxt_string("idle_timeout"), 1176 NXT_CONF_MAP_MSEC, 1177 offsetof(nxt_socket_conf_t, idle_timeout), 1178 }, 1179 1180 { 1181 nxt_string("header_read_timeout"), 1182 NXT_CONF_MAP_MSEC, 1183 offsetof(nxt_socket_conf_t, header_read_timeout), 1184 }, 1185 1186 { 1187 nxt_string("body_read_timeout"), 1188 NXT_CONF_MAP_MSEC, 1189 offsetof(nxt_socket_conf_t, body_read_timeout), 1190 }, 1191 1192 { 1193 nxt_string("send_timeout"), 1194 NXT_CONF_MAP_MSEC, 1195 offsetof(nxt_socket_conf_t, send_timeout), 1196 }, 1197 }; 1198 1199 1200 static nxt_int_t 1201 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1202 u_char *start, u_char *end) 1203 { 1204 u_char *p; 1205 size_t size; 1206 nxt_mp_t *mp; 1207 uint32_t next; 1208 nxt_int_t ret; 1209 nxt_str_t name; 1210 nxt_app_t *app, *prev; 1211 nxt_router_t *router; 1212 nxt_conf_value_t *conf, *http; 1213 nxt_conf_value_t *applications, *application; 1214 nxt_conf_value_t *listeners, *listener; 1215 nxt_socket_conf_t *skcf; 1216 nxt_event_engine_t *engine; 1217 nxt_app_lang_module_t *lang; 1218 nxt_router_app_conf_t apcf; 1219 nxt_router_listener_conf_t lscf; 1220 1221 static nxt_str_t http_path = nxt_string("/http"); 1222 static nxt_str_t applications_path = nxt_string("/applications"); 1223 static nxt_str_t listeners_path = nxt_string("/listeners"); 1224 1225 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1226 if (conf == NULL) { 1227 nxt_alert(task, "configuration parsing error"); 1228 return NXT_ERROR; 1229 } 1230 1231 mp = tmcf->conf->mem_pool; 1232 1233 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1234 nxt_nitems(nxt_router_conf), tmcf->conf); 1235 if (ret != NXT_OK) { 1236 nxt_alert(task, "root map error"); 1237 return NXT_ERROR; 1238 } 1239 1240 if (tmcf->conf->threads == 0) { 1241 tmcf->conf->threads = nxt_ncpu; 1242 } 1243 1244 applications = nxt_conf_get_path(conf, &applications_path); 1245 if (applications == NULL) { 1246 nxt_alert(task, "no \"applications\" block"); 1247 return NXT_ERROR; 1248 } 1249 1250 router = tmcf->conf->router; 1251 1252 next = 0; 1253 1254 for ( ;; ) { 1255 application = nxt_conf_next_object_member(applications, &name, &next); 1256 if (application == NULL) { 1257 break; 1258 } 1259 1260 nxt_debug(task, "application \"%V\"", &name); 1261 1262 size = nxt_conf_json_length(application, NULL); 1263 1264 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); 1265 if (app == NULL) { 1266 goto fail; 1267 } 1268 1269 nxt_memzero(app, sizeof(nxt_app_t)); 1270 1271 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1272 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length); 1273 1274 p = nxt_conf_json_print(app->conf.start, application, NULL); 1275 app->conf.length = p - app->conf.start; 1276 1277 nxt_assert(app->conf.length <= size); 1278 1279 nxt_debug(task, "application conf \"%V\"", &app->conf); 1280 1281 prev = nxt_router_app_find(&router->apps, &name); 1282 1283 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1284 nxt_free(app); 1285 1286 nxt_queue_remove(&prev->link); 1287 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1288 continue; 1289 } 1290 1291 apcf.processes = 1; 1292 apcf.max_processes = 1; 1293 apcf.spare_processes = 0; 1294 apcf.timeout = 0; 1295 apcf.res_timeout = 1000; 1296 apcf.idle_timeout = 15000; 1297 apcf.requests = 0; 1298 apcf.limits_value = NULL; 1299 apcf.processes_value = NULL; 1300 1301 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1302 nxt_nitems(nxt_router_app_conf), &apcf); 1303 if (ret != NXT_OK) { 1304 nxt_alert(task, "application map error"); 1305 goto app_fail; 1306 } 1307 1308 if (apcf.limits_value != NULL) { 1309 1310 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1311 nxt_alert(task, "application limits is not object"); 1312 goto app_fail; 1313 } 1314 1315 ret = nxt_conf_map_object(mp, apcf.limits_value, 1316 nxt_router_app_limits_conf, 1317 nxt_nitems(nxt_router_app_limits_conf), 1318 &apcf); 1319 if (ret != NXT_OK) { 1320 nxt_alert(task, "application limits map error"); 1321 goto app_fail; 1322 } 1323 } 1324 1325 if (apcf.processes_value != NULL 1326 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1327 { 1328 ret = nxt_conf_map_object(mp, apcf.processes_value, 1329 nxt_router_app_processes_conf, 1330 nxt_nitems(nxt_router_app_processes_conf), 1331 &apcf); 1332 if (ret != NXT_OK) { 1333 nxt_alert(task, "application processes map error"); 1334 goto app_fail; 1335 } 1336 1337 } else { 1338 apcf.max_processes = apcf.processes; 1339 apcf.spare_processes = apcf.processes; 1340 } 1341 1342 nxt_debug(task, "application type: %V", &apcf.type); 1343 nxt_debug(task, "application processes: %D", apcf.processes); 1344 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1345 nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout); 1346 nxt_debug(task, "application requests: %D", apcf.requests); 1347 1348 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1349 1350 if (lang == NULL) { 1351 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1352 goto app_fail; 1353 } 1354 1355 nxt_debug(task, "application language module: \"%s\"", lang->file); 1356 1357 ret = nxt_thread_mutex_create(&app->mutex); 1358 if (ret != NXT_OK) { 1359 goto app_fail; 1360 } 1361 1362 nxt_queue_init(&app->ports); 1363 nxt_queue_init(&app->spare_ports); 1364 nxt_queue_init(&app->idle_ports); 1365 nxt_queue_init(&app->requests); 1366 nxt_queue_init(&app->pending); 1367 1368 app->name.length = name.length; 1369 nxt_memcpy(app->name.start, name.start, name.length); 1370 1371 app->type = lang->type; 1372 app->max_processes = apcf.max_processes; 1373 app->spare_processes = apcf.spare_processes; 1374 app->max_pending_processes = apcf.spare_processes 1375 ? apcf.spare_processes : 1; 1376 app->timeout = apcf.timeout; 1377 app->res_timeout = apcf.res_timeout * 1000000; 1378 app->idle_timeout = apcf.idle_timeout; 1379 app->live = 1; 1380 app->max_pending_responses = 2; 1381 app->max_requests = apcf.requests; 1382 app->prepare_msg = nxt_app_prepare_msg[lang->type]; 1383 1384 engine = task->thread->engine; 1385 1386 app->engine = engine; 1387 1388 app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION; 1389 app->idle_timer.work_queue = &engine->fast_work_queue; 1390 app->idle_timer.handler = nxt_router_app_idle_timeout; 1391 app->idle_timer.task = &engine->task; 1392 app->idle_timer.log = app->idle_timer.task->log; 1393 1394 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1395 app->adjust_idle_work.task = &engine->task; 1396 app->adjust_idle_work.obj = app; 1397 1398 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1399 1400 nxt_router_app_use(task, app, 1); 1401 } 1402 1403 http = nxt_conf_get_path(conf, &http_path); 1404 #if 0 1405 if (http == NULL) { 1406 nxt_alert(task, "no \"http\" block"); 1407 return NXT_ERROR; 1408 } 1409 #endif 1410 1411 listeners = nxt_conf_get_path(conf, &listeners_path); 1412 if (listeners == NULL) { 1413 nxt_alert(task, "no \"listeners\" block"); 1414 return NXT_ERROR; 1415 } 1416 1417 next = 0; 1418 1419 for ( ;; ) { 1420 listener = nxt_conf_next_object_member(listeners, &name, &next); 1421 if (listener == NULL) { 1422 break; 1423 } 1424 1425 skcf = nxt_router_socket_conf(task, tmcf, &name); 1426 if (skcf == NULL) { 1427 goto fail; 1428 } 1429 1430 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1431 nxt_nitems(nxt_router_listener_conf), &lscf); 1432 if (ret != NXT_OK) { 1433 nxt_alert(task, "listener map error"); 1434 goto fail; 1435 } 1436 1437 nxt_debug(task, "application: %V", &lscf.application); 1438 1439 // STUB, default values if http block is not defined. 1440 skcf->header_buffer_size = 2048; 1441 skcf->large_header_buffer_size = 8192; 1442 skcf->large_header_buffers = 4; 1443 skcf->body_buffer_size = 16 * 1024; 1444 skcf->max_body_size = 2 * 1024 * 1024; 1445 skcf->idle_timeout = 65000; 1446 skcf->header_read_timeout = 5000; 1447 skcf->body_read_timeout = 5000; 1448 skcf->send_timeout = 5000; 1449 1450 if (http != NULL) { 1451 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1452 nxt_nitems(nxt_router_http_conf), skcf); 1453 if (ret != NXT_OK) { 1454 nxt_alert(task, "http map error"); 1455 goto fail; 1456 } 1457 } 1458 1459 skcf->listen->handler = nxt_http_conn_init; 1460 skcf->router_conf = tmcf->conf; 1461 skcf->router_conf->count++; 1462 skcf->application = nxt_router_listener_application(tmcf, 1463 &lscf.application); 1464 nxt_router_app_use(task, skcf->application, 1); 1465 } 1466 1467 nxt_queue_add(&tmcf->deleting, &router->sockets); 1468 nxt_queue_init(&router->sockets); 1469 1470 return NXT_OK; 1471 1472 app_fail: 1473 1474 nxt_free(app); 1475 1476 fail: 1477 1478 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1479 1480 nxt_queue_remove(&app->link); 1481 nxt_thread_mutex_destroy(&app->mutex); 1482 nxt_free(app); 1483 1484 } nxt_queue_loop; 1485 1486 return NXT_ERROR; 1487 } 1488 1489 1490 static nxt_app_t * 1491 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1492 { 1493 nxt_app_t *app; 1494 1495 nxt_queue_each(app, queue, nxt_app_t, link) { 1496 1497 if (nxt_strstr_eq(name, &app->name)) { 1498 return app; 1499 } 1500 1501 } nxt_queue_loop; 1502 1503 return NULL; 1504 } 1505 1506 1507 static nxt_app_t * 1508 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) 1509 { 1510 nxt_app_t *app; 1511 1512 app = nxt_router_app_find(&tmcf->apps, name); 1513 1514 if (app == NULL) { 1515 app = nxt_router_app_find(&tmcf->previous, name); 1516 } 1517 1518 return app; 1519 } 1520 1521 1522 static nxt_socket_conf_t * 1523 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1524 nxt_str_t *name) 1525 { 1526 size_t size; 1527 nxt_int_t ret; 1528 nxt_bool_t wildcard; 1529 nxt_sockaddr_t *sa; 1530 nxt_socket_conf_t *skcf; 1531 nxt_listen_socket_t *ls; 1532 1533 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 1534 if (nxt_slow_path(sa == NULL)) { 1535 nxt_alert(task, "invalid listener \"%V\"", name); 1536 return NULL; 1537 } 1538 1539 sa->type = SOCK_STREAM; 1540 1541 nxt_debug(task, "router listener: \"%*s\"", 1542 (size_t) sa->length, nxt_sockaddr_start(sa)); 1543 1544 skcf = nxt_mp_zget(tmcf->conf->mem_pool, sizeof(nxt_socket_conf_t)); 1545 if (nxt_slow_path(skcf == NULL)) { 1546 return NULL; 1547 } 1548 1549 size = nxt_sockaddr_size(sa); 1550 1551 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 1552 1553 if (ret != NXT_OK) { 1554 1555 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 1556 if (nxt_slow_path(ls == NULL)) { 1557 return NULL; 1558 } 1559 1560 skcf->listen = ls; 1561 1562 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 1563 nxt_memcpy(ls->sockaddr, sa, size); 1564 1565 nxt_listen_socket_remote_size(ls); 1566 1567 ls->socket = -1; 1568 ls->backlog = NXT_LISTEN_BACKLOG; 1569 ls->flags = NXT_NONBLOCK; 1570 ls->read_after_accept = 1; 1571 } 1572 1573 switch (sa->u.sockaddr.sa_family) { 1574 #if (NXT_HAVE_UNIX_DOMAIN) 1575 case AF_UNIX: 1576 wildcard = 0; 1577 break; 1578 #endif 1579 #if (NXT_INET6) 1580 case AF_INET6: 1581 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 1582 break; 1583 #endif 1584 case AF_INET: 1585 default: 1586 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 1587 break; 1588 } 1589 1590 if (!wildcard) { 1591 skcf->sockaddr = nxt_mp_zget(tmcf->conf->mem_pool, size); 1592 if (nxt_slow_path(skcf->sockaddr == NULL)) { 1593 return NULL; 1594 } 1595 1596 nxt_memcpy(skcf->sockaddr, sa, size); 1597 } 1598 1599 return skcf; 1600 } 1601 1602 1603 static nxt_int_t 1604 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 1605 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 1606 { 1607 nxt_router_t *router; 1608 nxt_queue_link_t *qlk; 1609 nxt_socket_conf_t *skcf; 1610 1611 router = tmcf->conf->router; 1612 1613 for (qlk = nxt_queue_first(&router->sockets); 1614 qlk != nxt_queue_tail(&router->sockets); 1615 qlk = nxt_queue_next(qlk)) 1616 { 1617 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1618 1619 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 1620 nskcf->listen = skcf->listen; 1621 1622 nxt_queue_remove(qlk); 1623 nxt_queue_insert_tail(&tmcf->keeping, qlk); 1624 1625 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link); 1626 1627 return NXT_OK; 1628 } 1629 } 1630 1631 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link); 1632 1633 return NXT_DECLINED; 1634 } 1635 1636 1637 static void 1638 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 1639 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 1640 { 1641 size_t size; 1642 uint32_t stream; 1643 nxt_buf_t *b; 1644 nxt_port_t *main_port, *router_port; 1645 nxt_runtime_t *rt; 1646 nxt_socket_rpc_t *rpc; 1647 1648 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 1649 if (rpc == NULL) { 1650 goto fail; 1651 } 1652 1653 rpc->socket_conf = skcf; 1654 rpc->temp_conf = tmcf; 1655 1656 size = nxt_sockaddr_size(skcf->listen->sockaddr); 1657 1658 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1659 if (b == NULL) { 1660 goto fail; 1661 } 1662 1663 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 1664 1665 rt = task->thread->runtime; 1666 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1667 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1668 1669 stream = nxt_port_rpc_register_handler(task, router_port, 1670 nxt_router_listen_socket_ready, 1671 nxt_router_listen_socket_error, 1672 main_port->pid, rpc); 1673 if (stream == 0) { 1674 goto fail; 1675 } 1676 1677 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 1678 stream, router_port->id, b); 1679 1680 return; 1681 1682 fail: 1683 1684 nxt_router_conf_error(task, tmcf); 1685 } 1686 1687 1688 static void 1689 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1690 void *data) 1691 { 1692 nxt_int_t ret; 1693 nxt_socket_t s; 1694 nxt_socket_rpc_t *rpc; 1695 1696 rpc = data; 1697 1698 s = msg->fd; 1699 1700 ret = nxt_socket_nonblocking(task, s); 1701 if (nxt_slow_path(ret != NXT_OK)) { 1702 goto fail; 1703 } 1704 1705 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 1706 1707 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 1708 if (nxt_slow_path(ret != NXT_OK)) { 1709 goto fail; 1710 } 1711 1712 rpc->socket_conf->listen->socket = s; 1713 1714 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 1715 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 1716 1717 return; 1718 1719 fail: 1720 1721 nxt_socket_close(task, s); 1722 1723 nxt_router_conf_error(task, rpc->temp_conf); 1724 } 1725 1726 1727 static void 1728 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1729 void *data) 1730 { 1731 u_char *p; 1732 size_t size; 1733 uint8_t error; 1734 nxt_buf_t *in, *out; 1735 nxt_sockaddr_t *sa; 1736 nxt_socket_rpc_t *rpc; 1737 nxt_router_temp_conf_t *tmcf; 1738 1739 static nxt_str_t socket_errors[] = { 1740 nxt_string("ListenerSystem"), 1741 nxt_string("ListenerNoIPv6"), 1742 nxt_string("ListenerPort"), 1743 nxt_string("ListenerInUse"), 1744 nxt_string("ListenerNoAddress"), 1745 nxt_string("ListenerNoAccess"), 1746 nxt_string("ListenerPath"), 1747 }; 1748 1749 rpc = data; 1750 sa = rpc->socket_conf->listen->sockaddr; 1751 tmcf = rpc->temp_conf; 1752 1753 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 1754 1755 if (nxt_slow_path(in == NULL)) { 1756 return; 1757 } 1758 1759 p = in->mem.pos; 1760 1761 error = *p++; 1762 1763 size = sizeof("listen socket error: ") - 1 1764 + sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1 1765 + sa->length + socket_errors[error].length + (in->mem.free - p); 1766 1767 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1768 if (nxt_slow_path(out == NULL)) { 1769 return; 1770 } 1771 1772 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 1773 "listen socket error: " 1774 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 1775 (size_t) sa->length, nxt_sockaddr_start(sa), 1776 &socket_errors[error], in->mem.free - p, p); 1777 1778 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 1779 1780 nxt_router_conf_error(task, tmcf); 1781 } 1782 1783 1784 static void 1785 nxt_router_app_rpc_create(nxt_task_t *task, 1786 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 1787 { 1788 size_t size; 1789 uint32_t stream; 1790 nxt_buf_t *b; 1791 nxt_port_t *main_port, *router_port; 1792 nxt_runtime_t *rt; 1793 nxt_app_rpc_t *rpc; 1794 1795 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 1796 if (rpc == NULL) { 1797 goto fail; 1798 } 1799 1800 rpc->app = app; 1801 rpc->temp_conf = tmcf; 1802 1803 nxt_debug(task, "app '%V' prefork", &app->name); 1804 1805 size = app->name.length + 1 + app->conf.length; 1806 1807 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1808 if (nxt_slow_path(b == NULL)) { 1809 goto fail; 1810 } 1811 1812 nxt_buf_cpystr(b, &app->name); 1813 *b->mem.free++ = '\0'; 1814 nxt_buf_cpystr(b, &app->conf); 1815 1816 rt = task->thread->runtime; 1817 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1818 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1819 1820 stream = nxt_port_rpc_register_handler(task, router_port, 1821 nxt_router_app_prefork_ready, 1822 nxt_router_app_prefork_error, 1823 -1, rpc); 1824 if (nxt_slow_path(stream == 0)) { 1825 goto fail; 1826 } 1827 1828 app->pending_processes++; 1829 1830 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 1831 stream, router_port->id, b); 1832 1833 return; 1834 1835 fail: 1836 1837 nxt_router_conf_error(task, tmcf); 1838 } 1839 1840 1841 static void 1842 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1843 void *data) 1844 { 1845 nxt_app_t *app; 1846 nxt_port_t *port; 1847 nxt_app_rpc_t *rpc; 1848 nxt_event_engine_t *engine; 1849 1850 rpc = data; 1851 app = rpc->app; 1852 1853 port = msg->u.new_port; 1854 port->app = app; 1855 1856 nxt_router_app_use(task, app, 1); 1857 1858 app->pending_processes--; 1859 app->processes++; 1860 app->idle_processes++; 1861 1862 engine = task->thread->engine; 1863 1864 nxt_queue_insert_tail(&app->ports, &port->app_link); 1865 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 1866 1867 port->idle_start = 0; 1868 1869 nxt_port_inc_use(port); 1870 1871 nxt_work_queue_add(&engine->fast_work_queue, 1872 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 1873 } 1874 1875 1876 static void 1877 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1878 void *data) 1879 { 1880 nxt_app_t *app; 1881 nxt_app_rpc_t *rpc; 1882 nxt_router_temp_conf_t *tmcf; 1883 1884 rpc = data; 1885 app = rpc->app; 1886 tmcf = rpc->temp_conf; 1887 1888 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 1889 &app->name); 1890 1891 app->pending_processes--; 1892 1893 nxt_router_conf_error(task, tmcf); 1894 } 1895 1896 1897 static nxt_int_t 1898 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 1899 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 1900 { 1901 nxt_int_t ret; 1902 nxt_uint_t n, threads; 1903 nxt_queue_link_t *qlk; 1904 nxt_router_engine_conf_t *recf; 1905 1906 threads = tmcf->conf->threads; 1907 1908 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 1909 sizeof(nxt_router_engine_conf_t)); 1910 if (nxt_slow_path(tmcf->engines == NULL)) { 1911 return NXT_ERROR; 1912 } 1913 1914 n = 0; 1915 1916 for (qlk = nxt_queue_first(&router->engines); 1917 qlk != nxt_queue_tail(&router->engines); 1918 qlk = nxt_queue_next(qlk)) 1919 { 1920 recf = nxt_array_zero_add(tmcf->engines); 1921 if (nxt_slow_path(recf == NULL)) { 1922 return NXT_ERROR; 1923 } 1924 1925 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 1926 1927 if (n < threads) { 1928 recf->action = NXT_ROUTER_ENGINE_KEEP; 1929 ret = nxt_router_engine_conf_update(tmcf, recf); 1930 1931 } else { 1932 recf->action = NXT_ROUTER_ENGINE_DELETE; 1933 ret = nxt_router_engine_conf_delete(tmcf, recf); 1934 } 1935 1936 if (nxt_slow_path(ret != NXT_OK)) { 1937 return ret; 1938 } 1939 1940 n++; 1941 } 1942 1943 tmcf->new_threads = n; 1944 1945 while (n < threads) { 1946 recf = nxt_array_zero_add(tmcf->engines); 1947 if (nxt_slow_path(recf == NULL)) { 1948 return NXT_ERROR; 1949 } 1950 1951 recf->action = NXT_ROUTER_ENGINE_ADD; 1952 1953 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 1954 if (nxt_slow_path(recf->engine == NULL)) { 1955 return NXT_ERROR; 1956 } 1957 1958 ret = nxt_router_engine_conf_create(tmcf, recf); 1959 if (nxt_slow_path(ret != NXT_OK)) { 1960 return ret; 1961 } 1962 1963 n++; 1964 } 1965 1966 return NXT_OK; 1967 } 1968 1969 1970 static nxt_int_t 1971 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 1972 nxt_router_engine_conf_t *recf) 1973 { 1974 nxt_int_t ret; 1975 1976 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 1977 nxt_router_listen_socket_create); 1978 if (nxt_slow_path(ret != NXT_OK)) { 1979 return ret; 1980 } 1981 1982 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 1983 nxt_router_listen_socket_create); 1984 if (nxt_slow_path(ret != NXT_OK)) { 1985 return ret; 1986 } 1987 1988 return ret; 1989 } 1990 1991 1992 static nxt_int_t 1993 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 1994 nxt_router_engine_conf_t *recf) 1995 { 1996 nxt_int_t ret; 1997 1998 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 1999 nxt_router_listen_socket_create); 2000 if (nxt_slow_path(ret != NXT_OK)) { 2001 return ret; 2002 } 2003 2004 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2005 nxt_router_listen_socket_update); 2006 if (nxt_slow_path(ret != NXT_OK)) { 2007 return ret; 2008 } 2009 2010 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2011 if (nxt_slow_path(ret != NXT_OK)) { 2012 return ret; 2013 } 2014 2015 return ret; 2016 } 2017 2018 2019 static nxt_int_t 2020 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2021 nxt_router_engine_conf_t *recf) 2022 { 2023 nxt_int_t ret; 2024 2025 ret = nxt_router_engine_quit(tmcf, recf); 2026 if (nxt_slow_path(ret != NXT_OK)) { 2027 return ret; 2028 } 2029 2030 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); 2031 if (nxt_slow_path(ret != NXT_OK)) { 2032 return ret; 2033 } 2034 2035 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2036 } 2037 2038 2039 static nxt_int_t 2040 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 2041 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 2042 nxt_work_handler_t handler) 2043 { 2044 nxt_joint_job_t *job; 2045 nxt_queue_link_t *qlk; 2046 nxt_socket_conf_t *skcf; 2047 nxt_socket_conf_joint_t *joint; 2048 2049 for (qlk = nxt_queue_first(sockets); 2050 qlk != nxt_queue_tail(sockets); 2051 qlk = nxt_queue_next(qlk)) 2052 { 2053 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2054 if (nxt_slow_path(job == NULL)) { 2055 return NXT_ERROR; 2056 } 2057 2058 job->work.next = recf->jobs; 2059 recf->jobs = &job->work; 2060 2061 job->task = tmcf->engine->task; 2062 job->work.handler = handler; 2063 job->work.task = &job->task; 2064 job->work.obj = job; 2065 job->tmcf = tmcf; 2066 2067 tmcf->count++; 2068 2069 joint = nxt_mp_alloc(tmcf->conf->mem_pool, 2070 sizeof(nxt_socket_conf_joint_t)); 2071 if (nxt_slow_path(joint == NULL)) { 2072 return NXT_ERROR; 2073 } 2074 2075 job->work.data = joint; 2076 2077 joint->count = 1; 2078 2079 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2080 skcf->count++; 2081 joint->socket_conf = skcf; 2082 2083 joint->engine = recf->engine; 2084 } 2085 2086 return NXT_OK; 2087 } 2088 2089 2090 static nxt_int_t 2091 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 2092 nxt_router_engine_conf_t *recf) 2093 { 2094 nxt_joint_job_t *job; 2095 2096 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2097 if (nxt_slow_path(job == NULL)) { 2098 return NXT_ERROR; 2099 } 2100 2101 job->work.next = recf->jobs; 2102 recf->jobs = &job->work; 2103 2104 job->task = tmcf->engine->task; 2105 job->work.handler = nxt_router_worker_thread_quit; 2106 job->work.task = &job->task; 2107 job->work.obj = NULL; 2108 job->work.data = NULL; 2109 job->tmcf = NULL; 2110 2111 return NXT_OK; 2112 } 2113 2114 2115 static nxt_int_t 2116 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 2117 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 2118 { 2119 nxt_joint_job_t *job; 2120 nxt_queue_link_t *qlk; 2121 2122 for (qlk = nxt_queue_first(sockets); 2123 qlk != nxt_queue_tail(sockets); 2124 qlk = nxt_queue_next(qlk)) 2125 { 2126 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2127 if (nxt_slow_path(job == NULL)) { 2128 return NXT_ERROR; 2129 } 2130 2131 job->work.next = recf->jobs; 2132 recf->jobs = &job->work; 2133 2134 job->task = tmcf->engine->task; 2135 job->work.handler = nxt_router_listen_socket_delete; 2136 job->work.task = &job->task; 2137 job->work.obj = job; 2138 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2139 job->tmcf = tmcf; 2140 2141 tmcf->count++; 2142 } 2143 2144 return NXT_OK; 2145 } 2146 2147 2148 static nxt_int_t 2149 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 2150 nxt_router_temp_conf_t *tmcf) 2151 { 2152 nxt_int_t ret; 2153 nxt_uint_t i, threads; 2154 nxt_router_engine_conf_t *recf; 2155 2156 recf = tmcf->engines->elts; 2157 threads = tmcf->conf->threads; 2158 2159 for (i = tmcf->new_threads; i < threads; i++) { 2160 ret = nxt_router_thread_create(task, rt, recf[i].engine); 2161 if (nxt_slow_path(ret != NXT_OK)) { 2162 return ret; 2163 } 2164 } 2165 2166 return NXT_OK; 2167 } 2168 2169 2170 static nxt_int_t 2171 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 2172 nxt_event_engine_t *engine) 2173 { 2174 nxt_int_t ret; 2175 nxt_thread_link_t *link; 2176 nxt_thread_handle_t handle; 2177 2178 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2179 2180 if (nxt_slow_path(link == NULL)) { 2181 return NXT_ERROR; 2182 } 2183 2184 link->start = nxt_router_thread_start; 2185 link->engine = engine; 2186 link->work.handler = nxt_router_thread_exit_handler; 2187 link->work.task = task; 2188 link->work.data = link; 2189 2190 nxt_queue_insert_tail(&rt->engines, &engine->link); 2191 2192 ret = nxt_thread_create(&handle, link); 2193 2194 if (nxt_slow_path(ret != NXT_OK)) { 2195 nxt_queue_remove(&engine->link); 2196 } 2197 2198 return ret; 2199 } 2200 2201 2202 static void 2203 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 2204 nxt_router_temp_conf_t *tmcf) 2205 { 2206 nxt_app_t *app; 2207 2208 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 2209 2210 nxt_router_app_quit(task, app); 2211 2212 } nxt_queue_loop; 2213 2214 nxt_queue_add(&router->apps, &tmcf->previous); 2215 nxt_queue_add(&router->apps, &tmcf->apps); 2216 } 2217 2218 2219 static void 2220 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 2221 { 2222 nxt_uint_t n; 2223 nxt_event_engine_t *engine; 2224 nxt_router_engine_conf_t *recf; 2225 2226 recf = tmcf->engines->elts; 2227 2228 for (n = tmcf->engines->nelts; n != 0; n--) { 2229 engine = recf->engine; 2230 2231 switch (recf->action) { 2232 2233 case NXT_ROUTER_ENGINE_KEEP: 2234 break; 2235 2236 case NXT_ROUTER_ENGINE_ADD: 2237 nxt_queue_insert_tail(&router->engines, &engine->link0); 2238 break; 2239 2240 case NXT_ROUTER_ENGINE_DELETE: 2241 nxt_queue_remove(&engine->link0); 2242 break; 2243 } 2244 2245 nxt_router_engine_post(engine, recf->jobs); 2246 2247 recf++; 2248 } 2249 } 2250 2251 2252 static void 2253 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 2254 { 2255 nxt_work_t *work, *next; 2256 2257 for (work = jobs; work != NULL; work = next) { 2258 next = work->next; 2259 work->next = NULL; 2260 2261 nxt_event_engine_post(engine, work); 2262 } 2263 } 2264 2265 2266 static nxt_port_handlers_t nxt_router_app_port_handlers = { 2267 .mmap = nxt_port_mmap_handler, 2268 .data = nxt_port_rpc_handler, 2269 }; 2270 2271 2272 static void 2273 nxt_router_thread_start(void *data) 2274 { 2275 nxt_int_t ret; 2276 nxt_port_t *port; 2277 nxt_task_t *task; 2278 nxt_thread_t *thread; 2279 nxt_thread_link_t *link; 2280 nxt_event_engine_t *engine; 2281 2282 link = data; 2283 engine = link->engine; 2284 task = &engine->task; 2285 2286 thread = nxt_thread(); 2287 2288 nxt_event_engine_thread_adopt(engine); 2289 2290 /* STUB */ 2291 thread->runtime = engine->task.thread->runtime; 2292 2293 engine->task.thread = thread; 2294 engine->task.log = thread->log; 2295 thread->engine = engine; 2296 thread->task = &engine->task; 2297 #if 0 2298 thread->fiber = &engine->fibers->fiber; 2299 #endif 2300 2301 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 2302 if (nxt_slow_path(engine->mem_pool == NULL)) { 2303 return; 2304 } 2305 2306 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 2307 NXT_PROCESS_ROUTER); 2308 if (nxt_slow_path(port == NULL)) { 2309 return; 2310 } 2311 2312 ret = nxt_port_socket_init(task, port, 0); 2313 if (nxt_slow_path(ret != NXT_OK)) { 2314 nxt_port_use(task, port, -1); 2315 return; 2316 } 2317 2318 engine->port = port; 2319 2320 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 2321 2322 nxt_event_engine_start(engine); 2323 } 2324 2325 2326 static void 2327 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 2328 { 2329 nxt_joint_job_t *job; 2330 nxt_socket_conf_t *skcf; 2331 nxt_listen_event_t *lev; 2332 nxt_listen_socket_t *ls; 2333 nxt_thread_spinlock_t *lock; 2334 nxt_socket_conf_joint_t *joint; 2335 2336 job = obj; 2337 joint = data; 2338 2339 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 2340 2341 skcf = joint->socket_conf; 2342 ls = skcf->listen; 2343 2344 lev = nxt_listen_event(task, ls); 2345 if (nxt_slow_path(lev == NULL)) { 2346 nxt_router_listen_socket_release(task, skcf); 2347 return; 2348 } 2349 2350 lev->socket.data = joint; 2351 2352 lock = &skcf->router_conf->router->lock; 2353 2354 nxt_thread_spin_lock(lock); 2355 ls->count++; 2356 nxt_thread_spin_unlock(lock); 2357 2358 job->work.next = NULL; 2359 job->work.handler = nxt_router_conf_wait; 2360 2361 nxt_event_engine_post(job->tmcf->engine, &job->work); 2362 } 2363 2364 2365 nxt_inline nxt_listen_event_t * 2366 nxt_router_listen_event(nxt_queue_t *listen_connections, 2367 nxt_socket_conf_t *skcf) 2368 { 2369 nxt_socket_t fd; 2370 nxt_queue_link_t *qlk; 2371 nxt_listen_event_t *lev; 2372 2373 fd = skcf->listen->socket; 2374 2375 for (qlk = nxt_queue_first(listen_connections); 2376 qlk != nxt_queue_tail(listen_connections); 2377 qlk = nxt_queue_next(qlk)) 2378 { 2379 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 2380 2381 if (fd == lev->socket.fd) { 2382 return lev; 2383 } 2384 } 2385 2386 return NULL; 2387 } 2388 2389 2390 static void 2391 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 2392 { 2393 nxt_joint_job_t *job; 2394 nxt_event_engine_t *engine; 2395 nxt_listen_event_t *lev; 2396 nxt_socket_conf_joint_t *joint, *old; 2397 2398 job = obj; 2399 joint = data; 2400 2401 engine = task->thread->engine; 2402 2403 nxt_queue_insert_tail(&engine->joints, &joint->link); 2404 2405 lev = nxt_router_listen_event(&engine->listen_connections, 2406 joint->socket_conf); 2407 2408 old = lev->socket.data; 2409 lev->socket.data = joint; 2410 lev->listen = joint->socket_conf->listen; 2411 2412 job->work.next = NULL; 2413 job->work.handler = nxt_router_conf_wait; 2414 2415 nxt_event_engine_post(job->tmcf->engine, &job->work); 2416 2417 /* 2418 * The task is allocated from configuration temporary 2419 * memory pool so it can be freed after engine post operation. 2420 */ 2421 2422 nxt_router_conf_release(&engine->task, old); 2423 } 2424 2425 2426 static void 2427 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 2428 { 2429 nxt_joint_job_t *job; 2430 nxt_socket_conf_t *skcf; 2431 nxt_listen_event_t *lev; 2432 nxt_event_engine_t *engine; 2433 2434 job = obj; 2435 skcf = data; 2436 2437 engine = task->thread->engine; 2438 2439 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 2440 2441 nxt_fd_event_delete(engine, &lev->socket); 2442 2443 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 2444 lev->socket.fd); 2445 2446 lev->timer.handler = nxt_router_listen_socket_close; 2447 lev->timer.work_queue = &engine->fast_work_queue; 2448 2449 nxt_timer_add(engine, &lev->timer, 0); 2450 2451 job->work.next = NULL; 2452 job->work.handler = nxt_router_conf_wait; 2453 2454 nxt_event_engine_post(job->tmcf->engine, &job->work); 2455 } 2456 2457 2458 static void 2459 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 2460 { 2461 nxt_event_engine_t *engine; 2462 2463 nxt_debug(task, "router worker thread quit"); 2464 2465 engine = task->thread->engine; 2466 2467 engine->shutdown = 1; 2468 2469 if (nxt_queue_is_empty(&engine->joints)) { 2470 nxt_thread_exit(task->thread); 2471 } 2472 } 2473 2474 2475 static void 2476 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 2477 { 2478 nxt_timer_t *timer; 2479 nxt_listen_event_t *lev; 2480 nxt_socket_conf_joint_t *joint; 2481 2482 timer = obj; 2483 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 2484 joint = lev->socket.data; 2485 2486 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 2487 lev->socket.fd); 2488 2489 nxt_queue_remove(&lev->link); 2490 2491 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 2492 task = &task->thread->engine->task; 2493 2494 nxt_router_listen_socket_release(task, joint->socket_conf); 2495 2496 nxt_free(lev); 2497 2498 nxt_router_conf_release(task, joint); 2499 } 2500 2501 2502 static void 2503 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 2504 { 2505 nxt_listen_socket_t *ls; 2506 nxt_thread_spinlock_t *lock; 2507 2508 ls = skcf->listen; 2509 lock = &skcf->router_conf->router->lock; 2510 2511 nxt_thread_spin_lock(lock); 2512 2513 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 2514 task->thread->engine, ls->count); 2515 2516 if (--ls->count != 0) { 2517 ls = NULL; 2518 } 2519 2520 nxt_thread_spin_unlock(lock); 2521 2522 if (ls != NULL) { 2523 nxt_socket_close(task, ls->socket); 2524 nxt_free(ls); 2525 } 2526 } 2527 2528 2529 static void 2530 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 2531 { 2532 nxt_app_t *app; 2533 nxt_socket_conf_t *skcf; 2534 nxt_router_conf_t *rtcf; 2535 nxt_event_engine_t *engine; 2536 nxt_thread_spinlock_t *lock; 2537 2538 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 2539 2540 if (--joint->count != 0) { 2541 return; 2542 } 2543 2544 nxt_queue_remove(&joint->link); 2545 2546 /* 2547 * The joint content can not be safely used after the critical 2548 * section protected by the spinlock because its memory pool may 2549 * be already destroyed by another thread. 2550 */ 2551 engine = joint->engine; 2552 2553 skcf = joint->socket_conf; 2554 app = skcf->application; 2555 rtcf = skcf->router_conf; 2556 lock = &rtcf->router->lock; 2557 2558 nxt_thread_spin_lock(lock); 2559 2560 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 2561 rtcf, rtcf->count); 2562 2563 if (--skcf->count != 0) { 2564 rtcf = NULL; 2565 app = NULL; 2566 2567 } else { 2568 nxt_queue_remove(&skcf->link); 2569 2570 if (--rtcf->count != 0) { 2571 rtcf = NULL; 2572 } 2573 } 2574 2575 nxt_thread_spin_unlock(lock); 2576 2577 if (app != NULL) { 2578 nxt_router_app_use(task, app, -1); 2579 } 2580 2581 /* TODO remove engine->port */ 2582 /* TODO excude from connected ports */ 2583 2584 if (rtcf != NULL) { 2585 nxt_debug(task, "old router conf is destroyed"); 2586 2587 nxt_mp_thread_adopt(rtcf->mem_pool); 2588 2589 nxt_mp_destroy(rtcf->mem_pool); 2590 } 2591 2592 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 2593 nxt_thread_exit(task->thread); 2594 } 2595 } 2596 2597 2598 static void 2599 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 2600 { 2601 nxt_port_t *port; 2602 nxt_thread_link_t *link; 2603 nxt_event_engine_t *engine; 2604 nxt_thread_handle_t handle; 2605 2606 handle = (nxt_thread_handle_t) obj; 2607 link = data; 2608 2609 nxt_thread_wait(handle); 2610 2611 engine = link->engine; 2612 2613 nxt_queue_remove(&engine->link); 2614 2615 port = engine->port; 2616 2617 // TODO notify all apps 2618 2619 port->engine = task->thread->engine; 2620 nxt_mp_thread_adopt(port->mem_pool); 2621 nxt_port_use(task, port, -1); 2622 2623 nxt_mp_thread_adopt(engine->mem_pool); 2624 nxt_mp_destroy(engine->mem_pool); 2625 2626 nxt_event_engine_free(engine); 2627 2628 nxt_free(link); 2629 } 2630 2631 2632 static void 2633 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2634 void *data) 2635 { 2636 size_t dump_size; 2637 nxt_int_t ret; 2638 nxt_buf_t *b, *last; 2639 nxt_http_request_t *r; 2640 nxt_req_conn_link_t *rc; 2641 nxt_app_parse_ctx_t *ar; 2642 2643 b = msg->buf; 2644 rc = data; 2645 2646 dump_size = nxt_buf_used_size(b); 2647 2648 if (dump_size > 300) { 2649 dump_size = 300; 2650 } 2651 2652 nxt_debug(task, "%srouter app data (%uz): %*s", 2653 msg->port_msg.last ? "last " : "", msg->size, dump_size, 2654 b->mem.pos); 2655 2656 if (msg->size == 0) { 2657 b = NULL; 2658 } 2659 2660 ar = rc->ap; 2661 if (nxt_slow_path(ar == NULL)) { 2662 return; 2663 } 2664 2665 if (msg->port_msg.last != 0) { 2666 nxt_debug(task, "router data create last buf"); 2667 2668 last = nxt_http_request_last_buffer(task, ar->request); 2669 if (nxt_slow_path(last == NULL)) { 2670 nxt_app_http_req_done(task, ar); 2671 nxt_router_rc_unlink(task, rc); 2672 return; 2673 } 2674 2675 nxt_buf_chain_add(&b, last); 2676 2677 nxt_router_rc_unlink(task, rc); 2678 2679 } else { 2680 if (rc->app->timeout != 0) { 2681 ar->timer.handler = nxt_router_app_timeout; 2682 nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout); 2683 } 2684 } 2685 2686 if (b == NULL) { 2687 return; 2688 } 2689 2690 if (msg->buf == b) { 2691 /* Disable instant buffer completion/re-using by port. */ 2692 msg->buf = NULL; 2693 } 2694 2695 r = ar->request; 2696 2697 if (r->header_sent) { 2698 nxt_buf_chain_add(&r->out, b); 2699 nxt_http_request_send_body(task, r, NULL); 2700 2701 } else { 2702 ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem); 2703 if (nxt_slow_path(ret != NXT_DONE)) { 2704 goto fail; 2705 } 2706 2707 r->resp.fields = ar->resp_parser.fields; 2708 2709 ret = nxt_http_fields_process(r->resp.fields, 2710 &nxt_response_fields_hash, r); 2711 if (nxt_slow_path(ret != NXT_OK)) { 2712 goto fail; 2713 } 2714 2715 if (nxt_buf_mem_used_size(&b->mem) == 0) { 2716 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2717 b->completion_handler, task, b, b->parent); 2718 2719 b = b->next; 2720 } 2721 2722 if (b != NULL) { 2723 nxt_buf_chain_add(&r->out, b); 2724 } 2725 2726 r->state = &nxt_http_request_send_state; 2727 2728 nxt_http_request_header_send(task, r); 2729 } 2730 2731 return; 2732 2733 fail: 2734 2735 nxt_app_http_req_done(task, ar); 2736 nxt_router_rc_unlink(task, rc); 2737 2738 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 2739 } 2740 2741 2742 static const nxt_http_request_state_t nxt_http_request_send_state 2743 nxt_aligned(64) = 2744 { 2745 .ready_handler = nxt_http_request_send_body, 2746 .error_handler = nxt_http_request_close_handler, 2747 }; 2748 2749 2750 static void 2751 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 2752 { 2753 nxt_buf_t *out; 2754 nxt_http_request_t *r; 2755 2756 r = obj; 2757 2758 out = r->out; 2759 2760 if (out != NULL) { 2761 r->out = NULL; 2762 nxt_http_request_send(task, r, out); 2763 } 2764 } 2765 2766 2767 static void 2768 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2769 void *data) 2770 { 2771 nxt_int_t res; 2772 nxt_port_t *port; 2773 nxt_bool_t cancelled; 2774 nxt_req_app_link_t *ra; 2775 nxt_req_conn_link_t *rc; 2776 2777 rc = data; 2778 2779 ra = rc->ra; 2780 2781 if (ra != NULL) { 2782 cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); 2783 2784 if (cancelled) { 2785 nxt_router_ra_inc_use(ra); 2786 2787 res = nxt_router_app_port(task, rc->app, ra); 2788 2789 if (res == NXT_OK) { 2790 port = ra->app_port; 2791 2792 if (nxt_slow_path(port == NULL)) { 2793 nxt_log(task, NXT_LOG_ERR, "port is NULL in cancelled ra"); 2794 return; 2795 } 2796 2797 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc, 2798 port->pid); 2799 2800 nxt_router_app_prepare_request(task, ra); 2801 } 2802 2803 msg->port_msg.last = 0; 2804 2805 return; 2806 } 2807 } 2808 2809 nxt_http_request_error(task, rc->ap->request, NXT_HTTP_SERVICE_UNAVAILABLE); 2810 2811 nxt_router_rc_unlink(task, rc); 2812 } 2813 2814 2815 static void 2816 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2817 void *data) 2818 { 2819 nxt_app_t *app; 2820 nxt_port_t *port; 2821 2822 app = data; 2823 port = msg->u.new_port; 2824 2825 nxt_assert(app != NULL); 2826 nxt_assert(port != NULL); 2827 2828 port->app = app; 2829 2830 nxt_thread_mutex_lock(&app->mutex); 2831 2832 nxt_assert(app->pending_processes != 0); 2833 2834 app->pending_processes--; 2835 app->processes++; 2836 2837 nxt_thread_mutex_unlock(&app->mutex); 2838 2839 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 2840 &app->name, port->pid, app->processes, app->pending_processes); 2841 2842 nxt_router_app_port_release(task, port, 0, 0); 2843 } 2844 2845 2846 static void 2847 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2848 void *data) 2849 { 2850 nxt_app_t *app; 2851 nxt_queue_link_t *lnk; 2852 nxt_req_app_link_t *ra; 2853 2854 app = data; 2855 2856 nxt_assert(app != NULL); 2857 2858 nxt_debug(task, "app '%V' %p start error", &app->name, app); 2859 2860 nxt_thread_mutex_lock(&app->mutex); 2861 2862 nxt_assert(app->pending_processes != 0); 2863 2864 app->pending_processes--; 2865 2866 if (!nxt_queue_is_empty(&app->requests)) { 2867 lnk = nxt_queue_last(&app->requests); 2868 nxt_queue_remove(lnk); 2869 lnk->next = NULL; 2870 2871 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); 2872 2873 } else { 2874 ra = NULL; 2875 } 2876 2877 nxt_thread_mutex_unlock(&app->mutex); 2878 2879 if (ra != NULL) { 2880 nxt_debug(task, "app '%V' %p abort next stream #%uD", 2881 &app->name, app, ra->stream); 2882 2883 nxt_router_ra_error(ra, 500, "Failed to start application process"); 2884 nxt_router_ra_use(task, ra, -1); 2885 } 2886 2887 nxt_router_app_use(task, app, -1); 2888 } 2889 2890 2891 void 2892 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 2893 { 2894 int c; 2895 2896 c = nxt_atomic_fetch_add(&app->use_count, i); 2897 2898 if (i < 0 && c == -i) { 2899 2900 nxt_assert(app->live == 0); 2901 nxt_assert(app->processes == 0); 2902 nxt_assert(app->idle_processes == 0); 2903 nxt_assert(app->pending_processes == 0); 2904 nxt_assert(nxt_queue_is_empty(&app->requests)); 2905 nxt_assert(nxt_queue_is_empty(&app->ports)); 2906 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 2907 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 2908 2909 nxt_thread_mutex_destroy(&app->mutex); 2910 nxt_free(app); 2911 } 2912 } 2913 2914 2915 nxt_inline nxt_bool_t 2916 nxt_router_app_first_port_busy(nxt_app_t *app) 2917 { 2918 nxt_port_t *port; 2919 nxt_queue_link_t *lnk; 2920 2921 lnk = nxt_queue_first(&app->ports); 2922 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 2923 2924 return port->app_pending_responses > 0; 2925 } 2926 2927 2928 nxt_inline nxt_port_t * 2929 nxt_router_pop_first_port(nxt_app_t *app) 2930 { 2931 nxt_port_t *port; 2932 nxt_queue_link_t *lnk; 2933 2934 lnk = nxt_queue_first(&app->ports); 2935 nxt_queue_remove(lnk); 2936 2937 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 2938 2939 port->app_pending_responses++; 2940 2941 if (nxt_queue_chk_remove(&port->idle_link)) { 2942 app->idle_processes--; 2943 2944 if (port->idle_start == 0) { 2945 nxt_assert(app->idle_processes < app->spare_processes); 2946 2947 } else { 2948 nxt_assert(app->idle_processes >= app->spare_processes); 2949 2950 port->idle_start = 0; 2951 } 2952 } 2953 2954 if ((app->max_pending_responses == 0 2955 || port->app_pending_responses < app->max_pending_responses) 2956 && (app->max_requests == 0 2957 || port->app_responses + port->app_pending_responses 2958 < app->max_requests)) 2959 { 2960 nxt_queue_insert_tail(&app->ports, lnk); 2961 2962 nxt_port_inc_use(port); 2963 2964 } else { 2965 lnk->next = NULL; 2966 } 2967 2968 return port; 2969 } 2970 2971 2972 nxt_inline nxt_port_t * 2973 nxt_router_app_get_port_for_quit(nxt_app_t *app) 2974 { 2975 nxt_port_t *port; 2976 2977 port = NULL; 2978 2979 nxt_thread_mutex_lock(&app->mutex); 2980 2981 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 2982 2983 if (port->app_pending_responses > 0) { 2984 port = NULL; 2985 2986 continue; 2987 } 2988 2989 /* Caller is responsible to decrease port use count. */ 2990 nxt_queue_chk_remove(&port->app_link); 2991 2992 if (nxt_queue_chk_remove(&port->idle_link)) { 2993 app->idle_processes--; 2994 } 2995 2996 /* Caller is responsible to decrease app use count. */ 2997 port->app = NULL; 2998 app->processes--; 2999 3000 break; 3001 3002 } nxt_queue_loop; 3003 3004 nxt_thread_mutex_unlock(&app->mutex); 3005 3006 return port; 3007 } 3008 3009 3010 static void 3011 nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app) 3012 { 3013 nxt_port_t *port; 3014 3015 nxt_queue_remove(&app->link); 3016 3017 app->live = 0; 3018 3019 for ( ;; ) { 3020 port = nxt_router_app_get_port_for_quit(app); 3021 if (port == NULL) { 3022 break; 3023 } 3024 3025 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); 3026 3027 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 3028 3029 nxt_port_use(task, port, -1); 3030 nxt_router_app_use(task, app, -1); 3031 } 3032 3033 if (nxt_timer_is_in_tree(&app->idle_timer)) { 3034 nxt_assert(app->engine == task->thread->engine); 3035 3036 app->idle_timer.handler = nxt_router_app_release_handler; 3037 nxt_timer_add(app->engine, &app->idle_timer, 0); 3038 3039 } else { 3040 nxt_router_app_use(task, app, -1); 3041 } 3042 } 3043 3044 3045 static void 3046 nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) 3047 { 3048 nxt_req_app_link_t *ra; 3049 3050 ra = data; 3051 3052 #if (NXT_DEBUG) 3053 { 3054 nxt_app_t *app; 3055 3056 app = obj; 3057 3058 nxt_assert(app != NULL); 3059 nxt_assert(ra != NULL); 3060 nxt_assert(ra->app_port != NULL); 3061 3062 nxt_debug(task, "app '%V' %p process next stream #%uD", 3063 &app->name, app, ra->stream); 3064 } 3065 #endif 3066 3067 nxt_router_app_prepare_request(task, ra); 3068 } 3069 3070 3071 static void 3072 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 3073 uint32_t request_failed, uint32_t got_response) 3074 { 3075 nxt_app_t *app; 3076 nxt_bool_t port_unchained; 3077 nxt_bool_t send_quit, cancelled, adjust_idle_timer; 3078 nxt_queue_link_t *lnk; 3079 nxt_req_app_link_t *ra, *pending_ra, *re_ra; 3080 nxt_port_select_state_t state; 3081 3082 nxt_assert(port != NULL); 3083 nxt_assert(port->app != NULL); 3084 3085 ra = NULL; 3086 3087 app = port->app; 3088 3089 nxt_thread_mutex_lock(&app->mutex); 3090 3091 port->app_pending_responses -= request_failed + got_response; 3092 port->app_responses += got_response; 3093 3094 if (nxt_slow_path(app->live == 0)) { 3095 goto app_dead; 3096 } 3097 3098 if (port->pair[1] != -1 3099 && (app->max_pending_responses == 0 3100 || port->app_pending_responses < app->max_pending_responses) 3101 && (app->max_requests == 0 3102 || port->app_responses + port->app_pending_responses 3103 < app->max_requests)) 3104 { 3105 if (port->app_link.next == NULL) { 3106 if (port->app_pending_responses > 0) { 3107 nxt_queue_insert_tail(&app->ports, &port->app_link); 3108 3109 } else { 3110 nxt_queue_insert_head(&app->ports, &port->app_link); 3111 } 3112 3113 nxt_port_inc_use(port); 3114 3115 } else { 3116 if (port->app_pending_responses == 0 3117 && nxt_queue_first(&app->ports) != &port->app_link) 3118 { 3119 nxt_queue_remove(&port->app_link); 3120 nxt_queue_insert_head(&app->ports, &port->app_link); 3121 } 3122 } 3123 } 3124 3125 if (!nxt_queue_is_empty(&app->ports) 3126 && !nxt_queue_is_empty(&app->requests)) 3127 { 3128 lnk = nxt_queue_first(&app->requests); 3129 nxt_queue_remove(lnk); 3130 lnk->next = NULL; 3131 3132 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); 3133 3134 ra->app_port = nxt_router_pop_first_port(app); 3135 3136 if (ra->app_port->app_pending_responses > 1) { 3137 nxt_router_ra_pending(task, app, ra); 3138 } 3139 } 3140 3141 app_dead: 3142 3143 /* Pop first pending request for this port. */ 3144 if ((request_failed > 0 || got_response > 0) 3145 && !nxt_queue_is_empty(&port->pending_requests)) 3146 { 3147 lnk = nxt_queue_first(&port->pending_requests); 3148 nxt_queue_remove(lnk); 3149 lnk->next = NULL; 3150 3151 pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, 3152 link_port_pending); 3153 3154 nxt_assert(pending_ra->link_app_pending.next != NULL); 3155 3156 nxt_queue_remove(&pending_ra->link_app_pending); 3157 pending_ra->link_app_pending.next = NULL; 3158 3159 } else { 3160 pending_ra = NULL; 3161 } 3162 3163 /* Try to cancel and re-schedule first stalled request for this app. */ 3164 if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { 3165 lnk = nxt_queue_first(&app->pending); 3166 3167 re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending); 3168 3169 if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { 3170 3171 nxt_debug(task, "app '%V' stalled request #%uD detected", 3172 &app->name, re_ra->stream); 3173 3174 cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info, 3175 re_ra->stream); 3176 3177 if (cancelled) { 3178 nxt_router_ra_inc_use(re_ra); 3179 3180 state.ra = re_ra; 3181 state.app = app; 3182 3183 nxt_router_port_select(task, &state); 3184 3185 goto re_ra_cancelled; 3186 } 3187 } 3188 } 3189 3190 re_ra = NULL; 3191 3192 re_ra_cancelled: 3193 3194 send_quit = (app->live == 0 && port->app_pending_responses == 0) 3195 || (app->max_requests > 0 && port->app_pending_responses == 0 3196 && port->app_responses >= app->max_requests); 3197 3198 if (send_quit) { 3199 port_unchained = nxt_queue_chk_remove(&port->app_link); 3200 3201 port->app = NULL; 3202 app->processes--; 3203 3204 } else { 3205 port_unchained = 0; 3206 } 3207 3208 adjust_idle_timer = 0; 3209 3210 if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) { 3211 nxt_assert(port->idle_link.next == NULL); 3212 3213 if (app->idle_processes == app->spare_processes 3214 && app->adjust_idle_work.data == NULL) 3215 { 3216 adjust_idle_timer = 1; 3217 app->adjust_idle_work.data = app; 3218 app->adjust_idle_work.next = NULL; 3219 } 3220 3221 if (app->idle_processes < app->spare_processes) { 3222 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 3223 3224 } else { 3225 nxt_queue_insert_tail(&app->idle_ports, &port->idle_link); 3226 3227 port->idle_start = task->thread->engine->timers.now; 3228 } 3229 3230 app->idle_processes++; 3231 } 3232 3233 nxt_thread_mutex_unlock(&app->mutex); 3234 3235 if (adjust_idle_timer) { 3236 nxt_router_app_use(task, app, 1); 3237 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 3238 } 3239 3240 if (pending_ra != NULL) { 3241 nxt_router_ra_use(task, pending_ra, -1); 3242 } 3243 3244 if (re_ra != NULL) { 3245 if (nxt_router_port_post_select(task, &state) == NXT_OK) { 3246 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3247 nxt_router_app_process_request, 3248 &task->thread->engine->task, app, re_ra); 3249 } 3250 } 3251 3252 if (ra != NULL) { 3253 nxt_router_ra_use(task, ra, -1); 3254 3255 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3256 nxt_router_app_process_request, 3257 &task->thread->engine->task, app, ra); 3258 3259 goto adjust_use; 3260 } 3261 3262 /* ? */ 3263 if (port->pair[1] == -1) { 3264 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 3265 &app->name, app, port, port->pid); 3266 3267 goto adjust_use; 3268 } 3269 3270 if (send_quit) { 3271 nxt_debug(task, "app '%V' %p send QUIT to port", 3272 &app->name, app); 3273 3274 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, 3275 -1, 0, 0, NULL); 3276 3277 if (port_unchained) { 3278 nxt_port_use(task, port, -1); 3279 } 3280 3281 nxt_router_app_use(task, app, -1); 3282 3283 goto adjust_use; 3284 } 3285 3286 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 3287 &app->name, app); 3288 3289 adjust_use: 3290 3291 if (request_failed > 0 || got_response > 0) { 3292 nxt_port_use(task, port, -1); 3293 } 3294 } 3295 3296 3297 void 3298 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 3299 { 3300 nxt_app_t *app; 3301 nxt_bool_t unchain, start_process; 3302 nxt_port_t *idle_port; 3303 nxt_queue_link_t *idle_lnk; 3304 3305 app = port->app; 3306 3307 nxt_assert(app != NULL); 3308 3309 nxt_thread_mutex_lock(&app->mutex); 3310 3311 unchain = nxt_queue_chk_remove(&port->app_link); 3312 3313 if (nxt_queue_chk_remove(&port->idle_link)) { 3314 app->idle_processes--; 3315 3316 if (port->idle_start == 0 3317 && app->idle_processes >= app->spare_processes) 3318 { 3319 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 3320 3321 idle_lnk = nxt_queue_last(&app->idle_ports); 3322 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 3323 nxt_queue_remove(idle_lnk); 3324 3325 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 3326 3327 idle_port->idle_start = 0; 3328 } 3329 } 3330 3331 app->processes--; 3332 3333 start_process = app->live != 0 3334 && !task->thread->engine->shutdown 3335 && nxt_router_app_can_start(app) 3336 && (!nxt_queue_is_empty(&app->requests) 3337 || nxt_router_app_need_start(app)); 3338 3339 if (start_process) { 3340 app->pending_processes++; 3341 } 3342 3343 nxt_thread_mutex_unlock(&app->mutex); 3344 3345 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 3346 3347 if (unchain) { 3348 nxt_port_use(task, port, -1); 3349 } 3350 3351 if (start_process) { 3352 nxt_router_start_app_process(task, app); 3353 } 3354 } 3355 3356 3357 static void 3358 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 3359 { 3360 nxt_app_t *app; 3361 nxt_bool_t queued; 3362 nxt_port_t *port; 3363 nxt_msec_t timeout, threshold; 3364 nxt_queue_link_t *lnk; 3365 nxt_event_engine_t *engine; 3366 3367 app = obj; 3368 queued = (data == app); 3369 3370 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 3371 &app->name, queued); 3372 3373 engine = task->thread->engine; 3374 3375 nxt_assert(app->engine == engine); 3376 3377 threshold = engine->timers.now + app->idle_timer.precision; 3378 timeout = 0; 3379 3380 nxt_thread_mutex_lock(&app->mutex); 3381 3382 if (queued) { 3383 app->adjust_idle_work.data = NULL; 3384 } 3385 3386 while (app->idle_processes > app->spare_processes) { 3387 3388 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 3389 3390 lnk = nxt_queue_first(&app->idle_ports); 3391 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 3392 3393 timeout = port->idle_start + app->idle_timeout; 3394 3395 if (timeout > threshold) { 3396 break; 3397 } 3398 3399 nxt_queue_remove(lnk); 3400 lnk->next = NULL; 3401 3402 nxt_queue_chk_remove(&port->app_link); 3403 3404 app->idle_processes--; 3405 app->processes--; 3406 port->app = NULL; 3407 3408 nxt_thread_mutex_unlock(&app->mutex); 3409 3410 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 3411 &app->name, port->pid); 3412 3413 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 3414 3415 nxt_port_use(task, port, -1); 3416 nxt_router_app_use(task, app, -1); 3417 3418 nxt_thread_mutex_lock(&app->mutex); 3419 } 3420 3421 nxt_thread_mutex_unlock(&app->mutex); 3422 3423 if (timeout > threshold) { 3424 nxt_timer_add(engine, &app->idle_timer, timeout - threshold); 3425 3426 } else { 3427 nxt_timer_disable(engine, &app->idle_timer); 3428 } 3429 3430 if (queued) { 3431 nxt_router_app_use(task, app, -1); 3432 } 3433 } 3434 3435 3436 static void 3437 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 3438 { 3439 nxt_app_t *app; 3440 nxt_timer_t *timer; 3441 3442 timer = obj; 3443 app = nxt_container_of(timer, nxt_app_t, idle_timer); 3444 3445 nxt_router_adjust_idle_timer(task, app, NULL); 3446 } 3447 3448 3449 static void 3450 nxt_router_app_release_handler(nxt_task_t *task, void *obj, void *data) 3451 { 3452 nxt_app_t *app; 3453 nxt_timer_t *timer; 3454 3455 timer = obj; 3456 app = nxt_container_of(timer, nxt_app_t, idle_timer); 3457 3458 nxt_router_app_use(task, app, -1); 3459 } 3460 3461 3462 static void 3463 nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) 3464 { 3465 nxt_app_t *app; 3466 nxt_bool_t can_start_process; 3467 nxt_req_app_link_t *ra; 3468 3469 ra = state->ra; 3470 app = state->app; 3471 3472 state->failed_port_use_delta = 0; 3473 3474 if (nxt_queue_chk_remove(&ra->link_app_requests)) 3475 { 3476 nxt_router_ra_dec_use(ra); 3477 } 3478 3479 if (nxt_queue_chk_remove(&ra->link_port_pending)) 3480 { 3481 nxt_assert(ra->link_app_pending.next != NULL); 3482 3483 nxt_queue_remove(&ra->link_app_pending); 3484 ra->link_app_pending.next = NULL; 3485 3486 nxt_router_ra_dec_use(ra); 3487 } 3488 3489 state->failed_port = ra->app_port; 3490 3491 if (ra->app_port != NULL) { 3492 state->failed_port_use_delta--; 3493 3494 state->failed_port->app_pending_responses--; 3495 3496 if (nxt_queue_chk_remove(&state->failed_port->app_link)) { 3497 state->failed_port_use_delta--; 3498 } 3499 3500 ra->app_port = NULL; 3501 } 3502 3503 can_start_process = nxt_router_app_can_start(app); 3504 3505 state->port = NULL; 3506 state->start_process = 0; 3507 3508 if (nxt_queue_is_empty(&app->ports) 3509 || (can_start_process && nxt_router_app_first_port_busy(app)) ) 3510 { 3511 ra = nxt_router_ra_create(task, ra); 3512 3513 if (nxt_slow_path(ra == NULL)) { 3514 goto fail; 3515 } 3516 3517 if (nxt_slow_path(state->failed_port != NULL)) { 3518 nxt_queue_insert_head(&app->requests, &ra->link_app_requests); 3519 3520 } else { 3521 nxt_queue_insert_tail(&app->requests, &ra->link_app_requests); 3522 } 3523 3524 nxt_router_ra_inc_use(ra); 3525 3526 nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream); 3527 3528 if (can_start_process) { 3529 app->pending_processes++; 3530 state->start_process = 1; 3531 } 3532 3533 } else { 3534 state->port = nxt_router_pop_first_port(app); 3535 3536 if (state->port->app_pending_responses > 1) { 3537 ra = nxt_router_ra_create(task, ra); 3538 3539 if (nxt_slow_path(ra == NULL)) { 3540 goto fail; 3541 } 3542 3543 ra->app_port = state->port; 3544 3545 nxt_router_ra_pending(task, app, ra); 3546 } 3547 3548 if (can_start_process && nxt_router_app_need_start(app)) { 3549 app->pending_processes++; 3550 state->start_process = 1; 3551 } 3552 } 3553 3554 fail: 3555 3556 state->shared_ra = ra; 3557 } 3558 3559 3560 static nxt_int_t 3561 nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) 3562 { 3563 nxt_int_t res; 3564 nxt_app_t *app; 3565 nxt_req_app_link_t *ra; 3566 3567 ra = state->shared_ra; 3568 app = state->app; 3569 3570 if (state->failed_port_use_delta != 0) { 3571 nxt_port_use(task, state->failed_port, state->failed_port_use_delta); 3572 } 3573 3574 if (nxt_slow_path(ra == NULL)) { 3575 if (state->port != NULL) { 3576 nxt_port_use(task, state->port, -1); 3577 } 3578 3579 nxt_router_ra_error(state->ra, 500, 3580 "Failed to allocate shared req<->app link"); 3581 nxt_router_ra_use(task, state->ra, -1); 3582 3583 return NXT_ERROR; 3584 } 3585 3586 if (state->port != NULL) { 3587 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); 3588 3589 ra->app_port = state->port; 3590 3591 if (state->start_process) { 3592 nxt_router_start_app_process(task, app); 3593 } 3594 3595 return NXT_OK; 3596 } 3597 3598 if (!state->start_process) { 3599 nxt_debug(task, "app '%V' %p too many running or pending processes", 3600 &app->name, app); 3601 3602 return NXT_AGAIN; 3603 } 3604 3605 res = nxt_router_start_app_process(task, app); 3606 3607 if (nxt_slow_path(res != NXT_OK)) { 3608 nxt_router_ra_error(ra, 500, "Failed to start app process"); 3609 nxt_router_ra_use(task, ra, -1); 3610 3611 return NXT_ERROR; 3612 } 3613 3614 return NXT_AGAIN; 3615 } 3616 3617 3618 static nxt_int_t 3619 nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) 3620 { 3621 nxt_port_select_state_t state; 3622 3623 state.ra = ra; 3624 state.app = app; 3625 3626 nxt_thread_mutex_lock(&app->mutex); 3627 3628 nxt_router_port_select(task, &state); 3629 3630 nxt_thread_mutex_unlock(&app->mutex); 3631 3632 return nxt_router_port_post_select(task, &state); 3633 } 3634 3635 3636 void 3637 nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar) 3638 { 3639 nxt_int_t res; 3640 nxt_app_t *app; 3641 nxt_port_t *port; 3642 nxt_event_engine_t *engine; 3643 nxt_http_request_t *r; 3644 nxt_req_app_link_t ra_local, *ra; 3645 nxt_req_conn_link_t *rc; 3646 3647 r = ar->request; 3648 app = r->socket_conf->application; 3649 3650 if (app == NULL) { 3651 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 3652 return; 3653 } 3654 3655 engine = task->thread->engine; 3656 3657 rc = nxt_port_rpc_register_handler_ex(task, engine->port, 3658 nxt_router_response_ready_handler, 3659 nxt_router_response_error_handler, 3660 sizeof(nxt_req_conn_link_t)); 3661 3662 if (nxt_slow_path(rc == NULL)) { 3663 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 3664 return; 3665 } 3666 3667 rc->stream = nxt_port_rpc_ex_stream(rc); 3668 rc->app = app; 3669 3670 nxt_router_app_use(task, app, 1); 3671 3672 rc->ap = ar; 3673 3674 ra = &ra_local; 3675 nxt_router_ra_init(task, ra, rc); 3676 3677 res = nxt_router_app_port(task, app, ra); 3678 3679 if (res != NXT_OK) { 3680 return; 3681 } 3682 3683 ra = rc->ra; 3684 port = ra->app_port; 3685 3686 nxt_assert(port != NULL); 3687 3688 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); 3689 3690 nxt_router_app_prepare_request(task, ra); 3691 } 3692 3693 3694 static void 3695 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) 3696 { 3697 } 3698 3699 3700 static void 3701 nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) 3702 { 3703 uint32_t request_failed; 3704 nxt_buf_t *b; 3705 nxt_int_t res; 3706 nxt_port_t *port, *c_port, *reply_port; 3707 nxt_app_wmsg_t wmsg; 3708 nxt_app_parse_ctx_t *ap; 3709 3710 nxt_assert(ra->app_port != NULL); 3711 3712 port = ra->app_port; 3713 reply_port = ra->reply_port; 3714 ap = ra->ap; 3715 3716 request_failed = 1; 3717 3718 c_port = nxt_process_connected_port_find(port->process, reply_port->pid, 3719 reply_port->id); 3720 if (nxt_slow_path(c_port != reply_port)) { 3721 res = nxt_port_send_port(task, port, reply_port, 0); 3722 3723 if (nxt_slow_path(res != NXT_OK)) { 3724 nxt_router_ra_error(ra, 500, 3725 "Failed to send reply port to application"); 3726 goto release_port; 3727 } 3728 3729 nxt_process_connected_port_add(port->process, reply_port); 3730 } 3731 3732 wmsg.port = port; 3733 wmsg.write = NULL; 3734 wmsg.buf = &wmsg.write; 3735 wmsg.stream = ra->stream; 3736 3737 res = port->app->prepare_msg(task, &ap->r, &wmsg); 3738 3739 if (nxt_slow_path(res != NXT_OK)) { 3740 nxt_router_ra_error(ra, 500, 3741 "Failed to prepare message for application"); 3742 goto release_port; 3743 } 3744 3745 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 3746 nxt_buf_used_size(wmsg.write), 3747 wmsg.port->socket.fd); 3748 3749 request_failed = 0; 3750 3751 ra->msg_info.buf = wmsg.write; 3752 ra->msg_info.completion_handler = wmsg.write->completion_handler; 3753 3754 for (b = wmsg.write; b != NULL; b = b->next) { 3755 b->completion_handler = nxt_router_dummy_buf_completion; 3756 } 3757 3758 res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking, 3759 ra->stream); 3760 if (nxt_slow_path(res != NXT_OK)) { 3761 nxt_router_ra_error(ra, 500, 3762 "Failed to get tracking area"); 3763 goto release_port; 3764 } 3765 3766 res = nxt_port_socket_twrite(task, wmsg.port, NXT_PORT_MSG_DATA, 3767 -1, ra->stream, reply_port->id, wmsg.write, 3768 &ra->msg_info.tracking); 3769 3770 if (nxt_slow_path(res != NXT_OK)) { 3771 nxt_router_ra_error(ra, 500, 3772 "Failed to send message to application"); 3773 goto release_port; 3774 } 3775 3776 release_port: 3777 3778 nxt_router_app_port_release(task, port, request_failed, 0); 3779 3780 nxt_router_ra_update_peer(task, ra); 3781 } 3782 3783 3784 static nxt_int_t 3785 nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 3786 nxt_app_wmsg_t *wmsg) 3787 { 3788 nxt_int_t rc; 3789 nxt_buf_t *b; 3790 nxt_http_field_t *field; 3791 nxt_app_request_header_t *h; 3792 3793 static const nxt_str_t prefix = nxt_string("HTTP_"); 3794 static const nxt_str_t eof = nxt_null_string; 3795 3796 h = &r->header; 3797 3798 #define RC(S) \ 3799 do { \ 3800 rc = (S); \ 3801 if (nxt_slow_path(rc != NXT_OK)) { \ 3802 goto fail; \ 3803 } \ 3804 } while(0) 3805 3806 #define NXT_WRITE(N) \ 3807 RC(nxt_app_msg_write_str(task, wmsg, N)) 3808 3809 /* TODO error handle, async mmap buffer assignment */ 3810 3811 NXT_WRITE(&h->method); 3812 NXT_WRITE(&h->target); 3813 3814 if (h->path.start == h->target.start) { 3815 NXT_WRITE(&eof); 3816 3817 } else { 3818 NXT_WRITE(&h->path); 3819 } 3820 3821 if (h->query.start != NULL) { 3822 RC(nxt_app_msg_write_size(task, wmsg, 3823 h->query.start - h->target.start + 1)); 3824 } else { 3825 RC(nxt_app_msg_write_size(task, wmsg, 0)); 3826 } 3827 3828 NXT_WRITE(&h->version); 3829 3830 NXT_WRITE(&r->remote); 3831 NXT_WRITE(&r->local); 3832 3833 NXT_WRITE(&h->host); 3834 NXT_WRITE(&h->content_type); 3835 NXT_WRITE(&h->content_length); 3836 3837 nxt_list_each(field, h->fields) { 3838 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, field->name, 3839 field->name_length)); 3840 RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length)); 3841 3842 } nxt_list_loop; 3843 3844 /* end-of-headers mark */ 3845 NXT_WRITE(&eof); 3846 3847 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 3848 3849 for (b = r->body.buf; b != NULL; b = b->next) { 3850 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3851 nxt_buf_mem_used_size(&b->mem))); 3852 } 3853 3854 #undef NXT_WRITE 3855 #undef RC 3856 3857 return NXT_OK; 3858 3859 fail: 3860 3861 return NXT_ERROR; 3862 } 3863 3864 3865 static nxt_int_t 3866 nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 3867 nxt_app_wmsg_t *wmsg) 3868 { 3869 nxt_int_t rc; 3870 nxt_buf_t *b; 3871 nxt_bool_t method_is_post; 3872 nxt_http_field_t *field; 3873 nxt_app_request_header_t *h; 3874 3875 static const nxt_str_t prefix = nxt_string("HTTP_"); 3876 static const nxt_str_t eof = nxt_null_string; 3877 3878 h = &r->header; 3879 3880 #define RC(S) \ 3881 do { \ 3882 rc = (S); \ 3883 if (nxt_slow_path(rc != NXT_OK)) { \ 3884 goto fail; \ 3885 } \ 3886 } while(0) 3887 3888 #define NXT_WRITE(N) \ 3889 RC(nxt_app_msg_write_str(task, wmsg, N)) 3890 3891 /* TODO error handle, async mmap buffer assignment */ 3892 3893 NXT_WRITE(&h->method); 3894 NXT_WRITE(&h->target); 3895 3896 if (h->path.start == h->target.start) { 3897 NXT_WRITE(&eof); 3898 3899 } else { 3900 NXT_WRITE(&h->path); 3901 } 3902 3903 if (h->query.start != NULL) { 3904 RC(nxt_app_msg_write_size(task, wmsg, 3905 h->query.start - h->target.start + 1)); 3906 } else { 3907 RC(nxt_app_msg_write_size(task, wmsg, 0)); 3908 } 3909 3910 NXT_WRITE(&h->version); 3911 3912 // PHP_SELF 3913 // SCRIPT_NAME 3914 // SCRIPT_FILENAME 3915 // DOCUMENT_ROOT 3916 3917 NXT_WRITE(&r->remote); 3918 NXT_WRITE(&r->local); 3919 3920 NXT_WRITE(&h->host); 3921 NXT_WRITE(&h->cookie); 3922 NXT_WRITE(&h->content_type); 3923 NXT_WRITE(&h->content_length); 3924 3925 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); 3926 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 3927 3928 method_is_post = h->method.length == 4 3929 && h->method.start[0] == 'P' 3930 && h->method.start[1] == 'O' 3931 && h->method.start[2] == 'S' 3932 && h->method.start[3] == 'T'; 3933 3934 if (method_is_post) { 3935 for (b = r->body.buf; b != NULL; b = b->next) { 3936 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3937 nxt_buf_mem_used_size(&b->mem))); 3938 } 3939 } 3940 3941 nxt_list_each(field, h->fields) { 3942 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, field->name, 3943 field->name_length)); 3944 RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length)); 3945 3946 } nxt_list_loop; 3947 3948 /* end-of-headers mark */ 3949 NXT_WRITE(&eof); 3950 3951 if (!method_is_post) { 3952 for (b = r->body.buf; b != NULL; b = b->next) { 3953 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 3954 nxt_buf_mem_used_size(&b->mem))); 3955 } 3956 } 3957 3958 #undef NXT_WRITE 3959 #undef RC 3960 3961 return NXT_OK; 3962 3963 fail: 3964 3965 return NXT_ERROR; 3966 } 3967 3968 3969 static nxt_int_t 3970 nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) 3971 { 3972 nxt_int_t rc; 3973 nxt_buf_t *b; 3974 nxt_http_field_t *field; 3975 nxt_app_request_header_t *h; 3976 3977 static const nxt_str_t eof = nxt_null_string; 3978 3979 h = &r->header; 3980 3981 #define RC(S) \ 3982 do { \ 3983 rc = (S); \ 3984 if (nxt_slow_path(rc != NXT_OK)) { \ 3985 goto fail; \ 3986 } \ 3987 } while(0) 3988 3989 #define NXT_WRITE(N) \ 3990 RC(nxt_app_msg_write_str(task, wmsg, N)) 3991 3992 /* TODO error handle, async mmap buffer assignment */ 3993 3994 NXT_WRITE(&h->method); 3995 NXT_WRITE(&h->target); 3996 3997 if (h->path.start == h->target.start) { 3998 NXT_WRITE(&eof); 3999 4000 } else { 4001 NXT_WRITE(&h->path); 4002 } 4003 4004 if (h->query.start != NULL) { 4005 RC(nxt_app_msg_write_size(task, wmsg, 4006 h->query.start - h->target.start + 1)); 4007 } else { 4008 RC(nxt_app_msg_write_size(task, wmsg, 0)); 4009 } 4010 4011 NXT_WRITE(&h->version); 4012 NXT_WRITE(&r->remote); 4013 4014 NXT_WRITE(&h->host); 4015 NXT_WRITE(&h->cookie); 4016 NXT_WRITE(&h->content_type); 4017 NXT_WRITE(&h->content_length); 4018 4019 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); 4020 4021 nxt_list_each(field, h->fields) { 4022 RC(nxt_app_msg_write(task, wmsg, field->name, field->name_length)); 4023 RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length)); 4024 4025 } nxt_list_loop; 4026 4027 /* end-of-headers mark */ 4028 NXT_WRITE(&eof); 4029 4030 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 4031 4032 for (b = r->body.buf; b != NULL; b = b->next) { 4033 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 4034 nxt_buf_mem_used_size(&b->mem))); 4035 } 4036 4037 #undef NXT_WRITE 4038 #undef RC 4039 4040 return NXT_OK; 4041 4042 fail: 4043 4044 return NXT_ERROR; 4045 } 4046 4047 4048 static nxt_int_t 4049 nxt_perl_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 4050 nxt_app_wmsg_t *wmsg) 4051 { 4052 nxt_int_t rc; 4053 nxt_str_t str; 4054 nxt_buf_t *b; 4055 nxt_http_field_t *field; 4056 nxt_app_request_header_t *h; 4057 4058 static const nxt_str_t prefix = nxt_string("HTTP_"); 4059 static const nxt_str_t eof = nxt_null_string; 4060 4061 h = &r->header; 4062 4063 #define RC(S) \ 4064 do { \ 4065 rc = (S); \ 4066 if (nxt_slow_path(rc != NXT_OK)) { \ 4067 goto fail; \ 4068 } \ 4069 } while(0) 4070 4071 #define NXT_WRITE(N) \ 4072 RC(nxt_app_msg_write_str(task, wmsg, N)) 4073 4074 /* TODO error handle, async mmap buffer assignment */ 4075 4076 NXT_WRITE(&h->method); 4077 NXT_WRITE(&h->target); 4078 4079 if (h->query.length) { 4080 str.start = h->target.start; 4081 str.length = (h->target.length - h->query.length) - 1; 4082 4083 RC(nxt_app_msg_write_str(task, wmsg, &str)); 4084 4085 } else { 4086 NXT_WRITE(&eof); 4087 } 4088 4089 if (h->query.start != NULL) { 4090 RC(nxt_app_msg_write_size(task, wmsg, 4091 h->query.start - h->target.start + 1)); 4092 } else { 4093 RC(nxt_app_msg_write_size(task, wmsg, 0)); 4094 } 4095 4096 NXT_WRITE(&h->version); 4097 4098 NXT_WRITE(&r->remote); 4099 NXT_WRITE(&r->local); 4100 4101 NXT_WRITE(&h->host); 4102 NXT_WRITE(&h->content_type); 4103 NXT_WRITE(&h->content_length); 4104 4105 nxt_list_each(field, h->fields) { 4106 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, 4107 field->name, field->name_length)); 4108 RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length)); 4109 } nxt_list_loop; 4110 4111 /* end-of-headers mark */ 4112 NXT_WRITE(&eof); 4113 4114 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 4115 4116 for (b = r->body.buf; b != NULL; b = b->next) { 4117 4118 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 4119 nxt_buf_mem_used_size(&b->mem))); 4120 } 4121 4122 #undef NXT_WRITE 4123 #undef RC 4124 4125 return NXT_OK; 4126 4127 fail: 4128 4129 return NXT_ERROR; 4130 } 4131 4132 4133 static nxt_int_t 4134 nxt_ruby_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 4135 nxt_app_wmsg_t *wmsg) 4136 { 4137 nxt_int_t rc; 4138 nxt_str_t str; 4139 nxt_buf_t *b; 4140 nxt_http_field_t *field; 4141 nxt_app_request_header_t *h; 4142 4143 static const nxt_str_t prefix = nxt_string("HTTP_"); 4144 static const nxt_str_t eof = nxt_null_string; 4145 4146 h = &r->header; 4147 4148 #define RC(S) \ 4149 do { \ 4150 rc = (S); \ 4151 if (nxt_slow_path(rc != NXT_OK)) { \ 4152 goto fail; \ 4153 } \ 4154 } while(0) 4155 4156 #define NXT_WRITE(N) \ 4157 RC(nxt_app_msg_write_str(task, wmsg, N)) 4158 4159 /* TODO error handle, async mmap buffer assignment */ 4160 4161 NXT_WRITE(&h->method); 4162 NXT_WRITE(&h->target); 4163 4164 if (h->query.length) { 4165 str.start = h->target.start; 4166 str.length = (h->target.length - h->query.length) - 1; 4167 4168 RC(nxt_app_msg_write_str(task, wmsg, &str)); 4169 4170 } else { 4171 NXT_WRITE(&eof); 4172 } 4173 4174 if (h->query.start != NULL) { 4175 RC(nxt_app_msg_write_size(task, wmsg, 4176 h->query.start - h->target.start + 1)); 4177 } else { 4178 RC(nxt_app_msg_write_size(task, wmsg, 0)); 4179 } 4180 4181 NXT_WRITE(&h->version); 4182 4183 NXT_WRITE(&r->remote); 4184 NXT_WRITE(&r->local); 4185 4186 NXT_WRITE(&h->host); 4187 NXT_WRITE(&h->content_type); 4188 NXT_WRITE(&h->content_length); 4189 4190 nxt_list_each(field, h->fields) { 4191 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, 4192 field->name, field->name_length)); 4193 RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length)); 4194 } nxt_list_loop; 4195 4196 /* end-of-headers mark */ 4197 NXT_WRITE(&eof); 4198 4199 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); 4200 4201 for (b = r->body.buf; b != NULL; b = b->next) { 4202 4203 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, 4204 nxt_buf_mem_used_size(&b->mem))); 4205 } 4206 4207 #undef NXT_WRITE 4208 #undef RC 4209 4210 return NXT_OK; 4211 4212 fail: 4213 4214 return NXT_ERROR; 4215 } 4216 4217 4218 const nxt_conn_state_t nxt_router_conn_close_state 4219 nxt_aligned(64) = 4220 { 4221 .ready_handler = nxt_router_conn_free, 4222 }; 4223 4224 4225 static void 4226 nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data) 4227 { 4228 nxt_socket_conf_joint_t *joint; 4229 4230 joint = obj; 4231 4232 nxt_router_conf_release(task, joint); 4233 } 4234 4235 4236 static void 4237 nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) 4238 { 4239 nxt_conn_t *c; 4240 nxt_event_engine_t *engine; 4241 nxt_socket_conf_joint_t *joint; 4242 4243 c = obj; 4244 4245 nxt_debug(task, "router conn close done"); 4246 4247 nxt_queue_remove(&c->link); 4248 4249 engine = task->thread->engine; 4250 4251 nxt_sockaddr_cache_free(engine, c); 4252 4253 joint = c->joint; 4254 4255 nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup, 4256 &engine->task, joint, NULL); 4257 4258 nxt_conn_free(task, c); 4259 } 4260 4261 4262 static void 4263 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 4264 { 4265 nxt_timer_t *timer; 4266 nxt_app_parse_ctx_t *ar; 4267 4268 timer = obj; 4269 4270 nxt_debug(task, "router app timeout"); 4271 4272 ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer); 4273 4274 if (!ar->request->header_sent) { 4275 nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE); 4276 } 4277 } 4278