1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 18 19 typedef struct { 20 nxt_str_t type; 21 uint32_t processes; 22 uint32_t max_processes; 23 uint32_t spare_processes; 24 nxt_msec_t timeout; 25 nxt_msec_t res_timeout; 26 nxt_msec_t idle_timeout; 27 uint32_t requests; 28 nxt_conf_value_t *limits_value; 29 nxt_conf_value_t *processes_value; 30 } nxt_router_app_conf_t; 31 32 33 typedef struct { 34 nxt_str_t pass; 35 nxt_str_t application; 36 } nxt_router_listener_conf_t; 37 38 39 #if (NXT_TLS) 40 41 typedef struct { 42 nxt_str_t name; 43 nxt_socket_conf_t *conf; 44 45 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 46 } nxt_router_tlssock_t; 47 48 #endif 49 50 51 typedef struct nxt_msg_info_s { 52 nxt_buf_t *buf; 53 nxt_port_mmap_tracking_t tracking; 54 nxt_work_handler_t completion_handler; 55 } nxt_msg_info_t; 56 57 58 typedef struct nxt_req_app_link_s nxt_req_app_link_t; 59 60 61 typedef struct { 62 uint32_t stream; 63 nxt_app_t *app; 64 nxt_port_t *app_port; 65 nxt_app_parse_ctx_t *ap; 66 nxt_msg_info_t msg_info; 67 nxt_req_app_link_t *ra; 68 69 nxt_queue_link_t link; /* for nxt_conn_t.requests */ 70 } nxt_req_conn_link_t; 71 72 73 struct nxt_req_app_link_s { 74 uint32_t stream; 75 nxt_atomic_t use_count; 76 nxt_port_t *app_port; 77 nxt_port_t *reply_port; 78 nxt_app_parse_ctx_t *ap; 79 nxt_msg_info_t msg_info; 80 nxt_req_conn_link_t *rc; 81 82 nxt_nsec_t res_time; 83 84 nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ 85 nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */ 86 nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ 87 88 nxt_mp_t *mem_pool; 89 nxt_work_t work; 90 91 int err_code; 92 const char *err_str; 93 }; 94 95 96 typedef struct { 97 nxt_socket_conf_t *socket_conf; 98 nxt_router_temp_conf_t *temp_conf; 99 } nxt_socket_rpc_t; 100 101 102 typedef struct { 103 nxt_app_t *app; 104 nxt_router_temp_conf_t *temp_conf; 105 } nxt_app_rpc_t; 106 107 108 struct nxt_port_select_state_s { 109 nxt_app_t *app; 110 nxt_req_app_link_t *ra; 111 112 nxt_port_t *failed_port; 113 int failed_port_use_delta; 114 115 uint8_t start_process; /* 1 bit */ 116 nxt_req_app_link_t *shared_ra; 117 nxt_port_t *port; 118 }; 119 120 typedef struct nxt_port_select_state_s nxt_port_select_state_t; 121 122 static void nxt_router_greet_controller(nxt_task_t *task, 123 nxt_port_t *controller_port); 124 125 static void nxt_router_port_select(nxt_task_t *task, 126 nxt_port_select_state_t *state); 127 128 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, 129 nxt_port_select_state_t *state); 130 131 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 132 133 nxt_inline void 134 nxt_router_ra_inc_use(nxt_req_app_link_t *ra) 135 { 136 nxt_atomic_fetch_add(&ra->use_count, 1); 137 } 138 139 nxt_inline void 140 nxt_router_ra_dec_use(nxt_req_app_link_t *ra) 141 { 142 #if (NXT_DEBUG) 143 int c; 144 145 c = nxt_atomic_fetch_add(&ra->use_count, -1); 146 147 nxt_assert(c > 1); 148 #else 149 (void) nxt_atomic_fetch_add(&ra->use_count, -1); 150 #endif 151 } 152 153 static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i); 154 155 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 156 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 157 static void nxt_router_conf_ready(nxt_task_t *task, 158 nxt_router_temp_conf_t *tmcf); 159 static void nxt_router_conf_error(nxt_task_t *task, 160 nxt_router_temp_conf_t *tmcf); 161 static void nxt_router_conf_send(nxt_task_t *task, 162 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 163 164 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 165 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 166 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 167 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 168 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 169 static void nxt_router_listen_socket_ready(nxt_task_t *task, 170 nxt_port_recv_msg_t *msg, void *data); 171 static void nxt_router_listen_socket_error(nxt_task_t *task, 172 nxt_port_recv_msg_t *msg, void *data); 173 #if (NXT_TLS) 174 static void nxt_router_tls_rpc_create(nxt_task_t *task, 175 nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls); 176 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 177 nxt_port_recv_msg_t *msg, void *data); 178 #endif 179 static void nxt_router_app_rpc_create(nxt_task_t *task, 180 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 181 static void nxt_router_app_prefork_ready(nxt_task_t *task, 182 nxt_port_recv_msg_t *msg, void *data); 183 static void nxt_router_app_prefork_error(nxt_task_t *task, 184 nxt_port_recv_msg_t *msg, void *data); 185 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 186 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 187 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 188 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 189 190 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 191 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 192 const nxt_event_interface_t *interface); 193 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 194 nxt_router_engine_conf_t *recf); 195 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 196 nxt_router_engine_conf_t *recf); 197 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 198 nxt_router_engine_conf_t *recf); 199 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 200 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 201 nxt_work_handler_t handler); 202 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 203 nxt_router_engine_conf_t *recf); 204 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 205 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 206 207 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 208 nxt_router_temp_conf_t *tmcf); 209 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 210 nxt_event_engine_t *engine); 211 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 212 nxt_router_temp_conf_t *tmcf); 213 214 static void nxt_router_engines_post(nxt_router_t *router, 215 nxt_router_temp_conf_t *tmcf); 216 static void nxt_router_engine_post(nxt_event_engine_t *engine, 217 nxt_work_t *jobs); 218 219 static void nxt_router_thread_start(void *data); 220 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 221 void *data); 222 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 223 void *data); 224 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 225 void *data); 226 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 227 void *data); 228 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 229 void *data); 230 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 231 void *data); 232 static void nxt_router_listen_socket_release(nxt_task_t *task, 233 nxt_socket_conf_t *skcf); 234 235 static void nxt_router_access_log_writer(nxt_task_t *task, 236 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 237 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 238 struct tm *tm, size_t size, const char *format); 239 static void nxt_router_access_log_open(nxt_task_t *task, 240 nxt_router_temp_conf_t *tmcf); 241 static void nxt_router_access_log_ready(nxt_task_t *task, 242 nxt_port_recv_msg_t *msg, void *data); 243 static void nxt_router_access_log_error(nxt_task_t *task, 244 nxt_port_recv_msg_t *msg, void *data); 245 static void nxt_router_access_log_release(nxt_task_t *task, 246 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 247 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 248 void *data); 249 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 250 nxt_port_recv_msg_t *msg, void *data); 251 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 252 nxt_port_recv_msg_t *msg, void *data); 253 254 static void nxt_router_app_port_ready(nxt_task_t *task, 255 nxt_port_recv_msg_t *msg, void *data); 256 static void nxt_router_app_port_error(nxt_task_t *task, 257 nxt_port_recv_msg_t *msg, void *data); 258 259 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 260 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 261 uint32_t request_failed, uint32_t got_response); 262 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, 263 nxt_req_app_link_t *ra); 264 265 static void nxt_router_app_prepare_request(nxt_task_t *task, 266 nxt_req_app_link_t *ra); 267 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 268 nxt_port_t *port, const nxt_str_t *prefix); 269 270 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 271 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 272 void *data); 273 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 274 void *data); 275 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 276 void *data); 277 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 278 279 static const nxt_http_request_state_t nxt_http_request_send_state; 280 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 281 282 static void nxt_router_app_joint_use(nxt_task_t *task, 283 nxt_app_joint_t *app_joint, int i); 284 285 static nxt_router_t *nxt_router; 286 287 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 288 static const nxt_str_t empty_prefix = nxt_string(""); 289 290 static const nxt_str_t *nxt_app_msg_prefix[] = { 291 &empty_prefix, 292 &http_prefix, 293 &http_prefix, 294 &http_prefix, 295 &http_prefix, 296 }; 297 298 299 nxt_port_handlers_t nxt_router_process_port_handlers = { 300 .quit = nxt_worker_process_quit_handler, 301 .new_port = nxt_router_new_port_handler, 302 .change_file = nxt_port_change_log_file_handler, 303 .mmap = nxt_port_mmap_handler, 304 .data = nxt_router_conf_data_handler, 305 .remove_pid = nxt_router_remove_pid_handler, 306 .access_log = nxt_router_access_log_reopen_handler, 307 .rpc_ready = nxt_port_rpc_handler, 308 .rpc_error = nxt_port_rpc_handler, 309 }; 310 311 312 nxt_int_t 313 nxt_router_start(nxt_task_t *task, void *data) 314 { 315 nxt_int_t ret; 316 nxt_port_t *controller_port; 317 nxt_router_t *router; 318 nxt_runtime_t *rt; 319 320 rt = task->thread->runtime; 321 322 #if (NXT_TLS) 323 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 324 if (nxt_slow_path(rt->tls == NULL)) { 325 return NXT_ERROR; 326 } 327 328 ret = rt->tls->library_init(task); 329 if (nxt_slow_path(ret != NXT_OK)) { 330 return ret; 331 } 332 #endif 333 334 ret = nxt_http_init(task, rt); 335 if (nxt_slow_path(ret != NXT_OK)) { 336 return ret; 337 } 338 339 router = nxt_zalloc(sizeof(nxt_router_t)); 340 if (nxt_slow_path(router == NULL)) { 341 return NXT_ERROR; 342 } 343 344 nxt_queue_init(&router->engines); 345 nxt_queue_init(&router->sockets); 346 nxt_queue_init(&router->apps); 347 348 nxt_router = router; 349 350 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 351 if (controller_port != NULL) { 352 nxt_router_greet_controller(task, controller_port); 353 } 354 355 return NXT_OK; 356 } 357 358 359 static void 360 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 361 { 362 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 363 -1, 0, 0, NULL); 364 } 365 366 367 static void 368 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 369 void *data) 370 { 371 size_t size; 372 uint32_t stream; 373 nxt_mp_t *mp; 374 nxt_int_t ret; 375 nxt_app_t *app; 376 nxt_buf_t *b; 377 nxt_port_t *main_port; 378 nxt_runtime_t *rt; 379 380 app = data; 381 382 rt = task->thread->runtime; 383 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 384 385 nxt_debug(task, "app '%V' %p start process", &app->name, app); 386 387 size = app->name.length + 1 + app->conf.length; 388 389 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 390 391 if (nxt_slow_path(b == NULL)) { 392 goto failed; 393 } 394 395 nxt_buf_cpystr(b, &app->name); 396 *b->mem.free++ = '\0'; 397 nxt_buf_cpystr(b, &app->conf); 398 399 nxt_router_app_joint_use(task, app->joint, 1); 400 401 stream = nxt_port_rpc_register_handler(task, port, 402 nxt_router_app_port_ready, 403 nxt_router_app_port_error, 404 -1, app->joint); 405 406 if (nxt_slow_path(stream == 0)) { 407 nxt_router_app_joint_use(task, app->joint, -1); 408 409 goto failed; 410 } 411 412 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 413 stream, port->id, b); 414 415 if (nxt_slow_path(ret != NXT_OK)) { 416 nxt_port_rpc_cancel(task, port, stream); 417 418 nxt_router_app_joint_use(task, app->joint, -1); 419 420 goto failed; 421 } 422 423 nxt_router_app_use(task, app, -1); 424 425 return; 426 427 failed: 428 429 if (b != NULL) { 430 mp = b->data; 431 nxt_mp_free(mp, b); 432 nxt_mp_release(mp); 433 } 434 435 nxt_thread_mutex_lock(&app->mutex); 436 437 app->pending_processes--; 438 439 nxt_thread_mutex_unlock(&app->mutex); 440 441 nxt_router_app_use(task, app, -1); 442 } 443 444 445 static void 446 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 447 { 448 app_joint->use_count += i; 449 450 if (app_joint->use_count == 0) { 451 nxt_assert(app_joint->app == NULL); 452 453 nxt_free(app_joint); 454 } 455 } 456 457 458 static nxt_int_t 459 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 460 { 461 nxt_int_t res; 462 nxt_port_t *router_port; 463 nxt_runtime_t *rt; 464 465 rt = task->thread->runtime; 466 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 467 468 nxt_router_app_use(task, app, 1); 469 470 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 471 app); 472 473 if (res == NXT_OK) { 474 return res; 475 } 476 477 nxt_thread_mutex_lock(&app->mutex); 478 479 app->pending_processes--; 480 481 nxt_thread_mutex_unlock(&app->mutex); 482 483 nxt_router_app_use(task, app, -1); 484 485 return NXT_ERROR; 486 } 487 488 489 nxt_inline void 490 nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra, 491 nxt_req_conn_link_t *rc) 492 { 493 nxt_event_engine_t *engine; 494 495 engine = task->thread->engine; 496 497 nxt_memzero(ra, sizeof(nxt_req_app_link_t)); 498 499 ra->stream = rc->stream; 500 ra->use_count = 1; 501 ra->rc = rc; 502 rc->ra = ra; 503 ra->reply_port = engine->port; 504 ra->ap = rc->ap; 505 506 ra->work.handler = NULL; 507 ra->work.task = &engine->task; 508 ra->work.obj = ra; 509 ra->work.data = engine; 510 } 511 512 513 nxt_inline nxt_req_app_link_t * 514 nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) 515 { 516 nxt_mp_t *mp; 517 nxt_req_app_link_t *ra; 518 519 if (ra_src->mem_pool != NULL) { 520 return ra_src; 521 } 522 523 mp = ra_src->ap->mem_pool; 524 525 ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t)); 526 527 if (nxt_slow_path(ra == NULL)) { 528 529 ra_src->rc->ra = NULL; 530 ra_src->rc = NULL; 531 532 return NULL; 533 } 534 535 nxt_mp_retain(mp); 536 537 nxt_router_ra_init(task, ra, ra_src->rc); 538 539 ra->mem_pool = mp; 540 541 return ra; 542 } 543 544 545 nxt_inline nxt_bool_t 546 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, 547 uint32_t stream) 548 { 549 nxt_buf_t *b, *next; 550 nxt_bool_t cancelled; 551 552 if (msg_info->buf == NULL) { 553 return 0; 554 } 555 556 cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, 557 stream); 558 559 if (cancelled) { 560 nxt_debug(task, "stream #%uD: cancelled by router", stream); 561 } 562 563 for (b = msg_info->buf; b != NULL; b = next) { 564 next = b->next; 565 566 b->completion_handler = msg_info->completion_handler; 567 568 if (b->is_port_mmap_sent) { 569 b->is_port_mmap_sent = cancelled == 0; 570 b->completion_handler(task, b, b->parent); 571 } 572 } 573 574 msg_info->buf = NULL; 575 576 return cancelled; 577 } 578 579 580 static void 581 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra); 582 583 584 static void 585 nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data) 586 { 587 nxt_req_app_link_t *ra; 588 589 ra = obj; 590 591 nxt_router_ra_update_peer(task, ra); 592 593 nxt_router_ra_use(task, ra, -1); 594 } 595 596 597 static void 598 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra) 599 { 600 nxt_event_engine_t *engine; 601 nxt_req_conn_link_t *rc; 602 603 engine = ra->work.data; 604 605 if (task->thread->engine != engine) { 606 nxt_router_ra_inc_use(ra); 607 608 ra->work.handler = nxt_router_ra_update_peer_handler; 609 ra->work.task = &engine->task; 610 ra->work.next = NULL; 611 612 nxt_debug(task, "ra stream #%uD post update peer to %p", 613 ra->stream, engine); 614 615 nxt_event_engine_post(engine, &ra->work); 616 617 return; 618 } 619 620 nxt_debug(task, "ra stream #%uD update peer", ra->stream); 621 622 rc = ra->rc; 623 624 if (rc != NULL && ra->app_port != NULL) { 625 nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid); 626 } 627 628 nxt_router_ra_use(task, ra, -1); 629 } 630 631 632 static void 633 nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) 634 { 635 nxt_mp_t *mp; 636 nxt_req_conn_link_t *rc; 637 638 nxt_assert(task->thread->engine == ra->work.data); 639 nxt_assert(ra->use_count == 0); 640 641 nxt_debug(task, "ra stream #%uD release", ra->stream); 642 643 rc = ra->rc; 644 645 if (rc != NULL) { 646 if (nxt_slow_path(ra->err_code != 0)) { 647 nxt_http_request_error(task, rc->ap->request, ra->err_code); 648 649 } else { 650 rc->app_port = ra->app_port; 651 rc->msg_info = ra->msg_info; 652 653 if (rc->app->timeout != 0) { 654 rc->ap->timer.handler = nxt_router_app_timeout; 655 rc->ap->timer_data = rc; 656 nxt_timer_add(task->thread->engine, &rc->ap->timer, 657 rc->app->timeout); 658 } 659 660 ra->app_port = NULL; 661 ra->msg_info.buf = NULL; 662 } 663 664 rc->ra = NULL; 665 ra->rc = NULL; 666 } 667 668 if (ra->app_port != NULL) { 669 nxt_router_app_port_release(task, ra->app_port, 0, 1); 670 671 ra->app_port = NULL; 672 } 673 674 nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); 675 676 mp = ra->mem_pool; 677 678 if (mp != NULL) { 679 nxt_mp_free(mp, ra); 680 nxt_mp_release(mp); 681 } 682 } 683 684 685 static void 686 nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data) 687 { 688 nxt_req_app_link_t *ra; 689 690 ra = obj; 691 692 nxt_assert(ra->work.data == data); 693 694 nxt_atomic_fetch_add(&ra->use_count, -1); 695 696 nxt_router_ra_release(task, ra); 697 } 698 699 700 static void 701 nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i) 702 { 703 int c; 704 nxt_event_engine_t *engine; 705 706 c = nxt_atomic_fetch_add(&ra->use_count, i); 707 708 if (i < 0 && c == -i) { 709 engine = ra->work.data; 710 711 if (task->thread->engine == engine) { 712 nxt_router_ra_release(task, ra); 713 714 return; 715 } 716 717 nxt_router_ra_inc_use(ra); 718 719 ra->work.handler = nxt_router_ra_release_handler; 720 ra->work.task = &engine->task; 721 ra->work.next = NULL; 722 723 nxt_debug(task, "ra stream #%uD post release to %p", 724 ra->stream, engine); 725 726 nxt_event_engine_post(engine, &ra->work); 727 } 728 } 729 730 731 nxt_inline void 732 nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str) 733 { 734 ra->app_port = NULL; 735 ra->err_code = code; 736 ra->err_str = str; 737 } 738 739 740 nxt_inline void 741 nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) 742 { 743 nxt_queue_insert_tail(&ra->app_port->pending_requests, 744 &ra->link_port_pending); 745 nxt_queue_insert_tail(&app->pending, &ra->link_app_pending); 746 747 nxt_router_ra_inc_use(ra); 748 749 ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout; 750 751 nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream); 752 } 753 754 755 nxt_inline nxt_bool_t 756 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 757 { 758 if (lnk->next != NULL) { 759 nxt_queue_remove(lnk); 760 761 lnk->next = NULL; 762 763 return 1; 764 } 765 766 return 0; 767 } 768 769 770 nxt_inline void 771 nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) 772 { 773 int ra_use_delta; 774 nxt_req_app_link_t *ra; 775 776 if (rc->app_port != NULL) { 777 nxt_router_app_port_release(task, rc->app_port, 0, 1); 778 779 rc->app_port = NULL; 780 } 781 782 nxt_router_msg_cancel(task, &rc->msg_info, rc->stream); 783 784 ra = rc->ra; 785 786 if (ra != NULL) { 787 rc->ra = NULL; 788 ra->rc = NULL; 789 790 ra_use_delta = 0; 791 792 nxt_thread_mutex_lock(&rc->app->mutex); 793 794 if (ra->link_app_requests.next == NULL 795 && ra->link_port_pending.next == NULL 796 && ra->link_app_pending.next == NULL) 797 { 798 ra = NULL; 799 800 } else { 801 ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests); 802 ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending); 803 nxt_queue_chk_remove(&ra->link_app_pending); 804 } 805 806 nxt_thread_mutex_unlock(&rc->app->mutex); 807 808 if (ra != NULL) { 809 nxt_router_ra_use(task, ra, ra_use_delta); 810 } 811 } 812 813 if (rc->app != NULL) { 814 nxt_router_app_use(task, rc->app, -1); 815 816 rc->app = NULL; 817 } 818 819 if (rc->ap != NULL) { 820 rc->ap->timer_data = NULL; 821 822 nxt_app_http_req_done(task, rc->ap); 823 824 rc->ap = NULL; 825 } 826 } 827 828 829 void 830 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 831 { 832 nxt_port_new_port_handler(task, msg); 833 834 if (msg->u.new_port != NULL 835 && msg->u.new_port->type == NXT_PROCESS_CONTROLLER) 836 { 837 nxt_router_greet_controller(task, msg->u.new_port); 838 } 839 840 if (msg->port_msg.stream == 0) { 841 return; 842 } 843 844 if (msg->u.new_port == NULL 845 || msg->u.new_port->type != NXT_PROCESS_WORKER) 846 { 847 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 848 } 849 850 nxt_port_rpc_handler(task, msg); 851 } 852 853 854 void 855 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 856 { 857 nxt_int_t ret; 858 nxt_buf_t *b; 859 nxt_router_temp_conf_t *tmcf; 860 861 tmcf = nxt_router_temp_conf(task); 862 if (nxt_slow_path(tmcf == NULL)) { 863 return; 864 } 865 866 nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s", 867 nxt_buf_used_size(msg->buf), 868 (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos); 869 870 tmcf->router_conf->router = nxt_router; 871 tmcf->stream = msg->port_msg.stream; 872 tmcf->port = nxt_runtime_port_find(task->thread->runtime, 873 msg->port_msg.pid, 874 msg->port_msg.reply_port); 875 876 if (nxt_slow_path(tmcf->port == NULL)) { 877 nxt_alert(task, "reply port not found"); 878 879 return; 880 } 881 882 nxt_port_use(task, tmcf->port, 1); 883 884 b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool, 885 msg->buf, msg->size); 886 if (nxt_slow_path(b == NULL)) { 887 nxt_router_conf_error(task, tmcf); 888 889 return; 890 } 891 892 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); 893 894 if (nxt_fast_path(ret == NXT_OK)) { 895 nxt_router_conf_apply(task, tmcf, NULL); 896 897 } else { 898 nxt_router_conf_error(task, tmcf); 899 } 900 } 901 902 903 static void 904 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 905 void *data) 906 { 907 union { 908 nxt_pid_t removed_pid; 909 void *data; 910 } u; 911 912 u.data = data; 913 914 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 915 } 916 917 918 void 919 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 920 { 921 nxt_event_engine_t *engine; 922 923 nxt_port_remove_pid_handler(task, msg); 924 925 if (msg->port_msg.stream == 0) { 926 return; 927 } 928 929 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 930 { 931 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 932 msg->u.data); 933 } 934 nxt_queue_loop; 935 936 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 937 938 nxt_port_rpc_handler(task, msg); 939 } 940 941 942 static nxt_router_temp_conf_t * 943 nxt_router_temp_conf(nxt_task_t *task) 944 { 945 nxt_mp_t *mp, *tmp; 946 nxt_router_conf_t *rtcf; 947 nxt_router_temp_conf_t *tmcf; 948 949 mp = nxt_mp_create(1024, 128, 256, 32); 950 if (nxt_slow_path(mp == NULL)) { 951 return NULL; 952 } 953 954 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 955 if (nxt_slow_path(rtcf == NULL)) { 956 goto fail; 957 } 958 959 rtcf->mem_pool = mp; 960 961 tmp = nxt_mp_create(1024, 128, 256, 32); 962 if (nxt_slow_path(tmp == NULL)) { 963 goto fail; 964 } 965 966 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 967 if (nxt_slow_path(tmcf == NULL)) { 968 goto temp_fail; 969 } 970 971 tmcf->mem_pool = tmp; 972 tmcf->router_conf = rtcf; 973 tmcf->count = 1; 974 tmcf->engine = task->thread->engine; 975 976 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 977 sizeof(nxt_router_engine_conf_t)); 978 if (nxt_slow_path(tmcf->engines == NULL)) { 979 goto temp_fail; 980 } 981 982 nxt_queue_init(&tmcf->deleting); 983 nxt_queue_init(&tmcf->keeping); 984 nxt_queue_init(&tmcf->updating); 985 nxt_queue_init(&tmcf->pending); 986 nxt_queue_init(&tmcf->creating); 987 988 #if (NXT_TLS) 989 nxt_queue_init(&tmcf->tls); 990 #endif 991 992 nxt_queue_init(&tmcf->apps); 993 nxt_queue_init(&tmcf->previous); 994 995 return tmcf; 996 997 temp_fail: 998 999 nxt_mp_destroy(tmp); 1000 1001 fail: 1002 1003 nxt_mp_destroy(mp); 1004 1005 return NULL; 1006 } 1007 1008 1009 nxt_inline nxt_bool_t 1010 nxt_router_app_can_start(nxt_app_t *app) 1011 { 1012 return app->processes + app->pending_processes < app->max_processes 1013 && app->pending_processes < app->max_pending_processes; 1014 } 1015 1016 1017 nxt_inline nxt_bool_t 1018 nxt_router_app_need_start(nxt_app_t *app) 1019 { 1020 return app->idle_processes + app->pending_processes 1021 < app->spare_processes; 1022 } 1023 1024 1025 static void 1026 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1027 { 1028 nxt_int_t ret; 1029 nxt_app_t *app; 1030 nxt_router_t *router; 1031 nxt_runtime_t *rt; 1032 nxt_queue_link_t *qlk; 1033 nxt_socket_conf_t *skcf; 1034 nxt_router_conf_t *rtcf; 1035 nxt_router_temp_conf_t *tmcf; 1036 const nxt_event_interface_t *interface; 1037 #if (NXT_TLS) 1038 nxt_router_tlssock_t *tls; 1039 #endif 1040 1041 tmcf = obj; 1042 1043 qlk = nxt_queue_first(&tmcf->pending); 1044 1045 if (qlk != nxt_queue_tail(&tmcf->pending)) { 1046 nxt_queue_remove(qlk); 1047 nxt_queue_insert_tail(&tmcf->creating, qlk); 1048 1049 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1050 1051 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1052 1053 return; 1054 } 1055 1056 #if (NXT_TLS) 1057 qlk = nxt_queue_first(&tmcf->tls); 1058 1059 if (qlk != nxt_queue_tail(&tmcf->tls)) { 1060 nxt_queue_remove(qlk); 1061 1062 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1063 1064 nxt_router_tls_rpc_create(task, tmcf, tls); 1065 return; 1066 } 1067 #endif 1068 1069 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1070 1071 if (nxt_router_app_need_start(app)) { 1072 nxt_router_app_rpc_create(task, tmcf, app); 1073 return; 1074 } 1075 1076 } nxt_queue_loop; 1077 1078 rtcf = tmcf->router_conf; 1079 1080 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1081 nxt_router_access_log_open(task, tmcf); 1082 return; 1083 } 1084 1085 rt = task->thread->runtime; 1086 1087 interface = nxt_service_get(rt->services, "engine", NULL); 1088 1089 router = rtcf->router; 1090 1091 ret = nxt_router_engines_create(task, router, tmcf, interface); 1092 if (nxt_slow_path(ret != NXT_OK)) { 1093 goto fail; 1094 } 1095 1096 ret = nxt_router_threads_create(task, rt, tmcf); 1097 if (nxt_slow_path(ret != NXT_OK)) { 1098 goto fail; 1099 } 1100 1101 nxt_router_apps_sort(task, router, tmcf); 1102 1103 nxt_router_engines_post(router, tmcf); 1104 1105 nxt_queue_add(&router->sockets, &tmcf->updating); 1106 nxt_queue_add(&router->sockets, &tmcf->creating); 1107 1108 router->access_log = rtcf->access_log; 1109 1110 nxt_router_conf_ready(task, tmcf); 1111 1112 return; 1113 1114 fail: 1115 1116 nxt_router_conf_error(task, tmcf); 1117 1118 return; 1119 } 1120 1121 1122 static void 1123 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1124 { 1125 nxt_joint_job_t *job; 1126 1127 job = obj; 1128 1129 nxt_router_conf_ready(task, job->tmcf); 1130 } 1131 1132 1133 static void 1134 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1135 { 1136 nxt_debug(task, "temp conf count:%D", tmcf->count); 1137 1138 if (--tmcf->count == 0) { 1139 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1140 } 1141 } 1142 1143 1144 static void 1145 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1146 { 1147 nxt_app_t *app; 1148 nxt_queue_t new_socket_confs; 1149 nxt_socket_t s; 1150 nxt_router_t *router; 1151 nxt_queue_link_t *qlk; 1152 nxt_socket_conf_t *skcf; 1153 nxt_router_conf_t *rtcf; 1154 1155 nxt_alert(task, "failed to apply new conf"); 1156 1157 for (qlk = nxt_queue_first(&tmcf->creating); 1158 qlk != nxt_queue_tail(&tmcf->creating); 1159 qlk = nxt_queue_next(qlk)) 1160 { 1161 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1162 s = skcf->listen->socket; 1163 1164 if (s != -1) { 1165 nxt_socket_close(task, s); 1166 } 1167 1168 nxt_free(skcf->listen); 1169 } 1170 1171 nxt_queue_init(&new_socket_confs); 1172 nxt_queue_add(&new_socket_confs, &tmcf->updating); 1173 nxt_queue_add(&new_socket_confs, &tmcf->pending); 1174 nxt_queue_add(&new_socket_confs, &tmcf->creating); 1175 1176 rtcf = tmcf->router_conf; 1177 1178 nxt_http_routes_cleanup(task, rtcf->routes); 1179 1180 nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) { 1181 1182 if (skcf->pass != NULL) { 1183 nxt_http_pass_cleanup(task, skcf->pass); 1184 } 1185 1186 } nxt_queue_loop; 1187 1188 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1189 1190 nxt_router_app_unlink(task, app); 1191 1192 } nxt_queue_loop; 1193 1194 router = rtcf->router; 1195 1196 nxt_queue_add(&router->sockets, &tmcf->keeping); 1197 nxt_queue_add(&router->sockets, &tmcf->deleting); 1198 1199 nxt_queue_add(&router->apps, &tmcf->previous); 1200 1201 // TODO: new engines and threads 1202 1203 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1204 1205 nxt_mp_destroy(rtcf->mem_pool); 1206 1207 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1208 } 1209 1210 1211 static void 1212 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1213 nxt_port_msg_type_t type) 1214 { 1215 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1216 1217 nxt_port_use(task, tmcf->port, -1); 1218 1219 tmcf->port = NULL; 1220 } 1221 1222 1223 static nxt_conf_map_t nxt_router_conf[] = { 1224 { 1225 nxt_string("listeners_threads"), 1226 NXT_CONF_MAP_INT32, 1227 offsetof(nxt_router_conf_t, threads), 1228 }, 1229 }; 1230 1231 1232 static nxt_conf_map_t nxt_router_app_conf[] = { 1233 { 1234 nxt_string("type"), 1235 NXT_CONF_MAP_STR, 1236 offsetof(nxt_router_app_conf_t, type), 1237 }, 1238 1239 { 1240 nxt_string("limits"), 1241 NXT_CONF_MAP_PTR, 1242 offsetof(nxt_router_app_conf_t, limits_value), 1243 }, 1244 1245 { 1246 nxt_string("processes"), 1247 NXT_CONF_MAP_INT32, 1248 offsetof(nxt_router_app_conf_t, processes), 1249 }, 1250 1251 { 1252 nxt_string("processes"), 1253 NXT_CONF_MAP_PTR, 1254 offsetof(nxt_router_app_conf_t, processes_value), 1255 }, 1256 }; 1257 1258 1259 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1260 { 1261 nxt_string("timeout"), 1262 NXT_CONF_MAP_MSEC, 1263 offsetof(nxt_router_app_conf_t, timeout), 1264 }, 1265 1266 { 1267 nxt_string("reschedule_timeout"), 1268 NXT_CONF_MAP_MSEC, 1269 offsetof(nxt_router_app_conf_t, res_timeout), 1270 }, 1271 1272 { 1273 nxt_string("requests"), 1274 NXT_CONF_MAP_INT32, 1275 offsetof(nxt_router_app_conf_t, requests), 1276 }, 1277 }; 1278 1279 1280 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1281 { 1282 nxt_string("spare"), 1283 NXT_CONF_MAP_INT32, 1284 offsetof(nxt_router_app_conf_t, spare_processes), 1285 }, 1286 1287 { 1288 nxt_string("max"), 1289 NXT_CONF_MAP_INT32, 1290 offsetof(nxt_router_app_conf_t, max_processes), 1291 }, 1292 1293 { 1294 nxt_string("idle_timeout"), 1295 NXT_CONF_MAP_MSEC, 1296 offsetof(nxt_router_app_conf_t, idle_timeout), 1297 }, 1298 }; 1299 1300 1301 static nxt_conf_map_t nxt_router_listener_conf[] = { 1302 { 1303 nxt_string("pass"), 1304 NXT_CONF_MAP_STR_COPY, 1305 offsetof(nxt_router_listener_conf_t, pass), 1306 }, 1307 1308 { 1309 nxt_string("application"), 1310 NXT_CONF_MAP_STR_COPY, 1311 offsetof(nxt_router_listener_conf_t, application), 1312 }, 1313 }; 1314 1315 1316 static nxt_conf_map_t nxt_router_http_conf[] = { 1317 { 1318 nxt_string("header_buffer_size"), 1319 NXT_CONF_MAP_SIZE, 1320 offsetof(nxt_socket_conf_t, header_buffer_size), 1321 }, 1322 1323 { 1324 nxt_string("large_header_buffer_size"), 1325 NXT_CONF_MAP_SIZE, 1326 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1327 }, 1328 1329 { 1330 nxt_string("large_header_buffers"), 1331 NXT_CONF_MAP_SIZE, 1332 offsetof(nxt_socket_conf_t, large_header_buffers), 1333 }, 1334 1335 { 1336 nxt_string("body_buffer_size"), 1337 NXT_CONF_MAP_SIZE, 1338 offsetof(nxt_socket_conf_t, body_buffer_size), 1339 }, 1340 1341 { 1342 nxt_string("max_body_size"), 1343 NXT_CONF_MAP_SIZE, 1344 offsetof(nxt_socket_conf_t, max_body_size), 1345 }, 1346 1347 { 1348 nxt_string("idle_timeout"), 1349 NXT_CONF_MAP_MSEC, 1350 offsetof(nxt_socket_conf_t, idle_timeout), 1351 }, 1352 1353 { 1354 nxt_string("header_read_timeout"), 1355 NXT_CONF_MAP_MSEC, 1356 offsetof(nxt_socket_conf_t, header_read_timeout), 1357 }, 1358 1359 { 1360 nxt_string("body_read_timeout"), 1361 NXT_CONF_MAP_MSEC, 1362 offsetof(nxt_socket_conf_t, body_read_timeout), 1363 }, 1364 1365 { 1366 nxt_string("send_timeout"), 1367 NXT_CONF_MAP_MSEC, 1368 offsetof(nxt_socket_conf_t, send_timeout), 1369 }, 1370 }; 1371 1372 1373 static nxt_int_t 1374 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1375 u_char *start, u_char *end) 1376 { 1377 u_char *p; 1378 size_t size; 1379 nxt_mp_t *mp; 1380 uint32_t next; 1381 nxt_int_t ret; 1382 nxt_str_t name, path; 1383 nxt_app_t *app, *prev; 1384 nxt_router_t *router; 1385 nxt_app_joint_t *app_joint; 1386 nxt_conf_value_t *conf, *http, *value; 1387 nxt_conf_value_t *applications, *application; 1388 nxt_conf_value_t *listeners, *listener; 1389 nxt_conf_value_t *routes_conf; 1390 nxt_socket_conf_t *skcf; 1391 nxt_http_routes_t *routes; 1392 nxt_event_engine_t *engine; 1393 nxt_app_lang_module_t *lang; 1394 nxt_router_app_conf_t apcf; 1395 nxt_router_access_log_t *access_log; 1396 nxt_router_listener_conf_t lscf; 1397 #if (NXT_TLS) 1398 nxt_router_tlssock_t *tls; 1399 #endif 1400 1401 static nxt_str_t http_path = nxt_string("/settings/http"); 1402 static nxt_str_t applications_path = nxt_string("/applications"); 1403 static nxt_str_t listeners_path = nxt_string("/listeners"); 1404 static nxt_str_t routes_path = nxt_string("/routes"); 1405 static nxt_str_t access_log_path = nxt_string("/access_log"); 1406 #if (NXT_TLS) 1407 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1408 #endif 1409 1410 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1411 if (conf == NULL) { 1412 nxt_alert(task, "configuration parsing error"); 1413 return NXT_ERROR; 1414 } 1415 1416 mp = tmcf->router_conf->mem_pool; 1417 1418 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1419 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1420 if (ret != NXT_OK) { 1421 nxt_alert(task, "root map error"); 1422 return NXT_ERROR; 1423 } 1424 1425 if (tmcf->router_conf->threads == 0) { 1426 tmcf->router_conf->threads = nxt_ncpu; 1427 } 1428 1429 applications = nxt_conf_get_path(conf, &applications_path); 1430 if (applications == NULL) { 1431 nxt_alert(task, "no \"applications\" block"); 1432 return NXT_ERROR; 1433 } 1434 1435 router = tmcf->router_conf->router; 1436 1437 next = 0; 1438 1439 for ( ;; ) { 1440 application = nxt_conf_next_object_member(applications, &name, &next); 1441 if (application == NULL) { 1442 break; 1443 } 1444 1445 nxt_debug(task, "application \"%V\"", &name); 1446 1447 size = nxt_conf_json_length(application, NULL); 1448 1449 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); 1450 if (app == NULL) { 1451 goto fail; 1452 } 1453 1454 nxt_memzero(app, sizeof(nxt_app_t)); 1455 1456 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1457 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length); 1458 1459 p = nxt_conf_json_print(app->conf.start, application, NULL); 1460 app->conf.length = p - app->conf.start; 1461 1462 nxt_assert(app->conf.length <= size); 1463 1464 nxt_debug(task, "application conf \"%V\"", &app->conf); 1465 1466 prev = nxt_router_app_find(&router->apps, &name); 1467 1468 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1469 nxt_free(app); 1470 1471 nxt_queue_remove(&prev->link); 1472 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1473 continue; 1474 } 1475 1476 apcf.processes = 1; 1477 apcf.max_processes = 1; 1478 apcf.spare_processes = 0; 1479 apcf.timeout = 0; 1480 apcf.res_timeout = 1000; 1481 apcf.idle_timeout = 15000; 1482 apcf.requests = 0; 1483 apcf.limits_value = NULL; 1484 apcf.processes_value = NULL; 1485 1486 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1487 if (nxt_slow_path(app_joint == NULL)) { 1488 goto app_fail; 1489 } 1490 1491 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1492 1493 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1494 nxt_nitems(nxt_router_app_conf), &apcf); 1495 if (ret != NXT_OK) { 1496 nxt_alert(task, "application map error"); 1497 goto app_fail; 1498 } 1499 1500 if (apcf.limits_value != NULL) { 1501 1502 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1503 nxt_alert(task, "application limits is not object"); 1504 goto app_fail; 1505 } 1506 1507 ret = nxt_conf_map_object(mp, apcf.limits_value, 1508 nxt_router_app_limits_conf, 1509 nxt_nitems(nxt_router_app_limits_conf), 1510 &apcf); 1511 if (ret != NXT_OK) { 1512 nxt_alert(task, "application limits map error"); 1513 goto app_fail; 1514 } 1515 } 1516 1517 if (apcf.processes_value != NULL 1518 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1519 { 1520 ret = nxt_conf_map_object(mp, apcf.processes_value, 1521 nxt_router_app_processes_conf, 1522 nxt_nitems(nxt_router_app_processes_conf), 1523 &apcf); 1524 if (ret != NXT_OK) { 1525 nxt_alert(task, "application processes map error"); 1526 goto app_fail; 1527 } 1528 1529 } else { 1530 apcf.max_processes = apcf.processes; 1531 apcf.spare_processes = apcf.processes; 1532 } 1533 1534 nxt_debug(task, "application type: %V", &apcf.type); 1535 nxt_debug(task, "application processes: %D", apcf.processes); 1536 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1537 nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout); 1538 nxt_debug(task, "application requests: %D", apcf.requests); 1539 1540 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1541 1542 if (lang == NULL) { 1543 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1544 goto app_fail; 1545 } 1546 1547 nxt_debug(task, "application language module: \"%s\"", lang->file); 1548 1549 ret = nxt_thread_mutex_create(&app->mutex); 1550 if (ret != NXT_OK) { 1551 goto app_fail; 1552 } 1553 1554 nxt_queue_init(&app->ports); 1555 nxt_queue_init(&app->spare_ports); 1556 nxt_queue_init(&app->idle_ports); 1557 nxt_queue_init(&app->requests); 1558 nxt_queue_init(&app->pending); 1559 1560 app->name.length = name.length; 1561 nxt_memcpy(app->name.start, name.start, name.length); 1562 1563 app->type = lang->type; 1564 app->max_processes = apcf.max_processes; 1565 app->spare_processes = apcf.spare_processes; 1566 app->max_pending_processes = apcf.spare_processes 1567 ? apcf.spare_processes : 1; 1568 app->timeout = apcf.timeout; 1569 app->res_timeout = apcf.res_timeout * 1000000; 1570 app->idle_timeout = apcf.idle_timeout; 1571 app->max_pending_responses = 2; 1572 app->max_requests = apcf.requests; 1573 1574 engine = task->thread->engine; 1575 1576 app->engine = engine; 1577 1578 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1579 app->adjust_idle_work.task = &engine->task; 1580 app->adjust_idle_work.obj = app; 1581 1582 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1583 1584 nxt_router_app_use(task, app, 1); 1585 1586 app->joint = app_joint; 1587 1588 app_joint->use_count = 1; 1589 app_joint->app = app; 1590 1591 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1592 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1593 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1594 app_joint->idle_timer.task = &engine->task; 1595 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1596 1597 app_joint->free_app_work.handler = nxt_router_free_app; 1598 app_joint->free_app_work.task = &engine->task; 1599 app_joint->free_app_work.obj = app_joint; 1600 } 1601 1602 routes_conf = nxt_conf_get_path(conf, &routes_path); 1603 if (nxt_fast_path(routes_conf != NULL)) { 1604 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1605 if (nxt_slow_path(routes == NULL)) { 1606 return NXT_ERROR; 1607 } 1608 tmcf->router_conf->routes = routes; 1609 } 1610 1611 http = nxt_conf_get_path(conf, &http_path); 1612 #if 0 1613 if (http == NULL) { 1614 nxt_alert(task, "no \"http\" block"); 1615 return NXT_ERROR; 1616 } 1617 #endif 1618 1619 listeners = nxt_conf_get_path(conf, &listeners_path); 1620 if (listeners == NULL) { 1621 nxt_alert(task, "no \"listeners\" block"); 1622 return NXT_ERROR; 1623 } 1624 1625 next = 0; 1626 1627 for ( ;; ) { 1628 listener = nxt_conf_next_object_member(listeners, &name, &next); 1629 if (listener == NULL) { 1630 break; 1631 } 1632 1633 skcf = nxt_router_socket_conf(task, tmcf, &name); 1634 if (skcf == NULL) { 1635 goto fail; 1636 } 1637 1638 nxt_memzero(&lscf, sizeof(lscf)); 1639 1640 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1641 nxt_nitems(nxt_router_listener_conf), &lscf); 1642 if (ret != NXT_OK) { 1643 nxt_alert(task, "listener map error"); 1644 goto fail; 1645 } 1646 1647 nxt_debug(task, "application: %V", &lscf.application); 1648 1649 // STUB, default values if http block is not defined. 1650 skcf->header_buffer_size = 2048; 1651 skcf->large_header_buffer_size = 8192; 1652 skcf->large_header_buffers = 4; 1653 skcf->body_buffer_size = 16 * 1024; 1654 skcf->max_body_size = 8 * 1024 * 1024; 1655 skcf->idle_timeout = 180 * 1000; 1656 skcf->header_read_timeout = 30 * 1000; 1657 skcf->body_read_timeout = 30 * 1000; 1658 skcf->send_timeout = 30 * 1000; 1659 1660 if (http != NULL) { 1661 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1662 nxt_nitems(nxt_router_http_conf), skcf); 1663 if (ret != NXT_OK) { 1664 nxt_alert(task, "http map error"); 1665 goto fail; 1666 } 1667 } 1668 1669 #if (NXT_TLS) 1670 1671 value = nxt_conf_get_path(listener, &certificate_path); 1672 1673 if (value != NULL) { 1674 nxt_conf_get_string(value, &name); 1675 1676 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t)); 1677 if (nxt_slow_path(tls == NULL)) { 1678 goto fail; 1679 } 1680 1681 tls->name = name; 1682 tls->conf = skcf; 1683 1684 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 1685 } 1686 1687 #endif 1688 1689 skcf->listen->handler = nxt_http_conn_init; 1690 skcf->router_conf = tmcf->router_conf; 1691 skcf->router_conf->count++; 1692 1693 if (lscf.pass.length != 0) { 1694 skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass); 1695 1696 /* COMPATIBILITY: listener application. */ 1697 } else if (lscf.application.length > 0) { 1698 skcf->pass = nxt_http_pass_application(task, tmcf, 1699 &lscf.application); 1700 } 1701 } 1702 1703 value = nxt_conf_get_path(conf, &access_log_path); 1704 1705 if (value != NULL) { 1706 nxt_conf_get_string(value, &path); 1707 1708 access_log = router->access_log; 1709 1710 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1711 nxt_thread_spin_lock(&router->lock); 1712 access_log->count++; 1713 nxt_thread_spin_unlock(&router->lock); 1714 1715 } else { 1716 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1717 + path.length); 1718 if (access_log == NULL) { 1719 nxt_alert(task, "failed to allocate access log structure"); 1720 goto fail; 1721 } 1722 1723 access_log->fd = -1; 1724 access_log->handler = &nxt_router_access_log_writer; 1725 access_log->count = 1; 1726 1727 access_log->path.length = path.length; 1728 access_log->path.start = (u_char *) access_log 1729 + sizeof(nxt_router_access_log_t); 1730 1731 nxt_memcpy(access_log->path.start, path.start, path.length); 1732 } 1733 1734 tmcf->router_conf->access_log = access_log; 1735 } 1736 1737 nxt_http_routes_resolve(task, tmcf); 1738 1739 nxt_queue_add(&tmcf->deleting, &router->sockets); 1740 nxt_queue_init(&router->sockets); 1741 1742 return NXT_OK; 1743 1744 app_fail: 1745 1746 nxt_free(app); 1747 1748 fail: 1749 1750 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1751 1752 nxt_queue_remove(&app->link); 1753 nxt_thread_mutex_destroy(&app->mutex); 1754 nxt_free(app); 1755 1756 } nxt_queue_loop; 1757 1758 return NXT_ERROR; 1759 } 1760 1761 1762 static nxt_app_t * 1763 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1764 { 1765 nxt_app_t *app; 1766 1767 nxt_queue_each(app, queue, nxt_app_t, link) { 1768 1769 if (nxt_strstr_eq(name, &app->name)) { 1770 return app; 1771 } 1772 1773 } nxt_queue_loop; 1774 1775 return NULL; 1776 } 1777 1778 1779 nxt_app_t * 1780 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) 1781 { 1782 nxt_app_t *app; 1783 1784 app = nxt_router_app_find(&tmcf->apps, name); 1785 1786 if (app == NULL) { 1787 app = nxt_router_app_find(&tmcf->previous, name); 1788 } 1789 1790 return app; 1791 } 1792 1793 1794 static nxt_socket_conf_t * 1795 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1796 nxt_str_t *name) 1797 { 1798 size_t size; 1799 nxt_int_t ret; 1800 nxt_bool_t wildcard; 1801 nxt_sockaddr_t *sa; 1802 nxt_socket_conf_t *skcf; 1803 nxt_listen_socket_t *ls; 1804 1805 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 1806 if (nxt_slow_path(sa == NULL)) { 1807 nxt_alert(task, "invalid listener \"%V\"", name); 1808 return NULL; 1809 } 1810 1811 sa->type = SOCK_STREAM; 1812 1813 nxt_debug(task, "router listener: \"%*s\"", 1814 (size_t) sa->length, nxt_sockaddr_start(sa)); 1815 1816 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 1817 if (nxt_slow_path(skcf == NULL)) { 1818 return NULL; 1819 } 1820 1821 size = nxt_sockaddr_size(sa); 1822 1823 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 1824 1825 if (ret != NXT_OK) { 1826 1827 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 1828 if (nxt_slow_path(ls == NULL)) { 1829 return NULL; 1830 } 1831 1832 skcf->listen = ls; 1833 1834 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 1835 nxt_memcpy(ls->sockaddr, sa, size); 1836 1837 nxt_listen_socket_remote_size(ls); 1838 1839 ls->socket = -1; 1840 ls->backlog = NXT_LISTEN_BACKLOG; 1841 ls->flags = NXT_NONBLOCK; 1842 ls->read_after_accept = 1; 1843 } 1844 1845 switch (sa->u.sockaddr.sa_family) { 1846 #if (NXT_HAVE_UNIX_DOMAIN) 1847 case AF_UNIX: 1848 wildcard = 0; 1849 break; 1850 #endif 1851 #if (NXT_INET6) 1852 case AF_INET6: 1853 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 1854 break; 1855 #endif 1856 case AF_INET: 1857 default: 1858 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 1859 break; 1860 } 1861 1862 if (!wildcard) { 1863 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 1864 if (nxt_slow_path(skcf->sockaddr == NULL)) { 1865 return NULL; 1866 } 1867 1868 nxt_memcpy(skcf->sockaddr, sa, size); 1869 } 1870 1871 return skcf; 1872 } 1873 1874 1875 static nxt_int_t 1876 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 1877 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 1878 { 1879 nxt_router_t *router; 1880 nxt_queue_link_t *qlk; 1881 nxt_socket_conf_t *skcf; 1882 1883 router = tmcf->router_conf->router; 1884 1885 for (qlk = nxt_queue_first(&router->sockets); 1886 qlk != nxt_queue_tail(&router->sockets); 1887 qlk = nxt_queue_next(qlk)) 1888 { 1889 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1890 1891 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 1892 nskcf->listen = skcf->listen; 1893 1894 nxt_queue_remove(qlk); 1895 nxt_queue_insert_tail(&tmcf->keeping, qlk); 1896 1897 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link); 1898 1899 return NXT_OK; 1900 } 1901 } 1902 1903 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link); 1904 1905 return NXT_DECLINED; 1906 } 1907 1908 1909 static void 1910 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 1911 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 1912 { 1913 size_t size; 1914 uint32_t stream; 1915 nxt_int_t ret; 1916 nxt_buf_t *b; 1917 nxt_port_t *main_port, *router_port; 1918 nxt_runtime_t *rt; 1919 nxt_socket_rpc_t *rpc; 1920 1921 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 1922 if (rpc == NULL) { 1923 goto fail; 1924 } 1925 1926 rpc->socket_conf = skcf; 1927 rpc->temp_conf = tmcf; 1928 1929 size = nxt_sockaddr_size(skcf->listen->sockaddr); 1930 1931 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1932 if (b == NULL) { 1933 goto fail; 1934 } 1935 1936 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 1937 1938 rt = task->thread->runtime; 1939 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1940 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1941 1942 stream = nxt_port_rpc_register_handler(task, router_port, 1943 nxt_router_listen_socket_ready, 1944 nxt_router_listen_socket_error, 1945 main_port->pid, rpc); 1946 if (nxt_slow_path(stream == 0)) { 1947 goto fail; 1948 } 1949 1950 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 1951 stream, router_port->id, b); 1952 1953 if (nxt_slow_path(ret != NXT_OK)) { 1954 nxt_port_rpc_cancel(task, router_port, stream); 1955 goto fail; 1956 } 1957 1958 return; 1959 1960 fail: 1961 1962 nxt_router_conf_error(task, tmcf); 1963 } 1964 1965 1966 static void 1967 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1968 void *data) 1969 { 1970 nxt_int_t ret; 1971 nxt_socket_t s; 1972 nxt_socket_rpc_t *rpc; 1973 1974 rpc = data; 1975 1976 s = msg->fd; 1977 1978 ret = nxt_socket_nonblocking(task, s); 1979 if (nxt_slow_path(ret != NXT_OK)) { 1980 goto fail; 1981 } 1982 1983 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 1984 1985 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 1986 if (nxt_slow_path(ret != NXT_OK)) { 1987 goto fail; 1988 } 1989 1990 rpc->socket_conf->listen->socket = s; 1991 1992 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 1993 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 1994 1995 return; 1996 1997 fail: 1998 1999 nxt_socket_close(task, s); 2000 2001 nxt_router_conf_error(task, rpc->temp_conf); 2002 } 2003 2004 2005 static void 2006 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2007 void *data) 2008 { 2009 nxt_socket_rpc_t *rpc; 2010 nxt_router_temp_conf_t *tmcf; 2011 2012 rpc = data; 2013 tmcf = rpc->temp_conf; 2014 2015 #if 0 2016 u_char *p; 2017 size_t size; 2018 uint8_t error; 2019 nxt_buf_t *in, *out; 2020 nxt_sockaddr_t *sa; 2021 2022 static nxt_str_t socket_errors[] = { 2023 nxt_string("ListenerSystem"), 2024 nxt_string("ListenerNoIPv6"), 2025 nxt_string("ListenerPort"), 2026 nxt_string("ListenerInUse"), 2027 nxt_string("ListenerNoAddress"), 2028 nxt_string("ListenerNoAccess"), 2029 nxt_string("ListenerPath"), 2030 }; 2031 2032 sa = rpc->socket_conf->listen->sockaddr; 2033 2034 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2035 2036 if (nxt_slow_path(in == NULL)) { 2037 return; 2038 } 2039 2040 p = in->mem.pos; 2041 2042 error = *p++; 2043 2044 size = nxt_length("listen socket error: ") 2045 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2046 + sa->length + socket_errors[error].length + (in->mem.free - p); 2047 2048 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2049 if (nxt_slow_path(out == NULL)) { 2050 return; 2051 } 2052 2053 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2054 "listen socket error: " 2055 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2056 (size_t) sa->length, nxt_sockaddr_start(sa), 2057 &socket_errors[error], in->mem.free - p, p); 2058 2059 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2060 #endif 2061 2062 nxt_router_conf_error(task, tmcf); 2063 } 2064 2065 2066 #if (NXT_TLS) 2067 2068 static void 2069 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2070 nxt_router_tlssock_t *tls) 2071 { 2072 nxt_socket_rpc_t *rpc; 2073 2074 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2075 if (rpc == NULL) { 2076 nxt_router_conf_error(task, tmcf); 2077 return; 2078 } 2079 2080 rpc->socket_conf = tls->conf; 2081 rpc->temp_conf = tmcf; 2082 2083 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 2084 nxt_router_tls_rpc_handler, rpc); 2085 } 2086 2087 2088 static void 2089 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2090 void *data) 2091 { 2092 nxt_mp_t *mp; 2093 nxt_int_t ret; 2094 nxt_tls_conf_t *tlscf; 2095 nxt_socket_rpc_t *rpc; 2096 nxt_router_temp_conf_t *tmcf; 2097 2098 nxt_debug(task, "tls rpc handler"); 2099 2100 rpc = data; 2101 tmcf = rpc->temp_conf; 2102 2103 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2104 goto fail; 2105 } 2106 2107 mp = tmcf->router_conf->mem_pool; 2108 2109 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2110 if (nxt_slow_path(tlscf == NULL)) { 2111 goto fail; 2112 } 2113 2114 tlscf->chain_file = msg->fd; 2115 2116 ret = task->thread->runtime->tls->server_init(task, tlscf); 2117 if (nxt_slow_path(ret != NXT_OK)) { 2118 goto fail; 2119 } 2120 2121 rpc->socket_conf->tls = tlscf; 2122 2123 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2124 nxt_router_conf_apply, task, tmcf, NULL); 2125 return; 2126 2127 fail: 2128 2129 nxt_router_conf_error(task, tmcf); 2130 } 2131 2132 #endif 2133 2134 2135 static void 2136 nxt_router_app_rpc_create(nxt_task_t *task, 2137 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2138 { 2139 size_t size; 2140 uint32_t stream; 2141 nxt_int_t ret; 2142 nxt_buf_t *b; 2143 nxt_port_t *main_port, *router_port; 2144 nxt_runtime_t *rt; 2145 nxt_app_rpc_t *rpc; 2146 2147 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2148 if (rpc == NULL) { 2149 goto fail; 2150 } 2151 2152 rpc->app = app; 2153 rpc->temp_conf = tmcf; 2154 2155 nxt_debug(task, "app '%V' prefork", &app->name); 2156 2157 size = app->name.length + 1 + app->conf.length; 2158 2159 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2160 if (nxt_slow_path(b == NULL)) { 2161 goto fail; 2162 } 2163 2164 nxt_buf_cpystr(b, &app->name); 2165 *b->mem.free++ = '\0'; 2166 nxt_buf_cpystr(b, &app->conf); 2167 2168 rt = task->thread->runtime; 2169 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2170 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2171 2172 stream = nxt_port_rpc_register_handler(task, router_port, 2173 nxt_router_app_prefork_ready, 2174 nxt_router_app_prefork_error, 2175 -1, rpc); 2176 if (nxt_slow_path(stream == 0)) { 2177 goto fail; 2178 } 2179 2180 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 2181 stream, router_port->id, b); 2182 2183 if (nxt_slow_path(ret != NXT_OK)) { 2184 nxt_port_rpc_cancel(task, router_port, stream); 2185 goto fail; 2186 } 2187 2188 app->pending_processes++; 2189 2190 return; 2191 2192 fail: 2193 2194 nxt_router_conf_error(task, tmcf); 2195 } 2196 2197 2198 static void 2199 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2200 void *data) 2201 { 2202 nxt_app_t *app; 2203 nxt_port_t *port; 2204 nxt_app_rpc_t *rpc; 2205 nxt_event_engine_t *engine; 2206 2207 rpc = data; 2208 app = rpc->app; 2209 2210 port = msg->u.new_port; 2211 port->app = app; 2212 2213 app->pending_processes--; 2214 app->processes++; 2215 app->idle_processes++; 2216 2217 engine = task->thread->engine; 2218 2219 nxt_queue_insert_tail(&app->ports, &port->app_link); 2220 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2221 2222 port->idle_start = 0; 2223 2224 nxt_port_inc_use(port); 2225 2226 nxt_work_queue_add(&engine->fast_work_queue, 2227 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2228 } 2229 2230 2231 static void 2232 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2233 void *data) 2234 { 2235 nxt_app_t *app; 2236 nxt_app_rpc_t *rpc; 2237 nxt_router_temp_conf_t *tmcf; 2238 2239 rpc = data; 2240 app = rpc->app; 2241 tmcf = rpc->temp_conf; 2242 2243 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2244 &app->name); 2245 2246 app->pending_processes--; 2247 2248 nxt_router_conf_error(task, tmcf); 2249 } 2250 2251 2252 static nxt_int_t 2253 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2254 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2255 { 2256 nxt_int_t ret; 2257 nxt_uint_t n, threads; 2258 nxt_queue_link_t *qlk; 2259 nxt_router_engine_conf_t *recf; 2260 2261 threads = tmcf->router_conf->threads; 2262 2263 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2264 sizeof(nxt_router_engine_conf_t)); 2265 if (nxt_slow_path(tmcf->engines == NULL)) { 2266 return NXT_ERROR; 2267 } 2268 2269 n = 0; 2270 2271 for (qlk = nxt_queue_first(&router->engines); 2272 qlk != nxt_queue_tail(&router->engines); 2273 qlk = nxt_queue_next(qlk)) 2274 { 2275 recf = nxt_array_zero_add(tmcf->engines); 2276 if (nxt_slow_path(recf == NULL)) { 2277 return NXT_ERROR; 2278 } 2279 2280 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2281 2282 if (n < threads) { 2283 recf->action = NXT_ROUTER_ENGINE_KEEP; 2284 ret = nxt_router_engine_conf_update(tmcf, recf); 2285 2286 } else { 2287 recf->action = NXT_ROUTER_ENGINE_DELETE; 2288 ret = nxt_router_engine_conf_delete(tmcf, recf); 2289 } 2290 2291 if (nxt_slow_path(ret != NXT_OK)) { 2292 return ret; 2293 } 2294 2295 n++; 2296 } 2297 2298 tmcf->new_threads = n; 2299 2300 while (n < threads) { 2301 recf = nxt_array_zero_add(tmcf->engines); 2302 if (nxt_slow_path(recf == NULL)) { 2303 return NXT_ERROR; 2304 } 2305 2306 recf->action = NXT_ROUTER_ENGINE_ADD; 2307 2308 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2309 if (nxt_slow_path(recf->engine == NULL)) { 2310 return NXT_ERROR; 2311 } 2312 2313 ret = nxt_router_engine_conf_create(tmcf, recf); 2314 if (nxt_slow_path(ret != NXT_OK)) { 2315 return ret; 2316 } 2317 2318 n++; 2319 } 2320 2321 return NXT_OK; 2322 } 2323 2324 2325 static nxt_int_t 2326 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2327 nxt_router_engine_conf_t *recf) 2328 { 2329 nxt_int_t ret; 2330 2331 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2332 nxt_router_listen_socket_create); 2333 if (nxt_slow_path(ret != NXT_OK)) { 2334 return ret; 2335 } 2336 2337 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2338 nxt_router_listen_socket_create); 2339 if (nxt_slow_path(ret != NXT_OK)) { 2340 return ret; 2341 } 2342 2343 return ret; 2344 } 2345 2346 2347 static nxt_int_t 2348 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2349 nxt_router_engine_conf_t *recf) 2350 { 2351 nxt_int_t ret; 2352 2353 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2354 nxt_router_listen_socket_create); 2355 if (nxt_slow_path(ret != NXT_OK)) { 2356 return ret; 2357 } 2358 2359 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2360 nxt_router_listen_socket_update); 2361 if (nxt_slow_path(ret != NXT_OK)) { 2362 return ret; 2363 } 2364 2365 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2366 if (nxt_slow_path(ret != NXT_OK)) { 2367 return ret; 2368 } 2369 2370 return ret; 2371 } 2372 2373 2374 static nxt_int_t 2375 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2376 nxt_router_engine_conf_t *recf) 2377 { 2378 nxt_int_t ret; 2379 2380 ret = nxt_router_engine_quit(tmcf, recf); 2381 if (nxt_slow_path(ret != NXT_OK)) { 2382 return ret; 2383 } 2384 2385 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); 2386 if (nxt_slow_path(ret != NXT_OK)) { 2387 return ret; 2388 } 2389 2390 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2391 } 2392 2393 2394 static nxt_int_t 2395 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 2396 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 2397 nxt_work_handler_t handler) 2398 { 2399 nxt_joint_job_t *job; 2400 nxt_queue_link_t *qlk; 2401 nxt_socket_conf_t *skcf; 2402 nxt_socket_conf_joint_t *joint; 2403 2404 for (qlk = nxt_queue_first(sockets); 2405 qlk != nxt_queue_tail(sockets); 2406 qlk = nxt_queue_next(qlk)) 2407 { 2408 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2409 if (nxt_slow_path(job == NULL)) { 2410 return NXT_ERROR; 2411 } 2412 2413 job->work.next = recf->jobs; 2414 recf->jobs = &job->work; 2415 2416 job->task = tmcf->engine->task; 2417 job->work.handler = handler; 2418 job->work.task = &job->task; 2419 job->work.obj = job; 2420 job->tmcf = tmcf; 2421 2422 tmcf->count++; 2423 2424 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 2425 sizeof(nxt_socket_conf_joint_t)); 2426 if (nxt_slow_path(joint == NULL)) { 2427 return NXT_ERROR; 2428 } 2429 2430 job->work.data = joint; 2431 2432 joint->count = 1; 2433 2434 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2435 skcf->count++; 2436 joint->socket_conf = skcf; 2437 2438 joint->engine = recf->engine; 2439 } 2440 2441 return NXT_OK; 2442 } 2443 2444 2445 static nxt_int_t 2446 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 2447 nxt_router_engine_conf_t *recf) 2448 { 2449 nxt_joint_job_t *job; 2450 2451 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2452 if (nxt_slow_path(job == NULL)) { 2453 return NXT_ERROR; 2454 } 2455 2456 job->work.next = recf->jobs; 2457 recf->jobs = &job->work; 2458 2459 job->task = tmcf->engine->task; 2460 job->work.handler = nxt_router_worker_thread_quit; 2461 job->work.task = &job->task; 2462 job->work.obj = NULL; 2463 job->work.data = NULL; 2464 job->tmcf = NULL; 2465 2466 return NXT_OK; 2467 } 2468 2469 2470 static nxt_int_t 2471 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 2472 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 2473 { 2474 nxt_joint_job_t *job; 2475 nxt_queue_link_t *qlk; 2476 2477 for (qlk = nxt_queue_first(sockets); 2478 qlk != nxt_queue_tail(sockets); 2479 qlk = nxt_queue_next(qlk)) 2480 { 2481 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2482 if (nxt_slow_path(job == NULL)) { 2483 return NXT_ERROR; 2484 } 2485 2486 job->work.next = recf->jobs; 2487 recf->jobs = &job->work; 2488 2489 job->task = tmcf->engine->task; 2490 job->work.handler = nxt_router_listen_socket_delete; 2491 job->work.task = &job->task; 2492 job->work.obj = job; 2493 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2494 job->tmcf = tmcf; 2495 2496 tmcf->count++; 2497 } 2498 2499 return NXT_OK; 2500 } 2501 2502 2503 static nxt_int_t 2504 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 2505 nxt_router_temp_conf_t *tmcf) 2506 { 2507 nxt_int_t ret; 2508 nxt_uint_t i, threads; 2509 nxt_router_engine_conf_t *recf; 2510 2511 recf = tmcf->engines->elts; 2512 threads = tmcf->router_conf->threads; 2513 2514 for (i = tmcf->new_threads; i < threads; i++) { 2515 ret = nxt_router_thread_create(task, rt, recf[i].engine); 2516 if (nxt_slow_path(ret != NXT_OK)) { 2517 return ret; 2518 } 2519 } 2520 2521 return NXT_OK; 2522 } 2523 2524 2525 static nxt_int_t 2526 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 2527 nxt_event_engine_t *engine) 2528 { 2529 nxt_int_t ret; 2530 nxt_thread_link_t *link; 2531 nxt_thread_handle_t handle; 2532 2533 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2534 2535 if (nxt_slow_path(link == NULL)) { 2536 return NXT_ERROR; 2537 } 2538 2539 link->start = nxt_router_thread_start; 2540 link->engine = engine; 2541 link->work.handler = nxt_router_thread_exit_handler; 2542 link->work.task = task; 2543 link->work.data = link; 2544 2545 nxt_queue_insert_tail(&rt->engines, &engine->link); 2546 2547 ret = nxt_thread_create(&handle, link); 2548 2549 if (nxt_slow_path(ret != NXT_OK)) { 2550 nxt_queue_remove(&engine->link); 2551 } 2552 2553 return ret; 2554 } 2555 2556 2557 static void 2558 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 2559 nxt_router_temp_conf_t *tmcf) 2560 { 2561 nxt_app_t *app; 2562 2563 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 2564 2565 nxt_router_app_unlink(task, app); 2566 2567 } nxt_queue_loop; 2568 2569 nxt_queue_add(&router->apps, &tmcf->previous); 2570 nxt_queue_add(&router->apps, &tmcf->apps); 2571 } 2572 2573 2574 static void 2575 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 2576 { 2577 nxt_uint_t n; 2578 nxt_event_engine_t *engine; 2579 nxt_router_engine_conf_t *recf; 2580 2581 recf = tmcf->engines->elts; 2582 2583 for (n = tmcf->engines->nelts; n != 0; n--) { 2584 engine = recf->engine; 2585 2586 switch (recf->action) { 2587 2588 case NXT_ROUTER_ENGINE_KEEP: 2589 break; 2590 2591 case NXT_ROUTER_ENGINE_ADD: 2592 nxt_queue_insert_tail(&router->engines, &engine->link0); 2593 break; 2594 2595 case NXT_ROUTER_ENGINE_DELETE: 2596 nxt_queue_remove(&engine->link0); 2597 break; 2598 } 2599 2600 nxt_router_engine_post(engine, recf->jobs); 2601 2602 recf++; 2603 } 2604 } 2605 2606 2607 static void 2608 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 2609 { 2610 nxt_work_t *work, *next; 2611 2612 for (work = jobs; work != NULL; work = next) { 2613 next = work->next; 2614 work->next = NULL; 2615 2616 nxt_event_engine_post(engine, work); 2617 } 2618 } 2619 2620 2621 static nxt_port_handlers_t nxt_router_app_port_handlers = { 2622 .rpc_error = nxt_port_rpc_handler, 2623 .mmap = nxt_port_mmap_handler, 2624 .data = nxt_port_rpc_handler, 2625 }; 2626 2627 2628 static void 2629 nxt_router_thread_start(void *data) 2630 { 2631 nxt_int_t ret; 2632 nxt_port_t *port; 2633 nxt_task_t *task; 2634 nxt_thread_t *thread; 2635 nxt_thread_link_t *link; 2636 nxt_event_engine_t *engine; 2637 2638 link = data; 2639 engine = link->engine; 2640 task = &engine->task; 2641 2642 thread = nxt_thread(); 2643 2644 nxt_event_engine_thread_adopt(engine); 2645 2646 /* STUB */ 2647 thread->runtime = engine->task.thread->runtime; 2648 2649 engine->task.thread = thread; 2650 engine->task.log = thread->log; 2651 thread->engine = engine; 2652 thread->task = &engine->task; 2653 #if 0 2654 thread->fiber = &engine->fibers->fiber; 2655 #endif 2656 2657 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 2658 if (nxt_slow_path(engine->mem_pool == NULL)) { 2659 return; 2660 } 2661 2662 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 2663 NXT_PROCESS_ROUTER); 2664 if (nxt_slow_path(port == NULL)) { 2665 return; 2666 } 2667 2668 ret = nxt_port_socket_init(task, port, 0); 2669 if (nxt_slow_path(ret != NXT_OK)) { 2670 nxt_port_use(task, port, -1); 2671 return; 2672 } 2673 2674 engine->port = port; 2675 2676 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 2677 2678 nxt_event_engine_start(engine); 2679 } 2680 2681 2682 static void 2683 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 2684 { 2685 nxt_joint_job_t *job; 2686 nxt_socket_conf_t *skcf; 2687 nxt_listen_event_t *lev; 2688 nxt_listen_socket_t *ls; 2689 nxt_thread_spinlock_t *lock; 2690 nxt_socket_conf_joint_t *joint; 2691 2692 job = obj; 2693 joint = data; 2694 2695 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 2696 2697 skcf = joint->socket_conf; 2698 ls = skcf->listen; 2699 2700 lev = nxt_listen_event(task, ls); 2701 if (nxt_slow_path(lev == NULL)) { 2702 nxt_router_listen_socket_release(task, skcf); 2703 return; 2704 } 2705 2706 lev->socket.data = joint; 2707 2708 lock = &skcf->router_conf->router->lock; 2709 2710 nxt_thread_spin_lock(lock); 2711 ls->count++; 2712 nxt_thread_spin_unlock(lock); 2713 2714 job->work.next = NULL; 2715 job->work.handler = nxt_router_conf_wait; 2716 2717 nxt_event_engine_post(job->tmcf->engine, &job->work); 2718 } 2719 2720 2721 nxt_inline nxt_listen_event_t * 2722 nxt_router_listen_event(nxt_queue_t *listen_connections, 2723 nxt_socket_conf_t *skcf) 2724 { 2725 nxt_socket_t fd; 2726 nxt_queue_link_t *qlk; 2727 nxt_listen_event_t *lev; 2728 2729 fd = skcf->listen->socket; 2730 2731 for (qlk = nxt_queue_first(listen_connections); 2732 qlk != nxt_queue_tail(listen_connections); 2733 qlk = nxt_queue_next(qlk)) 2734 { 2735 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 2736 2737 if (fd == lev->socket.fd) { 2738 return lev; 2739 } 2740 } 2741 2742 return NULL; 2743 } 2744 2745 2746 static void 2747 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 2748 { 2749 nxt_joint_job_t *job; 2750 nxt_event_engine_t *engine; 2751 nxt_listen_event_t *lev; 2752 nxt_socket_conf_joint_t *joint, *old; 2753 2754 job = obj; 2755 joint = data; 2756 2757 engine = task->thread->engine; 2758 2759 nxt_queue_insert_tail(&engine->joints, &joint->link); 2760 2761 lev = nxt_router_listen_event(&engine->listen_connections, 2762 joint->socket_conf); 2763 2764 old = lev->socket.data; 2765 lev->socket.data = joint; 2766 lev->listen = joint->socket_conf->listen; 2767 2768 job->work.next = NULL; 2769 job->work.handler = nxt_router_conf_wait; 2770 2771 nxt_event_engine_post(job->tmcf->engine, &job->work); 2772 2773 /* 2774 * The task is allocated from configuration temporary 2775 * memory pool so it can be freed after engine post operation. 2776 */ 2777 2778 nxt_router_conf_release(&engine->task, old); 2779 } 2780 2781 2782 static void 2783 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 2784 { 2785 nxt_joint_job_t *job; 2786 nxt_socket_conf_t *skcf; 2787 nxt_listen_event_t *lev; 2788 nxt_event_engine_t *engine; 2789 2790 job = obj; 2791 skcf = data; 2792 2793 engine = task->thread->engine; 2794 2795 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 2796 2797 nxt_fd_event_delete(engine, &lev->socket); 2798 2799 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 2800 lev->socket.fd); 2801 2802 lev->timer.handler = nxt_router_listen_socket_close; 2803 lev->timer.work_queue = &engine->fast_work_queue; 2804 2805 nxt_timer_add(engine, &lev->timer, 0); 2806 2807 job->work.next = NULL; 2808 job->work.handler = nxt_router_conf_wait; 2809 2810 nxt_event_engine_post(job->tmcf->engine, &job->work); 2811 } 2812 2813 2814 static void 2815 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 2816 { 2817 nxt_event_engine_t *engine; 2818 2819 nxt_debug(task, "router worker thread quit"); 2820 2821 engine = task->thread->engine; 2822 2823 engine->shutdown = 1; 2824 2825 if (nxt_queue_is_empty(&engine->joints)) { 2826 nxt_thread_exit(task->thread); 2827 } 2828 } 2829 2830 2831 static void 2832 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 2833 { 2834 nxt_timer_t *timer; 2835 nxt_listen_event_t *lev; 2836 nxt_socket_conf_joint_t *joint; 2837 2838 timer = obj; 2839 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 2840 2841 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 2842 lev->socket.fd); 2843 2844 nxt_queue_remove(&lev->link); 2845 2846 joint = lev->socket.data; 2847 lev->socket.data = NULL; 2848 2849 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 2850 task = &task->thread->engine->task; 2851 2852 nxt_router_listen_socket_release(task, joint->socket_conf); 2853 2854 nxt_router_listen_event_release(task, lev, joint); 2855 } 2856 2857 2858 static void 2859 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 2860 { 2861 nxt_listen_socket_t *ls; 2862 nxt_thread_spinlock_t *lock; 2863 2864 ls = skcf->listen; 2865 lock = &skcf->router_conf->router->lock; 2866 2867 nxt_thread_spin_lock(lock); 2868 2869 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 2870 task->thread->engine, ls->count); 2871 2872 if (--ls->count != 0) { 2873 ls = NULL; 2874 } 2875 2876 nxt_thread_spin_unlock(lock); 2877 2878 if (ls != NULL) { 2879 nxt_socket_close(task, ls->socket); 2880 nxt_free(ls); 2881 } 2882 } 2883 2884 2885 void 2886 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 2887 nxt_socket_conf_joint_t *joint) 2888 { 2889 nxt_event_engine_t *engine; 2890 2891 nxt_debug(task, "listen event count: %D", lev->count); 2892 2893 if (--lev->count == 0) { 2894 nxt_free(lev); 2895 } 2896 2897 if (joint != NULL) { 2898 nxt_router_conf_release(task, joint); 2899 } 2900 2901 engine = task->thread->engine; 2902 2903 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 2904 nxt_thread_exit(task->thread); 2905 } 2906 } 2907 2908 2909 void 2910 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 2911 { 2912 nxt_socket_conf_t *skcf; 2913 nxt_router_conf_t *rtcf; 2914 nxt_thread_spinlock_t *lock; 2915 2916 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 2917 2918 if (--joint->count != 0) { 2919 return; 2920 } 2921 2922 nxt_queue_remove(&joint->link); 2923 2924 /* 2925 * The joint content can not be safely used after the critical 2926 * section protected by the spinlock because its memory pool may 2927 * be already destroyed by another thread. 2928 */ 2929 skcf = joint->socket_conf; 2930 rtcf = skcf->router_conf; 2931 lock = &rtcf->router->lock; 2932 2933 nxt_thread_spin_lock(lock); 2934 2935 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 2936 rtcf, rtcf->count); 2937 2938 if (--skcf->count != 0) { 2939 skcf = NULL; 2940 rtcf = NULL; 2941 2942 } else { 2943 nxt_queue_remove(&skcf->link); 2944 2945 if (--rtcf->count != 0) { 2946 rtcf = NULL; 2947 } 2948 } 2949 2950 nxt_thread_spin_unlock(lock); 2951 2952 if (skcf != NULL) { 2953 if (skcf->pass != NULL) { 2954 nxt_http_pass_cleanup(task, skcf->pass); 2955 } 2956 2957 #if (NXT_TLS) 2958 if (skcf->tls != NULL) { 2959 task->thread->runtime->tls->server_free(task, skcf->tls); 2960 } 2961 #endif 2962 } 2963 2964 /* TODO remove engine->port */ 2965 /* TODO excude from connected ports */ 2966 2967 if (rtcf != NULL) { 2968 nxt_debug(task, "old router conf is destroyed"); 2969 2970 nxt_http_routes_cleanup(task, rtcf->routes); 2971 2972 nxt_router_access_log_release(task, lock, rtcf->access_log); 2973 2974 nxt_mp_thread_adopt(rtcf->mem_pool); 2975 2976 nxt_mp_destroy(rtcf->mem_pool); 2977 } 2978 } 2979 2980 2981 static void 2982 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 2983 nxt_router_access_log_t *access_log) 2984 { 2985 size_t size; 2986 u_char *buf, *p; 2987 nxt_off_t bytes; 2988 2989 static nxt_time_string_t date_cache = { 2990 (nxt_atomic_uint_t) -1, 2991 nxt_router_access_log_date, 2992 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 2993 nxt_length("31/Dec/1986:19:40:00 +0300"), 2994 NXT_THREAD_TIME_LOCAL, 2995 NXT_THREAD_TIME_SEC, 2996 }; 2997 2998 size = r->remote->address_length 2999 + 6 /* ' - - [' */ 3000 + date_cache.size 3001 + 3 /* '] "' */ 3002 + r->method->length 3003 + 1 /* space */ 3004 + r->target.length 3005 + 1 /* space */ 3006 + r->version.length 3007 + 2 /* '" ' */ 3008 + 3 /* status */ 3009 + 1 /* space */ 3010 + NXT_OFF_T_LEN 3011 + 2 /* ' "' */ 3012 + (r->referer != NULL ? r->referer->value_length : 1) 3013 + 3 /* '" "' */ 3014 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3015 + 2 /* '"\n' */ 3016 ; 3017 3018 buf = nxt_mp_nget(r->mem_pool, size); 3019 if (nxt_slow_path(buf == NULL)) { 3020 return; 3021 } 3022 3023 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3024 r->remote->address_length); 3025 3026 p = nxt_cpymem(p, " - - [", 6); 3027 3028 p = nxt_thread_time_string(task->thread, &date_cache, p); 3029 3030 p = nxt_cpymem(p, "] \"", 3); 3031 3032 if (r->method->length != 0) { 3033 p = nxt_cpymem(p, r->method->start, r->method->length); 3034 3035 if (r->target.length != 0) { 3036 *p++ = ' '; 3037 p = nxt_cpymem(p, r->target.start, r->target.length); 3038 3039 if (r->version.length != 0) { 3040 *p++ = ' '; 3041 p = nxt_cpymem(p, r->version.start, r->version.length); 3042 } 3043 } 3044 3045 } else { 3046 *p++ = '-'; 3047 } 3048 3049 p = nxt_cpymem(p, "\" ", 2); 3050 3051 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3052 3053 *p++ = ' '; 3054 3055 bytes = nxt_http_proto_body_bytes_sent[r->protocol](task, r->proto); 3056 3057 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3058 3059 p = nxt_cpymem(p, " \"", 2); 3060 3061 if (r->referer != NULL) { 3062 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3063 3064 } else { 3065 *p++ = '-'; 3066 } 3067 3068 p = nxt_cpymem(p, "\" \"", 3); 3069 3070 if (r->user_agent != NULL) { 3071 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3072 3073 } else { 3074 *p++ = '-'; 3075 } 3076 3077 p = nxt_cpymem(p, "\"\n", 2); 3078 3079 nxt_fd_write(access_log->fd, buf, p - buf); 3080 } 3081 3082 3083 static u_char * 3084 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3085 size_t size, const char *format) 3086 { 3087 u_char sign; 3088 time_t gmtoff; 3089 3090 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3091 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3092 3093 gmtoff = nxt_timezone(tm) / 60; 3094 3095 if (gmtoff < 0) { 3096 gmtoff = -gmtoff; 3097 sign = '-'; 3098 3099 } else { 3100 sign = '+'; 3101 } 3102 3103 return nxt_sprintf(buf, buf + size, format, 3104 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3105 tm->tm_hour, tm->tm_min, tm->tm_sec, 3106 sign, gmtoff / 60, gmtoff % 60); 3107 } 3108 3109 3110 static void 3111 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3112 { 3113 uint32_t stream; 3114 nxt_int_t ret; 3115 nxt_buf_t *b; 3116 nxt_port_t *main_port, *router_port; 3117 nxt_runtime_t *rt; 3118 nxt_router_access_log_t *access_log; 3119 3120 access_log = tmcf->router_conf->access_log; 3121 3122 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3123 if (nxt_slow_path(b == NULL)) { 3124 goto fail; 3125 } 3126 3127 nxt_buf_cpystr(b, &access_log->path); 3128 *b->mem.free++ = '\0'; 3129 3130 rt = task->thread->runtime; 3131 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3132 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3133 3134 stream = nxt_port_rpc_register_handler(task, router_port, 3135 nxt_router_access_log_ready, 3136 nxt_router_access_log_error, 3137 -1, tmcf); 3138 if (nxt_slow_path(stream == 0)) { 3139 goto fail; 3140 } 3141 3142 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3143 stream, router_port->id, b); 3144 3145 if (nxt_slow_path(ret != NXT_OK)) { 3146 nxt_port_rpc_cancel(task, router_port, stream); 3147 goto fail; 3148 } 3149 3150 return; 3151 3152 fail: 3153 3154 nxt_router_conf_error(task, tmcf); 3155 } 3156 3157 3158 static void 3159 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3160 void *data) 3161 { 3162 nxt_router_temp_conf_t *tmcf; 3163 nxt_router_access_log_t *access_log; 3164 3165 tmcf = data; 3166 3167 access_log = tmcf->router_conf->access_log; 3168 3169 access_log->fd = msg->fd; 3170 3171 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3172 nxt_router_conf_apply, task, tmcf, NULL); 3173 } 3174 3175 3176 static void 3177 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3178 void *data) 3179 { 3180 nxt_router_temp_conf_t *tmcf; 3181 3182 tmcf = data; 3183 3184 nxt_router_conf_error(task, tmcf); 3185 } 3186 3187 3188 static void 3189 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3190 nxt_router_access_log_t *access_log) 3191 { 3192 if (access_log == NULL) { 3193 return; 3194 } 3195 3196 nxt_thread_spin_lock(lock); 3197 3198 if (--access_log->count != 0) { 3199 access_log = NULL; 3200 } 3201 3202 nxt_thread_spin_unlock(lock); 3203 3204 if (access_log != NULL) { 3205 3206 if (access_log->fd != -1) { 3207 nxt_fd_close(access_log->fd); 3208 } 3209 3210 nxt_free(access_log); 3211 } 3212 } 3213 3214 3215 typedef struct { 3216 nxt_mp_t *mem_pool; 3217 nxt_router_access_log_t *access_log; 3218 } nxt_router_access_log_reopen_t; 3219 3220 3221 void 3222 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3223 { 3224 nxt_mp_t *mp; 3225 uint32_t stream; 3226 nxt_int_t ret; 3227 nxt_buf_t *b; 3228 nxt_port_t *main_port, *router_port; 3229 nxt_runtime_t *rt; 3230 nxt_router_access_log_t *access_log; 3231 nxt_router_access_log_reopen_t *reopen; 3232 3233 access_log = nxt_router->access_log; 3234 3235 if (access_log == NULL) { 3236 return; 3237 } 3238 3239 mp = nxt_mp_create(1024, 128, 256, 32); 3240 if (nxt_slow_path(mp == NULL)) { 3241 return; 3242 } 3243 3244 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3245 if (nxt_slow_path(reopen == NULL)) { 3246 goto fail; 3247 } 3248 3249 reopen->mem_pool = mp; 3250 reopen->access_log = access_log; 3251 3252 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 3253 if (nxt_slow_path(b == NULL)) { 3254 goto fail; 3255 } 3256 3257 b->completion_handler = nxt_router_access_log_reopen_completion; 3258 3259 nxt_buf_cpystr(b, &access_log->path); 3260 *b->mem.free++ = '\0'; 3261 3262 rt = task->thread->runtime; 3263 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3264 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3265 3266 stream = nxt_port_rpc_register_handler(task, router_port, 3267 nxt_router_access_log_reopen_ready, 3268 nxt_router_access_log_reopen_error, 3269 -1, reopen); 3270 if (nxt_slow_path(stream == 0)) { 3271 goto fail; 3272 } 3273 3274 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3275 stream, router_port->id, b); 3276 3277 if (nxt_slow_path(ret != NXT_OK)) { 3278 nxt_port_rpc_cancel(task, router_port, stream); 3279 goto fail; 3280 } 3281 3282 nxt_mp_retain(mp); 3283 3284 return; 3285 3286 fail: 3287 3288 nxt_mp_destroy(mp); 3289 } 3290 3291 3292 static void 3293 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 3294 { 3295 nxt_mp_t *mp; 3296 nxt_buf_t *b; 3297 3298 b = obj; 3299 mp = b->data; 3300 3301 nxt_mp_release(mp); 3302 } 3303 3304 3305 static void 3306 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3307 void *data) 3308 { 3309 nxt_router_access_log_t *access_log; 3310 nxt_router_access_log_reopen_t *reopen; 3311 3312 reopen = data; 3313 3314 access_log = reopen->access_log; 3315 3316 if (access_log == nxt_router->access_log) { 3317 3318 if (nxt_slow_path(dup2(msg->fd, access_log->fd) == -1)) { 3319 nxt_alert(task, "dup2(%FD, %FD) failed %E", 3320 msg->fd, access_log->fd, nxt_errno); 3321 } 3322 } 3323 3324 nxt_fd_close(msg->fd); 3325 nxt_mp_release(reopen->mem_pool); 3326 } 3327 3328 3329 static void 3330 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3331 void *data) 3332 { 3333 nxt_router_access_log_reopen_t *reopen; 3334 3335 reopen = data; 3336 3337 nxt_mp_release(reopen->mem_pool); 3338 } 3339 3340 3341 static void 3342 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 3343 { 3344 nxt_port_t *port; 3345 nxt_thread_link_t *link; 3346 nxt_event_engine_t *engine; 3347 nxt_thread_handle_t handle; 3348 3349 handle = (nxt_thread_handle_t) obj; 3350 link = data; 3351 3352 nxt_thread_wait(handle); 3353 3354 engine = link->engine; 3355 3356 nxt_queue_remove(&engine->link); 3357 3358 port = engine->port; 3359 3360 // TODO notify all apps 3361 3362 port->engine = task->thread->engine; 3363 nxt_mp_thread_adopt(port->mem_pool); 3364 nxt_port_use(task, port, -1); 3365 3366 nxt_mp_thread_adopt(engine->mem_pool); 3367 nxt_mp_destroy(engine->mem_pool); 3368 3369 nxt_event_engine_free(engine); 3370 3371 nxt_free(link); 3372 } 3373 3374 3375 static void 3376 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3377 void *data) 3378 { 3379 size_t dump_size; 3380 nxt_int_t ret; 3381 nxt_buf_t *b; 3382 nxt_http_request_t *r; 3383 nxt_req_conn_link_t *rc; 3384 nxt_app_parse_ctx_t *ar; 3385 nxt_unit_response_t *resp; 3386 3387 b = msg->buf; 3388 rc = data; 3389 3390 dump_size = nxt_buf_used_size(b); 3391 3392 if (dump_size > 300) { 3393 dump_size = 300; 3394 } 3395 3396 nxt_debug(task, "%srouter app data (%uz): %*s", 3397 msg->port_msg.last ? "last " : "", msg->size, dump_size, 3398 b->mem.pos); 3399 3400 if (msg->size == 0) { 3401 b = NULL; 3402 } 3403 3404 ar = rc->ap; 3405 if (nxt_slow_path(ar == NULL)) { 3406 return; 3407 } 3408 3409 if (ar->request->error) { 3410 nxt_router_rc_unlink(task, rc); 3411 return; 3412 } 3413 3414 if (msg->port_msg.last != 0) { 3415 nxt_debug(task, "router data create last buf"); 3416 3417 nxt_buf_chain_add(&b, nxt_http_buf_last(ar->request)); 3418 3419 nxt_router_rc_unlink(task, rc); 3420 3421 } else { 3422 if (rc->app != NULL && rc->app->timeout != 0) { 3423 ar->timer.handler = nxt_router_app_timeout; 3424 ar->timer_data = rc; 3425 nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout); 3426 } 3427 } 3428 3429 if (b == NULL) { 3430 return; 3431 } 3432 3433 if (msg->buf == b) { 3434 /* Disable instant buffer completion/re-using by port. */ 3435 msg->buf = NULL; 3436 } 3437 3438 r = ar->request; 3439 3440 if (r->header_sent) { 3441 nxt_buf_chain_add(&r->out, b); 3442 nxt_http_request_send_body(task, r, NULL); 3443 3444 } else { 3445 size_t b_size = nxt_buf_mem_used_size(&b->mem); 3446 3447 if (nxt_slow_path(b_size < sizeof(*resp))) { 3448 goto fail; 3449 } 3450 3451 resp = (void *) b->mem.pos; 3452 if (nxt_slow_path(b_size < sizeof(*resp) 3453 + resp->fields_count * sizeof(nxt_unit_field_t))) { 3454 goto fail; 3455 } 3456 3457 nxt_unit_field_t *f; 3458 nxt_http_field_t *field; 3459 3460 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 3461 field = nxt_list_add(ar->resp_parser.fields); 3462 3463 if (nxt_slow_path(field == NULL)) { 3464 goto fail; 3465 } 3466 3467 field->hash = f->hash; 3468 field->skip = f->skip; 3469 3470 field->name_length = f->name_length; 3471 field->value_length = f->value_length; 3472 field->name = nxt_unit_sptr_get(&f->name); 3473 field->value = nxt_unit_sptr_get(&f->value); 3474 3475 nxt_debug(task, "header: %*s: %*s", 3476 (size_t) field->name_length, field->name, 3477 (size_t) field->value_length, field->value); 3478 } 3479 r->status = resp->status; 3480 3481 /* 3482 ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem); 3483 if (nxt_slow_path(ret != NXT_DONE)) { 3484 goto fail; 3485 } 3486 */ 3487 r->resp.fields = ar->resp_parser.fields; 3488 3489 ret = nxt_http_fields_process(r->resp.fields, 3490 &nxt_response_fields_hash, r); 3491 if (nxt_slow_path(ret != NXT_OK)) { 3492 goto fail; 3493 } 3494 3495 if (resp->piggyback_content_length != 0) { 3496 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 3497 b->mem.free = b->mem.pos + resp->piggyback_content_length; 3498 3499 } else { 3500 b->mem.pos = b->mem.free; 3501 } 3502 3503 if (nxt_buf_mem_used_size(&b->mem) == 0) { 3504 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3505 b->completion_handler, task, b, b->parent); 3506 3507 b = b->next; 3508 } 3509 3510 if (b != NULL) { 3511 nxt_buf_chain_add(&r->out, b); 3512 } 3513 3514 r->state = &nxt_http_request_send_state; 3515 3516 nxt_http_request_header_send(task, r); 3517 } 3518 3519 return; 3520 3521 fail: 3522 3523 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 3524 3525 nxt_router_rc_unlink(task, rc); 3526 } 3527 3528 3529 static const nxt_http_request_state_t nxt_http_request_send_state 3530 nxt_aligned(64) = 3531 { 3532 .ready_handler = nxt_http_request_send_body, 3533 .error_handler = nxt_http_request_error_handler, 3534 }; 3535 3536 3537 static void 3538 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 3539 { 3540 nxt_buf_t *out; 3541 nxt_http_request_t *r; 3542 3543 r = obj; 3544 3545 out = r->out; 3546 3547 if (out != NULL) { 3548 r->out = NULL; 3549 nxt_http_request_send(task, r, out); 3550 } 3551 } 3552 3553 3554 static void 3555 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3556 void *data) 3557 { 3558 nxt_int_t res; 3559 nxt_port_t *port; 3560 nxt_bool_t cancelled; 3561 nxt_req_app_link_t *ra; 3562 nxt_req_conn_link_t *rc; 3563 3564 rc = data; 3565 3566 ra = rc->ra; 3567 3568 if (ra != NULL) { 3569 cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); 3570 3571 if (cancelled) { 3572 nxt_router_ra_inc_use(ra); 3573 3574 res = nxt_router_app_port(task, rc->app, ra); 3575 3576 if (res == NXT_OK) { 3577 port = ra->app_port; 3578 3579 if (nxt_slow_path(port == NULL)) { 3580 nxt_log(task, NXT_LOG_ERR, "port is NULL in cancelled ra"); 3581 return; 3582 } 3583 3584 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc, 3585 port->pid); 3586 3587 nxt_router_app_prepare_request(task, ra); 3588 } 3589 3590 msg->port_msg.last = 0; 3591 3592 return; 3593 } 3594 } 3595 3596 if (rc->ap != NULL) { 3597 nxt_http_request_error(task, rc->ap->request, 3598 NXT_HTTP_SERVICE_UNAVAILABLE); 3599 } 3600 3601 nxt_router_rc_unlink(task, rc); 3602 } 3603 3604 3605 static void 3606 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3607 void *data) 3608 { 3609 nxt_app_t *app; 3610 nxt_port_t *port; 3611 nxt_app_joint_t *app_joint; 3612 3613 app_joint = data; 3614 port = msg->u.new_port; 3615 3616 nxt_assert(app_joint != NULL); 3617 nxt_assert(port != NULL); 3618 3619 app = app_joint->app; 3620 3621 nxt_router_app_joint_use(task, app_joint, -1); 3622 3623 if (nxt_slow_path(app == NULL)) { 3624 nxt_debug(task, "new port ready for released app, send QUIT"); 3625 3626 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 3627 3628 return; 3629 } 3630 3631 port->app = app; 3632 3633 nxt_thread_mutex_lock(&app->mutex); 3634 3635 nxt_assert(app->pending_processes != 0); 3636 3637 app->pending_processes--; 3638 app->processes++; 3639 3640 nxt_thread_mutex_unlock(&app->mutex); 3641 3642 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 3643 &app->name, port->pid, app->processes, app->pending_processes); 3644 3645 nxt_router_app_port_release(task, port, 0, 0); 3646 } 3647 3648 3649 static void 3650 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3651 void *data) 3652 { 3653 nxt_app_t *app; 3654 nxt_app_joint_t *app_joint; 3655 nxt_queue_link_t *lnk; 3656 nxt_req_app_link_t *ra; 3657 3658 app_joint = data; 3659 3660 nxt_assert(app_joint != NULL); 3661 3662 app = app_joint->app; 3663 3664 nxt_router_app_joint_use(task, app_joint, -1); 3665 3666 if (nxt_slow_path(app == NULL)) { 3667 nxt_debug(task, "start error for released app"); 3668 3669 return; 3670 } 3671 3672 nxt_debug(task, "app '%V' %p start error", &app->name, app); 3673 3674 nxt_thread_mutex_lock(&app->mutex); 3675 3676 nxt_assert(app->pending_processes != 0); 3677 3678 app->pending_processes--; 3679 3680 if (!nxt_queue_is_empty(&app->requests)) { 3681 lnk = nxt_queue_last(&app->requests); 3682 nxt_queue_remove(lnk); 3683 lnk->next = NULL; 3684 3685 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); 3686 3687 } else { 3688 ra = NULL; 3689 } 3690 3691 nxt_thread_mutex_unlock(&app->mutex); 3692 3693 if (ra != NULL) { 3694 nxt_debug(task, "app '%V' %p abort next stream #%uD", 3695 &app->name, app, ra->stream); 3696 3697 nxt_router_ra_error(ra, 500, "Failed to start application process"); 3698 nxt_router_ra_use(task, ra, -1); 3699 } 3700 } 3701 3702 nxt_inline nxt_port_t * 3703 nxt_router_app_get_port_for_quit(nxt_app_t *app); 3704 3705 void 3706 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 3707 { 3708 int c; 3709 3710 c = nxt_atomic_fetch_add(&app->use_count, i); 3711 3712 if (i < 0 && c == -i) { 3713 3714 if (task->thread->engine != app->engine) { 3715 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 3716 3717 } else { 3718 nxt_router_free_app(task, app->joint, NULL); 3719 } 3720 } 3721 } 3722 3723 3724 nxt_inline nxt_bool_t 3725 nxt_router_app_first_port_busy(nxt_app_t *app) 3726 { 3727 nxt_port_t *port; 3728 nxt_queue_link_t *lnk; 3729 3730 lnk = nxt_queue_first(&app->ports); 3731 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 3732 3733 return port->app_pending_responses > 0; 3734 } 3735 3736 3737 nxt_inline nxt_port_t * 3738 nxt_router_pop_first_port(nxt_app_t *app) 3739 { 3740 nxt_port_t *port; 3741 nxt_queue_link_t *lnk; 3742 3743 lnk = nxt_queue_first(&app->ports); 3744 nxt_queue_remove(lnk); 3745 3746 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 3747 3748 port->app_pending_responses++; 3749 3750 if (nxt_queue_chk_remove(&port->idle_link)) { 3751 app->idle_processes--; 3752 3753 if (port->idle_start == 0) { 3754 nxt_assert(app->idle_processes < app->spare_processes); 3755 3756 } else { 3757 nxt_assert(app->idle_processes >= app->spare_processes); 3758 3759 port->idle_start = 0; 3760 } 3761 } 3762 3763 if ((app->max_pending_responses == 0 3764 || port->app_pending_responses < app->max_pending_responses) 3765 && (app->max_requests == 0 3766 || port->app_responses + port->app_pending_responses 3767 < app->max_requests)) 3768 { 3769 nxt_queue_insert_tail(&app->ports, lnk); 3770 3771 nxt_port_inc_use(port); 3772 3773 } else { 3774 lnk->next = NULL; 3775 } 3776 3777 return port; 3778 } 3779 3780 3781 nxt_inline nxt_port_t * 3782 nxt_router_app_get_port_for_quit(nxt_app_t *app) 3783 { 3784 nxt_port_t *port; 3785 3786 port = NULL; 3787 3788 nxt_thread_mutex_lock(&app->mutex); 3789 3790 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 3791 3792 if (port->app_pending_responses > 0) { 3793 port = NULL; 3794 3795 continue; 3796 } 3797 3798 /* Caller is responsible to decrease port use count. */ 3799 nxt_queue_chk_remove(&port->app_link); 3800 3801 if (nxt_queue_chk_remove(&port->idle_link)) { 3802 app->idle_processes--; 3803 } 3804 3805 port->app = NULL; 3806 app->processes--; 3807 3808 break; 3809 3810 } nxt_queue_loop; 3811 3812 nxt_thread_mutex_unlock(&app->mutex); 3813 3814 return port; 3815 } 3816 3817 3818 static void 3819 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 3820 { 3821 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 3822 3823 nxt_queue_remove(&app->link); 3824 3825 nxt_router_app_use(task, app, -1); 3826 } 3827 3828 3829 static void 3830 nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) 3831 { 3832 nxt_req_app_link_t *ra; 3833 3834 ra = data; 3835 3836 #if (NXT_DEBUG) 3837 { 3838 nxt_app_t *app; 3839 3840 app = obj; 3841 3842 nxt_assert(app != NULL); 3843 nxt_assert(ra != NULL); 3844 nxt_assert(ra->app_port != NULL); 3845 3846 nxt_debug(task, "app '%V' %p process next stream #%uD", 3847 &app->name, app, ra->stream); 3848 } 3849 #endif 3850 3851 nxt_router_app_prepare_request(task, ra); 3852 } 3853 3854 3855 static void 3856 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 3857 uint32_t request_failed, uint32_t got_response) 3858 { 3859 nxt_app_t *app; 3860 nxt_bool_t port_unchained; 3861 nxt_bool_t send_quit, cancelled, adjust_idle_timer; 3862 nxt_queue_link_t *lnk; 3863 nxt_req_app_link_t *ra, *pending_ra, *re_ra; 3864 nxt_port_select_state_t state; 3865 3866 nxt_assert(port != NULL); 3867 nxt_assert(port->app != NULL); 3868 3869 ra = NULL; 3870 3871 app = port->app; 3872 3873 nxt_thread_mutex_lock(&app->mutex); 3874 3875 port->app_pending_responses -= request_failed + got_response; 3876 port->app_responses += got_response; 3877 3878 if (port->pair[1] != -1 3879 && (app->max_pending_responses == 0 3880 || port->app_pending_responses < app->max_pending_responses) 3881 && (app->max_requests == 0 3882 || port->app_responses + port->app_pending_responses 3883 < app->max_requests)) 3884 { 3885 if (port->app_link.next == NULL) { 3886 if (port->app_pending_responses > 0) { 3887 nxt_queue_insert_tail(&app->ports, &port->app_link); 3888 3889 } else { 3890 nxt_queue_insert_head(&app->ports, &port->app_link); 3891 } 3892 3893 nxt_port_inc_use(port); 3894 3895 } else { 3896 if (port->app_pending_responses == 0 3897 && nxt_queue_first(&app->ports) != &port->app_link) 3898 { 3899 nxt_queue_remove(&port->app_link); 3900 nxt_queue_insert_head(&app->ports, &port->app_link); 3901 } 3902 } 3903 } 3904 3905 if (!nxt_queue_is_empty(&app->ports) 3906 && !nxt_queue_is_empty(&app->requests)) 3907 { 3908 lnk = nxt_queue_first(&app->requests); 3909 nxt_queue_remove(lnk); 3910 lnk->next = NULL; 3911 3912 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); 3913 3914 ra->app_port = nxt_router_pop_first_port(app); 3915 3916 if (ra->app_port->app_pending_responses > 1) { 3917 nxt_router_ra_pending(task, app, ra); 3918 } 3919 } 3920 3921 /* Pop first pending request for this port. */ 3922 if ((request_failed > 0 || got_response > 0) 3923 && !nxt_queue_is_empty(&port->pending_requests)) 3924 { 3925 lnk = nxt_queue_first(&port->pending_requests); 3926 nxt_queue_remove(lnk); 3927 lnk->next = NULL; 3928 3929 pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, 3930 link_port_pending); 3931 3932 nxt_assert(pending_ra->link_app_pending.next != NULL); 3933 3934 nxt_queue_remove(&pending_ra->link_app_pending); 3935 pending_ra->link_app_pending.next = NULL; 3936 3937 } else { 3938 pending_ra = NULL; 3939 } 3940 3941 /* Try to cancel and re-schedule first stalled request for this app. */ 3942 if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { 3943 lnk = nxt_queue_first(&app->pending); 3944 3945 re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending); 3946 3947 if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { 3948 3949 nxt_debug(task, "app '%V' stalled request #%uD detected", 3950 &app->name, re_ra->stream); 3951 3952 cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info, 3953 re_ra->stream); 3954 3955 if (cancelled) { 3956 nxt_router_ra_inc_use(re_ra); 3957 3958 state.ra = re_ra; 3959 state.app = app; 3960 3961 nxt_router_port_select(task, &state); 3962 3963 goto re_ra_cancelled; 3964 } 3965 } 3966 } 3967 3968 re_ra = NULL; 3969 3970 re_ra_cancelled: 3971 3972 send_quit = (app->max_requests > 0 3973 && port->app_pending_responses == 0 3974 && port->app_responses >= app->max_requests); 3975 3976 if (send_quit) { 3977 port_unchained = nxt_queue_chk_remove(&port->app_link); 3978 3979 port->app = NULL; 3980 app->processes--; 3981 3982 } else { 3983 port_unchained = 0; 3984 } 3985 3986 adjust_idle_timer = 0; 3987 3988 if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) { 3989 nxt_assert(port->idle_link.next == NULL); 3990 3991 if (app->idle_processes == app->spare_processes 3992 && app->adjust_idle_work.data == NULL) 3993 { 3994 adjust_idle_timer = 1; 3995 app->adjust_idle_work.data = app; 3996 app->adjust_idle_work.next = NULL; 3997 } 3998 3999 if (app->idle_processes < app->spare_processes) { 4000 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 4001 4002 } else { 4003 nxt_queue_insert_tail(&app->idle_ports, &port->idle_link); 4004 4005 port->idle_start = task->thread->engine->timers.now; 4006 } 4007 4008 app->idle_processes++; 4009 } 4010 4011 nxt_thread_mutex_unlock(&app->mutex); 4012 4013 if (adjust_idle_timer) { 4014 nxt_router_app_use(task, app, 1); 4015 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4016 } 4017 4018 if (pending_ra != NULL) { 4019 nxt_router_ra_use(task, pending_ra, -1); 4020 } 4021 4022 if (re_ra != NULL) { 4023 if (nxt_router_port_post_select(task, &state) == NXT_OK) { 4024 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 4025 nxt_router_app_process_request, 4026 &task->thread->engine->task, app, re_ra); 4027 } 4028 } 4029 4030 if (ra != NULL) { 4031 nxt_router_ra_use(task, ra, -1); 4032 4033 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 4034 nxt_router_app_process_request, 4035 &task->thread->engine->task, app, ra); 4036 4037 goto adjust_use; 4038 } 4039 4040 /* ? */ 4041 if (port->pair[1] == -1) { 4042 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4043 &app->name, app, port, port->pid); 4044 4045 goto adjust_use; 4046 } 4047 4048 if (send_quit) { 4049 nxt_debug(task, "app '%V' %p send QUIT to port", 4050 &app->name, app); 4051 4052 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, 4053 -1, 0, 0, NULL); 4054 4055 if (port_unchained) { 4056 nxt_port_use(task, port, -1); 4057 } 4058 4059 goto adjust_use; 4060 } 4061 4062 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4063 &app->name, app); 4064 4065 adjust_use: 4066 4067 if (request_failed > 0 || got_response > 0) { 4068 nxt_port_use(task, port, -1); 4069 } 4070 } 4071 4072 4073 void 4074 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4075 { 4076 nxt_app_t *app; 4077 nxt_bool_t unchain, start_process; 4078 nxt_port_t *idle_port; 4079 nxt_queue_link_t *idle_lnk; 4080 4081 app = port->app; 4082 4083 nxt_assert(app != NULL); 4084 4085 nxt_thread_mutex_lock(&app->mutex); 4086 4087 unchain = nxt_queue_chk_remove(&port->app_link); 4088 4089 if (nxt_queue_chk_remove(&port->idle_link)) { 4090 app->idle_processes--; 4091 4092 if (port->idle_start == 0 4093 && app->idle_processes >= app->spare_processes) 4094 { 4095 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4096 4097 idle_lnk = nxt_queue_last(&app->idle_ports); 4098 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4099 nxt_queue_remove(idle_lnk); 4100 4101 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4102 4103 idle_port->idle_start = 0; 4104 } 4105 } 4106 4107 app->processes--; 4108 4109 start_process = !task->thread->engine->shutdown 4110 && nxt_router_app_can_start(app) 4111 && (!nxt_queue_is_empty(&app->requests) 4112 || nxt_router_app_need_start(app)); 4113 4114 if (start_process) { 4115 app->pending_processes++; 4116 } 4117 4118 nxt_thread_mutex_unlock(&app->mutex); 4119 4120 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 4121 4122 if (unchain) { 4123 nxt_port_use(task, port, -1); 4124 } 4125 4126 if (start_process) { 4127 nxt_router_start_app_process(task, app); 4128 } 4129 } 4130 4131 4132 static void 4133 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 4134 { 4135 nxt_app_t *app; 4136 nxt_bool_t queued; 4137 nxt_port_t *port; 4138 nxt_msec_t timeout, threshold; 4139 nxt_queue_link_t *lnk; 4140 nxt_event_engine_t *engine; 4141 4142 app = obj; 4143 queued = (data == app); 4144 4145 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 4146 &app->name, queued); 4147 4148 engine = task->thread->engine; 4149 4150 nxt_assert(app->engine == engine); 4151 4152 threshold = engine->timers.now + app->joint->idle_timer.bias; 4153 timeout = 0; 4154 4155 nxt_thread_mutex_lock(&app->mutex); 4156 4157 if (queued) { 4158 app->adjust_idle_work.data = NULL; 4159 } 4160 4161 while (app->idle_processes > app->spare_processes) { 4162 4163 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4164 4165 lnk = nxt_queue_first(&app->idle_ports); 4166 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 4167 4168 timeout = port->idle_start + app->idle_timeout; 4169 4170 if (timeout > threshold) { 4171 break; 4172 } 4173 4174 nxt_queue_remove(lnk); 4175 lnk->next = NULL; 4176 4177 nxt_queue_chk_remove(&port->app_link); 4178 4179 app->idle_processes--; 4180 app->processes--; 4181 port->app = NULL; 4182 4183 nxt_thread_mutex_unlock(&app->mutex); 4184 4185 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 4186 &app->name, port->pid); 4187 4188 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4189 4190 nxt_port_use(task, port, -1); 4191 4192 nxt_thread_mutex_lock(&app->mutex); 4193 } 4194 4195 nxt_thread_mutex_unlock(&app->mutex); 4196 4197 if (timeout > threshold) { 4198 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 4199 4200 } else { 4201 nxt_timer_disable(engine, &app->joint->idle_timer); 4202 } 4203 4204 if (queued) { 4205 nxt_router_app_use(task, app, -1); 4206 } 4207 } 4208 4209 4210 static void 4211 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 4212 { 4213 nxt_timer_t *timer; 4214 nxt_app_joint_t *app_joint; 4215 4216 timer = obj; 4217 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4218 4219 if (nxt_fast_path(app_joint->app != NULL)) { 4220 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 4221 } 4222 } 4223 4224 4225 static void 4226 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 4227 { 4228 nxt_timer_t *timer; 4229 nxt_app_joint_t *app_joint; 4230 4231 timer = obj; 4232 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4233 4234 nxt_router_app_joint_use(task, app_joint, -1); 4235 } 4236 4237 4238 static void 4239 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 4240 { 4241 nxt_app_t *app; 4242 nxt_port_t *port; 4243 nxt_app_joint_t *app_joint; 4244 4245 app_joint = obj; 4246 app = app_joint->app; 4247 4248 for ( ;; ) { 4249 port = nxt_router_app_get_port_for_quit(app); 4250 if (port == NULL) { 4251 break; 4252 } 4253 4254 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); 4255 4256 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4257 4258 nxt_port_use(task, port, -1); 4259 } 4260 4261 nxt_assert(app->processes == 0); 4262 nxt_assert(app->idle_processes == 0); 4263 nxt_assert(nxt_queue_is_empty(&app->requests)); 4264 nxt_assert(nxt_queue_is_empty(&app->ports)); 4265 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 4266 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 4267 4268 nxt_thread_mutex_destroy(&app->mutex); 4269 nxt_free(app); 4270 4271 app_joint->app = NULL; 4272 4273 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 4274 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 4275 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 4276 4277 } else { 4278 nxt_router_app_joint_use(task, app_joint, -1); 4279 } 4280 } 4281 4282 4283 static void 4284 nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) 4285 { 4286 nxt_app_t *app; 4287 nxt_bool_t can_start_process; 4288 nxt_req_app_link_t *ra; 4289 4290 ra = state->ra; 4291 app = state->app; 4292 4293 state->failed_port_use_delta = 0; 4294 4295 if (nxt_queue_chk_remove(&ra->link_app_requests)) 4296 { 4297 nxt_router_ra_dec_use(ra); 4298 } 4299 4300 if (nxt_queue_chk_remove(&ra->link_port_pending)) 4301 { 4302 nxt_assert(ra->link_app_pending.next != NULL); 4303 4304 nxt_queue_remove(&ra->link_app_pending); 4305 ra->link_app_pending.next = NULL; 4306 4307 nxt_router_ra_dec_use(ra); 4308 } 4309 4310 state->failed_port = ra->app_port; 4311 4312 if (ra->app_port != NULL) { 4313 state->failed_port_use_delta--; 4314 4315 state->failed_port->app_pending_responses--; 4316 4317 if (nxt_queue_chk_remove(&state->failed_port->app_link)) { 4318 state->failed_port_use_delta--; 4319 } 4320 4321 ra->app_port = NULL; 4322 } 4323 4324 can_start_process = nxt_router_app_can_start(app); 4325 4326 state->port = NULL; 4327 state->start_process = 0; 4328 4329 if (nxt_queue_is_empty(&app->ports) 4330 || (can_start_process && nxt_router_app_first_port_busy(app)) ) 4331 { 4332 ra = nxt_router_ra_create(task, ra); 4333 4334 if (nxt_slow_path(ra == NULL)) { 4335 goto fail; 4336 } 4337 4338 if (nxt_slow_path(state->failed_port != NULL)) { 4339 nxt_queue_insert_head(&app->requests, &ra->link_app_requests); 4340 4341 } else { 4342 nxt_queue_insert_tail(&app->requests, &ra->link_app_requests); 4343 } 4344 4345 nxt_router_ra_inc_use(ra); 4346 4347 nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream); 4348 4349 if (can_start_process) { 4350 app->pending_processes++; 4351 state->start_process = 1; 4352 } 4353 4354 } else { 4355 state->port = nxt_router_pop_first_port(app); 4356 4357 if (state->port->app_pending_responses > 1) { 4358 ra = nxt_router_ra_create(task, ra); 4359 4360 if (nxt_slow_path(ra == NULL)) { 4361 goto fail; 4362 } 4363 4364 ra->app_port = state->port; 4365 4366 nxt_router_ra_pending(task, app, ra); 4367 } 4368 4369 if (can_start_process && nxt_router_app_need_start(app)) { 4370 app->pending_processes++; 4371 state->start_process = 1; 4372 } 4373 } 4374 4375 fail: 4376 4377 state->shared_ra = ra; 4378 } 4379 4380 4381 static nxt_int_t 4382 nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) 4383 { 4384 nxt_int_t res; 4385 nxt_app_t *app; 4386 nxt_req_app_link_t *ra; 4387 4388 ra = state->shared_ra; 4389 app = state->app; 4390 4391 if (state->failed_port_use_delta != 0) { 4392 nxt_port_use(task, state->failed_port, state->failed_port_use_delta); 4393 } 4394 4395 if (nxt_slow_path(ra == NULL)) { 4396 if (state->port != NULL) { 4397 nxt_port_use(task, state->port, -1); 4398 } 4399 4400 nxt_router_ra_error(state->ra, 500, 4401 "Failed to allocate shared req<->app link"); 4402 nxt_router_ra_use(task, state->ra, -1); 4403 4404 return NXT_ERROR; 4405 } 4406 4407 if (state->port != NULL) { 4408 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); 4409 4410 ra->app_port = state->port; 4411 4412 if (state->start_process) { 4413 nxt_router_start_app_process(task, app); 4414 } 4415 4416 return NXT_OK; 4417 } 4418 4419 if (!state->start_process) { 4420 nxt_debug(task, "app '%V' %p too many running or pending processes", 4421 &app->name, app); 4422 4423 return NXT_AGAIN; 4424 } 4425 4426 res = nxt_router_start_app_process(task, app); 4427 4428 if (nxt_slow_path(res != NXT_OK)) { 4429 nxt_router_ra_error(ra, 500, "Failed to start app process"); 4430 nxt_router_ra_use(task, ra, -1); 4431 4432 return NXT_ERROR; 4433 } 4434 4435 return NXT_AGAIN; 4436 } 4437 4438 4439 static nxt_int_t 4440 nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) 4441 { 4442 nxt_port_select_state_t state; 4443 4444 state.ra = ra; 4445 state.app = app; 4446 4447 nxt_thread_mutex_lock(&app->mutex); 4448 4449 nxt_router_port_select(task, &state); 4450 4451 nxt_thread_mutex_unlock(&app->mutex); 4452 4453 return nxt_router_port_post_select(task, &state); 4454 } 4455 4456 4457 void 4458 nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar, 4459 nxt_app_t *app) 4460 { 4461 nxt_int_t res; 4462 nxt_port_t *port; 4463 nxt_event_engine_t *engine; 4464 nxt_http_request_t *r; 4465 nxt_req_app_link_t ra_local, *ra; 4466 nxt_req_conn_link_t *rc; 4467 4468 r = ar->request; 4469 engine = task->thread->engine; 4470 4471 rc = nxt_port_rpc_register_handler_ex(task, engine->port, 4472 nxt_router_response_ready_handler, 4473 nxt_router_response_error_handler, 4474 sizeof(nxt_req_conn_link_t)); 4475 4476 if (nxt_slow_path(rc == NULL)) { 4477 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4478 return; 4479 } 4480 4481 rc->stream = nxt_port_rpc_ex_stream(rc); 4482 rc->app = app; 4483 4484 nxt_router_app_use(task, app, 1); 4485 4486 rc->ap = ar; 4487 4488 ra = &ra_local; 4489 nxt_router_ra_init(task, ra, rc); 4490 4491 res = nxt_router_app_port(task, app, ra); 4492 4493 if (res != NXT_OK) { 4494 return; 4495 } 4496 4497 ra = rc->ra; 4498 port = ra->app_port; 4499 4500 nxt_assert(port != NULL); 4501 4502 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); 4503 4504 nxt_router_app_prepare_request(task, ra); 4505 } 4506 4507 4508 static void 4509 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) 4510 { 4511 } 4512 4513 4514 static void 4515 nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) 4516 { 4517 uint32_t request_failed; 4518 nxt_buf_t *buf; 4519 nxt_int_t res; 4520 nxt_port_t *port, *c_port, *reply_port; 4521 nxt_app_parse_ctx_t *ap; 4522 4523 nxt_assert(ra->app_port != NULL); 4524 4525 port = ra->app_port; 4526 reply_port = ra->reply_port; 4527 ap = ra->ap; 4528 4529 request_failed = 1; 4530 4531 c_port = nxt_process_connected_port_find(port->process, reply_port->pid, 4532 reply_port->id); 4533 if (nxt_slow_path(c_port != reply_port)) { 4534 res = nxt_port_send_port(task, port, reply_port, 0); 4535 4536 if (nxt_slow_path(res != NXT_OK)) { 4537 nxt_router_ra_error(ra, 500, 4538 "Failed to send reply port to application"); 4539 goto release_port; 4540 } 4541 4542 nxt_process_connected_port_add(port->process, reply_port); 4543 } 4544 4545 buf = nxt_router_prepare_msg(task, &ap->r, port, 4546 nxt_app_msg_prefix[port->app->type]); 4547 4548 if (nxt_slow_path(buf == NULL)) { 4549 nxt_router_ra_error(ra, 500, 4550 "Failed to prepare message for application"); 4551 goto release_port; 4552 } 4553 4554 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 4555 nxt_buf_used_size(buf), 4556 port->socket.fd); 4557 4558 request_failed = 0; 4559 4560 ra->msg_info.buf = buf; 4561 ra->msg_info.completion_handler = buf->completion_handler; 4562 4563 for (; buf; buf = buf->next) { 4564 buf->completion_handler = nxt_router_dummy_buf_completion; 4565 } 4566 4567 buf = ra->msg_info.buf; 4568 4569 res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking, 4570 ra->stream); 4571 if (nxt_slow_path(res != NXT_OK)) { 4572 nxt_router_ra_error(ra, 500, 4573 "Failed to get tracking area"); 4574 goto release_port; 4575 } 4576 4577 res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA, 4578 -1, ra->stream, reply_port->id, buf, 4579 &ra->msg_info.tracking); 4580 4581 if (nxt_slow_path(res != NXT_OK)) { 4582 nxt_router_ra_error(ra, 500, 4583 "Failed to send message to application"); 4584 goto release_port; 4585 } 4586 4587 release_port: 4588 4589 nxt_router_app_port_release(task, port, request_failed, 0); 4590 4591 nxt_router_ra_update_peer(task, ra); 4592 } 4593 4594 4595 struct nxt_fields_iter_s { 4596 nxt_list_part_t *part; 4597 nxt_http_field_t *field; 4598 }; 4599 4600 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 4601 4602 4603 static nxt_http_field_t * 4604 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 4605 { 4606 if (part == NULL) { 4607 return NULL; 4608 } 4609 4610 while (part->nelts == 0) { 4611 part = part->next; 4612 if (part == NULL) { 4613 return NULL; 4614 } 4615 } 4616 4617 i->part = part; 4618 i->field = nxt_list_data(i->part); 4619 4620 return i->field; 4621 } 4622 4623 4624 static nxt_http_field_t * 4625 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 4626 { 4627 return nxt_fields_part_first(nxt_list_part(fields), i); 4628 } 4629 4630 4631 static nxt_http_field_t * 4632 nxt_fields_next(nxt_fields_iter_t *i) 4633 { 4634 nxt_http_field_t *end = nxt_list_data(i->part); 4635 4636 end += i->part->nelts; 4637 i->field++; 4638 4639 if (i->field < end) { 4640 return i->field; 4641 } 4642 4643 return nxt_fields_part_first(i->part->next, i); 4644 } 4645 4646 4647 static nxt_buf_t * 4648 nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 4649 nxt_port_t *port, const nxt_str_t *prefix) 4650 { 4651 void *target_pos, *query_pos; 4652 u_char *pos, *end, *p, c; 4653 size_t fields_count, req_size, size, free_size; 4654 size_t copy_size; 4655 nxt_buf_t *b, *buf, *out, **tail; 4656 nxt_http_field_t *field, *dup; 4657 nxt_unit_field_t *dst_field; 4658 nxt_fields_iter_t iter, dup_iter; 4659 nxt_unit_request_t *req; 4660 nxt_app_request_header_t *h; 4661 4662 h = &r->header; 4663 4664 req_size = sizeof(nxt_unit_request_t) 4665 + h->method.length + 1 4666 + h->version.length + 1 4667 + r->remote.length + 1 4668 + r->local.length + 1 4669 + h->server_name.length + 1 4670 + h->target.length + 1 4671 + (h->path.start != h->target.start ? h->path.length + 1 : 0); 4672 4673 fields_count = 0; 4674 4675 nxt_list_each(field, h->fields) { 4676 fields_count++; 4677 4678 req_size += field->name_length + prefix->length + 1 4679 + field->value_length + 1; 4680 } nxt_list_loop; 4681 4682 req_size += fields_count * sizeof(nxt_unit_field_t); 4683 4684 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 4685 nxt_alert(task, "headers to big to fit in shared memory (%d)", 4686 (int) req_size); 4687 4688 return NULL; 4689 } 4690 4691 out = nxt_port_mmap_get_buf(task, port, 4692 nxt_min(req_size + r->body.preread_size, PORT_MMAP_DATA_SIZE)); 4693 if (nxt_slow_path(out == NULL)) { 4694 return NULL; 4695 } 4696 4697 req = (nxt_unit_request_t *) out->mem.free; 4698 out->mem.free += req_size; 4699 4700 req->content_length = h->parsed_content_length; 4701 4702 p = (u_char *) (req->fields + fields_count); 4703 4704 nxt_debug(task, "fields_count=%d", (int) fields_count); 4705 4706 req->method_length = h->method.length; 4707 nxt_unit_sptr_set(&req->method, p); 4708 p = nxt_cpymem(p, h->method.start, h->method.length); 4709 *p++ = '\0'; 4710 4711 req->version_length = h->version.length; 4712 nxt_unit_sptr_set(&req->version, p); 4713 p = nxt_cpymem(p, h->version.start, h->version.length); 4714 *p++ = '\0'; 4715 4716 req->remote_length = r->remote.length; 4717 nxt_unit_sptr_set(&req->remote, p); 4718 p = nxt_cpymem(p, r->remote.start, r->remote.length); 4719 *p++ = '\0'; 4720 4721 req->local_length = r->local.length; 4722 nxt_unit_sptr_set(&req->local, p); 4723 p = nxt_cpymem(p, r->local.start, r->local.length); 4724 *p++ = '\0'; 4725 4726 req->server_name_length = h->server_name.length; 4727 nxt_unit_sptr_set(&req->server_name, p); 4728 p = nxt_cpymem(p, h->server_name.start, h->server_name.length); 4729 *p++ = '\0'; 4730 4731 target_pos = p; 4732 req->target_length = h->target.length; 4733 nxt_unit_sptr_set(&req->target, p); 4734 p = nxt_cpymem(p, h->target.start, h->target.length); 4735 *p++ = '\0'; 4736 4737 req->path_length = h->path.length; 4738 if (h->path.start == h->target.start) { 4739 nxt_unit_sptr_set(&req->path, target_pos); 4740 4741 } else { 4742 nxt_unit_sptr_set(&req->path, p); 4743 p = nxt_cpymem(p, h->path.start, h->path.length); 4744 *p++ = '\0'; 4745 } 4746 4747 req->query_length = h->query.length; 4748 if (h->query.start != NULL) { 4749 query_pos = nxt_pointer_to(target_pos, 4750 h->query.start - h->target.start); 4751 4752 nxt_unit_sptr_set(&req->query, query_pos); 4753 4754 } else { 4755 req->query.offset = 0; 4756 } 4757 4758 req->content_length_field = NXT_UNIT_NONE_FIELD; 4759 req->content_type_field = NXT_UNIT_NONE_FIELD; 4760 req->cookie_field = NXT_UNIT_NONE_FIELD; 4761 4762 dst_field = req->fields; 4763 4764 for (field = nxt_fields_first(h->fields, &iter); 4765 field != NULL; 4766 field = nxt_fields_next(&iter)) 4767 { 4768 if (field->skip) { 4769 continue; 4770 } 4771 4772 dst_field->hash = field->hash; 4773 dst_field->skip = 0; 4774 dst_field->name_length = field->name_length + prefix->length; 4775 dst_field->value_length = field->value_length; 4776 4777 if (field->value == h->content_length.start) { 4778 req->content_length_field = dst_field - req->fields; 4779 4780 } else if (field->value == h->content_type.start) { 4781 req->content_type_field = dst_field - req->fields; 4782 4783 } else if (field->value == h->cookie.start) { 4784 req->cookie_field = dst_field - req->fields; 4785 } 4786 4787 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 4788 (int) field->hash, (int) field->skip, 4789 (int) field->name_length, field->name, 4790 (int) field->value_length, field->value); 4791 4792 if (prefix->length != 0) { 4793 nxt_unit_sptr_set(&dst_field->name, p); 4794 p = nxt_cpymem(p, prefix->start, prefix->length); 4795 4796 end = field->name + field->name_length; 4797 for (pos = field->name; pos < end; pos++) { 4798 c = *pos; 4799 4800 if (c >= 'a' && c <= 'z') { 4801 *p++ = (c & ~0x20); 4802 continue; 4803 } 4804 4805 if (c == '-') { 4806 *p++ = '_'; 4807 continue; 4808 } 4809 4810 *p++ = c; 4811 } 4812 4813 } else { 4814 nxt_unit_sptr_set(&dst_field->name, p); 4815 p = nxt_cpymem(p, field->name, field->name_length); 4816 } 4817 4818 *p++ = '\0'; 4819 4820 nxt_unit_sptr_set(&dst_field->value, p); 4821 p = nxt_cpymem(p, field->value, field->value_length); 4822 4823 if (prefix->length != 0) { 4824 dup_iter = iter; 4825 4826 for (dup = nxt_fields_next(&dup_iter); 4827 dup != NULL; 4828 dup = nxt_fields_next(&dup_iter)) 4829 { 4830 if (dup->name_length != field->name_length 4831 || dup->skip 4832 || dup->hash != field->hash 4833 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 4834 { 4835 continue; 4836 } 4837 4838 p = nxt_cpymem(p, ", ", 2); 4839 p = nxt_cpymem(p, dup->value, dup->value_length); 4840 4841 dst_field->value_length += 2 + dup->value_length; 4842 4843 dup->skip = 1; 4844 } 4845 } 4846 4847 *p++ = '\0'; 4848 4849 dst_field++; 4850 } 4851 4852 req->fields_count = dst_field - req->fields; 4853 4854 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 4855 4856 buf = out; 4857 tail = &buf->next; 4858 4859 for (b = r->body.buf; b != NULL; b = b->next) { 4860 size = nxt_buf_mem_used_size(&b->mem); 4861 pos = b->mem.pos; 4862 4863 while (size > 0) { 4864 if (buf == NULL) { 4865 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 4866 4867 buf = nxt_port_mmap_get_buf(task, port, free_size); 4868 if (nxt_slow_path(buf == NULL)) { 4869 while (out != NULL) { 4870 buf = out->next; 4871 out->completion_handler(task, out, out->parent); 4872 out = buf; 4873 } 4874 return NULL; 4875 } 4876 4877 *tail = buf; 4878 tail = &buf->next; 4879 4880 } else { 4881 free_size = nxt_buf_mem_free_size(&buf->mem); 4882 if (free_size < size 4883 && nxt_port_mmap_increase_buf(task, buf, size, 1) 4884 == NXT_OK) 4885 { 4886 free_size = nxt_buf_mem_free_size(&buf->mem); 4887 } 4888 } 4889 4890 if (free_size > 0) { 4891 copy_size = nxt_min(free_size, size); 4892 4893 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 4894 4895 size -= copy_size; 4896 pos += copy_size; 4897 4898 if (size == 0) { 4899 break; 4900 } 4901 } 4902 4903 buf = NULL; 4904 } 4905 } 4906 4907 return out; 4908 } 4909 4910 4911 static void 4912 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 4913 { 4914 nxt_app_t *app; 4915 nxt_bool_t cancelled, unlinked; 4916 nxt_port_t *port; 4917 nxt_timer_t *timer; 4918 nxt_queue_link_t *lnk; 4919 nxt_req_app_link_t *pending_ra; 4920 nxt_app_parse_ctx_t *ar; 4921 nxt_req_conn_link_t *rc; 4922 nxt_port_select_state_t state; 4923 4924 timer = obj; 4925 4926 nxt_debug(task, "router app timeout"); 4927 4928 ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer); 4929 rc = ar->timer_data; 4930 app = rc->app; 4931 4932 if (app == NULL) { 4933 goto generate_error; 4934 } 4935 4936 port = NULL; 4937 pending_ra = NULL; 4938 4939 if (rc->app_port != NULL) { 4940 port = rc->app_port; 4941 rc->app_port = NULL; 4942 } 4943 4944 if (port == NULL && rc->ra != NULL && rc->ra->app_port != NULL) { 4945 port = rc->ra->app_port; 4946 rc->ra->app_port = NULL; 4947 } 4948 4949 if (port == NULL) { 4950 goto generate_error; 4951 } 4952 4953 nxt_thread_mutex_lock(&app->mutex); 4954 4955 unlinked = nxt_queue_chk_remove(&port->app_link); 4956 4957 if (!nxt_queue_is_empty(&port->pending_requests)) { 4958 lnk = nxt_queue_first(&port->pending_requests); 4959 4960 pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, 4961 link_port_pending); 4962 4963 nxt_assert(pending_ra->link_app_pending.next != NULL); 4964 4965 nxt_debug(task, "app '%V' pending request #%uD found", 4966 &app->name, pending_ra->stream); 4967 4968 cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info, 4969 pending_ra->stream); 4970 4971 if (cancelled) { 4972 nxt_router_ra_inc_use(pending_ra); 4973 4974 state.ra = pending_ra; 4975 state.app = app; 4976 4977 nxt_router_port_select(task, &state); 4978 4979 } else { 4980 pending_ra = NULL; 4981 } 4982 } 4983 4984 nxt_thread_mutex_unlock(&app->mutex); 4985 4986 if (pending_ra != NULL 4987 && nxt_router_port_post_select(task, &state) == NXT_OK) 4988 { 4989 nxt_router_app_prepare_request(task, pending_ra); 4990 } 4991 4992 nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid); 4993 4994 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4995 4996 nxt_port_use(task, port, unlinked ? -2 : -1); 4997 4998 generate_error: 4999 5000 nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE); 5001 5002 nxt_router_rc_unlink(task, rc); 5003 } 5004