1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> 9#include <nxt_conf.h> 10#if (NXT_TLS) 11#include <nxt_cert.h> 12#endif 13#include <nxt_http.h> 14#include <nxt_port_memory_int.h> 15#include <nxt_unit_request.h> 16#include <nxt_unit_response.h> 17#include <nxt_router_request.h> 18 19typedef struct { 20 nxt_str_t type; 21 uint32_t processes; 22 uint32_t max_processes; 23 uint32_t spare_processes; 24 nxt_msec_t timeout; 25 nxt_msec_t res_timeout; 26 nxt_msec_t idle_timeout; 27 uint32_t requests; 28 nxt_conf_value_t *limits_value; 29 nxt_conf_value_t *processes_value; 30} nxt_router_app_conf_t; 31 32 33typedef struct { 34 nxt_str_t pass; 35 nxt_str_t application; 36} nxt_router_listener_conf_t; 37 38 39#if (NXT_TLS) 40 41typedef struct { 42 nxt_str_t name; 43 nxt_socket_conf_t *conf; 44 45 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 46} nxt_router_tlssock_t; 47 48#endif 49 50 51typedef struct { 52 nxt_socket_conf_t *socket_conf; 53 nxt_router_temp_conf_t *temp_conf; 54} nxt_socket_rpc_t; 55 56 57typedef struct { 58 nxt_app_t *app; 59 nxt_router_temp_conf_t *temp_conf; 60} nxt_app_rpc_t; 61 62 63struct nxt_port_select_state_s { 64 nxt_app_t *app; 65 nxt_request_app_link_t *req_app_link; 66 67 nxt_port_t *failed_port; 68 int failed_port_use_delta; 69 70 uint8_t start_process; /* 1 bit */ 71 nxt_request_app_link_t *shared_ra; 72 nxt_port_t *port; 73}; 74 75typedef struct nxt_port_select_state_s nxt_port_select_state_t; 76 77static void nxt_router_greet_controller(nxt_task_t *task, 78 nxt_port_t *controller_port); 79 80static void nxt_router_port_select(nxt_task_t *task, 81 nxt_port_select_state_t *state); 82 83static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, 84 nxt_port_select_state_t *state); 85 86static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 87static void nxt_request_app_link_update_peer(nxt_task_t *task, 88 nxt_request_app_link_t *req_app_link); 89 90 91nxt_inline void 92nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) 93{ 94 nxt_atomic_fetch_add(&req_app_link->use_count, 1); 95} 96 97nxt_inline void 98nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i) 99{ 100#if (NXT_DEBUG) 101 int c; 102 103 c = nxt_atomic_fetch_add(&req_app_link->use_count, i); 104 105 nxt_assert((c + i) > 0); 106#else 107 (void) nxt_atomic_fetch_add(&req_app_link->use_count, i); 108#endif 109} 110 111static void nxt_request_app_link_use(nxt_task_t *task, 112 nxt_request_app_link_t *req_app_link, int i); 113 114static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 115static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 116static void nxt_router_conf_ready(nxt_task_t *task, 117 nxt_router_temp_conf_t *tmcf); 118static void nxt_router_conf_error(nxt_task_t *task, 119 nxt_router_temp_conf_t *tmcf); 120static void nxt_router_conf_send(nxt_task_t *task, 121 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 122 123static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 124 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 125static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 126 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 127static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 128static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 129 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 130static void nxt_router_listen_socket_ready(nxt_task_t *task, 131 nxt_port_recv_msg_t *msg, void *data); 132static void nxt_router_listen_socket_error(nxt_task_t *task, 133 nxt_port_recv_msg_t *msg, void *data); 134#if (NXT_TLS) 135static void nxt_router_tls_rpc_create(nxt_task_t *task, 136 nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls); 137static void nxt_router_tls_rpc_handler(nxt_task_t *task, 138 nxt_port_recv_msg_t *msg, void *data); 139#endif 140static void nxt_router_app_rpc_create(nxt_task_t *task, 141 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 142static void nxt_router_app_prefork_ready(nxt_task_t *task, 143 nxt_port_recv_msg_t *msg, void *data); 144static void nxt_router_app_prefork_error(nxt_task_t *task, 145 nxt_port_recv_msg_t *msg, void *data); 146static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 147 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 148static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 149 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 150 151static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 152 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 153 const nxt_event_interface_t *interface); 154static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 155 nxt_router_engine_conf_t *recf); 156static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 157 nxt_router_engine_conf_t *recf); 158static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 159 nxt_router_engine_conf_t *recf); 160static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 161 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 162 nxt_work_handler_t handler); 163static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 164 nxt_router_engine_conf_t *recf); 165static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 166 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 167 168static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 169 nxt_router_temp_conf_t *tmcf); 170static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 171 nxt_event_engine_t *engine); 172static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 173 nxt_router_temp_conf_t *tmcf); 174 175static void nxt_router_engines_post(nxt_router_t *router, 176 nxt_router_temp_conf_t *tmcf); 177static void nxt_router_engine_post(nxt_event_engine_t *engine, 178 nxt_work_t *jobs); 179 180static void nxt_router_thread_start(void *data); 181static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 182 void *data); 183static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 184 void *data); 185static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 186 void *data); 187static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 188 void *data); 189static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 190 void *data); 191static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 192 void *data); 193static void nxt_router_listen_socket_release(nxt_task_t *task, 194 nxt_socket_conf_t *skcf); 195 196static void nxt_router_access_log_writer(nxt_task_t *task, 197 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 198static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 199 struct tm *tm, size_t size, const char *format); 200static void nxt_router_access_log_open(nxt_task_t *task, 201 nxt_router_temp_conf_t *tmcf); 202static void nxt_router_access_log_ready(nxt_task_t *task, 203 nxt_port_recv_msg_t *msg, void *data); 204static void nxt_router_access_log_error(nxt_task_t *task, 205 nxt_port_recv_msg_t *msg, void *data); 206static void nxt_router_access_log_release(nxt_task_t *task, 207 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 208static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 209 void *data); 210static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 211 nxt_port_recv_msg_t *msg, void *data); 212static void nxt_router_access_log_reopen_error(nxt_task_t *task, 213 nxt_port_recv_msg_t *msg, void *data); 214 215static void nxt_router_app_port_ready(nxt_task_t *task, 216 nxt_port_recv_msg_t *msg, void *data); 217static void nxt_router_app_port_error(nxt_task_t *task, 218 nxt_port_recv_msg_t *msg, void *data); 219 220static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 221 222static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 223 nxt_apr_action_t action); 224static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, 225 nxt_request_app_link_t *req_app_link); 226 227static void nxt_router_app_prepare_request(nxt_task_t *task, 228 nxt_request_app_link_t *req_app_link); 229static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 230 nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix); 231 232static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 233static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 234 void *data); 235static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 236 void *data); 237static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 238 void *data); 239static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 240 241static const nxt_http_request_state_t nxt_http_request_send_state; 242static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 243 244static void nxt_router_app_joint_use(nxt_task_t *task, 245 nxt_app_joint_t *app_joint, int i); 246 247static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, 248 nxt_http_request_t *r); 249static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 250 void *data); 251static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 252 253extern const nxt_http_request_state_t nxt_http_websocket; 254 255static nxt_router_t *nxt_router; 256 257static const nxt_str_t http_prefix = nxt_string("HTTP_"); 258static const nxt_str_t empty_prefix = nxt_string(""); 259 260static const nxt_str_t *nxt_app_msg_prefix[] = { 261 &empty_prefix, 262 &http_prefix, 263 &http_prefix, 264 &http_prefix, 265 &http_prefix, 266 &empty_prefix, 267}; 268 269 270nxt_port_handlers_t nxt_router_process_port_handlers = { 271 .quit = nxt_worker_process_quit_handler, 272 .new_port = nxt_router_new_port_handler, 273 .change_file = nxt_port_change_log_file_handler, 274 .mmap = nxt_port_mmap_handler, 275 .data = nxt_router_conf_data_handler, 276 .remove_pid = nxt_router_remove_pid_handler, 277 .access_log = nxt_router_access_log_reopen_handler, 278 .rpc_ready = nxt_port_rpc_handler, 279 .rpc_error = nxt_port_rpc_handler, 280 .oosm = nxt_router_oosm_handler, 281}; 282 283 284nxt_int_t 285nxt_router_start(nxt_task_t *task, void *data) 286{ 287 nxt_int_t ret; 288 nxt_port_t *controller_port; 289 nxt_router_t *router; 290 nxt_runtime_t *rt; 291 292 rt = task->thread->runtime; 293 294#if (NXT_TLS) 295 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 296 if (nxt_slow_path(rt->tls == NULL)) { 297 return NXT_ERROR; 298 } 299 300 ret = rt->tls->library_init(task); 301 if (nxt_slow_path(ret != NXT_OK)) { 302 return ret; 303 } 304#endif 305 306 ret = nxt_http_init(task, rt); 307 if (nxt_slow_path(ret != NXT_OK)) { 308 return ret; 309 } 310 311 router = nxt_zalloc(sizeof(nxt_router_t)); 312 if (nxt_slow_path(router == NULL)) { 313 return NXT_ERROR; 314 } 315 316 nxt_queue_init(&router->engines); 317 nxt_queue_init(&router->sockets); 318 nxt_queue_init(&router->apps); 319 320 nxt_router = router; 321 322 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 323 if (controller_port != NULL) { 324 nxt_router_greet_controller(task, controller_port); 325 } 326 327 return NXT_OK; 328} 329 330 331static void 332nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 333{ 334 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 335 -1, 0, 0, NULL); 336} 337 338 339static void 340nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 341 void *data) 342{ 343 size_t size; 344 uint32_t stream; 345 nxt_mp_t *mp; 346 nxt_int_t ret; 347 nxt_app_t *app; 348 nxt_buf_t *b; 349 nxt_port_t *main_port; 350 nxt_runtime_t *rt; 351 352 app = data; 353 354 rt = task->thread->runtime; 355 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 356 357 nxt_debug(task, "app '%V' %p start process", &app->name, app); 358 359 size = app->name.length + 1 + app->conf.length; 360 361 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 362 363 if (nxt_slow_path(b == NULL)) { 364 goto failed; 365 } 366 367 nxt_buf_cpystr(b, &app->name); 368 *b->mem.free++ = '\0'; 369 nxt_buf_cpystr(b, &app->conf); 370 371 nxt_router_app_joint_use(task, app->joint, 1); 372 373 stream = nxt_port_rpc_register_handler(task, port, 374 nxt_router_app_port_ready, 375 nxt_router_app_port_error, 376 -1, app->joint); 377 378 if (nxt_slow_path(stream == 0)) { 379 nxt_router_app_joint_use(task, app->joint, -1); 380 381 goto failed; 382 } 383 384 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 385 stream, port->id, b); 386 387 if (nxt_slow_path(ret != NXT_OK)) { 388 nxt_port_rpc_cancel(task, port, stream); 389 390 nxt_router_app_joint_use(task, app->joint, -1); 391 392 goto failed; 393 } 394 395 nxt_router_app_use(task, app, -1); 396 397 return; 398 399failed: 400 401 if (b != NULL) { 402 mp = b->data; 403 nxt_mp_free(mp, b); 404 nxt_mp_release(mp); 405 } 406 407 nxt_thread_mutex_lock(&app->mutex); 408 409 app->pending_processes--; 410 411 nxt_thread_mutex_unlock(&app->mutex); 412 413 nxt_router_app_use(task, app, -1); 414} 415 416 417static void 418nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 419{ 420 app_joint->use_count += i; 421 422 if (app_joint->use_count == 0) { 423 nxt_assert(app_joint->app == NULL); 424 425 nxt_free(app_joint); 426 } 427} 428 429 430static nxt_int_t 431nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 432{ 433 nxt_int_t res; 434 nxt_port_t *router_port; 435 nxt_runtime_t *rt; 436 437 rt = task->thread->runtime; 438 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 439 440 nxt_router_app_use(task, app, 1); 441 442 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 443 app); 444 445 if (res == NXT_OK) { 446 return res; 447 } 448 449 nxt_thread_mutex_lock(&app->mutex); 450 451 app->pending_processes--; 452 453 nxt_thread_mutex_unlock(&app->mutex); 454 455 nxt_router_app_use(task, app, -1); 456 457 return NXT_ERROR; 458} 459 460 461nxt_inline void 462nxt_request_app_link_init(nxt_task_t *task, 463 nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data) 464{ 465 nxt_buf_t *body; 466 nxt_event_engine_t *engine; 467 468 engine = task->thread->engine; 469 470 nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t)); 471 472 req_app_link->stream = req_rpc_data->stream; 473 req_app_link->use_count = 1; 474 req_app_link->req_rpc_data = req_rpc_data; 475 req_rpc_data->req_app_link = req_app_link; 476 req_app_link->reply_port = engine->port; 477 req_app_link->request = req_rpc_data->request; 478 req_app_link->apr_action = NXT_APR_GOT_RESPONSE; 479 480 req_app_link->work.handler = NULL; 481 req_app_link->work.task = &engine->task; 482 req_app_link->work.obj = req_app_link; 483 req_app_link->work.data = engine; 484 485 body = req_rpc_data->request->body; 486 487 if (body != NULL && nxt_buf_is_file(body)) { 488 req_app_link->body_fd = body->file->fd; 489 490 body->file->fd = -1; 491 492 } else { 493 req_app_link->body_fd = -1; 494 } 495} 496 497 498nxt_inline nxt_request_app_link_t * 499nxt_request_app_link_alloc(nxt_task_t *task, 500 nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data) 501{ 502 nxt_mp_t *mp; 503 nxt_request_app_link_t *req_app_link; 504 505 if (ra_src != NULL && ra_src->mem_pool != NULL) { 506 return ra_src; 507 } 508 509 mp = req_rpc_data->request->mem_pool; 510 511 req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t)); 512 513 if (nxt_slow_path(req_app_link == NULL)) { 514 515 req_rpc_data->req_app_link = NULL; 516 517 if (ra_src != NULL) { 518 ra_src->req_rpc_data = NULL; 519 } 520 521 return NULL; 522 } 523 524 nxt_mp_retain(mp); 525 526 nxt_request_app_link_init(task, req_app_link, req_rpc_data); 527 528 if (ra_src != NULL) { 529 req_app_link->body_fd = ra_src->body_fd; 530 } 531 532 req_app_link->mem_pool = mp; 533 534 return req_app_link; 535} 536 537 538nxt_inline nxt_bool_t 539nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, 540 uint32_t stream) 541{ 542 nxt_buf_t *b, *next; 543 nxt_bool_t cancelled; 544 545 if (msg_info->buf == NULL) { 546 return 0; 547 } 548 549 cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, 550 stream); 551 552 if (cancelled) { 553 nxt_debug(task, "stream #%uD: cancelled by router", stream); 554 } 555 556 for (b = msg_info->buf; b != NULL; b = next) { 557 next = b->next; 558 b->next = NULL; 559 560 b->completion_handler = msg_info->completion_handler; 561 562 if (b->is_port_mmap_sent) { 563 b->is_port_mmap_sent = cancelled == 0; 564 b->completion_handler(task, b, b->parent); 565 } 566 } 567 568 msg_info->buf = NULL; 569 570 return cancelled; 571} 572 573 574static void 575nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj, 576 void *data) 577{ 578 nxt_request_app_link_t *req_app_link; 579 580 req_app_link = obj; 581 582 nxt_request_app_link_update_peer(task, req_app_link); 583 584 nxt_request_app_link_use(task, req_app_link, -1); 585} 586 587 588static void 589nxt_request_app_link_update_peer(nxt_task_t *task, 590 nxt_request_app_link_t *req_app_link) 591{ 592 nxt_event_engine_t *engine; 593 nxt_request_rpc_data_t *req_rpc_data; 594 595 engine = req_app_link->work.data; 596 597 if (task->thread->engine != engine) { 598 nxt_request_app_link_inc_use(req_app_link); 599 600 req_app_link->work.handler = nxt_request_app_link_update_peer_handler; 601 req_app_link->work.task = &engine->task; 602 req_app_link->work.next = NULL; 603 604 nxt_debug(task, "req_app_link stream #%uD post update peer to %p", 605 req_app_link->stream, engine); 606 607 nxt_event_engine_post(engine, &req_app_link->work); 608 609 return; 610 } 611 612 nxt_debug(task, "req_app_link stream #%uD update peer", 613 req_app_link->stream); 614 615 req_rpc_data = req_app_link->req_rpc_data; 616 617 if (req_rpc_data != NULL && req_app_link->app_port != NULL) { 618 nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, 619 req_app_link->app_port->pid); 620 } 621} 622 623 624static void 625nxt_request_app_link_release(nxt_task_t *task, 626 nxt_request_app_link_t *req_app_link) 627{ 628 nxt_mp_t *mp; 629 nxt_http_request_t *r; 630 nxt_request_rpc_data_t *req_rpc_data; 631 632 nxt_assert(task->thread->engine == req_app_link->work.data); 633 nxt_assert(req_app_link->use_count == 0); 634 635 nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream); 636 637 req_rpc_data = req_app_link->req_rpc_data; 638 639 if (req_rpc_data != NULL) { 640 if (nxt_slow_path(req_app_link->err_code != 0)) { 641 nxt_http_request_error(task, req_rpc_data->request, 642 req_app_link->err_code); 643 644 } else { 645 req_rpc_data->app_port = req_app_link->app_port; 646 req_rpc_data->apr_action = req_app_link->apr_action; 647 req_rpc_data->msg_info = req_app_link->msg_info; 648 649 if (req_rpc_data->app->timeout != 0) { 650 r = req_rpc_data->request; 651 652 r->timer.handler = nxt_router_app_timeout; 653 r->timer_data = req_rpc_data; 654 nxt_timer_add(task->thread->engine, &r->timer, 655 req_rpc_data->app->timeout); 656 } 657 658 req_app_link->app_port = NULL; 659 req_app_link->msg_info.buf = NULL; 660 } 661 662 req_rpc_data->req_app_link = NULL; 663 req_app_link->req_rpc_data = NULL; 664 } 665 666 if (req_app_link->app_port != NULL) { 667 nxt_router_app_port_release(task, req_app_link->app_port, 668 req_app_link->apr_action); 669 670 req_app_link->app_port = NULL; 671 } 672 673 if (req_app_link->body_fd != -1) { 674 nxt_fd_close(req_app_link->body_fd); 675 676 req_app_link->body_fd = -1; 677 } 678 679 nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); 680 681 mp = req_app_link->mem_pool; 682 683 if (mp != NULL) { 684 nxt_mp_free(mp, req_app_link); 685 nxt_mp_release(mp); 686 } 687} 688 689 690static void 691nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data) 692{ 693 nxt_request_app_link_t *req_app_link; 694 695 req_app_link = obj; 696 697 nxt_assert(req_app_link->work.data == data); 698 699 nxt_atomic_fetch_add(&req_app_link->use_count, -1); 700 701 nxt_request_app_link_release(task, req_app_link); 702} 703 704 705static void 706nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link, 707 int i) 708{ 709 int c; 710 nxt_event_engine_t *engine; 711 712 c = nxt_atomic_fetch_add(&req_app_link->use_count, i); 713 714 if (i < 0 && c == -i) { 715 engine = req_app_link->work.data; 716 717 if (task->thread->engine == engine) { 718 nxt_request_app_link_release(task, req_app_link); 719 720 return; 721 } 722 723 nxt_request_app_link_inc_use(req_app_link); 724 725 req_app_link->work.handler = nxt_request_app_link_release_handler; 726 req_app_link->work.task = &engine->task; 727 req_app_link->work.next = NULL; 728 729 nxt_debug(task, "req_app_link stream #%uD post release to %p", 730 req_app_link->stream, engine); 731 732 nxt_event_engine_post(engine, &req_app_link->work); 733 } 734} 735 736 737nxt_inline void
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> 9#include <nxt_conf.h> 10#if (NXT_TLS) 11#include <nxt_cert.h> 12#endif 13#include <nxt_http.h> 14#include <nxt_port_memory_int.h> 15#include <nxt_unit_request.h> 16#include <nxt_unit_response.h> 17#include <nxt_router_request.h> 18 19typedef struct { 20 nxt_str_t type; 21 uint32_t processes; 22 uint32_t max_processes; 23 uint32_t spare_processes; 24 nxt_msec_t timeout; 25 nxt_msec_t res_timeout; 26 nxt_msec_t idle_timeout; 27 uint32_t requests; 28 nxt_conf_value_t *limits_value; 29 nxt_conf_value_t *processes_value; 30} nxt_router_app_conf_t; 31 32 33typedef struct { 34 nxt_str_t pass; 35 nxt_str_t application; 36} nxt_router_listener_conf_t; 37 38 39#if (NXT_TLS) 40 41typedef struct { 42 nxt_str_t name; 43 nxt_socket_conf_t *conf; 44 45 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 46} nxt_router_tlssock_t; 47 48#endif 49 50 51typedef struct { 52 nxt_socket_conf_t *socket_conf; 53 nxt_router_temp_conf_t *temp_conf; 54} nxt_socket_rpc_t; 55 56 57typedef struct { 58 nxt_app_t *app; 59 nxt_router_temp_conf_t *temp_conf; 60} nxt_app_rpc_t; 61 62 63struct nxt_port_select_state_s { 64 nxt_app_t *app; 65 nxt_request_app_link_t *req_app_link; 66 67 nxt_port_t *failed_port; 68 int failed_port_use_delta; 69 70 uint8_t start_process; /* 1 bit */ 71 nxt_request_app_link_t *shared_ra; 72 nxt_port_t *port; 73}; 74 75typedef struct nxt_port_select_state_s nxt_port_select_state_t; 76 77static void nxt_router_greet_controller(nxt_task_t *task, 78 nxt_port_t *controller_port); 79 80static void nxt_router_port_select(nxt_task_t *task, 81 nxt_port_select_state_t *state); 82 83static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, 84 nxt_port_select_state_t *state); 85 86static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 87static void nxt_request_app_link_update_peer(nxt_task_t *task, 88 nxt_request_app_link_t *req_app_link); 89 90 91nxt_inline void 92nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) 93{ 94 nxt_atomic_fetch_add(&req_app_link->use_count, 1); 95} 96 97nxt_inline void 98nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i) 99{ 100#if (NXT_DEBUG) 101 int c; 102 103 c = nxt_atomic_fetch_add(&req_app_link->use_count, i); 104 105 nxt_assert((c + i) > 0); 106#else 107 (void) nxt_atomic_fetch_add(&req_app_link->use_count, i); 108#endif 109} 110 111static void nxt_request_app_link_use(nxt_task_t *task, 112 nxt_request_app_link_t *req_app_link, int i); 113 114static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 115static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 116static void nxt_router_conf_ready(nxt_task_t *task, 117 nxt_router_temp_conf_t *tmcf); 118static void nxt_router_conf_error(nxt_task_t *task, 119 nxt_router_temp_conf_t *tmcf); 120static void nxt_router_conf_send(nxt_task_t *task, 121 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 122 123static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 124 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 125static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 126 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 127static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 128static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 129 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 130static void nxt_router_listen_socket_ready(nxt_task_t *task, 131 nxt_port_recv_msg_t *msg, void *data); 132static void nxt_router_listen_socket_error(nxt_task_t *task, 133 nxt_port_recv_msg_t *msg, void *data); 134#if (NXT_TLS) 135static void nxt_router_tls_rpc_create(nxt_task_t *task, 136 nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls); 137static void nxt_router_tls_rpc_handler(nxt_task_t *task, 138 nxt_port_recv_msg_t *msg, void *data); 139#endif 140static void nxt_router_app_rpc_create(nxt_task_t *task, 141 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 142static void nxt_router_app_prefork_ready(nxt_task_t *task, 143 nxt_port_recv_msg_t *msg, void *data); 144static void nxt_router_app_prefork_error(nxt_task_t *task, 145 nxt_port_recv_msg_t *msg, void *data); 146static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 147 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 148static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 149 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 150 151static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 152 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 153 const nxt_event_interface_t *interface); 154static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 155 nxt_router_engine_conf_t *recf); 156static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 157 nxt_router_engine_conf_t *recf); 158static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 159 nxt_router_engine_conf_t *recf); 160static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 161 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 162 nxt_work_handler_t handler); 163static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 164 nxt_router_engine_conf_t *recf); 165static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 166 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 167 168static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 169 nxt_router_temp_conf_t *tmcf); 170static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 171 nxt_event_engine_t *engine); 172static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 173 nxt_router_temp_conf_t *tmcf); 174 175static void nxt_router_engines_post(nxt_router_t *router, 176 nxt_router_temp_conf_t *tmcf); 177static void nxt_router_engine_post(nxt_event_engine_t *engine, 178 nxt_work_t *jobs); 179 180static void nxt_router_thread_start(void *data); 181static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 182 void *data); 183static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 184 void *data); 185static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 186 void *data); 187static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 188 void *data); 189static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 190 void *data); 191static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 192 void *data); 193static void nxt_router_listen_socket_release(nxt_task_t *task, 194 nxt_socket_conf_t *skcf); 195 196static void nxt_router_access_log_writer(nxt_task_t *task, 197 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 198static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 199 struct tm *tm, size_t size, const char *format); 200static void nxt_router_access_log_open(nxt_task_t *task, 201 nxt_router_temp_conf_t *tmcf); 202static void nxt_router_access_log_ready(nxt_task_t *task, 203 nxt_port_recv_msg_t *msg, void *data); 204static void nxt_router_access_log_error(nxt_task_t *task, 205 nxt_port_recv_msg_t *msg, void *data); 206static void nxt_router_access_log_release(nxt_task_t *task, 207 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 208static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 209 void *data); 210static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 211 nxt_port_recv_msg_t *msg, void *data); 212static void nxt_router_access_log_reopen_error(nxt_task_t *task, 213 nxt_port_recv_msg_t *msg, void *data); 214 215static void nxt_router_app_port_ready(nxt_task_t *task, 216 nxt_port_recv_msg_t *msg, void *data); 217static void nxt_router_app_port_error(nxt_task_t *task, 218 nxt_port_recv_msg_t *msg, void *data); 219 220static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 221 222static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 223 nxt_apr_action_t action); 224static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, 225 nxt_request_app_link_t *req_app_link); 226 227static void nxt_router_app_prepare_request(nxt_task_t *task, 228 nxt_request_app_link_t *req_app_link); 229static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 230 nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix); 231 232static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 233static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 234 void *data); 235static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 236 void *data); 237static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 238 void *data); 239static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 240 241static const nxt_http_request_state_t nxt_http_request_send_state; 242static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 243 244static void nxt_router_app_joint_use(nxt_task_t *task, 245 nxt_app_joint_t *app_joint, int i); 246 247static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, 248 nxt_http_request_t *r); 249static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 250 void *data); 251static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 252 253extern const nxt_http_request_state_t nxt_http_websocket; 254 255static nxt_router_t *nxt_router; 256 257static const nxt_str_t http_prefix = nxt_string("HTTP_"); 258static const nxt_str_t empty_prefix = nxt_string(""); 259 260static const nxt_str_t *nxt_app_msg_prefix[] = { 261 &empty_prefix, 262 &http_prefix, 263 &http_prefix, 264 &http_prefix, 265 &http_prefix, 266 &empty_prefix, 267}; 268 269 270nxt_port_handlers_t nxt_router_process_port_handlers = { 271 .quit = nxt_worker_process_quit_handler, 272 .new_port = nxt_router_new_port_handler, 273 .change_file = nxt_port_change_log_file_handler, 274 .mmap = nxt_port_mmap_handler, 275 .data = nxt_router_conf_data_handler, 276 .remove_pid = nxt_router_remove_pid_handler, 277 .access_log = nxt_router_access_log_reopen_handler, 278 .rpc_ready = nxt_port_rpc_handler, 279 .rpc_error = nxt_port_rpc_handler, 280 .oosm = nxt_router_oosm_handler, 281}; 282 283 284nxt_int_t 285nxt_router_start(nxt_task_t *task, void *data) 286{ 287 nxt_int_t ret; 288 nxt_port_t *controller_port; 289 nxt_router_t *router; 290 nxt_runtime_t *rt; 291 292 rt = task->thread->runtime; 293 294#if (NXT_TLS) 295 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 296 if (nxt_slow_path(rt->tls == NULL)) { 297 return NXT_ERROR; 298 } 299 300 ret = rt->tls->library_init(task); 301 if (nxt_slow_path(ret != NXT_OK)) { 302 return ret; 303 } 304#endif 305 306 ret = nxt_http_init(task, rt); 307 if (nxt_slow_path(ret != NXT_OK)) { 308 return ret; 309 } 310 311 router = nxt_zalloc(sizeof(nxt_router_t)); 312 if (nxt_slow_path(router == NULL)) { 313 return NXT_ERROR; 314 } 315 316 nxt_queue_init(&router->engines); 317 nxt_queue_init(&router->sockets); 318 nxt_queue_init(&router->apps); 319 320 nxt_router = router; 321 322 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 323 if (controller_port != NULL) { 324 nxt_router_greet_controller(task, controller_port); 325 } 326 327 return NXT_OK; 328} 329 330 331static void 332nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 333{ 334 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 335 -1, 0, 0, NULL); 336} 337 338 339static void 340nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 341 void *data) 342{ 343 size_t size; 344 uint32_t stream; 345 nxt_mp_t *mp; 346 nxt_int_t ret; 347 nxt_app_t *app; 348 nxt_buf_t *b; 349 nxt_port_t *main_port; 350 nxt_runtime_t *rt; 351 352 app = data; 353 354 rt = task->thread->runtime; 355 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 356 357 nxt_debug(task, "app '%V' %p start process", &app->name, app); 358 359 size = app->name.length + 1 + app->conf.length; 360 361 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 362 363 if (nxt_slow_path(b == NULL)) { 364 goto failed; 365 } 366 367 nxt_buf_cpystr(b, &app->name); 368 *b->mem.free++ = '\0'; 369 nxt_buf_cpystr(b, &app->conf); 370 371 nxt_router_app_joint_use(task, app->joint, 1); 372 373 stream = nxt_port_rpc_register_handler(task, port, 374 nxt_router_app_port_ready, 375 nxt_router_app_port_error, 376 -1, app->joint); 377 378 if (nxt_slow_path(stream == 0)) { 379 nxt_router_app_joint_use(task, app->joint, -1); 380 381 goto failed; 382 } 383 384 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 385 stream, port->id, b); 386 387 if (nxt_slow_path(ret != NXT_OK)) { 388 nxt_port_rpc_cancel(task, port, stream); 389 390 nxt_router_app_joint_use(task, app->joint, -1); 391 392 goto failed; 393 } 394 395 nxt_router_app_use(task, app, -1); 396 397 return; 398 399failed: 400 401 if (b != NULL) { 402 mp = b->data; 403 nxt_mp_free(mp, b); 404 nxt_mp_release(mp); 405 } 406 407 nxt_thread_mutex_lock(&app->mutex); 408 409 app->pending_processes--; 410 411 nxt_thread_mutex_unlock(&app->mutex); 412 413 nxt_router_app_use(task, app, -1); 414} 415 416 417static void 418nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 419{ 420 app_joint->use_count += i; 421 422 if (app_joint->use_count == 0) { 423 nxt_assert(app_joint->app == NULL); 424 425 nxt_free(app_joint); 426 } 427} 428 429 430static nxt_int_t 431nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 432{ 433 nxt_int_t res; 434 nxt_port_t *router_port; 435 nxt_runtime_t *rt; 436 437 rt = task->thread->runtime; 438 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 439 440 nxt_router_app_use(task, app, 1); 441 442 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 443 app); 444 445 if (res == NXT_OK) { 446 return res; 447 } 448 449 nxt_thread_mutex_lock(&app->mutex); 450 451 app->pending_processes--; 452 453 nxt_thread_mutex_unlock(&app->mutex); 454 455 nxt_router_app_use(task, app, -1); 456 457 return NXT_ERROR; 458} 459 460 461nxt_inline void 462nxt_request_app_link_init(nxt_task_t *task, 463 nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data) 464{ 465 nxt_buf_t *body; 466 nxt_event_engine_t *engine; 467 468 engine = task->thread->engine; 469 470 nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t)); 471 472 req_app_link->stream = req_rpc_data->stream; 473 req_app_link->use_count = 1; 474 req_app_link->req_rpc_data = req_rpc_data; 475 req_rpc_data->req_app_link = req_app_link; 476 req_app_link->reply_port = engine->port; 477 req_app_link->request = req_rpc_data->request; 478 req_app_link->apr_action = NXT_APR_GOT_RESPONSE; 479 480 req_app_link->work.handler = NULL; 481 req_app_link->work.task = &engine->task; 482 req_app_link->work.obj = req_app_link; 483 req_app_link->work.data = engine; 484 485 body = req_rpc_data->request->body; 486 487 if (body != NULL && nxt_buf_is_file(body)) { 488 req_app_link->body_fd = body->file->fd; 489 490 body->file->fd = -1; 491 492 } else { 493 req_app_link->body_fd = -1; 494 } 495} 496 497 498nxt_inline nxt_request_app_link_t * 499nxt_request_app_link_alloc(nxt_task_t *task, 500 nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data) 501{ 502 nxt_mp_t *mp; 503 nxt_request_app_link_t *req_app_link; 504 505 if (ra_src != NULL && ra_src->mem_pool != NULL) { 506 return ra_src; 507 } 508 509 mp = req_rpc_data->request->mem_pool; 510 511 req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t)); 512 513 if (nxt_slow_path(req_app_link == NULL)) { 514 515 req_rpc_data->req_app_link = NULL; 516 517 if (ra_src != NULL) { 518 ra_src->req_rpc_data = NULL; 519 } 520 521 return NULL; 522 } 523 524 nxt_mp_retain(mp); 525 526 nxt_request_app_link_init(task, req_app_link, req_rpc_data); 527 528 if (ra_src != NULL) { 529 req_app_link->body_fd = ra_src->body_fd; 530 } 531 532 req_app_link->mem_pool = mp; 533 534 return req_app_link; 535} 536 537 538nxt_inline nxt_bool_t 539nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, 540 uint32_t stream) 541{ 542 nxt_buf_t *b, *next; 543 nxt_bool_t cancelled; 544 545 if (msg_info->buf == NULL) { 546 return 0; 547 } 548 549 cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, 550 stream); 551 552 if (cancelled) { 553 nxt_debug(task, "stream #%uD: cancelled by router", stream); 554 } 555 556 for (b = msg_info->buf; b != NULL; b = next) { 557 next = b->next; 558 b->next = NULL; 559 560 b->completion_handler = msg_info->completion_handler; 561 562 if (b->is_port_mmap_sent) { 563 b->is_port_mmap_sent = cancelled == 0; 564 b->completion_handler(task, b, b->parent); 565 } 566 } 567 568 msg_info->buf = NULL; 569 570 return cancelled; 571} 572 573 574static void 575nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj, 576 void *data) 577{ 578 nxt_request_app_link_t *req_app_link; 579 580 req_app_link = obj; 581 582 nxt_request_app_link_update_peer(task, req_app_link); 583 584 nxt_request_app_link_use(task, req_app_link, -1); 585} 586 587 588static void 589nxt_request_app_link_update_peer(nxt_task_t *task, 590 nxt_request_app_link_t *req_app_link) 591{ 592 nxt_event_engine_t *engine; 593 nxt_request_rpc_data_t *req_rpc_data; 594 595 engine = req_app_link->work.data; 596 597 if (task->thread->engine != engine) { 598 nxt_request_app_link_inc_use(req_app_link); 599 600 req_app_link->work.handler = nxt_request_app_link_update_peer_handler; 601 req_app_link->work.task = &engine->task; 602 req_app_link->work.next = NULL; 603 604 nxt_debug(task, "req_app_link stream #%uD post update peer to %p", 605 req_app_link->stream, engine); 606 607 nxt_event_engine_post(engine, &req_app_link->work); 608 609 return; 610 } 611 612 nxt_debug(task, "req_app_link stream #%uD update peer", 613 req_app_link->stream); 614 615 req_rpc_data = req_app_link->req_rpc_data; 616 617 if (req_rpc_data != NULL && req_app_link->app_port != NULL) { 618 nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, 619 req_app_link->app_port->pid); 620 } 621} 622 623 624static void 625nxt_request_app_link_release(nxt_task_t *task, 626 nxt_request_app_link_t *req_app_link) 627{ 628 nxt_mp_t *mp; 629 nxt_http_request_t *r; 630 nxt_request_rpc_data_t *req_rpc_data; 631 632 nxt_assert(task->thread->engine == req_app_link->work.data); 633 nxt_assert(req_app_link->use_count == 0); 634 635 nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream); 636 637 req_rpc_data = req_app_link->req_rpc_data; 638 639 if (req_rpc_data != NULL) { 640 if (nxt_slow_path(req_app_link->err_code != 0)) { 641 nxt_http_request_error(task, req_rpc_data->request, 642 req_app_link->err_code); 643 644 } else { 645 req_rpc_data->app_port = req_app_link->app_port; 646 req_rpc_data->apr_action = req_app_link->apr_action; 647 req_rpc_data->msg_info = req_app_link->msg_info; 648 649 if (req_rpc_data->app->timeout != 0) { 650 r = req_rpc_data->request; 651 652 r->timer.handler = nxt_router_app_timeout; 653 r->timer_data = req_rpc_data; 654 nxt_timer_add(task->thread->engine, &r->timer, 655 req_rpc_data->app->timeout); 656 } 657 658 req_app_link->app_port = NULL; 659 req_app_link->msg_info.buf = NULL; 660 } 661 662 req_rpc_data->req_app_link = NULL; 663 req_app_link->req_rpc_data = NULL; 664 } 665 666 if (req_app_link->app_port != NULL) { 667 nxt_router_app_port_release(task, req_app_link->app_port, 668 req_app_link->apr_action); 669 670 req_app_link->app_port = NULL; 671 } 672 673 if (req_app_link->body_fd != -1) { 674 nxt_fd_close(req_app_link->body_fd); 675 676 req_app_link->body_fd = -1; 677 } 678 679 nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); 680 681 mp = req_app_link->mem_pool; 682 683 if (mp != NULL) { 684 nxt_mp_free(mp, req_app_link); 685 nxt_mp_release(mp); 686 } 687} 688 689 690static void 691nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data) 692{ 693 nxt_request_app_link_t *req_app_link; 694 695 req_app_link = obj; 696 697 nxt_assert(req_app_link->work.data == data); 698 699 nxt_atomic_fetch_add(&req_app_link->use_count, -1); 700 701 nxt_request_app_link_release(task, req_app_link); 702} 703 704 705static void 706nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link, 707 int i) 708{ 709 int c; 710 nxt_event_engine_t *engine; 711 712 c = nxt_atomic_fetch_add(&req_app_link->use_count, i); 713 714 if (i < 0 && c == -i) { 715 engine = req_app_link->work.data; 716 717 if (task->thread->engine == engine) { 718 nxt_request_app_link_release(task, req_app_link); 719 720 return; 721 } 722 723 nxt_request_app_link_inc_use(req_app_link); 724 725 req_app_link->work.handler = nxt_request_app_link_release_handler; 726 req_app_link->work.task = &engine->task; 727 req_app_link->work.next = NULL; 728 729 nxt_debug(task, "req_app_link stream #%uD post release to %p", 730 req_app_link->stream, engine); 731 732 nxt_event_engine_post(engine, &req_app_link->work); 733 } 734} 735 736 737nxt_inline void
|
744} 745 746 747nxt_inline void 748nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app, 749 nxt_request_app_link_t *req_app_link) 750{ 751 nxt_queue_insert_tail(&req_app_link->app_port->pending_requests, 752 &req_app_link->link_port_pending); 753 nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending); 754 755 nxt_request_app_link_inc_use(req_app_link); 756 757 req_app_link->res_time = nxt_thread_monotonic_time(task->thread) 758 + app->res_timeout; 759 760 nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests", 761 req_app_link->stream); 762} 763 764 765nxt_inline nxt_bool_t 766nxt_queue_chk_remove(nxt_queue_link_t *lnk) 767{ 768 if (lnk->next != NULL) { 769 nxt_queue_remove(lnk); 770 771 lnk->next = NULL; 772 773 return 1; 774 } 775 776 return 0; 777} 778 779 780nxt_inline void 781nxt_request_rpc_data_unlink(nxt_task_t *task, 782 nxt_request_rpc_data_t *req_rpc_data) 783{ 784 int ra_use_delta; 785 nxt_request_app_link_t *req_app_link; 786 787 if (req_rpc_data->app_port != NULL) { 788 nxt_router_app_port_release(task, req_rpc_data->app_port, 789 req_rpc_data->apr_action); 790 791 req_rpc_data->app_port = NULL; 792 } 793 794 nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 795 796 req_app_link = req_rpc_data->req_app_link; 797 if (req_app_link != NULL) { 798 req_rpc_data->req_app_link = NULL; 799 req_app_link->req_rpc_data = NULL; 800 801 ra_use_delta = 0; 802 803 nxt_thread_mutex_lock(&req_rpc_data->app->mutex); 804 805 if (req_app_link->link_app_requests.next == NULL 806 && req_app_link->link_port_pending.next == NULL 807 && req_app_link->link_app_pending.next == NULL 808 && req_app_link->link_port_websockets.next == NULL) 809 { 810 req_app_link = NULL; 811 812 } else { 813 ra_use_delta -= 814 nxt_queue_chk_remove(&req_app_link->link_app_requests) 815 + nxt_queue_chk_remove(&req_app_link->link_port_pending) 816 + nxt_queue_chk_remove(&req_app_link->link_port_websockets); 817 818 nxt_queue_chk_remove(&req_app_link->link_app_pending); 819 } 820 821 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); 822 823 if (req_app_link != NULL) { 824 nxt_request_app_link_use(task, req_app_link, ra_use_delta); 825 } 826 } 827 828 if (req_rpc_data->app != NULL) { 829 nxt_router_app_use(task, req_rpc_data->app, -1); 830 831 req_rpc_data->app = NULL; 832 } 833 834 if (req_rpc_data->request != NULL) { 835 req_rpc_data->request->timer_data = NULL; 836 837 nxt_router_http_request_done(task, req_rpc_data->request); 838 839 req_rpc_data->request->req_rpc_data = NULL; 840 req_rpc_data->request = NULL; 841 } 842} 843 844 845void 846nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 847{ 848 nxt_port_new_port_handler(task, msg); 849 850 if (msg->u.new_port != NULL 851 && msg->u.new_port->type == NXT_PROCESS_CONTROLLER) 852 { 853 nxt_router_greet_controller(task, msg->u.new_port); 854 } 855 856 if (msg->port_msg.stream == 0) { 857 return; 858 } 859 860 if (msg->u.new_port == NULL 861 || msg->u.new_port->type != NXT_PROCESS_WORKER) 862 { 863 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 864 } 865 866 nxt_port_rpc_handler(task, msg); 867} 868 869 870void 871nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 872{ 873 nxt_int_t ret; 874 nxt_buf_t *b; 875 nxt_router_temp_conf_t *tmcf; 876 877 tmcf = nxt_router_temp_conf(task); 878 if (nxt_slow_path(tmcf == NULL)) { 879 return; 880 } 881 882 nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s", 883 nxt_buf_used_size(msg->buf), 884 (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos); 885 886 tmcf->router_conf->router = nxt_router; 887 tmcf->stream = msg->port_msg.stream; 888 tmcf->port = nxt_runtime_port_find(task->thread->runtime, 889 msg->port_msg.pid, 890 msg->port_msg.reply_port); 891 892 if (nxt_slow_path(tmcf->port == NULL)) { 893 nxt_alert(task, "reply port not found"); 894 895 return; 896 } 897 898 nxt_port_use(task, tmcf->port, 1); 899 900 b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool, 901 msg->buf, msg->size); 902 if (nxt_slow_path(b == NULL)) { 903 nxt_router_conf_error(task, tmcf); 904 905 return; 906 } 907 908 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); 909 910 if (nxt_fast_path(ret == NXT_OK)) { 911 nxt_router_conf_apply(task, tmcf, NULL); 912 913 } else { 914 nxt_router_conf_error(task, tmcf); 915 } 916} 917 918 919static void 920nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 921 void *data) 922{ 923 union { 924 nxt_pid_t removed_pid; 925 void *data; 926 } u; 927 928 u.data = data; 929 930 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 931} 932 933 934void 935nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 936{ 937 nxt_event_engine_t *engine; 938 939 nxt_port_remove_pid_handler(task, msg); 940 941 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 942 { 943 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 944 msg->u.data); 945 } 946 nxt_queue_loop; 947 948 if (msg->port_msg.stream == 0) { 949 return; 950 } 951 952 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 953 954 nxt_port_rpc_handler(task, msg); 955} 956 957 958static nxt_router_temp_conf_t * 959nxt_router_temp_conf(nxt_task_t *task) 960{ 961 nxt_mp_t *mp, *tmp; 962 nxt_router_conf_t *rtcf; 963 nxt_router_temp_conf_t *tmcf; 964 965 mp = nxt_mp_create(1024, 128, 256, 32); 966 if (nxt_slow_path(mp == NULL)) { 967 return NULL; 968 } 969 970 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 971 if (nxt_slow_path(rtcf == NULL)) { 972 goto fail; 973 } 974 975 rtcf->mem_pool = mp; 976 977 tmp = nxt_mp_create(1024, 128, 256, 32); 978 if (nxt_slow_path(tmp == NULL)) { 979 goto fail; 980 } 981 982 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 983 if (nxt_slow_path(tmcf == NULL)) { 984 goto temp_fail; 985 } 986 987 tmcf->mem_pool = tmp; 988 tmcf->router_conf = rtcf; 989 tmcf->count = 1; 990 tmcf->engine = task->thread->engine; 991 992 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 993 sizeof(nxt_router_engine_conf_t)); 994 if (nxt_slow_path(tmcf->engines == NULL)) { 995 goto temp_fail; 996 } 997 998 nxt_queue_init(&tmcf->deleting); 999 nxt_queue_init(&tmcf->keeping); 1000 nxt_queue_init(&tmcf->updating); 1001 nxt_queue_init(&tmcf->pending); 1002 nxt_queue_init(&tmcf->creating); 1003 1004#if (NXT_TLS) 1005 nxt_queue_init(&tmcf->tls); 1006#endif 1007 1008 nxt_queue_init(&tmcf->apps); 1009 nxt_queue_init(&tmcf->previous); 1010 1011 return tmcf; 1012 1013temp_fail: 1014 1015 nxt_mp_destroy(tmp); 1016 1017fail: 1018 1019 nxt_mp_destroy(mp); 1020 1021 return NULL; 1022} 1023 1024 1025nxt_inline nxt_bool_t 1026nxt_router_app_can_start(nxt_app_t *app) 1027{ 1028 return app->processes + app->pending_processes < app->max_processes 1029 && app->pending_processes < app->max_pending_processes; 1030} 1031 1032 1033nxt_inline nxt_bool_t 1034nxt_router_app_need_start(nxt_app_t *app) 1035{ 1036 return app->idle_processes + app->pending_processes 1037 < app->spare_processes; 1038} 1039 1040 1041static void 1042nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1043{ 1044 nxt_int_t ret; 1045 nxt_app_t *app; 1046 nxt_router_t *router; 1047 nxt_runtime_t *rt; 1048 nxt_queue_link_t *qlk; 1049 nxt_socket_conf_t *skcf; 1050 nxt_router_conf_t *rtcf; 1051 nxt_router_temp_conf_t *tmcf; 1052 const nxt_event_interface_t *interface; 1053#if (NXT_TLS) 1054 nxt_router_tlssock_t *tls; 1055#endif 1056 1057 tmcf = obj; 1058 1059 qlk = nxt_queue_first(&tmcf->pending); 1060 1061 if (qlk != nxt_queue_tail(&tmcf->pending)) { 1062 nxt_queue_remove(qlk); 1063 nxt_queue_insert_tail(&tmcf->creating, qlk); 1064 1065 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1066 1067 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1068 1069 return; 1070 } 1071 1072#if (NXT_TLS) 1073 qlk = nxt_queue_first(&tmcf->tls); 1074 1075 if (qlk != nxt_queue_tail(&tmcf->tls)) { 1076 nxt_queue_remove(qlk); 1077 1078 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1079 1080 nxt_router_tls_rpc_create(task, tmcf, tls); 1081 return; 1082 } 1083#endif 1084 1085 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1086 1087 if (nxt_router_app_need_start(app)) { 1088 nxt_router_app_rpc_create(task, tmcf, app); 1089 return; 1090 } 1091 1092 } nxt_queue_loop; 1093 1094 rtcf = tmcf->router_conf; 1095 1096 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1097 nxt_router_access_log_open(task, tmcf); 1098 return; 1099 } 1100 1101 rt = task->thread->runtime; 1102 1103 interface = nxt_service_get(rt->services, "engine", NULL); 1104 1105 router = rtcf->router; 1106 1107 ret = nxt_router_engines_create(task, router, tmcf, interface); 1108 if (nxt_slow_path(ret != NXT_OK)) { 1109 goto fail; 1110 } 1111 1112 ret = nxt_router_threads_create(task, rt, tmcf); 1113 if (nxt_slow_path(ret != NXT_OK)) { 1114 goto fail; 1115 } 1116 1117 nxt_router_apps_sort(task, router, tmcf); 1118 1119 nxt_router_engines_post(router, tmcf); 1120 1121 nxt_queue_add(&router->sockets, &tmcf->updating); 1122 nxt_queue_add(&router->sockets, &tmcf->creating); 1123 1124 router->access_log = rtcf->access_log; 1125 1126 nxt_router_conf_ready(task, tmcf); 1127 1128 return; 1129 1130fail: 1131 1132 nxt_router_conf_error(task, tmcf); 1133 1134 return; 1135} 1136 1137 1138static void 1139nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1140{ 1141 nxt_joint_job_t *job; 1142 1143 job = obj; 1144 1145 nxt_router_conf_ready(task, job->tmcf); 1146} 1147 1148 1149static void 1150nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1151{ 1152 nxt_debug(task, "temp conf count:%D", tmcf->count); 1153 1154 if (--tmcf->count == 0) { 1155 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1156 } 1157} 1158 1159 1160static void 1161nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1162{ 1163 nxt_app_t *app; 1164 nxt_queue_t new_socket_confs; 1165 nxt_socket_t s; 1166 nxt_router_t *router; 1167 nxt_queue_link_t *qlk; 1168 nxt_socket_conf_t *skcf; 1169 nxt_router_conf_t *rtcf; 1170 1171 nxt_alert(task, "failed to apply new conf"); 1172 1173 for (qlk = nxt_queue_first(&tmcf->creating); 1174 qlk != nxt_queue_tail(&tmcf->creating); 1175 qlk = nxt_queue_next(qlk)) 1176 { 1177 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1178 s = skcf->listen->socket; 1179 1180 if (s != -1) { 1181 nxt_socket_close(task, s); 1182 } 1183 1184 nxt_free(skcf->listen); 1185 } 1186 1187 nxt_queue_init(&new_socket_confs); 1188 nxt_queue_add(&new_socket_confs, &tmcf->updating); 1189 nxt_queue_add(&new_socket_confs, &tmcf->pending); 1190 nxt_queue_add(&new_socket_confs, &tmcf->creating); 1191 1192 rtcf = tmcf->router_conf; 1193 1194 nxt_http_routes_cleanup(task, rtcf->routes); 1195 1196 nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) { 1197 1198 if (skcf->action != NULL) { 1199 nxt_http_action_cleanup(task, skcf->action); 1200 } 1201 1202 } nxt_queue_loop; 1203 1204 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1205 1206 nxt_router_app_unlink(task, app); 1207 1208 } nxt_queue_loop; 1209 1210 router = rtcf->router; 1211 1212 nxt_queue_add(&router->sockets, &tmcf->keeping); 1213 nxt_queue_add(&router->sockets, &tmcf->deleting); 1214 1215 nxt_queue_add(&router->apps, &tmcf->previous); 1216 1217 // TODO: new engines and threads 1218 1219 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1220 1221 nxt_mp_destroy(rtcf->mem_pool); 1222 1223 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1224} 1225 1226 1227static void 1228nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1229 nxt_port_msg_type_t type) 1230{ 1231 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1232 1233 nxt_port_use(task, tmcf->port, -1); 1234 1235 tmcf->port = NULL; 1236} 1237 1238 1239static nxt_conf_map_t nxt_router_conf[] = { 1240 { 1241 nxt_string("listeners_threads"), 1242 NXT_CONF_MAP_INT32, 1243 offsetof(nxt_router_conf_t, threads), 1244 }, 1245}; 1246 1247 1248static nxt_conf_map_t nxt_router_app_conf[] = { 1249 { 1250 nxt_string("type"), 1251 NXT_CONF_MAP_STR, 1252 offsetof(nxt_router_app_conf_t, type), 1253 }, 1254 1255 { 1256 nxt_string("limits"), 1257 NXT_CONF_MAP_PTR, 1258 offsetof(nxt_router_app_conf_t, limits_value), 1259 }, 1260 1261 { 1262 nxt_string("processes"), 1263 NXT_CONF_MAP_INT32, 1264 offsetof(nxt_router_app_conf_t, processes), 1265 }, 1266 1267 { 1268 nxt_string("processes"), 1269 NXT_CONF_MAP_PTR, 1270 offsetof(nxt_router_app_conf_t, processes_value), 1271 }, 1272}; 1273 1274 1275static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1276 { 1277 nxt_string("timeout"), 1278 NXT_CONF_MAP_MSEC, 1279 offsetof(nxt_router_app_conf_t, timeout), 1280 }, 1281 1282 { 1283 nxt_string("reschedule_timeout"), 1284 NXT_CONF_MAP_MSEC, 1285 offsetof(nxt_router_app_conf_t, res_timeout), 1286 }, 1287 1288 { 1289 nxt_string("requests"), 1290 NXT_CONF_MAP_INT32, 1291 offsetof(nxt_router_app_conf_t, requests), 1292 }, 1293}; 1294 1295 1296static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1297 { 1298 nxt_string("spare"), 1299 NXT_CONF_MAP_INT32, 1300 offsetof(nxt_router_app_conf_t, spare_processes), 1301 }, 1302 1303 { 1304 nxt_string("max"), 1305 NXT_CONF_MAP_INT32, 1306 offsetof(nxt_router_app_conf_t, max_processes), 1307 }, 1308 1309 { 1310 nxt_string("idle_timeout"), 1311 NXT_CONF_MAP_MSEC, 1312 offsetof(nxt_router_app_conf_t, idle_timeout), 1313 }, 1314}; 1315 1316 1317static nxt_conf_map_t nxt_router_listener_conf[] = { 1318 { 1319 nxt_string("pass"), 1320 NXT_CONF_MAP_STR_COPY, 1321 offsetof(nxt_router_listener_conf_t, pass), 1322 }, 1323 1324 { 1325 nxt_string("application"), 1326 NXT_CONF_MAP_STR_COPY, 1327 offsetof(nxt_router_listener_conf_t, application), 1328 }, 1329}; 1330 1331 1332static nxt_conf_map_t nxt_router_http_conf[] = { 1333 { 1334 nxt_string("header_buffer_size"), 1335 NXT_CONF_MAP_SIZE, 1336 offsetof(nxt_socket_conf_t, header_buffer_size), 1337 }, 1338 1339 { 1340 nxt_string("large_header_buffer_size"), 1341 NXT_CONF_MAP_SIZE, 1342 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1343 }, 1344 1345 { 1346 nxt_string("large_header_buffers"), 1347 NXT_CONF_MAP_SIZE, 1348 offsetof(nxt_socket_conf_t, large_header_buffers), 1349 }, 1350 1351 { 1352 nxt_string("body_buffer_size"), 1353 NXT_CONF_MAP_SIZE, 1354 offsetof(nxt_socket_conf_t, body_buffer_size), 1355 }, 1356 1357 { 1358 nxt_string("max_body_size"), 1359 NXT_CONF_MAP_SIZE, 1360 offsetof(nxt_socket_conf_t, max_body_size), 1361 }, 1362 1363 { 1364 nxt_string("idle_timeout"), 1365 NXT_CONF_MAP_MSEC, 1366 offsetof(nxt_socket_conf_t, idle_timeout), 1367 }, 1368 1369 { 1370 nxt_string("header_read_timeout"), 1371 NXT_CONF_MAP_MSEC, 1372 offsetof(nxt_socket_conf_t, header_read_timeout), 1373 }, 1374 1375 { 1376 nxt_string("body_read_timeout"), 1377 NXT_CONF_MAP_MSEC, 1378 offsetof(nxt_socket_conf_t, body_read_timeout), 1379 }, 1380 1381 { 1382 nxt_string("send_timeout"), 1383 NXT_CONF_MAP_MSEC, 1384 offsetof(nxt_socket_conf_t, send_timeout), 1385 }, 1386 1387 { 1388 nxt_string("body_temp_path"), 1389 NXT_CONF_MAP_STR, 1390 offsetof(nxt_socket_conf_t, body_temp_path), 1391 }, 1392}; 1393 1394 1395static nxt_conf_map_t nxt_router_websocket_conf[] = { 1396 { 1397 nxt_string("max_frame_size"), 1398 NXT_CONF_MAP_SIZE, 1399 offsetof(nxt_websocket_conf_t, max_frame_size), 1400 }, 1401 1402 { 1403 nxt_string("read_timeout"), 1404 NXT_CONF_MAP_MSEC, 1405 offsetof(nxt_websocket_conf_t, read_timeout), 1406 }, 1407 1408 { 1409 nxt_string("keepalive_interval"), 1410 NXT_CONF_MAP_MSEC, 1411 offsetof(nxt_websocket_conf_t, keepalive_interval), 1412 }, 1413 1414}; 1415 1416 1417static nxt_int_t 1418nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1419 u_char *start, u_char *end) 1420{ 1421 u_char *p; 1422 size_t size; 1423 nxt_mp_t *mp; 1424 uint32_t next; 1425 nxt_int_t ret; 1426 nxt_str_t name, path; 1427 nxt_app_t *app, *prev; 1428 nxt_str_t *t; 1429 nxt_router_t *router; 1430 nxt_app_joint_t *app_joint; 1431 nxt_conf_value_t *conf, *http, *value, *websocket; 1432 nxt_conf_value_t *applications, *application; 1433 nxt_conf_value_t *listeners, *listener; 1434 nxt_conf_value_t *routes_conf, *static_conf; 1435 nxt_socket_conf_t *skcf; 1436 nxt_http_routes_t *routes; 1437 nxt_event_engine_t *engine; 1438 nxt_app_lang_module_t *lang; 1439 nxt_router_app_conf_t apcf; 1440 nxt_router_access_log_t *access_log; 1441 nxt_router_listener_conf_t lscf; 1442#if (NXT_TLS) 1443 nxt_router_tlssock_t *tls; 1444#endif 1445 1446 static nxt_str_t http_path = nxt_string("/settings/http"); 1447 static nxt_str_t applications_path = nxt_string("/applications"); 1448 static nxt_str_t listeners_path = nxt_string("/listeners"); 1449 static nxt_str_t routes_path = nxt_string("/routes"); 1450 static nxt_str_t access_log_path = nxt_string("/access_log"); 1451#if (NXT_TLS) 1452 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1453#endif 1454 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1455 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1456 1457 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1458 if (conf == NULL) { 1459 nxt_alert(task, "configuration parsing error"); 1460 return NXT_ERROR; 1461 } 1462 1463 mp = tmcf->router_conf->mem_pool; 1464 1465 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1466 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1467 if (ret != NXT_OK) { 1468 nxt_alert(task, "root map error"); 1469 return NXT_ERROR; 1470 } 1471 1472 if (tmcf->router_conf->threads == 0) { 1473 tmcf->router_conf->threads = nxt_ncpu; 1474 } 1475 1476 static_conf = nxt_conf_get_path(conf, &static_path); 1477 1478 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf); 1479 if (nxt_slow_path(ret != NXT_OK)) { 1480 return NXT_ERROR; 1481 } 1482 1483 router = tmcf->router_conf->router; 1484 1485 applications = nxt_conf_get_path(conf, &applications_path); 1486 1487 if (applications != NULL) { 1488 next = 0; 1489 1490 for ( ;; ) { 1491 application = nxt_conf_next_object_member(applications, 1492 &name, &next); 1493 if (application == NULL) { 1494 break; 1495 } 1496 1497 nxt_debug(task, "application \"%V\"", &name); 1498 1499 size = nxt_conf_json_length(application, NULL); 1500 1501 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); 1502 if (app == NULL) { 1503 goto fail; 1504 } 1505 1506 nxt_memzero(app, sizeof(nxt_app_t)); 1507 1508 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1509 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1510 + name.length); 1511 1512 p = nxt_conf_json_print(app->conf.start, application, NULL); 1513 app->conf.length = p - app->conf.start; 1514 1515 nxt_assert(app->conf.length <= size); 1516 1517 nxt_debug(task, "application conf \"%V\"", &app->conf); 1518 1519 prev = nxt_router_app_find(&router->apps, &name); 1520 1521 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1522 nxt_free(app); 1523 1524 nxt_queue_remove(&prev->link); 1525 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1526 continue; 1527 } 1528 1529 apcf.processes = 1; 1530 apcf.max_processes = 1; 1531 apcf.spare_processes = 0; 1532 apcf.timeout = 0; 1533 apcf.res_timeout = 1000; 1534 apcf.idle_timeout = 15000; 1535 apcf.requests = 0; 1536 apcf.limits_value = NULL; 1537 apcf.processes_value = NULL; 1538 1539 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1540 if (nxt_slow_path(app_joint == NULL)) { 1541 goto app_fail; 1542 } 1543 1544 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1545 1546 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1547 nxt_nitems(nxt_router_app_conf), &apcf); 1548 if (ret != NXT_OK) { 1549 nxt_alert(task, "application map error"); 1550 goto app_fail; 1551 } 1552 1553 if (apcf.limits_value != NULL) { 1554 1555 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1556 nxt_alert(task, "application limits is not object"); 1557 goto app_fail; 1558 } 1559 1560 ret = nxt_conf_map_object(mp, apcf.limits_value, 1561 nxt_router_app_limits_conf, 1562 nxt_nitems(nxt_router_app_limits_conf), 1563 &apcf); 1564 if (ret != NXT_OK) { 1565 nxt_alert(task, "application limits map error"); 1566 goto app_fail; 1567 } 1568 } 1569 1570 if (apcf.processes_value != NULL 1571 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1572 { 1573 ret = nxt_conf_map_object(mp, apcf.processes_value, 1574 nxt_router_app_processes_conf, 1575 nxt_nitems(nxt_router_app_processes_conf), 1576 &apcf); 1577 if (ret != NXT_OK) { 1578 nxt_alert(task, "application processes map error"); 1579 goto app_fail; 1580 } 1581 1582 } else { 1583 apcf.max_processes = apcf.processes; 1584 apcf.spare_processes = apcf.processes; 1585 } 1586 1587 nxt_debug(task, "application type: %V", &apcf.type); 1588 nxt_debug(task, "application processes: %D", apcf.processes); 1589 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1590 nxt_debug(task, "application reschedule timeout: %M", 1591 apcf.res_timeout); 1592 nxt_debug(task, "application requests: %D", apcf.requests); 1593 1594 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1595 1596 if (lang == NULL) { 1597 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1598 goto app_fail; 1599 } 1600 1601 nxt_debug(task, "application language module: \"%s\"", lang->file); 1602 1603 ret = nxt_thread_mutex_create(&app->mutex); 1604 if (ret != NXT_OK) { 1605 goto app_fail; 1606 } 1607 1608 nxt_queue_init(&app->ports); 1609 nxt_queue_init(&app->spare_ports); 1610 nxt_queue_init(&app->idle_ports); 1611 nxt_queue_init(&app->requests); 1612 nxt_queue_init(&app->pending); 1613 1614 app->name.length = name.length; 1615 nxt_memcpy(app->name.start, name.start, name.length); 1616 1617 app->type = lang->type; 1618 app->max_processes = apcf.max_processes; 1619 app->spare_processes = apcf.spare_processes; 1620 app->max_pending_processes = apcf.spare_processes 1621 ? apcf.spare_processes : 1; 1622 app->timeout = apcf.timeout; 1623 app->res_timeout = apcf.res_timeout * 1000000; 1624 app->idle_timeout = apcf.idle_timeout; 1625 app->max_pending_responses = 2; 1626 app->max_requests = apcf.requests; 1627 1628 engine = task->thread->engine; 1629 1630 app->engine = engine; 1631 1632 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1633 app->adjust_idle_work.task = &engine->task; 1634 app->adjust_idle_work.obj = app; 1635 1636 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1637 1638 nxt_router_app_use(task, app, 1); 1639 1640 app->joint = app_joint; 1641 1642 app_joint->use_count = 1; 1643 app_joint->app = app; 1644 1645 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1646 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1647 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1648 app_joint->idle_timer.task = &engine->task; 1649 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1650 1651 app_joint->free_app_work.handler = nxt_router_free_app; 1652 app_joint->free_app_work.task = &engine->task; 1653 app_joint->free_app_work.obj = app_joint; 1654 } 1655 } 1656 1657 routes_conf = nxt_conf_get_path(conf, &routes_path); 1658 if (nxt_fast_path(routes_conf != NULL)) { 1659 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1660 if (nxt_slow_path(routes == NULL)) { 1661 return NXT_ERROR; 1662 } 1663 tmcf->router_conf->routes = routes; 1664 } 1665 1666 ret = nxt_upstreams_create(task, tmcf, conf); 1667 if (nxt_slow_path(ret != NXT_OK)) { 1668 return ret; 1669 } 1670 1671 http = nxt_conf_get_path(conf, &http_path); 1672#if 0 1673 if (http == NULL) { 1674 nxt_alert(task, "no \"http\" block"); 1675 return NXT_ERROR; 1676 } 1677#endif 1678 1679 websocket = nxt_conf_get_path(conf, &websocket_path); 1680 1681 listeners = nxt_conf_get_path(conf, &listeners_path); 1682 1683 if (listeners != NULL) { 1684 next = 0; 1685 1686 for ( ;; ) { 1687 listener = nxt_conf_next_object_member(listeners, &name, &next); 1688 if (listener == NULL) { 1689 break; 1690 } 1691 1692 skcf = nxt_router_socket_conf(task, tmcf, &name); 1693 if (skcf == NULL) { 1694 goto fail; 1695 } 1696 1697 nxt_memzero(&lscf, sizeof(lscf)); 1698 1699 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1700 nxt_nitems(nxt_router_listener_conf), 1701 &lscf); 1702 if (ret != NXT_OK) { 1703 nxt_alert(task, "listener map error"); 1704 goto fail; 1705 } 1706 1707 nxt_debug(task, "application: %V", &lscf.application); 1708 1709 // STUB, default values if http block is not defined. 1710 skcf->header_buffer_size = 2048; 1711 skcf->large_header_buffer_size = 8192; 1712 skcf->large_header_buffers = 4; 1713 skcf->body_buffer_size = 16 * 1024; 1714 skcf->max_body_size = 8 * 1024 * 1024; 1715 skcf->proxy_header_buffer_size = 64 * 1024; 1716 skcf->proxy_buffer_size = 4096; 1717 skcf->proxy_buffers = 256; 1718 skcf->idle_timeout = 180 * 1000; 1719 skcf->header_read_timeout = 30 * 1000; 1720 skcf->body_read_timeout = 30 * 1000; 1721 skcf->send_timeout = 30 * 1000; 1722 skcf->proxy_timeout = 60 * 1000; 1723 skcf->proxy_send_timeout = 30 * 1000; 1724 skcf->proxy_read_timeout = 30 * 1000; 1725 1726 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1727 skcf->websocket_conf.read_timeout = 60 * 1000; 1728 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1729 1730 nxt_str_null(&skcf->body_temp_path); 1731 1732 if (http != NULL) { 1733 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1734 nxt_nitems(nxt_router_http_conf), 1735 skcf); 1736 if (ret != NXT_OK) { 1737 nxt_alert(task, "http map error"); 1738 goto fail; 1739 } 1740 } 1741 1742 if (websocket != NULL) { 1743 ret = nxt_conf_map_object(mp, websocket, 1744 nxt_router_websocket_conf, 1745 nxt_nitems(nxt_router_websocket_conf), 1746 &skcf->websocket_conf); 1747 if (ret != NXT_OK) { 1748 nxt_alert(task, "websocket map error"); 1749 goto fail; 1750 } 1751 } 1752 1753 t = &skcf->body_temp_path; 1754 1755 if (t->length == 0) { 1756 t->start = (u_char *) task->thread->runtime->tmp; 1757 t->length = nxt_strlen(t->start); 1758 } 1759 1760#if (NXT_TLS) 1761 value = nxt_conf_get_path(listener, &certificate_path); 1762 1763 if (value != NULL) { 1764 nxt_conf_get_string(value, &name); 1765 1766 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t)); 1767 if (nxt_slow_path(tls == NULL)) { 1768 goto fail; 1769 } 1770 1771 tls->name = name; 1772 tls->conf = skcf; 1773 1774 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 1775 } 1776#endif 1777 1778 skcf->listen->handler = nxt_http_conn_init; 1779 skcf->router_conf = tmcf->router_conf; 1780 skcf->router_conf->count++; 1781 1782 if (lscf.pass.length != 0) { 1783 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1784 1785 /* COMPATIBILITY: listener application. */ 1786 } else if (lscf.application.length > 0) { 1787 skcf->action = nxt_http_pass_application(task, tmcf, 1788 &lscf.application); 1789 } 1790 } 1791 } 1792 1793 value = nxt_conf_get_path(conf, &access_log_path); 1794 1795 if (value != NULL) { 1796 nxt_conf_get_string(value, &path); 1797 1798 access_log = router->access_log; 1799 1800 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1801 nxt_thread_spin_lock(&router->lock); 1802 access_log->count++; 1803 nxt_thread_spin_unlock(&router->lock); 1804 1805 } else { 1806 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1807 + path.length); 1808 if (access_log == NULL) { 1809 nxt_alert(task, "failed to allocate access log structure"); 1810 goto fail; 1811 } 1812 1813 access_log->fd = -1; 1814 access_log->handler = &nxt_router_access_log_writer; 1815 access_log->count = 1; 1816 1817 access_log->path.length = path.length; 1818 access_log->path.start = (u_char *) access_log 1819 + sizeof(nxt_router_access_log_t); 1820 1821 nxt_memcpy(access_log->path.start, path.start, path.length); 1822 } 1823 1824 tmcf->router_conf->access_log = access_log; 1825 } 1826 1827 nxt_http_routes_resolve(task, tmcf); 1828 1829 nxt_queue_add(&tmcf->deleting, &router->sockets); 1830 nxt_queue_init(&router->sockets); 1831 1832 return NXT_OK; 1833 1834app_fail: 1835 1836 nxt_free(app); 1837 1838fail: 1839 1840 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1841 1842 nxt_queue_remove(&app->link); 1843 nxt_thread_mutex_destroy(&app->mutex); 1844 nxt_free(app); 1845 1846 } nxt_queue_loop; 1847 1848 return NXT_ERROR; 1849} 1850 1851 1852static nxt_int_t 1853nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 1854 nxt_conf_value_t *conf) 1855{ 1856 uint32_t next, i; 1857 nxt_mp_t *mp; 1858 nxt_str_t *type, extension, str; 1859 nxt_int_t ret; 1860 nxt_uint_t exts; 1861 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 1862 1863 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 1864 1865 mp = rtcf->mem_pool; 1866 1867 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 1868 if (nxt_slow_path(ret != NXT_OK)) { 1869 return NXT_ERROR; 1870 } 1871 1872 if (conf == NULL) { 1873 return NXT_OK; 1874 } 1875 1876 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 1877 1878 if (mtypes_conf != NULL) { 1879 next = 0; 1880 1881 for ( ;; ) { 1882 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 1883 1884 if (ext_conf == NULL) { 1885 break; 1886 } 1887 1888 type = nxt_str_dup(mp, NULL, &str); 1889 if (nxt_slow_path(type == NULL)) { 1890 return NXT_ERROR; 1891 } 1892 1893 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 1894 nxt_conf_get_string(ext_conf, &str); 1895 1896 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { 1897 return NXT_ERROR; 1898 } 1899 1900 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1901 &extension, type); 1902 if (nxt_slow_path(ret != NXT_OK)) { 1903 return NXT_ERROR; 1904 } 1905 1906 continue; 1907 } 1908 1909 exts = nxt_conf_array_elements_count(ext_conf); 1910 1911 for (i = 0; i < exts; i++) { 1912 value = nxt_conf_get_array_element(ext_conf, i); 1913 1914 nxt_conf_get_string(value, &str); 1915 1916 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { 1917 return NXT_ERROR; 1918 } 1919 1920 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1921 &extension, type); 1922 if (nxt_slow_path(ret != NXT_OK)) { 1923 return NXT_ERROR; 1924 } 1925 } 1926 } 1927 } 1928 1929 return NXT_OK; 1930} 1931 1932 1933static nxt_app_t * 1934nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1935{ 1936 nxt_app_t *app; 1937 1938 nxt_queue_each(app, queue, nxt_app_t, link) { 1939 1940 if (nxt_strstr_eq(name, &app->name)) { 1941 return app; 1942 } 1943 1944 } nxt_queue_loop; 1945 1946 return NULL; 1947} 1948 1949 1950void 1951nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name, 1952 nxt_http_action_t *action) 1953{ 1954 nxt_app_t *app; 1955 1956 app = nxt_router_app_find(&tmcf->apps, name); 1957 1958 if (app == NULL) { 1959 app = nxt_router_app_find(&tmcf->previous, name); 1960 } 1961 1962 action->u.application = app; 1963 action->handler = nxt_http_application_handler; 1964} 1965 1966 1967static nxt_socket_conf_t * 1968nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1969 nxt_str_t *name) 1970{ 1971 size_t size; 1972 nxt_int_t ret; 1973 nxt_bool_t wildcard; 1974 nxt_sockaddr_t *sa; 1975 nxt_socket_conf_t *skcf; 1976 nxt_listen_socket_t *ls; 1977 1978 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 1979 if (nxt_slow_path(sa == NULL)) { 1980 nxt_alert(task, "invalid listener \"%V\"", name); 1981 return NULL; 1982 } 1983 1984 sa->type = SOCK_STREAM; 1985 1986 nxt_debug(task, "router listener: \"%*s\"", 1987 (size_t) sa->length, nxt_sockaddr_start(sa)); 1988 1989 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 1990 if (nxt_slow_path(skcf == NULL)) { 1991 return NULL; 1992 } 1993 1994 size = nxt_sockaddr_size(sa); 1995 1996 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 1997 1998 if (ret != NXT_OK) { 1999 2000 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2001 if (nxt_slow_path(ls == NULL)) { 2002 return NULL; 2003 } 2004 2005 skcf->listen = ls; 2006 2007 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2008 nxt_memcpy(ls->sockaddr, sa, size); 2009 2010 nxt_listen_socket_remote_size(ls); 2011 2012 ls->socket = -1; 2013 ls->backlog = NXT_LISTEN_BACKLOG; 2014 ls->flags = NXT_NONBLOCK; 2015 ls->read_after_accept = 1; 2016 } 2017 2018 switch (sa->u.sockaddr.sa_family) { 2019#if (NXT_HAVE_UNIX_DOMAIN) 2020 case AF_UNIX: 2021 wildcard = 0; 2022 break; 2023#endif 2024#if (NXT_INET6) 2025 case AF_INET6: 2026 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2027 break; 2028#endif 2029 case AF_INET: 2030 default: 2031 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2032 break; 2033 } 2034 2035 if (!wildcard) { 2036 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2037 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2038 return NULL; 2039 } 2040 2041 nxt_memcpy(skcf->sockaddr, sa, size); 2042 } 2043 2044 return skcf; 2045} 2046 2047 2048static nxt_int_t 2049nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2050 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2051{ 2052 nxt_router_t *router; 2053 nxt_queue_link_t *qlk; 2054 nxt_socket_conf_t *skcf; 2055 2056 router = tmcf->router_conf->router; 2057 2058 for (qlk = nxt_queue_first(&router->sockets); 2059 qlk != nxt_queue_tail(&router->sockets); 2060 qlk = nxt_queue_next(qlk)) 2061 { 2062 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2063 2064 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2065 nskcf->listen = skcf->listen; 2066 2067 nxt_queue_remove(qlk); 2068 nxt_queue_insert_tail(&tmcf->keeping, qlk); 2069 2070 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link); 2071 2072 return NXT_OK; 2073 } 2074 } 2075 2076 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link); 2077 2078 return NXT_DECLINED; 2079} 2080 2081 2082static void 2083nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2084 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2085{ 2086 size_t size; 2087 uint32_t stream; 2088 nxt_int_t ret; 2089 nxt_buf_t *b; 2090 nxt_port_t *main_port, *router_port; 2091 nxt_runtime_t *rt; 2092 nxt_socket_rpc_t *rpc; 2093 2094 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2095 if (rpc == NULL) { 2096 goto fail; 2097 } 2098 2099 rpc->socket_conf = skcf; 2100 rpc->temp_conf = tmcf; 2101 2102 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2103 2104 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2105 if (b == NULL) { 2106 goto fail; 2107 } 2108 2109 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2110 2111 rt = task->thread->runtime; 2112 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2113 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2114 2115 stream = nxt_port_rpc_register_handler(task, router_port, 2116 nxt_router_listen_socket_ready, 2117 nxt_router_listen_socket_error, 2118 main_port->pid, rpc); 2119 if (nxt_slow_path(stream == 0)) { 2120 goto fail; 2121 } 2122 2123 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2124 stream, router_port->id, b); 2125 2126 if (nxt_slow_path(ret != NXT_OK)) { 2127 nxt_port_rpc_cancel(task, router_port, stream); 2128 goto fail; 2129 } 2130 2131 return; 2132 2133fail: 2134 2135 nxt_router_conf_error(task, tmcf); 2136} 2137 2138 2139static void 2140nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2141 void *data) 2142{ 2143 nxt_int_t ret; 2144 nxt_socket_t s; 2145 nxt_socket_rpc_t *rpc; 2146 2147 rpc = data; 2148 2149 s = msg->fd; 2150 2151 ret = nxt_socket_nonblocking(task, s); 2152 if (nxt_slow_path(ret != NXT_OK)) { 2153 goto fail; 2154 } 2155 2156 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2157 2158 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2159 if (nxt_slow_path(ret != NXT_OK)) { 2160 goto fail; 2161 } 2162 2163 rpc->socket_conf->listen->socket = s; 2164 2165 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2166 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2167 2168 return; 2169 2170fail: 2171 2172 nxt_socket_close(task, s); 2173 2174 nxt_router_conf_error(task, rpc->temp_conf); 2175} 2176 2177 2178static void 2179nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2180 void *data) 2181{ 2182 nxt_socket_rpc_t *rpc; 2183 nxt_router_temp_conf_t *tmcf; 2184 2185 rpc = data; 2186 tmcf = rpc->temp_conf; 2187 2188#if 0 2189 u_char *p; 2190 size_t size; 2191 uint8_t error; 2192 nxt_buf_t *in, *out; 2193 nxt_sockaddr_t *sa; 2194 2195 static nxt_str_t socket_errors[] = { 2196 nxt_string("ListenerSystem"), 2197 nxt_string("ListenerNoIPv6"), 2198 nxt_string("ListenerPort"), 2199 nxt_string("ListenerInUse"), 2200 nxt_string("ListenerNoAddress"), 2201 nxt_string("ListenerNoAccess"), 2202 nxt_string("ListenerPath"), 2203 }; 2204 2205 sa = rpc->socket_conf->listen->sockaddr; 2206 2207 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2208 2209 if (nxt_slow_path(in == NULL)) { 2210 return; 2211 } 2212 2213 p = in->mem.pos; 2214 2215 error = *p++; 2216 2217 size = nxt_length("listen socket error: ") 2218 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2219 + sa->length + socket_errors[error].length + (in->mem.free - p); 2220 2221 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2222 if (nxt_slow_path(out == NULL)) { 2223 return; 2224 } 2225 2226 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2227 "listen socket error: " 2228 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2229 (size_t) sa->length, nxt_sockaddr_start(sa), 2230 &socket_errors[error], in->mem.free - p, p); 2231 2232 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2233#endif 2234 2235 nxt_router_conf_error(task, tmcf); 2236} 2237 2238 2239#if (NXT_TLS) 2240 2241static void 2242nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2243 nxt_router_tlssock_t *tls) 2244{ 2245 nxt_socket_rpc_t *rpc; 2246 2247 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2248 if (rpc == NULL) { 2249 nxt_router_conf_error(task, tmcf); 2250 return; 2251 } 2252 2253 rpc->socket_conf = tls->conf; 2254 rpc->temp_conf = tmcf; 2255 2256 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 2257 nxt_router_tls_rpc_handler, rpc); 2258} 2259 2260 2261static void 2262nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2263 void *data) 2264{ 2265 nxt_mp_t *mp; 2266 nxt_int_t ret; 2267 nxt_tls_conf_t *tlscf; 2268 nxt_socket_rpc_t *rpc; 2269 nxt_router_temp_conf_t *tmcf; 2270 2271 nxt_debug(task, "tls rpc handler"); 2272 2273 rpc = data; 2274 tmcf = rpc->temp_conf; 2275 2276 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2277 goto fail; 2278 } 2279 2280 mp = tmcf->router_conf->mem_pool; 2281 2282 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2283 if (nxt_slow_path(tlscf == NULL)) { 2284 goto fail; 2285 } 2286 2287 tlscf->chain_file = msg->fd; 2288 2289 ret = task->thread->runtime->tls->server_init(task, tlscf); 2290 if (nxt_slow_path(ret != NXT_OK)) { 2291 goto fail; 2292 } 2293 2294 rpc->socket_conf->tls = tlscf; 2295 2296 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2297 nxt_router_conf_apply, task, tmcf, NULL); 2298 return; 2299 2300fail: 2301 2302 nxt_router_conf_error(task, tmcf); 2303} 2304 2305#endif 2306 2307 2308static void 2309nxt_router_app_rpc_create(nxt_task_t *task, 2310 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2311{ 2312 size_t size; 2313 uint32_t stream; 2314 nxt_int_t ret; 2315 nxt_buf_t *b; 2316 nxt_port_t *main_port, *router_port; 2317 nxt_runtime_t *rt; 2318 nxt_app_rpc_t *rpc; 2319 2320 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2321 if (rpc == NULL) { 2322 goto fail; 2323 } 2324 2325 rpc->app = app; 2326 rpc->temp_conf = tmcf; 2327 2328 nxt_debug(task, "app '%V' prefork", &app->name); 2329 2330 size = app->name.length + 1 + app->conf.length; 2331 2332 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2333 if (nxt_slow_path(b == NULL)) { 2334 goto fail; 2335 } 2336 2337 nxt_buf_cpystr(b, &app->name); 2338 *b->mem.free++ = '\0'; 2339 nxt_buf_cpystr(b, &app->conf); 2340 2341 rt = task->thread->runtime; 2342 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2343 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2344 2345 stream = nxt_port_rpc_register_handler(task, router_port, 2346 nxt_router_app_prefork_ready, 2347 nxt_router_app_prefork_error, 2348 -1, rpc); 2349 if (nxt_slow_path(stream == 0)) { 2350 goto fail; 2351 } 2352 2353 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 2354 stream, router_port->id, b); 2355 2356 if (nxt_slow_path(ret != NXT_OK)) { 2357 nxt_port_rpc_cancel(task, router_port, stream); 2358 goto fail; 2359 } 2360 2361 app->pending_processes++; 2362 2363 return; 2364 2365fail: 2366 2367 nxt_router_conf_error(task, tmcf); 2368} 2369 2370 2371static void 2372nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2373 void *data) 2374{ 2375 nxt_app_t *app; 2376 nxt_port_t *port; 2377 nxt_app_rpc_t *rpc; 2378 nxt_event_engine_t *engine; 2379 2380 rpc = data; 2381 app = rpc->app; 2382 2383 port = msg->u.new_port; 2384 port->app = app; 2385 2386 app->pending_processes--; 2387 app->processes++; 2388 app->idle_processes++; 2389 2390 engine = task->thread->engine; 2391 2392 nxt_queue_insert_tail(&app->ports, &port->app_link); 2393 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2394 2395 port->idle_start = 0; 2396 2397 nxt_port_inc_use(port); 2398 2399 nxt_work_queue_add(&engine->fast_work_queue, 2400 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2401} 2402 2403 2404static void 2405nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2406 void *data) 2407{ 2408 nxt_app_t *app; 2409 nxt_app_rpc_t *rpc; 2410 nxt_router_temp_conf_t *tmcf; 2411 2412 rpc = data; 2413 app = rpc->app; 2414 tmcf = rpc->temp_conf; 2415 2416 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2417 &app->name); 2418 2419 app->pending_processes--; 2420 2421 nxt_router_conf_error(task, tmcf); 2422} 2423 2424 2425static nxt_int_t 2426nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2427 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2428{ 2429 nxt_int_t ret; 2430 nxt_uint_t n, threads; 2431 nxt_queue_link_t *qlk; 2432 nxt_router_engine_conf_t *recf; 2433 2434 threads = tmcf->router_conf->threads; 2435 2436 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2437 sizeof(nxt_router_engine_conf_t)); 2438 if (nxt_slow_path(tmcf->engines == NULL)) { 2439 return NXT_ERROR; 2440 } 2441 2442 n = 0; 2443 2444 for (qlk = nxt_queue_first(&router->engines); 2445 qlk != nxt_queue_tail(&router->engines); 2446 qlk = nxt_queue_next(qlk)) 2447 { 2448 recf = nxt_array_zero_add(tmcf->engines); 2449 if (nxt_slow_path(recf == NULL)) { 2450 return NXT_ERROR; 2451 } 2452 2453 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2454 2455 if (n < threads) { 2456 recf->action = NXT_ROUTER_ENGINE_KEEP; 2457 ret = nxt_router_engine_conf_update(tmcf, recf); 2458 2459 } else { 2460 recf->action = NXT_ROUTER_ENGINE_DELETE; 2461 ret = nxt_router_engine_conf_delete(tmcf, recf); 2462 } 2463 2464 if (nxt_slow_path(ret != NXT_OK)) { 2465 return ret; 2466 } 2467 2468 n++; 2469 } 2470 2471 tmcf->new_threads = n; 2472 2473 while (n < threads) { 2474 recf = nxt_array_zero_add(tmcf->engines); 2475 if (nxt_slow_path(recf == NULL)) { 2476 return NXT_ERROR; 2477 } 2478 2479 recf->action = NXT_ROUTER_ENGINE_ADD; 2480 2481 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2482 if (nxt_slow_path(recf->engine == NULL)) { 2483 return NXT_ERROR; 2484 } 2485 2486 ret = nxt_router_engine_conf_create(tmcf, recf); 2487 if (nxt_slow_path(ret != NXT_OK)) { 2488 return ret; 2489 } 2490 2491 n++; 2492 } 2493 2494 return NXT_OK; 2495} 2496 2497 2498static nxt_int_t 2499nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2500 nxt_router_engine_conf_t *recf) 2501{ 2502 nxt_int_t ret; 2503 2504 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2505 nxt_router_listen_socket_create); 2506 if (nxt_slow_path(ret != NXT_OK)) { 2507 return ret; 2508 } 2509 2510 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2511 nxt_router_listen_socket_create); 2512 if (nxt_slow_path(ret != NXT_OK)) { 2513 return ret; 2514 } 2515 2516 return ret; 2517} 2518 2519 2520static nxt_int_t 2521nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2522 nxt_router_engine_conf_t *recf) 2523{ 2524 nxt_int_t ret; 2525 2526 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2527 nxt_router_listen_socket_create); 2528 if (nxt_slow_path(ret != NXT_OK)) { 2529 return ret; 2530 } 2531 2532 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2533 nxt_router_listen_socket_update); 2534 if (nxt_slow_path(ret != NXT_OK)) { 2535 return ret; 2536 } 2537 2538 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2539 if (nxt_slow_path(ret != NXT_OK)) { 2540 return ret; 2541 } 2542 2543 return ret; 2544} 2545 2546 2547static nxt_int_t 2548nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2549 nxt_router_engine_conf_t *recf) 2550{ 2551 nxt_int_t ret; 2552 2553 ret = nxt_router_engine_quit(tmcf, recf); 2554 if (nxt_slow_path(ret != NXT_OK)) { 2555 return ret; 2556 } 2557 2558 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); 2559 if (nxt_slow_path(ret != NXT_OK)) { 2560 return ret; 2561 } 2562 2563 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2564} 2565 2566 2567static nxt_int_t 2568nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 2569 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 2570 nxt_work_handler_t handler) 2571{ 2572 nxt_int_t ret; 2573 nxt_joint_job_t *job; 2574 nxt_queue_link_t *qlk; 2575 nxt_socket_conf_t *skcf; 2576 nxt_socket_conf_joint_t *joint; 2577 2578 for (qlk = nxt_queue_first(sockets); 2579 qlk != nxt_queue_tail(sockets); 2580 qlk = nxt_queue_next(qlk)) 2581 { 2582 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2583 if (nxt_slow_path(job == NULL)) { 2584 return NXT_ERROR; 2585 } 2586 2587 job->work.next = recf->jobs; 2588 recf->jobs = &job->work; 2589 2590 job->task = tmcf->engine->task; 2591 job->work.handler = handler; 2592 job->work.task = &job->task; 2593 job->work.obj = job; 2594 job->tmcf = tmcf; 2595 2596 tmcf->count++; 2597 2598 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 2599 sizeof(nxt_socket_conf_joint_t)); 2600 if (nxt_slow_path(joint == NULL)) { 2601 return NXT_ERROR; 2602 } 2603 2604 job->work.data = joint; 2605 2606 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 2607 if (nxt_slow_path(ret != NXT_OK)) { 2608 return ret; 2609 } 2610 2611 joint->count = 1; 2612 2613 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2614 skcf->count++; 2615 joint->socket_conf = skcf; 2616 2617 joint->engine = recf->engine; 2618 } 2619 2620 return NXT_OK; 2621} 2622 2623 2624static nxt_int_t 2625nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 2626 nxt_router_engine_conf_t *recf) 2627{ 2628 nxt_joint_job_t *job; 2629 2630 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2631 if (nxt_slow_path(job == NULL)) { 2632 return NXT_ERROR; 2633 } 2634 2635 job->work.next = recf->jobs; 2636 recf->jobs = &job->work; 2637 2638 job->task = tmcf->engine->task; 2639 job->work.handler = nxt_router_worker_thread_quit; 2640 job->work.task = &job->task; 2641 job->work.obj = NULL; 2642 job->work.data = NULL; 2643 job->tmcf = NULL; 2644 2645 return NXT_OK; 2646} 2647 2648 2649static nxt_int_t 2650nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 2651 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 2652{ 2653 nxt_joint_job_t *job; 2654 nxt_queue_link_t *qlk; 2655 2656 for (qlk = nxt_queue_first(sockets); 2657 qlk != nxt_queue_tail(sockets); 2658 qlk = nxt_queue_next(qlk)) 2659 { 2660 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2661 if (nxt_slow_path(job == NULL)) { 2662 return NXT_ERROR; 2663 } 2664 2665 job->work.next = recf->jobs; 2666 recf->jobs = &job->work; 2667 2668 job->task = tmcf->engine->task; 2669 job->work.handler = nxt_router_listen_socket_delete; 2670 job->work.task = &job->task; 2671 job->work.obj = job; 2672 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2673 job->tmcf = tmcf; 2674 2675 tmcf->count++; 2676 } 2677 2678 return NXT_OK; 2679} 2680 2681 2682static nxt_int_t 2683nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 2684 nxt_router_temp_conf_t *tmcf) 2685{ 2686 nxt_int_t ret; 2687 nxt_uint_t i, threads; 2688 nxt_router_engine_conf_t *recf; 2689 2690 recf = tmcf->engines->elts; 2691 threads = tmcf->router_conf->threads; 2692 2693 for (i = tmcf->new_threads; i < threads; i++) { 2694 ret = nxt_router_thread_create(task, rt, recf[i].engine); 2695 if (nxt_slow_path(ret != NXT_OK)) { 2696 return ret; 2697 } 2698 } 2699 2700 return NXT_OK; 2701} 2702 2703 2704static nxt_int_t 2705nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 2706 nxt_event_engine_t *engine) 2707{ 2708 nxt_int_t ret; 2709 nxt_thread_link_t *link; 2710 nxt_thread_handle_t handle; 2711 2712 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2713 2714 if (nxt_slow_path(link == NULL)) { 2715 return NXT_ERROR; 2716 } 2717 2718 link->start = nxt_router_thread_start; 2719 link->engine = engine; 2720 link->work.handler = nxt_router_thread_exit_handler; 2721 link->work.task = task; 2722 link->work.data = link; 2723 2724 nxt_queue_insert_tail(&rt->engines, &engine->link); 2725 2726 ret = nxt_thread_create(&handle, link); 2727 2728 if (nxt_slow_path(ret != NXT_OK)) { 2729 nxt_queue_remove(&engine->link); 2730 } 2731 2732 return ret; 2733} 2734 2735 2736static void 2737nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 2738 nxt_router_temp_conf_t *tmcf) 2739{ 2740 nxt_app_t *app; 2741 2742 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 2743 2744 nxt_router_app_unlink(task, app); 2745 2746 } nxt_queue_loop; 2747 2748 nxt_queue_add(&router->apps, &tmcf->previous); 2749 nxt_queue_add(&router->apps, &tmcf->apps); 2750} 2751 2752 2753static void 2754nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 2755{ 2756 nxt_uint_t n; 2757 nxt_event_engine_t *engine; 2758 nxt_router_engine_conf_t *recf; 2759 2760 recf = tmcf->engines->elts; 2761 2762 for (n = tmcf->engines->nelts; n != 0; n--) { 2763 engine = recf->engine; 2764 2765 switch (recf->action) { 2766 2767 case NXT_ROUTER_ENGINE_KEEP: 2768 break; 2769 2770 case NXT_ROUTER_ENGINE_ADD: 2771 nxt_queue_insert_tail(&router->engines, &engine->link0); 2772 break; 2773 2774 case NXT_ROUTER_ENGINE_DELETE: 2775 nxt_queue_remove(&engine->link0); 2776 break; 2777 } 2778 2779 nxt_router_engine_post(engine, recf->jobs); 2780 2781 recf++; 2782 } 2783} 2784 2785 2786static void 2787nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 2788{ 2789 nxt_work_t *work, *next; 2790 2791 for (work = jobs; work != NULL; work = next) { 2792 next = work->next; 2793 work->next = NULL; 2794 2795 nxt_event_engine_post(engine, work); 2796 } 2797} 2798 2799 2800static nxt_port_handlers_t nxt_router_app_port_handlers = { 2801 .rpc_error = nxt_port_rpc_handler, 2802 .mmap = nxt_port_mmap_handler, 2803 .data = nxt_port_rpc_handler, 2804 .oosm = nxt_router_oosm_handler, 2805}; 2806 2807 2808static void 2809nxt_router_thread_start(void *data) 2810{ 2811 nxt_int_t ret; 2812 nxt_port_t *port; 2813 nxt_task_t *task; 2814 nxt_thread_t *thread; 2815 nxt_thread_link_t *link; 2816 nxt_event_engine_t *engine; 2817 2818 link = data; 2819 engine = link->engine; 2820 task = &engine->task; 2821 2822 thread = nxt_thread(); 2823 2824 nxt_event_engine_thread_adopt(engine); 2825 2826 /* STUB */ 2827 thread->runtime = engine->task.thread->runtime; 2828 2829 engine->task.thread = thread; 2830 engine->task.log = thread->log; 2831 thread->engine = engine; 2832 thread->task = &engine->task; 2833#if 0 2834 thread->fiber = &engine->fibers->fiber; 2835#endif 2836 2837 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 2838 if (nxt_slow_path(engine->mem_pool == NULL)) { 2839 return; 2840 } 2841 2842 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 2843 NXT_PROCESS_ROUTER); 2844 if (nxt_slow_path(port == NULL)) { 2845 return; 2846 } 2847 2848 ret = nxt_port_socket_init(task, port, 0); 2849 if (nxt_slow_path(ret != NXT_OK)) { 2850 nxt_port_use(task, port, -1); 2851 return; 2852 } 2853 2854 engine->port = port; 2855 2856 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 2857 2858 nxt_event_engine_start(engine); 2859} 2860 2861 2862static void 2863nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 2864{ 2865 nxt_joint_job_t *job; 2866 nxt_socket_conf_t *skcf; 2867 nxt_listen_event_t *lev; 2868 nxt_listen_socket_t *ls; 2869 nxt_thread_spinlock_t *lock; 2870 nxt_socket_conf_joint_t *joint; 2871 2872 job = obj; 2873 joint = data; 2874 2875 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 2876 2877 skcf = joint->socket_conf; 2878 ls = skcf->listen; 2879 2880 lev = nxt_listen_event(task, ls); 2881 if (nxt_slow_path(lev == NULL)) { 2882 nxt_router_listen_socket_release(task, skcf); 2883 return; 2884 } 2885 2886 lev->socket.data = joint; 2887 2888 lock = &skcf->router_conf->router->lock; 2889 2890 nxt_thread_spin_lock(lock); 2891 ls->count++; 2892 nxt_thread_spin_unlock(lock); 2893 2894 job->work.next = NULL; 2895 job->work.handler = nxt_router_conf_wait; 2896 2897 nxt_event_engine_post(job->tmcf->engine, &job->work); 2898} 2899 2900 2901nxt_inline nxt_listen_event_t * 2902nxt_router_listen_event(nxt_queue_t *listen_connections, 2903 nxt_socket_conf_t *skcf) 2904{ 2905 nxt_socket_t fd; 2906 nxt_queue_link_t *qlk; 2907 nxt_listen_event_t *lev; 2908 2909 fd = skcf->listen->socket; 2910 2911 for (qlk = nxt_queue_first(listen_connections); 2912 qlk != nxt_queue_tail(listen_connections); 2913 qlk = nxt_queue_next(qlk)) 2914 { 2915 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 2916 2917 if (fd == lev->socket.fd) { 2918 return lev; 2919 } 2920 } 2921 2922 return NULL; 2923} 2924 2925 2926static void 2927nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 2928{ 2929 nxt_joint_job_t *job; 2930 nxt_event_engine_t *engine; 2931 nxt_listen_event_t *lev; 2932 nxt_socket_conf_joint_t *joint, *old; 2933 2934 job = obj; 2935 joint = data; 2936 2937 engine = task->thread->engine; 2938 2939 nxt_queue_insert_tail(&engine->joints, &joint->link); 2940 2941 lev = nxt_router_listen_event(&engine->listen_connections, 2942 joint->socket_conf); 2943 2944 old = lev->socket.data; 2945 lev->socket.data = joint; 2946 lev->listen = joint->socket_conf->listen; 2947 2948 job->work.next = NULL; 2949 job->work.handler = nxt_router_conf_wait; 2950 2951 nxt_event_engine_post(job->tmcf->engine, &job->work); 2952 2953 /* 2954 * The task is allocated from configuration temporary 2955 * memory pool so it can be freed after engine post operation. 2956 */ 2957 2958 nxt_router_conf_release(&engine->task, old); 2959} 2960 2961 2962static void 2963nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 2964{ 2965 nxt_joint_job_t *job; 2966 nxt_socket_conf_t *skcf; 2967 nxt_listen_event_t *lev; 2968 nxt_event_engine_t *engine; 2969 2970 job = obj; 2971 skcf = data; 2972 2973 engine = task->thread->engine; 2974 2975 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 2976 2977 nxt_fd_event_delete(engine, &lev->socket); 2978 2979 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 2980 lev->socket.fd); 2981 2982 lev->timer.handler = nxt_router_listen_socket_close; 2983 lev->timer.work_queue = &engine->fast_work_queue; 2984 2985 nxt_timer_add(engine, &lev->timer, 0); 2986 2987 job->work.next = NULL; 2988 job->work.handler = nxt_router_conf_wait; 2989 2990 nxt_event_engine_post(job->tmcf->engine, &job->work); 2991} 2992 2993 2994static void 2995nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 2996{ 2997 nxt_event_engine_t *engine; 2998 2999 nxt_debug(task, "router worker thread quit"); 3000 3001 engine = task->thread->engine; 3002 3003 engine->shutdown = 1; 3004 3005 if (nxt_queue_is_empty(&engine->joints)) { 3006 nxt_thread_exit(task->thread); 3007 } 3008} 3009 3010 3011static void 3012nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3013{ 3014 nxt_timer_t *timer; 3015 nxt_listen_event_t *lev; 3016 nxt_socket_conf_joint_t *joint; 3017 3018 timer = obj; 3019 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3020 3021 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3022 lev->socket.fd); 3023 3024 nxt_queue_remove(&lev->link); 3025 3026 joint = lev->socket.data; 3027 lev->socket.data = NULL; 3028 3029 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3030 task = &task->thread->engine->task; 3031 3032 nxt_router_listen_socket_release(task, joint->socket_conf); 3033 3034 nxt_router_listen_event_release(task, lev, joint); 3035} 3036 3037 3038static void 3039nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3040{ 3041 nxt_listen_socket_t *ls; 3042 nxt_thread_spinlock_t *lock; 3043 3044 ls = skcf->listen; 3045 lock = &skcf->router_conf->router->lock; 3046 3047 nxt_thread_spin_lock(lock); 3048 3049 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3050 task->thread->engine, ls->count); 3051 3052 if (--ls->count != 0) { 3053 ls = NULL; 3054 } 3055 3056 nxt_thread_spin_unlock(lock); 3057 3058 if (ls != NULL) { 3059 nxt_socket_close(task, ls->socket); 3060 nxt_free(ls); 3061 } 3062} 3063 3064 3065void 3066nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3067 nxt_socket_conf_joint_t *joint) 3068{ 3069 nxt_event_engine_t *engine; 3070 3071 nxt_debug(task, "listen event count: %D", lev->count); 3072 3073 if (--lev->count == 0) { 3074 nxt_free(lev); 3075 } 3076 3077 if (joint != NULL) { 3078 nxt_router_conf_release(task, joint); 3079 } 3080 3081 engine = task->thread->engine; 3082 3083 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3084 nxt_thread_exit(task->thread); 3085 } 3086} 3087 3088 3089void 3090nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3091{ 3092 nxt_socket_conf_t *skcf; 3093 nxt_router_conf_t *rtcf; 3094 nxt_thread_spinlock_t *lock; 3095 3096 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3097 3098 if (--joint->count != 0) { 3099 return; 3100 } 3101 3102 nxt_queue_remove(&joint->link); 3103 3104 /* 3105 * The joint content can not be safely used after the critical 3106 * section protected by the spinlock because its memory pool may 3107 * be already destroyed by another thread. 3108 */ 3109 skcf = joint->socket_conf; 3110 rtcf = skcf->router_conf; 3111 lock = &rtcf->router->lock; 3112 3113 nxt_thread_spin_lock(lock); 3114 3115 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3116 rtcf, rtcf->count); 3117 3118 if (--skcf->count != 0) { 3119 skcf = NULL; 3120 rtcf = NULL; 3121 3122 } else { 3123 nxt_queue_remove(&skcf->link); 3124 3125 if (--rtcf->count != 0) { 3126 rtcf = NULL; 3127 } 3128 } 3129 3130 nxt_thread_spin_unlock(lock); 3131 3132 if (skcf != NULL) { 3133 if (skcf->action != NULL) { 3134 nxt_http_action_cleanup(task, skcf->action); 3135 } 3136 3137#if (NXT_TLS) 3138 if (skcf->tls != NULL) { 3139 task->thread->runtime->tls->server_free(task, skcf->tls); 3140 } 3141#endif 3142 } 3143 3144 /* TODO remove engine->port */ 3145 /* TODO excude from connected ports */ 3146 3147 if (rtcf != NULL) { 3148 nxt_debug(task, "old router conf is destroyed"); 3149 3150 nxt_http_routes_cleanup(task, rtcf->routes); 3151 3152 nxt_router_access_log_release(task, lock, rtcf->access_log); 3153 3154 nxt_mp_thread_adopt(rtcf->mem_pool); 3155 3156 nxt_mp_destroy(rtcf->mem_pool); 3157 } 3158} 3159 3160 3161static void 3162nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3163 nxt_router_access_log_t *access_log) 3164{ 3165 size_t size; 3166 u_char *buf, *p; 3167 nxt_off_t bytes; 3168 3169 static nxt_time_string_t date_cache = { 3170 (nxt_atomic_uint_t) -1, 3171 nxt_router_access_log_date, 3172 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3173 nxt_length("31/Dec/1986:19:40:00 +0300"), 3174 NXT_THREAD_TIME_LOCAL, 3175 NXT_THREAD_TIME_SEC, 3176 }; 3177 3178 size = r->remote->address_length 3179 + 6 /* ' - - [' */ 3180 + date_cache.size 3181 + 3 /* '] "' */ 3182 + r->method->length 3183 + 1 /* space */ 3184 + r->target.length 3185 + 1 /* space */ 3186 + r->version.length 3187 + 2 /* '" ' */ 3188 + 3 /* status */ 3189 + 1 /* space */ 3190 + NXT_OFF_T_LEN 3191 + 2 /* ' "' */ 3192 + (r->referer != NULL ? r->referer->value_length : 1) 3193 + 3 /* '" "' */ 3194 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3195 + 2 /* '"\n' */ 3196 ; 3197 3198 buf = nxt_mp_nget(r->mem_pool, size); 3199 if (nxt_slow_path(buf == NULL)) { 3200 return; 3201 } 3202 3203 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3204 r->remote->address_length); 3205 3206 p = nxt_cpymem(p, " - - [", 6); 3207 3208 p = nxt_thread_time_string(task->thread, &date_cache, p); 3209 3210 p = nxt_cpymem(p, "] \"", 3); 3211 3212 if (r->method->length != 0) { 3213 p = nxt_cpymem(p, r->method->start, r->method->length); 3214 3215 if (r->target.length != 0) { 3216 *p++ = ' '; 3217 p = nxt_cpymem(p, r->target.start, r->target.length); 3218 3219 if (r->version.length != 0) { 3220 *p++ = ' '; 3221 p = nxt_cpymem(p, r->version.start, r->version.length); 3222 } 3223 } 3224 3225 } else { 3226 *p++ = '-'; 3227 } 3228 3229 p = nxt_cpymem(p, "\" ", 2); 3230 3231 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3232 3233 *p++ = ' '; 3234 3235 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3236 3237 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3238 3239 p = nxt_cpymem(p, " \"", 2); 3240 3241 if (r->referer != NULL) { 3242 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3243 3244 } else { 3245 *p++ = '-'; 3246 } 3247 3248 p = nxt_cpymem(p, "\" \"", 3); 3249 3250 if (r->user_agent != NULL) { 3251 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3252 3253 } else { 3254 *p++ = '-'; 3255 } 3256 3257 p = nxt_cpymem(p, "\"\n", 2); 3258 3259 nxt_fd_write(access_log->fd, buf, p - buf); 3260} 3261 3262 3263static u_char * 3264nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3265 size_t size, const char *format) 3266{ 3267 u_char sign; 3268 time_t gmtoff; 3269 3270 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3271 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3272 3273 gmtoff = nxt_timezone(tm) / 60; 3274 3275 if (gmtoff < 0) { 3276 gmtoff = -gmtoff; 3277 sign = '-'; 3278 3279 } else { 3280 sign = '+'; 3281 } 3282 3283 return nxt_sprintf(buf, buf + size, format, 3284 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3285 tm->tm_hour, tm->tm_min, tm->tm_sec, 3286 sign, gmtoff / 60, gmtoff % 60); 3287} 3288 3289 3290static void 3291nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3292{ 3293 uint32_t stream; 3294 nxt_int_t ret; 3295 nxt_buf_t *b; 3296 nxt_port_t *main_port, *router_port; 3297 nxt_runtime_t *rt; 3298 nxt_router_access_log_t *access_log; 3299 3300 access_log = tmcf->router_conf->access_log; 3301 3302 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3303 if (nxt_slow_path(b == NULL)) { 3304 goto fail; 3305 } 3306 3307 nxt_buf_cpystr(b, &access_log->path); 3308 *b->mem.free++ = '\0'; 3309 3310 rt = task->thread->runtime; 3311 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3312 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3313 3314 stream = nxt_port_rpc_register_handler(task, router_port, 3315 nxt_router_access_log_ready, 3316 nxt_router_access_log_error, 3317 -1, tmcf); 3318 if (nxt_slow_path(stream == 0)) { 3319 goto fail; 3320 } 3321 3322 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3323 stream, router_port->id, b); 3324 3325 if (nxt_slow_path(ret != NXT_OK)) { 3326 nxt_port_rpc_cancel(task, router_port, stream); 3327 goto fail; 3328 } 3329 3330 return; 3331 3332fail: 3333 3334 nxt_router_conf_error(task, tmcf); 3335} 3336 3337 3338static void 3339nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3340 void *data) 3341{ 3342 nxt_router_temp_conf_t *tmcf; 3343 nxt_router_access_log_t *access_log; 3344 3345 tmcf = data; 3346 3347 access_log = tmcf->router_conf->access_log; 3348 3349 access_log->fd = msg->fd; 3350 3351 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3352 nxt_router_conf_apply, task, tmcf, NULL); 3353} 3354 3355 3356static void 3357nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3358 void *data) 3359{ 3360 nxt_router_temp_conf_t *tmcf; 3361 3362 tmcf = data; 3363 3364 nxt_router_conf_error(task, tmcf); 3365} 3366 3367 3368static void 3369nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3370 nxt_router_access_log_t *access_log) 3371{ 3372 if (access_log == NULL) { 3373 return; 3374 } 3375 3376 nxt_thread_spin_lock(lock); 3377 3378 if (--access_log->count != 0) { 3379 access_log = NULL; 3380 } 3381 3382 nxt_thread_spin_unlock(lock); 3383 3384 if (access_log != NULL) { 3385 3386 if (access_log->fd != -1) { 3387 nxt_fd_close(access_log->fd); 3388 } 3389 3390 nxt_free(access_log); 3391 } 3392} 3393 3394 3395typedef struct { 3396 nxt_mp_t *mem_pool; 3397 nxt_router_access_log_t *access_log; 3398} nxt_router_access_log_reopen_t; 3399 3400 3401void 3402nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3403{ 3404 nxt_mp_t *mp; 3405 uint32_t stream; 3406 nxt_int_t ret; 3407 nxt_buf_t *b; 3408 nxt_port_t *main_port, *router_port; 3409 nxt_runtime_t *rt; 3410 nxt_router_access_log_t *access_log; 3411 nxt_router_access_log_reopen_t *reopen; 3412 3413 access_log = nxt_router->access_log; 3414 3415 if (access_log == NULL) { 3416 return; 3417 } 3418 3419 mp = nxt_mp_create(1024, 128, 256, 32); 3420 if (nxt_slow_path(mp == NULL)) { 3421 return; 3422 } 3423 3424 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3425 if (nxt_slow_path(reopen == NULL)) { 3426 goto fail; 3427 } 3428 3429 reopen->mem_pool = mp; 3430 reopen->access_log = access_log; 3431 3432 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 3433 if (nxt_slow_path(b == NULL)) { 3434 goto fail; 3435 } 3436 3437 b->completion_handler = nxt_router_access_log_reopen_completion; 3438 3439 nxt_buf_cpystr(b, &access_log->path); 3440 *b->mem.free++ = '\0'; 3441 3442 rt = task->thread->runtime; 3443 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3444 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3445 3446 stream = nxt_port_rpc_register_handler(task, router_port, 3447 nxt_router_access_log_reopen_ready, 3448 nxt_router_access_log_reopen_error, 3449 -1, reopen); 3450 if (nxt_slow_path(stream == 0)) { 3451 goto fail; 3452 } 3453 3454 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3455 stream, router_port->id, b); 3456 3457 if (nxt_slow_path(ret != NXT_OK)) { 3458 nxt_port_rpc_cancel(task, router_port, stream); 3459 goto fail; 3460 } 3461 3462 nxt_mp_retain(mp); 3463 3464 return; 3465 3466fail: 3467 3468 nxt_mp_destroy(mp); 3469} 3470 3471 3472static void 3473nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 3474{ 3475 nxt_mp_t *mp; 3476 nxt_buf_t *b; 3477 3478 b = obj; 3479 mp = b->data; 3480 3481 nxt_mp_release(mp); 3482} 3483 3484 3485static void 3486nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3487 void *data) 3488{ 3489 nxt_router_access_log_t *access_log; 3490 nxt_router_access_log_reopen_t *reopen; 3491 3492 reopen = data; 3493 3494 access_log = reopen->access_log; 3495 3496 if (access_log == nxt_router->access_log) { 3497 3498 if (nxt_slow_path(dup2(msg->fd, access_log->fd) == -1)) { 3499 nxt_alert(task, "dup2(%FD, %FD) failed %E", 3500 msg->fd, access_log->fd, nxt_errno); 3501 } 3502 } 3503 3504 nxt_fd_close(msg->fd); 3505 nxt_mp_release(reopen->mem_pool); 3506} 3507 3508 3509static void 3510nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3511 void *data) 3512{ 3513 nxt_router_access_log_reopen_t *reopen; 3514 3515 reopen = data; 3516 3517 nxt_mp_release(reopen->mem_pool); 3518} 3519 3520 3521static void 3522nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 3523{ 3524 nxt_port_t *port; 3525 nxt_thread_link_t *link; 3526 nxt_event_engine_t *engine; 3527 nxt_thread_handle_t handle; 3528 3529 handle = (nxt_thread_handle_t) obj; 3530 link = data; 3531 3532 nxt_thread_wait(handle); 3533 3534 engine = link->engine; 3535 3536 nxt_queue_remove(&engine->link); 3537 3538 port = engine->port; 3539 3540 // TODO notify all apps 3541 3542 port->engine = task->thread->engine; 3543 nxt_mp_thread_adopt(port->mem_pool); 3544 nxt_port_use(task, port, -1); 3545 3546 nxt_mp_thread_adopt(engine->mem_pool); 3547 nxt_mp_destroy(engine->mem_pool); 3548 3549 nxt_event_engine_free(engine); 3550 3551 nxt_free(link); 3552} 3553 3554 3555static void 3556nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3557 void *data) 3558{ 3559 nxt_int_t ret; 3560 nxt_buf_t *b, *next; 3561 nxt_port_t *app_port; 3562 nxt_unit_field_t *f; 3563 nxt_http_field_t *field; 3564 nxt_http_request_t *r; 3565 nxt_unit_response_t *resp; 3566 nxt_request_app_link_t *req_app_link; 3567 nxt_request_rpc_data_t *req_rpc_data; 3568 3569 b = msg->buf; 3570 req_rpc_data = data; 3571 3572 if (msg->size == 0) { 3573 b = NULL; 3574 } 3575 3576 r = req_rpc_data->request; 3577 if (nxt_slow_path(r == NULL)) { 3578 return; 3579 } 3580 3581 if (r->error) { 3582 nxt_request_rpc_data_unlink(task, req_rpc_data); 3583 return; 3584 } 3585 3586 if (msg->port_msg.last != 0) { 3587 nxt_debug(task, "router data create last buf"); 3588 3589 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 3590 3591 nxt_request_rpc_data_unlink(task, req_rpc_data); 3592 3593 } else { 3594 if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) { 3595 r->timer.handler = nxt_router_app_timeout; 3596 r->timer_data = req_rpc_data; 3597 nxt_timer_add(task->thread->engine, &r->timer, 3598 req_rpc_data->app->timeout); 3599 } 3600 } 3601 3602 if (b == NULL) { 3603 return; 3604 } 3605 3606 if (msg->buf == b) { 3607 /* Disable instant buffer completion/re-using by port. */ 3608 msg->buf = NULL; 3609 } 3610 3611 if (r->header_sent) { 3612 nxt_buf_chain_add(&r->out, b); 3613 nxt_http_request_send_body(task, r, NULL); 3614 3615 } else { 3616 size_t b_size = nxt_buf_mem_used_size(&b->mem); 3617 3618 if (nxt_slow_path(b_size < sizeof(*resp))) { 3619 goto fail; 3620 } 3621 3622 resp = (void *) b->mem.pos; 3623 if (nxt_slow_path(b_size < sizeof(*resp) 3624 + resp->fields_count * sizeof(nxt_unit_field_t))) { 3625 goto fail; 3626 } 3627 3628 field = NULL; 3629 3630 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 3631 if (f->skip) { 3632 continue; 3633 } 3634 3635 field = nxt_list_add(r->resp.fields); 3636 3637 if (nxt_slow_path(field == NULL)) { 3638 goto fail; 3639 } 3640 3641 field->hash = f->hash; 3642 field->skip = 0; 3643 field->hopbyhop = 0; 3644 3645 field->name_length = f->name_length; 3646 field->value_length = f->value_length; 3647 field->name = nxt_unit_sptr_get(&f->name); 3648 field->value = nxt_unit_sptr_get(&f->value); 3649 3650 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 3651 if (nxt_slow_path(ret != NXT_OK)) { 3652 goto fail; 3653 } 3654 3655 nxt_debug(task, "header%s: %*s: %*s", 3656 (field->skip ? " skipped" : ""), 3657 (size_t) field->name_length, field->name, 3658 (size_t) field->value_length, field->value); 3659 3660 if (field->skip) { 3661 r->resp.fields->last->nelts--; 3662 } 3663 } 3664 3665 r->status = resp->status; 3666 3667 if (resp->piggyback_content_length != 0) { 3668 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 3669 b->mem.free = b->mem.pos + resp->piggyback_content_length; 3670 3671 } else { 3672 b->mem.pos = b->mem.free; 3673 } 3674 3675 if (nxt_buf_mem_used_size(&b->mem) == 0) { 3676 next = b->next; 3677 b->next = NULL; 3678 3679 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3680 b->completion_handler, task, b, b->parent); 3681 3682 b = next; 3683 } 3684 3685 if (b != NULL) { 3686 nxt_buf_chain_add(&r->out, b); 3687 } 3688 3689 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 3690 3691 if (r->websocket_handshake 3692 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 3693 { 3694 req_app_link = nxt_request_app_link_alloc(task, 3695 req_rpc_data->req_app_link, 3696 req_rpc_data); 3697 if (nxt_slow_path(req_app_link == NULL)) { 3698 goto fail; 3699 } 3700 3701 app_port = req_app_link->app_port; 3702 3703 if (app_port == NULL && req_rpc_data->app_port != NULL) { 3704 req_app_link->app_port = req_rpc_data->app_port; 3705 app_port = req_app_link->app_port; 3706 req_app_link->apr_action = req_rpc_data->apr_action; 3707 3708 req_rpc_data->app_port = NULL; 3709 } 3710 3711 if (nxt_slow_path(app_port == NULL)) { 3712 goto fail; 3713 } 3714 3715 nxt_thread_mutex_lock(&req_rpc_data->app->mutex); 3716 3717 nxt_queue_insert_tail(&app_port->active_websockets, 3718 &req_app_link->link_port_websockets); 3719 3720 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); 3721 3722 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); 3723 req_app_link->apr_action = NXT_APR_CLOSE; 3724 3725 nxt_debug(task, "req_app_link stream #%uD upgrade", 3726 req_app_link->stream); 3727 3728 r->state = &nxt_http_websocket; 3729 3730 } else { 3731 r->state = &nxt_http_request_send_state; 3732 } 3733 } 3734 3735 return; 3736 3737fail: 3738 3739 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 3740 3741 nxt_request_rpc_data_unlink(task, req_rpc_data); 3742} 3743 3744 3745static const nxt_http_request_state_t nxt_http_request_send_state 3746 nxt_aligned(64) = 3747{ 3748 .error_handler = nxt_http_request_error_handler, 3749}; 3750 3751 3752static void 3753nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 3754{ 3755 nxt_buf_t *out; 3756 nxt_http_request_t *r; 3757 3758 r = obj; 3759 3760 out = r->out; 3761 3762 if (out != NULL) { 3763 r->out = NULL; 3764 nxt_http_request_send(task, r, out); 3765 } 3766} 3767 3768 3769static void 3770nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3771 void *data) 3772{ 3773 nxt_int_t res; 3774 nxt_port_t *port; 3775 nxt_bool_t cancelled; 3776 nxt_request_app_link_t *req_app_link; 3777 nxt_request_rpc_data_t *req_rpc_data; 3778 3779 req_rpc_data = data; 3780 3781 req_app_link = req_rpc_data->req_app_link; 3782 3783 if (req_app_link != NULL) { 3784 cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, 3785 req_app_link->stream); 3786 if (cancelled) { 3787 res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); 3788 3789 if (res == NXT_OK) { 3790 port = req_app_link->app_port; 3791 3792 if (nxt_slow_path(port == NULL)) { 3793 nxt_log(task, NXT_LOG_ERR, 3794 "port is NULL in cancelled req_app_link"); 3795 return; 3796 } 3797 3798 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, 3799 req_rpc_data, port->pid); 3800 3801 nxt_router_app_prepare_request(task, req_app_link); 3802 } 3803 3804 msg->port_msg.last = 0; 3805 3806 return; 3807 } 3808 } 3809 3810 if (req_rpc_data->request != NULL) { 3811 nxt_http_request_error(task, req_rpc_data->request, 3812 NXT_HTTP_SERVICE_UNAVAILABLE); 3813 } 3814 3815 nxt_request_rpc_data_unlink(task, req_rpc_data); 3816} 3817 3818 3819static void 3820nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3821 void *data) 3822{ 3823 nxt_app_t *app; 3824 nxt_port_t *port; 3825 nxt_app_joint_t *app_joint; 3826 3827 app_joint = data; 3828 port = msg->u.new_port; 3829 3830 nxt_assert(app_joint != NULL); 3831 nxt_assert(port != NULL); 3832 3833 app = app_joint->app; 3834 3835 nxt_router_app_joint_use(task, app_joint, -1); 3836 3837 if (nxt_slow_path(app == NULL)) { 3838 nxt_debug(task, "new port ready for released app, send QUIT"); 3839 3840 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 3841 3842 return; 3843 } 3844 3845 port->app = app; 3846 3847 nxt_thread_mutex_lock(&app->mutex); 3848 3849 nxt_assert(app->pending_processes != 0); 3850 3851 app->pending_processes--; 3852 app->processes++; 3853 3854 nxt_thread_mutex_unlock(&app->mutex); 3855 3856 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 3857 &app->name, port->pid, app->processes, app->pending_processes); 3858 3859 nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); 3860} 3861 3862 3863static void 3864nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3865 void *data) 3866{ 3867 nxt_app_t *app; 3868 nxt_app_joint_t *app_joint; 3869 nxt_queue_link_t *lnk; 3870 nxt_request_app_link_t *req_app_link; 3871 3872 app_joint = data; 3873 3874 nxt_assert(app_joint != NULL); 3875 3876 app = app_joint->app; 3877 3878 nxt_router_app_joint_use(task, app_joint, -1); 3879 3880 if (nxt_slow_path(app == NULL)) { 3881 nxt_debug(task, "start error for released app"); 3882 3883 return; 3884 } 3885 3886 nxt_debug(task, "app '%V' %p start error", &app->name, app); 3887 3888 nxt_thread_mutex_lock(&app->mutex); 3889 3890 nxt_assert(app->pending_processes != 0); 3891 3892 app->pending_processes--; 3893 3894 if (!nxt_queue_is_empty(&app->requests)) { 3895 lnk = nxt_queue_last(&app->requests); 3896 nxt_queue_remove(lnk); 3897 lnk->next = NULL; 3898 3899 req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, 3900 link_app_requests); 3901 3902 } else { 3903 req_app_link = NULL; 3904 } 3905 3906 nxt_thread_mutex_unlock(&app->mutex); 3907 3908 if (req_app_link != NULL) { 3909 nxt_debug(task, "app '%V' %p abort next stream #%uD", 3910 &app->name, app, req_app_link->stream); 3911
| 747} 748 749 750nxt_inline void 751nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app, 752 nxt_request_app_link_t *req_app_link) 753{ 754 nxt_queue_insert_tail(&req_app_link->app_port->pending_requests, 755 &req_app_link->link_port_pending); 756 nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending); 757 758 nxt_request_app_link_inc_use(req_app_link); 759 760 req_app_link->res_time = nxt_thread_monotonic_time(task->thread) 761 + app->res_timeout; 762 763 nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests", 764 req_app_link->stream); 765} 766 767 768nxt_inline nxt_bool_t 769nxt_queue_chk_remove(nxt_queue_link_t *lnk) 770{ 771 if (lnk->next != NULL) { 772 nxt_queue_remove(lnk); 773 774 lnk->next = NULL; 775 776 return 1; 777 } 778 779 return 0; 780} 781 782 783nxt_inline void 784nxt_request_rpc_data_unlink(nxt_task_t *task, 785 nxt_request_rpc_data_t *req_rpc_data) 786{ 787 int ra_use_delta; 788 nxt_request_app_link_t *req_app_link; 789 790 if (req_rpc_data->app_port != NULL) { 791 nxt_router_app_port_release(task, req_rpc_data->app_port, 792 req_rpc_data->apr_action); 793 794 req_rpc_data->app_port = NULL; 795 } 796 797 nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 798 799 req_app_link = req_rpc_data->req_app_link; 800 if (req_app_link != NULL) { 801 req_rpc_data->req_app_link = NULL; 802 req_app_link->req_rpc_data = NULL; 803 804 ra_use_delta = 0; 805 806 nxt_thread_mutex_lock(&req_rpc_data->app->mutex); 807 808 if (req_app_link->link_app_requests.next == NULL 809 && req_app_link->link_port_pending.next == NULL 810 && req_app_link->link_app_pending.next == NULL 811 && req_app_link->link_port_websockets.next == NULL) 812 { 813 req_app_link = NULL; 814 815 } else { 816 ra_use_delta -= 817 nxt_queue_chk_remove(&req_app_link->link_app_requests) 818 + nxt_queue_chk_remove(&req_app_link->link_port_pending) 819 + nxt_queue_chk_remove(&req_app_link->link_port_websockets); 820 821 nxt_queue_chk_remove(&req_app_link->link_app_pending); 822 } 823 824 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); 825 826 if (req_app_link != NULL) { 827 nxt_request_app_link_use(task, req_app_link, ra_use_delta); 828 } 829 } 830 831 if (req_rpc_data->app != NULL) { 832 nxt_router_app_use(task, req_rpc_data->app, -1); 833 834 req_rpc_data->app = NULL; 835 } 836 837 if (req_rpc_data->request != NULL) { 838 req_rpc_data->request->timer_data = NULL; 839 840 nxt_router_http_request_done(task, req_rpc_data->request); 841 842 req_rpc_data->request->req_rpc_data = NULL; 843 req_rpc_data->request = NULL; 844 } 845} 846 847 848void 849nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 850{ 851 nxt_port_new_port_handler(task, msg); 852 853 if (msg->u.new_port != NULL 854 && msg->u.new_port->type == NXT_PROCESS_CONTROLLER) 855 { 856 nxt_router_greet_controller(task, msg->u.new_port); 857 } 858 859 if (msg->port_msg.stream == 0) { 860 return; 861 } 862 863 if (msg->u.new_port == NULL 864 || msg->u.new_port->type != NXT_PROCESS_WORKER) 865 { 866 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 867 } 868 869 nxt_port_rpc_handler(task, msg); 870} 871 872 873void 874nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 875{ 876 nxt_int_t ret; 877 nxt_buf_t *b; 878 nxt_router_temp_conf_t *tmcf; 879 880 tmcf = nxt_router_temp_conf(task); 881 if (nxt_slow_path(tmcf == NULL)) { 882 return; 883 } 884 885 nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s", 886 nxt_buf_used_size(msg->buf), 887 (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos); 888 889 tmcf->router_conf->router = nxt_router; 890 tmcf->stream = msg->port_msg.stream; 891 tmcf->port = nxt_runtime_port_find(task->thread->runtime, 892 msg->port_msg.pid, 893 msg->port_msg.reply_port); 894 895 if (nxt_slow_path(tmcf->port == NULL)) { 896 nxt_alert(task, "reply port not found"); 897 898 return; 899 } 900 901 nxt_port_use(task, tmcf->port, 1); 902 903 b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool, 904 msg->buf, msg->size); 905 if (nxt_slow_path(b == NULL)) { 906 nxt_router_conf_error(task, tmcf); 907 908 return; 909 } 910 911 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); 912 913 if (nxt_fast_path(ret == NXT_OK)) { 914 nxt_router_conf_apply(task, tmcf, NULL); 915 916 } else { 917 nxt_router_conf_error(task, tmcf); 918 } 919} 920 921 922static void 923nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 924 void *data) 925{ 926 union { 927 nxt_pid_t removed_pid; 928 void *data; 929 } u; 930 931 u.data = data; 932 933 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 934} 935 936 937void 938nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 939{ 940 nxt_event_engine_t *engine; 941 942 nxt_port_remove_pid_handler(task, msg); 943 944 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 945 { 946 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 947 msg->u.data); 948 } 949 nxt_queue_loop; 950 951 if (msg->port_msg.stream == 0) { 952 return; 953 } 954 955 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 956 957 nxt_port_rpc_handler(task, msg); 958} 959 960 961static nxt_router_temp_conf_t * 962nxt_router_temp_conf(nxt_task_t *task) 963{ 964 nxt_mp_t *mp, *tmp; 965 nxt_router_conf_t *rtcf; 966 nxt_router_temp_conf_t *tmcf; 967 968 mp = nxt_mp_create(1024, 128, 256, 32); 969 if (nxt_slow_path(mp == NULL)) { 970 return NULL; 971 } 972 973 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 974 if (nxt_slow_path(rtcf == NULL)) { 975 goto fail; 976 } 977 978 rtcf->mem_pool = mp; 979 980 tmp = nxt_mp_create(1024, 128, 256, 32); 981 if (nxt_slow_path(tmp == NULL)) { 982 goto fail; 983 } 984 985 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 986 if (nxt_slow_path(tmcf == NULL)) { 987 goto temp_fail; 988 } 989 990 tmcf->mem_pool = tmp; 991 tmcf->router_conf = rtcf; 992 tmcf->count = 1; 993 tmcf->engine = task->thread->engine; 994 995 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 996 sizeof(nxt_router_engine_conf_t)); 997 if (nxt_slow_path(tmcf->engines == NULL)) { 998 goto temp_fail; 999 } 1000 1001 nxt_queue_init(&tmcf->deleting); 1002 nxt_queue_init(&tmcf->keeping); 1003 nxt_queue_init(&tmcf->updating); 1004 nxt_queue_init(&tmcf->pending); 1005 nxt_queue_init(&tmcf->creating); 1006 1007#if (NXT_TLS) 1008 nxt_queue_init(&tmcf->tls); 1009#endif 1010 1011 nxt_queue_init(&tmcf->apps); 1012 nxt_queue_init(&tmcf->previous); 1013 1014 return tmcf; 1015 1016temp_fail: 1017 1018 nxt_mp_destroy(tmp); 1019 1020fail: 1021 1022 nxt_mp_destroy(mp); 1023 1024 return NULL; 1025} 1026 1027 1028nxt_inline nxt_bool_t 1029nxt_router_app_can_start(nxt_app_t *app) 1030{ 1031 return app->processes + app->pending_processes < app->max_processes 1032 && app->pending_processes < app->max_pending_processes; 1033} 1034 1035 1036nxt_inline nxt_bool_t 1037nxt_router_app_need_start(nxt_app_t *app) 1038{ 1039 return app->idle_processes + app->pending_processes 1040 < app->spare_processes; 1041} 1042 1043 1044static void 1045nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1046{ 1047 nxt_int_t ret; 1048 nxt_app_t *app; 1049 nxt_router_t *router; 1050 nxt_runtime_t *rt; 1051 nxt_queue_link_t *qlk; 1052 nxt_socket_conf_t *skcf; 1053 nxt_router_conf_t *rtcf; 1054 nxt_router_temp_conf_t *tmcf; 1055 const nxt_event_interface_t *interface; 1056#if (NXT_TLS) 1057 nxt_router_tlssock_t *tls; 1058#endif 1059 1060 tmcf = obj; 1061 1062 qlk = nxt_queue_first(&tmcf->pending); 1063 1064 if (qlk != nxt_queue_tail(&tmcf->pending)) { 1065 nxt_queue_remove(qlk); 1066 nxt_queue_insert_tail(&tmcf->creating, qlk); 1067 1068 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1069 1070 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1071 1072 return; 1073 } 1074 1075#if (NXT_TLS) 1076 qlk = nxt_queue_first(&tmcf->tls); 1077 1078 if (qlk != nxt_queue_tail(&tmcf->tls)) { 1079 nxt_queue_remove(qlk); 1080 1081 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1082 1083 nxt_router_tls_rpc_create(task, tmcf, tls); 1084 return; 1085 } 1086#endif 1087 1088 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1089 1090 if (nxt_router_app_need_start(app)) { 1091 nxt_router_app_rpc_create(task, tmcf, app); 1092 return; 1093 } 1094 1095 } nxt_queue_loop; 1096 1097 rtcf = tmcf->router_conf; 1098 1099 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1100 nxt_router_access_log_open(task, tmcf); 1101 return; 1102 } 1103 1104 rt = task->thread->runtime; 1105 1106 interface = nxt_service_get(rt->services, "engine", NULL); 1107 1108 router = rtcf->router; 1109 1110 ret = nxt_router_engines_create(task, router, tmcf, interface); 1111 if (nxt_slow_path(ret != NXT_OK)) { 1112 goto fail; 1113 } 1114 1115 ret = nxt_router_threads_create(task, rt, tmcf); 1116 if (nxt_slow_path(ret != NXT_OK)) { 1117 goto fail; 1118 } 1119 1120 nxt_router_apps_sort(task, router, tmcf); 1121 1122 nxt_router_engines_post(router, tmcf); 1123 1124 nxt_queue_add(&router->sockets, &tmcf->updating); 1125 nxt_queue_add(&router->sockets, &tmcf->creating); 1126 1127 router->access_log = rtcf->access_log; 1128 1129 nxt_router_conf_ready(task, tmcf); 1130 1131 return; 1132 1133fail: 1134 1135 nxt_router_conf_error(task, tmcf); 1136 1137 return; 1138} 1139 1140 1141static void 1142nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1143{ 1144 nxt_joint_job_t *job; 1145 1146 job = obj; 1147 1148 nxt_router_conf_ready(task, job->tmcf); 1149} 1150 1151 1152static void 1153nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1154{ 1155 nxt_debug(task, "temp conf count:%D", tmcf->count); 1156 1157 if (--tmcf->count == 0) { 1158 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1159 } 1160} 1161 1162 1163static void 1164nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1165{ 1166 nxt_app_t *app; 1167 nxt_queue_t new_socket_confs; 1168 nxt_socket_t s; 1169 nxt_router_t *router; 1170 nxt_queue_link_t *qlk; 1171 nxt_socket_conf_t *skcf; 1172 nxt_router_conf_t *rtcf; 1173 1174 nxt_alert(task, "failed to apply new conf"); 1175 1176 for (qlk = nxt_queue_first(&tmcf->creating); 1177 qlk != nxt_queue_tail(&tmcf->creating); 1178 qlk = nxt_queue_next(qlk)) 1179 { 1180 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1181 s = skcf->listen->socket; 1182 1183 if (s != -1) { 1184 nxt_socket_close(task, s); 1185 } 1186 1187 nxt_free(skcf->listen); 1188 } 1189 1190 nxt_queue_init(&new_socket_confs); 1191 nxt_queue_add(&new_socket_confs, &tmcf->updating); 1192 nxt_queue_add(&new_socket_confs, &tmcf->pending); 1193 nxt_queue_add(&new_socket_confs, &tmcf->creating); 1194 1195 rtcf = tmcf->router_conf; 1196 1197 nxt_http_routes_cleanup(task, rtcf->routes); 1198 1199 nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) { 1200 1201 if (skcf->action != NULL) { 1202 nxt_http_action_cleanup(task, skcf->action); 1203 } 1204 1205 } nxt_queue_loop; 1206 1207 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1208 1209 nxt_router_app_unlink(task, app); 1210 1211 } nxt_queue_loop; 1212 1213 router = rtcf->router; 1214 1215 nxt_queue_add(&router->sockets, &tmcf->keeping); 1216 nxt_queue_add(&router->sockets, &tmcf->deleting); 1217 1218 nxt_queue_add(&router->apps, &tmcf->previous); 1219 1220 // TODO: new engines and threads 1221 1222 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1223 1224 nxt_mp_destroy(rtcf->mem_pool); 1225 1226 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1227} 1228 1229 1230static void 1231nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1232 nxt_port_msg_type_t type) 1233{ 1234 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1235 1236 nxt_port_use(task, tmcf->port, -1); 1237 1238 tmcf->port = NULL; 1239} 1240 1241 1242static nxt_conf_map_t nxt_router_conf[] = { 1243 { 1244 nxt_string("listeners_threads"), 1245 NXT_CONF_MAP_INT32, 1246 offsetof(nxt_router_conf_t, threads), 1247 }, 1248}; 1249 1250 1251static nxt_conf_map_t nxt_router_app_conf[] = { 1252 { 1253 nxt_string("type"), 1254 NXT_CONF_MAP_STR, 1255 offsetof(nxt_router_app_conf_t, type), 1256 }, 1257 1258 { 1259 nxt_string("limits"), 1260 NXT_CONF_MAP_PTR, 1261 offsetof(nxt_router_app_conf_t, limits_value), 1262 }, 1263 1264 { 1265 nxt_string("processes"), 1266 NXT_CONF_MAP_INT32, 1267 offsetof(nxt_router_app_conf_t, processes), 1268 }, 1269 1270 { 1271 nxt_string("processes"), 1272 NXT_CONF_MAP_PTR, 1273 offsetof(nxt_router_app_conf_t, processes_value), 1274 }, 1275}; 1276 1277 1278static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1279 { 1280 nxt_string("timeout"), 1281 NXT_CONF_MAP_MSEC, 1282 offsetof(nxt_router_app_conf_t, timeout), 1283 }, 1284 1285 { 1286 nxt_string("reschedule_timeout"), 1287 NXT_CONF_MAP_MSEC, 1288 offsetof(nxt_router_app_conf_t, res_timeout), 1289 }, 1290 1291 { 1292 nxt_string("requests"), 1293 NXT_CONF_MAP_INT32, 1294 offsetof(nxt_router_app_conf_t, requests), 1295 }, 1296}; 1297 1298 1299static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1300 { 1301 nxt_string("spare"), 1302 NXT_CONF_MAP_INT32, 1303 offsetof(nxt_router_app_conf_t, spare_processes), 1304 }, 1305 1306 { 1307 nxt_string("max"), 1308 NXT_CONF_MAP_INT32, 1309 offsetof(nxt_router_app_conf_t, max_processes), 1310 }, 1311 1312 { 1313 nxt_string("idle_timeout"), 1314 NXT_CONF_MAP_MSEC, 1315 offsetof(nxt_router_app_conf_t, idle_timeout), 1316 }, 1317}; 1318 1319 1320static nxt_conf_map_t nxt_router_listener_conf[] = { 1321 { 1322 nxt_string("pass"), 1323 NXT_CONF_MAP_STR_COPY, 1324 offsetof(nxt_router_listener_conf_t, pass), 1325 }, 1326 1327 { 1328 nxt_string("application"), 1329 NXT_CONF_MAP_STR_COPY, 1330 offsetof(nxt_router_listener_conf_t, application), 1331 }, 1332}; 1333 1334 1335static nxt_conf_map_t nxt_router_http_conf[] = { 1336 { 1337 nxt_string("header_buffer_size"), 1338 NXT_CONF_MAP_SIZE, 1339 offsetof(nxt_socket_conf_t, header_buffer_size), 1340 }, 1341 1342 { 1343 nxt_string("large_header_buffer_size"), 1344 NXT_CONF_MAP_SIZE, 1345 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1346 }, 1347 1348 { 1349 nxt_string("large_header_buffers"), 1350 NXT_CONF_MAP_SIZE, 1351 offsetof(nxt_socket_conf_t, large_header_buffers), 1352 }, 1353 1354 { 1355 nxt_string("body_buffer_size"), 1356 NXT_CONF_MAP_SIZE, 1357 offsetof(nxt_socket_conf_t, body_buffer_size), 1358 }, 1359 1360 { 1361 nxt_string("max_body_size"), 1362 NXT_CONF_MAP_SIZE, 1363 offsetof(nxt_socket_conf_t, max_body_size), 1364 }, 1365 1366 { 1367 nxt_string("idle_timeout"), 1368 NXT_CONF_MAP_MSEC, 1369 offsetof(nxt_socket_conf_t, idle_timeout), 1370 }, 1371 1372 { 1373 nxt_string("header_read_timeout"), 1374 NXT_CONF_MAP_MSEC, 1375 offsetof(nxt_socket_conf_t, header_read_timeout), 1376 }, 1377 1378 { 1379 nxt_string("body_read_timeout"), 1380 NXT_CONF_MAP_MSEC, 1381 offsetof(nxt_socket_conf_t, body_read_timeout), 1382 }, 1383 1384 { 1385 nxt_string("send_timeout"), 1386 NXT_CONF_MAP_MSEC, 1387 offsetof(nxt_socket_conf_t, send_timeout), 1388 }, 1389 1390 { 1391 nxt_string("body_temp_path"), 1392 NXT_CONF_MAP_STR, 1393 offsetof(nxt_socket_conf_t, body_temp_path), 1394 }, 1395}; 1396 1397 1398static nxt_conf_map_t nxt_router_websocket_conf[] = { 1399 { 1400 nxt_string("max_frame_size"), 1401 NXT_CONF_MAP_SIZE, 1402 offsetof(nxt_websocket_conf_t, max_frame_size), 1403 }, 1404 1405 { 1406 nxt_string("read_timeout"), 1407 NXT_CONF_MAP_MSEC, 1408 offsetof(nxt_websocket_conf_t, read_timeout), 1409 }, 1410 1411 { 1412 nxt_string("keepalive_interval"), 1413 NXT_CONF_MAP_MSEC, 1414 offsetof(nxt_websocket_conf_t, keepalive_interval), 1415 }, 1416 1417}; 1418 1419 1420static nxt_int_t 1421nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1422 u_char *start, u_char *end) 1423{ 1424 u_char *p; 1425 size_t size; 1426 nxt_mp_t *mp; 1427 uint32_t next; 1428 nxt_int_t ret; 1429 nxt_str_t name, path; 1430 nxt_app_t *app, *prev; 1431 nxt_str_t *t; 1432 nxt_router_t *router; 1433 nxt_app_joint_t *app_joint; 1434 nxt_conf_value_t *conf, *http, *value, *websocket; 1435 nxt_conf_value_t *applications, *application; 1436 nxt_conf_value_t *listeners, *listener; 1437 nxt_conf_value_t *routes_conf, *static_conf; 1438 nxt_socket_conf_t *skcf; 1439 nxt_http_routes_t *routes; 1440 nxt_event_engine_t *engine; 1441 nxt_app_lang_module_t *lang; 1442 nxt_router_app_conf_t apcf; 1443 nxt_router_access_log_t *access_log; 1444 nxt_router_listener_conf_t lscf; 1445#if (NXT_TLS) 1446 nxt_router_tlssock_t *tls; 1447#endif 1448 1449 static nxt_str_t http_path = nxt_string("/settings/http"); 1450 static nxt_str_t applications_path = nxt_string("/applications"); 1451 static nxt_str_t listeners_path = nxt_string("/listeners"); 1452 static nxt_str_t routes_path = nxt_string("/routes"); 1453 static nxt_str_t access_log_path = nxt_string("/access_log"); 1454#if (NXT_TLS) 1455 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1456#endif 1457 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1458 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1459 1460 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1461 if (conf == NULL) { 1462 nxt_alert(task, "configuration parsing error"); 1463 return NXT_ERROR; 1464 } 1465 1466 mp = tmcf->router_conf->mem_pool; 1467 1468 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1469 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1470 if (ret != NXT_OK) { 1471 nxt_alert(task, "root map error"); 1472 return NXT_ERROR; 1473 } 1474 1475 if (tmcf->router_conf->threads == 0) { 1476 tmcf->router_conf->threads = nxt_ncpu; 1477 } 1478 1479 static_conf = nxt_conf_get_path(conf, &static_path); 1480 1481 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf); 1482 if (nxt_slow_path(ret != NXT_OK)) { 1483 return NXT_ERROR; 1484 } 1485 1486 router = tmcf->router_conf->router; 1487 1488 applications = nxt_conf_get_path(conf, &applications_path); 1489 1490 if (applications != NULL) { 1491 next = 0; 1492 1493 for ( ;; ) { 1494 application = nxt_conf_next_object_member(applications, 1495 &name, &next); 1496 if (application == NULL) { 1497 break; 1498 } 1499 1500 nxt_debug(task, "application \"%V\"", &name); 1501 1502 size = nxt_conf_json_length(application, NULL); 1503 1504 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); 1505 if (app == NULL) { 1506 goto fail; 1507 } 1508 1509 nxt_memzero(app, sizeof(nxt_app_t)); 1510 1511 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1512 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1513 + name.length); 1514 1515 p = nxt_conf_json_print(app->conf.start, application, NULL); 1516 app->conf.length = p - app->conf.start; 1517 1518 nxt_assert(app->conf.length <= size); 1519 1520 nxt_debug(task, "application conf \"%V\"", &app->conf); 1521 1522 prev = nxt_router_app_find(&router->apps, &name); 1523 1524 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1525 nxt_free(app); 1526 1527 nxt_queue_remove(&prev->link); 1528 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1529 continue; 1530 } 1531 1532 apcf.processes = 1; 1533 apcf.max_processes = 1; 1534 apcf.spare_processes = 0; 1535 apcf.timeout = 0; 1536 apcf.res_timeout = 1000; 1537 apcf.idle_timeout = 15000; 1538 apcf.requests = 0; 1539 apcf.limits_value = NULL; 1540 apcf.processes_value = NULL; 1541 1542 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1543 if (nxt_slow_path(app_joint == NULL)) { 1544 goto app_fail; 1545 } 1546 1547 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1548 1549 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1550 nxt_nitems(nxt_router_app_conf), &apcf); 1551 if (ret != NXT_OK) { 1552 nxt_alert(task, "application map error"); 1553 goto app_fail; 1554 } 1555 1556 if (apcf.limits_value != NULL) { 1557 1558 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1559 nxt_alert(task, "application limits is not object"); 1560 goto app_fail; 1561 } 1562 1563 ret = nxt_conf_map_object(mp, apcf.limits_value, 1564 nxt_router_app_limits_conf, 1565 nxt_nitems(nxt_router_app_limits_conf), 1566 &apcf); 1567 if (ret != NXT_OK) { 1568 nxt_alert(task, "application limits map error"); 1569 goto app_fail; 1570 } 1571 } 1572 1573 if (apcf.processes_value != NULL 1574 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1575 { 1576 ret = nxt_conf_map_object(mp, apcf.processes_value, 1577 nxt_router_app_processes_conf, 1578 nxt_nitems(nxt_router_app_processes_conf), 1579 &apcf); 1580 if (ret != NXT_OK) { 1581 nxt_alert(task, "application processes map error"); 1582 goto app_fail; 1583 } 1584 1585 } else { 1586 apcf.max_processes = apcf.processes; 1587 apcf.spare_processes = apcf.processes; 1588 } 1589 1590 nxt_debug(task, "application type: %V", &apcf.type); 1591 nxt_debug(task, "application processes: %D", apcf.processes); 1592 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1593 nxt_debug(task, "application reschedule timeout: %M", 1594 apcf.res_timeout); 1595 nxt_debug(task, "application requests: %D", apcf.requests); 1596 1597 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1598 1599 if (lang == NULL) { 1600 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1601 goto app_fail; 1602 } 1603 1604 nxt_debug(task, "application language module: \"%s\"", lang->file); 1605 1606 ret = nxt_thread_mutex_create(&app->mutex); 1607 if (ret != NXT_OK) { 1608 goto app_fail; 1609 } 1610 1611 nxt_queue_init(&app->ports); 1612 nxt_queue_init(&app->spare_ports); 1613 nxt_queue_init(&app->idle_ports); 1614 nxt_queue_init(&app->requests); 1615 nxt_queue_init(&app->pending); 1616 1617 app->name.length = name.length; 1618 nxt_memcpy(app->name.start, name.start, name.length); 1619 1620 app->type = lang->type; 1621 app->max_processes = apcf.max_processes; 1622 app->spare_processes = apcf.spare_processes; 1623 app->max_pending_processes = apcf.spare_processes 1624 ? apcf.spare_processes : 1; 1625 app->timeout = apcf.timeout; 1626 app->res_timeout = apcf.res_timeout * 1000000; 1627 app->idle_timeout = apcf.idle_timeout; 1628 app->max_pending_responses = 2; 1629 app->max_requests = apcf.requests; 1630 1631 engine = task->thread->engine; 1632 1633 app->engine = engine; 1634 1635 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1636 app->adjust_idle_work.task = &engine->task; 1637 app->adjust_idle_work.obj = app; 1638 1639 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1640 1641 nxt_router_app_use(task, app, 1); 1642 1643 app->joint = app_joint; 1644 1645 app_joint->use_count = 1; 1646 app_joint->app = app; 1647 1648 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1649 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1650 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1651 app_joint->idle_timer.task = &engine->task; 1652 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1653 1654 app_joint->free_app_work.handler = nxt_router_free_app; 1655 app_joint->free_app_work.task = &engine->task; 1656 app_joint->free_app_work.obj = app_joint; 1657 } 1658 } 1659 1660 routes_conf = nxt_conf_get_path(conf, &routes_path); 1661 if (nxt_fast_path(routes_conf != NULL)) { 1662 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1663 if (nxt_slow_path(routes == NULL)) { 1664 return NXT_ERROR; 1665 } 1666 tmcf->router_conf->routes = routes; 1667 } 1668 1669 ret = nxt_upstreams_create(task, tmcf, conf); 1670 if (nxt_slow_path(ret != NXT_OK)) { 1671 return ret; 1672 } 1673 1674 http = nxt_conf_get_path(conf, &http_path); 1675#if 0 1676 if (http == NULL) { 1677 nxt_alert(task, "no \"http\" block"); 1678 return NXT_ERROR; 1679 } 1680#endif 1681 1682 websocket = nxt_conf_get_path(conf, &websocket_path); 1683 1684 listeners = nxt_conf_get_path(conf, &listeners_path); 1685 1686 if (listeners != NULL) { 1687 next = 0; 1688 1689 for ( ;; ) { 1690 listener = nxt_conf_next_object_member(listeners, &name, &next); 1691 if (listener == NULL) { 1692 break; 1693 } 1694 1695 skcf = nxt_router_socket_conf(task, tmcf, &name); 1696 if (skcf == NULL) { 1697 goto fail; 1698 } 1699 1700 nxt_memzero(&lscf, sizeof(lscf)); 1701 1702 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1703 nxt_nitems(nxt_router_listener_conf), 1704 &lscf); 1705 if (ret != NXT_OK) { 1706 nxt_alert(task, "listener map error"); 1707 goto fail; 1708 } 1709 1710 nxt_debug(task, "application: %V", &lscf.application); 1711 1712 // STUB, default values if http block is not defined. 1713 skcf->header_buffer_size = 2048; 1714 skcf->large_header_buffer_size = 8192; 1715 skcf->large_header_buffers = 4; 1716 skcf->body_buffer_size = 16 * 1024; 1717 skcf->max_body_size = 8 * 1024 * 1024; 1718 skcf->proxy_header_buffer_size = 64 * 1024; 1719 skcf->proxy_buffer_size = 4096; 1720 skcf->proxy_buffers = 256; 1721 skcf->idle_timeout = 180 * 1000; 1722 skcf->header_read_timeout = 30 * 1000; 1723 skcf->body_read_timeout = 30 * 1000; 1724 skcf->send_timeout = 30 * 1000; 1725 skcf->proxy_timeout = 60 * 1000; 1726 skcf->proxy_send_timeout = 30 * 1000; 1727 skcf->proxy_read_timeout = 30 * 1000; 1728 1729 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1730 skcf->websocket_conf.read_timeout = 60 * 1000; 1731 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1732 1733 nxt_str_null(&skcf->body_temp_path); 1734 1735 if (http != NULL) { 1736 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1737 nxt_nitems(nxt_router_http_conf), 1738 skcf); 1739 if (ret != NXT_OK) { 1740 nxt_alert(task, "http map error"); 1741 goto fail; 1742 } 1743 } 1744 1745 if (websocket != NULL) { 1746 ret = nxt_conf_map_object(mp, websocket, 1747 nxt_router_websocket_conf, 1748 nxt_nitems(nxt_router_websocket_conf), 1749 &skcf->websocket_conf); 1750 if (ret != NXT_OK) { 1751 nxt_alert(task, "websocket map error"); 1752 goto fail; 1753 } 1754 } 1755 1756 t = &skcf->body_temp_path; 1757 1758 if (t->length == 0) { 1759 t->start = (u_char *) task->thread->runtime->tmp; 1760 t->length = nxt_strlen(t->start); 1761 } 1762 1763#if (NXT_TLS) 1764 value = nxt_conf_get_path(listener, &certificate_path); 1765 1766 if (value != NULL) { 1767 nxt_conf_get_string(value, &name); 1768 1769 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t)); 1770 if (nxt_slow_path(tls == NULL)) { 1771 goto fail; 1772 } 1773 1774 tls->name = name; 1775 tls->conf = skcf; 1776 1777 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 1778 } 1779#endif 1780 1781 skcf->listen->handler = nxt_http_conn_init; 1782 skcf->router_conf = tmcf->router_conf; 1783 skcf->router_conf->count++; 1784 1785 if (lscf.pass.length != 0) { 1786 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1787 1788 /* COMPATIBILITY: listener application. */ 1789 } else if (lscf.application.length > 0) { 1790 skcf->action = nxt_http_pass_application(task, tmcf, 1791 &lscf.application); 1792 } 1793 } 1794 } 1795 1796 value = nxt_conf_get_path(conf, &access_log_path); 1797 1798 if (value != NULL) { 1799 nxt_conf_get_string(value, &path); 1800 1801 access_log = router->access_log; 1802 1803 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1804 nxt_thread_spin_lock(&router->lock); 1805 access_log->count++; 1806 nxt_thread_spin_unlock(&router->lock); 1807 1808 } else { 1809 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1810 + path.length); 1811 if (access_log == NULL) { 1812 nxt_alert(task, "failed to allocate access log structure"); 1813 goto fail; 1814 } 1815 1816 access_log->fd = -1; 1817 access_log->handler = &nxt_router_access_log_writer; 1818 access_log->count = 1; 1819 1820 access_log->path.length = path.length; 1821 access_log->path.start = (u_char *) access_log 1822 + sizeof(nxt_router_access_log_t); 1823 1824 nxt_memcpy(access_log->path.start, path.start, path.length); 1825 } 1826 1827 tmcf->router_conf->access_log = access_log; 1828 } 1829 1830 nxt_http_routes_resolve(task, tmcf); 1831 1832 nxt_queue_add(&tmcf->deleting, &router->sockets); 1833 nxt_queue_init(&router->sockets); 1834 1835 return NXT_OK; 1836 1837app_fail: 1838 1839 nxt_free(app); 1840 1841fail: 1842 1843 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1844 1845 nxt_queue_remove(&app->link); 1846 nxt_thread_mutex_destroy(&app->mutex); 1847 nxt_free(app); 1848 1849 } nxt_queue_loop; 1850 1851 return NXT_ERROR; 1852} 1853 1854 1855static nxt_int_t 1856nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 1857 nxt_conf_value_t *conf) 1858{ 1859 uint32_t next, i; 1860 nxt_mp_t *mp; 1861 nxt_str_t *type, extension, str; 1862 nxt_int_t ret; 1863 nxt_uint_t exts; 1864 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 1865 1866 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 1867 1868 mp = rtcf->mem_pool; 1869 1870 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 1871 if (nxt_slow_path(ret != NXT_OK)) { 1872 return NXT_ERROR; 1873 } 1874 1875 if (conf == NULL) { 1876 return NXT_OK; 1877 } 1878 1879 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 1880 1881 if (mtypes_conf != NULL) { 1882 next = 0; 1883 1884 for ( ;; ) { 1885 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 1886 1887 if (ext_conf == NULL) { 1888 break; 1889 } 1890 1891 type = nxt_str_dup(mp, NULL, &str); 1892 if (nxt_slow_path(type == NULL)) { 1893 return NXT_ERROR; 1894 } 1895 1896 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 1897 nxt_conf_get_string(ext_conf, &str); 1898 1899 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { 1900 return NXT_ERROR; 1901 } 1902 1903 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1904 &extension, type); 1905 if (nxt_slow_path(ret != NXT_OK)) { 1906 return NXT_ERROR; 1907 } 1908 1909 continue; 1910 } 1911 1912 exts = nxt_conf_array_elements_count(ext_conf); 1913 1914 for (i = 0; i < exts; i++) { 1915 value = nxt_conf_get_array_element(ext_conf, i); 1916 1917 nxt_conf_get_string(value, &str); 1918 1919 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { 1920 return NXT_ERROR; 1921 } 1922 1923 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1924 &extension, type); 1925 if (nxt_slow_path(ret != NXT_OK)) { 1926 return NXT_ERROR; 1927 } 1928 } 1929 } 1930 } 1931 1932 return NXT_OK; 1933} 1934 1935 1936static nxt_app_t * 1937nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1938{ 1939 nxt_app_t *app; 1940 1941 nxt_queue_each(app, queue, nxt_app_t, link) { 1942 1943 if (nxt_strstr_eq(name, &app->name)) { 1944 return app; 1945 } 1946 1947 } nxt_queue_loop; 1948 1949 return NULL; 1950} 1951 1952 1953void 1954nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name, 1955 nxt_http_action_t *action) 1956{ 1957 nxt_app_t *app; 1958 1959 app = nxt_router_app_find(&tmcf->apps, name); 1960 1961 if (app == NULL) { 1962 app = nxt_router_app_find(&tmcf->previous, name); 1963 } 1964 1965 action->u.application = app; 1966 action->handler = nxt_http_application_handler; 1967} 1968 1969 1970static nxt_socket_conf_t * 1971nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1972 nxt_str_t *name) 1973{ 1974 size_t size; 1975 nxt_int_t ret; 1976 nxt_bool_t wildcard; 1977 nxt_sockaddr_t *sa; 1978 nxt_socket_conf_t *skcf; 1979 nxt_listen_socket_t *ls; 1980 1981 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 1982 if (nxt_slow_path(sa == NULL)) { 1983 nxt_alert(task, "invalid listener \"%V\"", name); 1984 return NULL; 1985 } 1986 1987 sa->type = SOCK_STREAM; 1988 1989 nxt_debug(task, "router listener: \"%*s\"", 1990 (size_t) sa->length, nxt_sockaddr_start(sa)); 1991 1992 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 1993 if (nxt_slow_path(skcf == NULL)) { 1994 return NULL; 1995 } 1996 1997 size = nxt_sockaddr_size(sa); 1998 1999 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2000 2001 if (ret != NXT_OK) { 2002 2003 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2004 if (nxt_slow_path(ls == NULL)) { 2005 return NULL; 2006 } 2007 2008 skcf->listen = ls; 2009 2010 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2011 nxt_memcpy(ls->sockaddr, sa, size); 2012 2013 nxt_listen_socket_remote_size(ls); 2014 2015 ls->socket = -1; 2016 ls->backlog = NXT_LISTEN_BACKLOG; 2017 ls->flags = NXT_NONBLOCK; 2018 ls->read_after_accept = 1; 2019 } 2020 2021 switch (sa->u.sockaddr.sa_family) { 2022#if (NXT_HAVE_UNIX_DOMAIN) 2023 case AF_UNIX: 2024 wildcard = 0; 2025 break; 2026#endif 2027#if (NXT_INET6) 2028 case AF_INET6: 2029 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2030 break; 2031#endif 2032 case AF_INET: 2033 default: 2034 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2035 break; 2036 } 2037 2038 if (!wildcard) { 2039 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2040 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2041 return NULL; 2042 } 2043 2044 nxt_memcpy(skcf->sockaddr, sa, size); 2045 } 2046 2047 return skcf; 2048} 2049 2050 2051static nxt_int_t 2052nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2053 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2054{ 2055 nxt_router_t *router; 2056 nxt_queue_link_t *qlk; 2057 nxt_socket_conf_t *skcf; 2058 2059 router = tmcf->router_conf->router; 2060 2061 for (qlk = nxt_queue_first(&router->sockets); 2062 qlk != nxt_queue_tail(&router->sockets); 2063 qlk = nxt_queue_next(qlk)) 2064 { 2065 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2066 2067 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2068 nskcf->listen = skcf->listen; 2069 2070 nxt_queue_remove(qlk); 2071 nxt_queue_insert_tail(&tmcf->keeping, qlk); 2072 2073 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link); 2074 2075 return NXT_OK; 2076 } 2077 } 2078 2079 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link); 2080 2081 return NXT_DECLINED; 2082} 2083 2084 2085static void 2086nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2087 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2088{ 2089 size_t size; 2090 uint32_t stream; 2091 nxt_int_t ret; 2092 nxt_buf_t *b; 2093 nxt_port_t *main_port, *router_port; 2094 nxt_runtime_t *rt; 2095 nxt_socket_rpc_t *rpc; 2096 2097 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2098 if (rpc == NULL) { 2099 goto fail; 2100 } 2101 2102 rpc->socket_conf = skcf; 2103 rpc->temp_conf = tmcf; 2104 2105 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2106 2107 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2108 if (b == NULL) { 2109 goto fail; 2110 } 2111 2112 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2113 2114 rt = task->thread->runtime; 2115 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2116 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2117 2118 stream = nxt_port_rpc_register_handler(task, router_port, 2119 nxt_router_listen_socket_ready, 2120 nxt_router_listen_socket_error, 2121 main_port->pid, rpc); 2122 if (nxt_slow_path(stream == 0)) { 2123 goto fail; 2124 } 2125 2126 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2127 stream, router_port->id, b); 2128 2129 if (nxt_slow_path(ret != NXT_OK)) { 2130 nxt_port_rpc_cancel(task, router_port, stream); 2131 goto fail; 2132 } 2133 2134 return; 2135 2136fail: 2137 2138 nxt_router_conf_error(task, tmcf); 2139} 2140 2141 2142static void 2143nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2144 void *data) 2145{ 2146 nxt_int_t ret; 2147 nxt_socket_t s; 2148 nxt_socket_rpc_t *rpc; 2149 2150 rpc = data; 2151 2152 s = msg->fd; 2153 2154 ret = nxt_socket_nonblocking(task, s); 2155 if (nxt_slow_path(ret != NXT_OK)) { 2156 goto fail; 2157 } 2158 2159 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2160 2161 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2162 if (nxt_slow_path(ret != NXT_OK)) { 2163 goto fail; 2164 } 2165 2166 rpc->socket_conf->listen->socket = s; 2167 2168 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2169 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2170 2171 return; 2172 2173fail: 2174 2175 nxt_socket_close(task, s); 2176 2177 nxt_router_conf_error(task, rpc->temp_conf); 2178} 2179 2180 2181static void 2182nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2183 void *data) 2184{ 2185 nxt_socket_rpc_t *rpc; 2186 nxt_router_temp_conf_t *tmcf; 2187 2188 rpc = data; 2189 tmcf = rpc->temp_conf; 2190 2191#if 0 2192 u_char *p; 2193 size_t size; 2194 uint8_t error; 2195 nxt_buf_t *in, *out; 2196 nxt_sockaddr_t *sa; 2197 2198 static nxt_str_t socket_errors[] = { 2199 nxt_string("ListenerSystem"), 2200 nxt_string("ListenerNoIPv6"), 2201 nxt_string("ListenerPort"), 2202 nxt_string("ListenerInUse"), 2203 nxt_string("ListenerNoAddress"), 2204 nxt_string("ListenerNoAccess"), 2205 nxt_string("ListenerPath"), 2206 }; 2207 2208 sa = rpc->socket_conf->listen->sockaddr; 2209 2210 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2211 2212 if (nxt_slow_path(in == NULL)) { 2213 return; 2214 } 2215 2216 p = in->mem.pos; 2217 2218 error = *p++; 2219 2220 size = nxt_length("listen socket error: ") 2221 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2222 + sa->length + socket_errors[error].length + (in->mem.free - p); 2223 2224 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2225 if (nxt_slow_path(out == NULL)) { 2226 return; 2227 } 2228 2229 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2230 "listen socket error: " 2231 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2232 (size_t) sa->length, nxt_sockaddr_start(sa), 2233 &socket_errors[error], in->mem.free - p, p); 2234 2235 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2236#endif 2237 2238 nxt_router_conf_error(task, tmcf); 2239} 2240 2241 2242#if (NXT_TLS) 2243 2244static void 2245nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2246 nxt_router_tlssock_t *tls) 2247{ 2248 nxt_socket_rpc_t *rpc; 2249 2250 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2251 if (rpc == NULL) { 2252 nxt_router_conf_error(task, tmcf); 2253 return; 2254 } 2255 2256 rpc->socket_conf = tls->conf; 2257 rpc->temp_conf = tmcf; 2258 2259 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 2260 nxt_router_tls_rpc_handler, rpc); 2261} 2262 2263 2264static void 2265nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2266 void *data) 2267{ 2268 nxt_mp_t *mp; 2269 nxt_int_t ret; 2270 nxt_tls_conf_t *tlscf; 2271 nxt_socket_rpc_t *rpc; 2272 nxt_router_temp_conf_t *tmcf; 2273 2274 nxt_debug(task, "tls rpc handler"); 2275 2276 rpc = data; 2277 tmcf = rpc->temp_conf; 2278 2279 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2280 goto fail; 2281 } 2282 2283 mp = tmcf->router_conf->mem_pool; 2284 2285 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2286 if (nxt_slow_path(tlscf == NULL)) { 2287 goto fail; 2288 } 2289 2290 tlscf->chain_file = msg->fd; 2291 2292 ret = task->thread->runtime->tls->server_init(task, tlscf); 2293 if (nxt_slow_path(ret != NXT_OK)) { 2294 goto fail; 2295 } 2296 2297 rpc->socket_conf->tls = tlscf; 2298 2299 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2300 nxt_router_conf_apply, task, tmcf, NULL); 2301 return; 2302 2303fail: 2304 2305 nxt_router_conf_error(task, tmcf); 2306} 2307 2308#endif 2309 2310 2311static void 2312nxt_router_app_rpc_create(nxt_task_t *task, 2313 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2314{ 2315 size_t size; 2316 uint32_t stream; 2317 nxt_int_t ret; 2318 nxt_buf_t *b; 2319 nxt_port_t *main_port, *router_port; 2320 nxt_runtime_t *rt; 2321 nxt_app_rpc_t *rpc; 2322 2323 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2324 if (rpc == NULL) { 2325 goto fail; 2326 } 2327 2328 rpc->app = app; 2329 rpc->temp_conf = tmcf; 2330 2331 nxt_debug(task, "app '%V' prefork", &app->name); 2332 2333 size = app->name.length + 1 + app->conf.length; 2334 2335 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2336 if (nxt_slow_path(b == NULL)) { 2337 goto fail; 2338 } 2339 2340 nxt_buf_cpystr(b, &app->name); 2341 *b->mem.free++ = '\0'; 2342 nxt_buf_cpystr(b, &app->conf); 2343 2344 rt = task->thread->runtime; 2345 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2346 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2347 2348 stream = nxt_port_rpc_register_handler(task, router_port, 2349 nxt_router_app_prefork_ready, 2350 nxt_router_app_prefork_error, 2351 -1, rpc); 2352 if (nxt_slow_path(stream == 0)) { 2353 goto fail; 2354 } 2355 2356 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 2357 stream, router_port->id, b); 2358 2359 if (nxt_slow_path(ret != NXT_OK)) { 2360 nxt_port_rpc_cancel(task, router_port, stream); 2361 goto fail; 2362 } 2363 2364 app->pending_processes++; 2365 2366 return; 2367 2368fail: 2369 2370 nxt_router_conf_error(task, tmcf); 2371} 2372 2373 2374static void 2375nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2376 void *data) 2377{ 2378 nxt_app_t *app; 2379 nxt_port_t *port; 2380 nxt_app_rpc_t *rpc; 2381 nxt_event_engine_t *engine; 2382 2383 rpc = data; 2384 app = rpc->app; 2385 2386 port = msg->u.new_port; 2387 port->app = app; 2388 2389 app->pending_processes--; 2390 app->processes++; 2391 app->idle_processes++; 2392 2393 engine = task->thread->engine; 2394 2395 nxt_queue_insert_tail(&app->ports, &port->app_link); 2396 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2397 2398 port->idle_start = 0; 2399 2400 nxt_port_inc_use(port); 2401 2402 nxt_work_queue_add(&engine->fast_work_queue, 2403 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2404} 2405 2406 2407static void 2408nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2409 void *data) 2410{ 2411 nxt_app_t *app; 2412 nxt_app_rpc_t *rpc; 2413 nxt_router_temp_conf_t *tmcf; 2414 2415 rpc = data; 2416 app = rpc->app; 2417 tmcf = rpc->temp_conf; 2418 2419 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2420 &app->name); 2421 2422 app->pending_processes--; 2423 2424 nxt_router_conf_error(task, tmcf); 2425} 2426 2427 2428static nxt_int_t 2429nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2430 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2431{ 2432 nxt_int_t ret; 2433 nxt_uint_t n, threads; 2434 nxt_queue_link_t *qlk; 2435 nxt_router_engine_conf_t *recf; 2436 2437 threads = tmcf->router_conf->threads; 2438 2439 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2440 sizeof(nxt_router_engine_conf_t)); 2441 if (nxt_slow_path(tmcf->engines == NULL)) { 2442 return NXT_ERROR; 2443 } 2444 2445 n = 0; 2446 2447 for (qlk = nxt_queue_first(&router->engines); 2448 qlk != nxt_queue_tail(&router->engines); 2449 qlk = nxt_queue_next(qlk)) 2450 { 2451 recf = nxt_array_zero_add(tmcf->engines); 2452 if (nxt_slow_path(recf == NULL)) { 2453 return NXT_ERROR; 2454 } 2455 2456 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2457 2458 if (n < threads) { 2459 recf->action = NXT_ROUTER_ENGINE_KEEP; 2460 ret = nxt_router_engine_conf_update(tmcf, recf); 2461 2462 } else { 2463 recf->action = NXT_ROUTER_ENGINE_DELETE; 2464 ret = nxt_router_engine_conf_delete(tmcf, recf); 2465 } 2466 2467 if (nxt_slow_path(ret != NXT_OK)) { 2468 return ret; 2469 } 2470 2471 n++; 2472 } 2473 2474 tmcf->new_threads = n; 2475 2476 while (n < threads) { 2477 recf = nxt_array_zero_add(tmcf->engines); 2478 if (nxt_slow_path(recf == NULL)) { 2479 return NXT_ERROR; 2480 } 2481 2482 recf->action = NXT_ROUTER_ENGINE_ADD; 2483 2484 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2485 if (nxt_slow_path(recf->engine == NULL)) { 2486 return NXT_ERROR; 2487 } 2488 2489 ret = nxt_router_engine_conf_create(tmcf, recf); 2490 if (nxt_slow_path(ret != NXT_OK)) { 2491 return ret; 2492 } 2493 2494 n++; 2495 } 2496 2497 return NXT_OK; 2498} 2499 2500 2501static nxt_int_t 2502nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2503 nxt_router_engine_conf_t *recf) 2504{ 2505 nxt_int_t ret; 2506 2507 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2508 nxt_router_listen_socket_create); 2509 if (nxt_slow_path(ret != NXT_OK)) { 2510 return ret; 2511 } 2512 2513 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2514 nxt_router_listen_socket_create); 2515 if (nxt_slow_path(ret != NXT_OK)) { 2516 return ret; 2517 } 2518 2519 return ret; 2520} 2521 2522 2523static nxt_int_t 2524nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2525 nxt_router_engine_conf_t *recf) 2526{ 2527 nxt_int_t ret; 2528 2529 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2530 nxt_router_listen_socket_create); 2531 if (nxt_slow_path(ret != NXT_OK)) { 2532 return ret; 2533 } 2534 2535 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2536 nxt_router_listen_socket_update); 2537 if (nxt_slow_path(ret != NXT_OK)) { 2538 return ret; 2539 } 2540 2541 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2542 if (nxt_slow_path(ret != NXT_OK)) { 2543 return ret; 2544 } 2545 2546 return ret; 2547} 2548 2549 2550static nxt_int_t 2551nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2552 nxt_router_engine_conf_t *recf) 2553{ 2554 nxt_int_t ret; 2555 2556 ret = nxt_router_engine_quit(tmcf, recf); 2557 if (nxt_slow_path(ret != NXT_OK)) { 2558 return ret; 2559 } 2560 2561 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); 2562 if (nxt_slow_path(ret != NXT_OK)) { 2563 return ret; 2564 } 2565 2566 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2567} 2568 2569 2570static nxt_int_t 2571nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 2572 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 2573 nxt_work_handler_t handler) 2574{ 2575 nxt_int_t ret; 2576 nxt_joint_job_t *job; 2577 nxt_queue_link_t *qlk; 2578 nxt_socket_conf_t *skcf; 2579 nxt_socket_conf_joint_t *joint; 2580 2581 for (qlk = nxt_queue_first(sockets); 2582 qlk != nxt_queue_tail(sockets); 2583 qlk = nxt_queue_next(qlk)) 2584 { 2585 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2586 if (nxt_slow_path(job == NULL)) { 2587 return NXT_ERROR; 2588 } 2589 2590 job->work.next = recf->jobs; 2591 recf->jobs = &job->work; 2592 2593 job->task = tmcf->engine->task; 2594 job->work.handler = handler; 2595 job->work.task = &job->task; 2596 job->work.obj = job; 2597 job->tmcf = tmcf; 2598 2599 tmcf->count++; 2600 2601 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 2602 sizeof(nxt_socket_conf_joint_t)); 2603 if (nxt_slow_path(joint == NULL)) { 2604 return NXT_ERROR; 2605 } 2606 2607 job->work.data = joint; 2608 2609 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 2610 if (nxt_slow_path(ret != NXT_OK)) { 2611 return ret; 2612 } 2613 2614 joint->count = 1; 2615 2616 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2617 skcf->count++; 2618 joint->socket_conf = skcf; 2619 2620 joint->engine = recf->engine; 2621 } 2622 2623 return NXT_OK; 2624} 2625 2626 2627static nxt_int_t 2628nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 2629 nxt_router_engine_conf_t *recf) 2630{ 2631 nxt_joint_job_t *job; 2632 2633 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2634 if (nxt_slow_path(job == NULL)) { 2635 return NXT_ERROR; 2636 } 2637 2638 job->work.next = recf->jobs; 2639 recf->jobs = &job->work; 2640 2641 job->task = tmcf->engine->task; 2642 job->work.handler = nxt_router_worker_thread_quit; 2643 job->work.task = &job->task; 2644 job->work.obj = NULL; 2645 job->work.data = NULL; 2646 job->tmcf = NULL; 2647 2648 return NXT_OK; 2649} 2650 2651 2652static nxt_int_t 2653nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 2654 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 2655{ 2656 nxt_joint_job_t *job; 2657 nxt_queue_link_t *qlk; 2658 2659 for (qlk = nxt_queue_first(sockets); 2660 qlk != nxt_queue_tail(sockets); 2661 qlk = nxt_queue_next(qlk)) 2662 { 2663 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2664 if (nxt_slow_path(job == NULL)) { 2665 return NXT_ERROR; 2666 } 2667 2668 job->work.next = recf->jobs; 2669 recf->jobs = &job->work; 2670 2671 job->task = tmcf->engine->task; 2672 job->work.handler = nxt_router_listen_socket_delete; 2673 job->work.task = &job->task; 2674 job->work.obj = job; 2675 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2676 job->tmcf = tmcf; 2677 2678 tmcf->count++; 2679 } 2680 2681 return NXT_OK; 2682} 2683 2684 2685static nxt_int_t 2686nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 2687 nxt_router_temp_conf_t *tmcf) 2688{ 2689 nxt_int_t ret; 2690 nxt_uint_t i, threads; 2691 nxt_router_engine_conf_t *recf; 2692 2693 recf = tmcf->engines->elts; 2694 threads = tmcf->router_conf->threads; 2695 2696 for (i = tmcf->new_threads; i < threads; i++) { 2697 ret = nxt_router_thread_create(task, rt, recf[i].engine); 2698 if (nxt_slow_path(ret != NXT_OK)) { 2699 return ret; 2700 } 2701 } 2702 2703 return NXT_OK; 2704} 2705 2706 2707static nxt_int_t 2708nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 2709 nxt_event_engine_t *engine) 2710{ 2711 nxt_int_t ret; 2712 nxt_thread_link_t *link; 2713 nxt_thread_handle_t handle; 2714 2715 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2716 2717 if (nxt_slow_path(link == NULL)) { 2718 return NXT_ERROR; 2719 } 2720 2721 link->start = nxt_router_thread_start; 2722 link->engine = engine; 2723 link->work.handler = nxt_router_thread_exit_handler; 2724 link->work.task = task; 2725 link->work.data = link; 2726 2727 nxt_queue_insert_tail(&rt->engines, &engine->link); 2728 2729 ret = nxt_thread_create(&handle, link); 2730 2731 if (nxt_slow_path(ret != NXT_OK)) { 2732 nxt_queue_remove(&engine->link); 2733 } 2734 2735 return ret; 2736} 2737 2738 2739static void 2740nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 2741 nxt_router_temp_conf_t *tmcf) 2742{ 2743 nxt_app_t *app; 2744 2745 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 2746 2747 nxt_router_app_unlink(task, app); 2748 2749 } nxt_queue_loop; 2750 2751 nxt_queue_add(&router->apps, &tmcf->previous); 2752 nxt_queue_add(&router->apps, &tmcf->apps); 2753} 2754 2755 2756static void 2757nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 2758{ 2759 nxt_uint_t n; 2760 nxt_event_engine_t *engine; 2761 nxt_router_engine_conf_t *recf; 2762 2763 recf = tmcf->engines->elts; 2764 2765 for (n = tmcf->engines->nelts; n != 0; n--) { 2766 engine = recf->engine; 2767 2768 switch (recf->action) { 2769 2770 case NXT_ROUTER_ENGINE_KEEP: 2771 break; 2772 2773 case NXT_ROUTER_ENGINE_ADD: 2774 nxt_queue_insert_tail(&router->engines, &engine->link0); 2775 break; 2776 2777 case NXT_ROUTER_ENGINE_DELETE: 2778 nxt_queue_remove(&engine->link0); 2779 break; 2780 } 2781 2782 nxt_router_engine_post(engine, recf->jobs); 2783 2784 recf++; 2785 } 2786} 2787 2788 2789static void 2790nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 2791{ 2792 nxt_work_t *work, *next; 2793 2794 for (work = jobs; work != NULL; work = next) { 2795 next = work->next; 2796 work->next = NULL; 2797 2798 nxt_event_engine_post(engine, work); 2799 } 2800} 2801 2802 2803static nxt_port_handlers_t nxt_router_app_port_handlers = { 2804 .rpc_error = nxt_port_rpc_handler, 2805 .mmap = nxt_port_mmap_handler, 2806 .data = nxt_port_rpc_handler, 2807 .oosm = nxt_router_oosm_handler, 2808}; 2809 2810 2811static void 2812nxt_router_thread_start(void *data) 2813{ 2814 nxt_int_t ret; 2815 nxt_port_t *port; 2816 nxt_task_t *task; 2817 nxt_thread_t *thread; 2818 nxt_thread_link_t *link; 2819 nxt_event_engine_t *engine; 2820 2821 link = data; 2822 engine = link->engine; 2823 task = &engine->task; 2824 2825 thread = nxt_thread(); 2826 2827 nxt_event_engine_thread_adopt(engine); 2828 2829 /* STUB */ 2830 thread->runtime = engine->task.thread->runtime; 2831 2832 engine->task.thread = thread; 2833 engine->task.log = thread->log; 2834 thread->engine = engine; 2835 thread->task = &engine->task; 2836#if 0 2837 thread->fiber = &engine->fibers->fiber; 2838#endif 2839 2840 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 2841 if (nxt_slow_path(engine->mem_pool == NULL)) { 2842 return; 2843 } 2844 2845 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 2846 NXT_PROCESS_ROUTER); 2847 if (nxt_slow_path(port == NULL)) { 2848 return; 2849 } 2850 2851 ret = nxt_port_socket_init(task, port, 0); 2852 if (nxt_slow_path(ret != NXT_OK)) { 2853 nxt_port_use(task, port, -1); 2854 return; 2855 } 2856 2857 engine->port = port; 2858 2859 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 2860 2861 nxt_event_engine_start(engine); 2862} 2863 2864 2865static void 2866nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 2867{ 2868 nxt_joint_job_t *job; 2869 nxt_socket_conf_t *skcf; 2870 nxt_listen_event_t *lev; 2871 nxt_listen_socket_t *ls; 2872 nxt_thread_spinlock_t *lock; 2873 nxt_socket_conf_joint_t *joint; 2874 2875 job = obj; 2876 joint = data; 2877 2878 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 2879 2880 skcf = joint->socket_conf; 2881 ls = skcf->listen; 2882 2883 lev = nxt_listen_event(task, ls); 2884 if (nxt_slow_path(lev == NULL)) { 2885 nxt_router_listen_socket_release(task, skcf); 2886 return; 2887 } 2888 2889 lev->socket.data = joint; 2890 2891 lock = &skcf->router_conf->router->lock; 2892 2893 nxt_thread_spin_lock(lock); 2894 ls->count++; 2895 nxt_thread_spin_unlock(lock); 2896 2897 job->work.next = NULL; 2898 job->work.handler = nxt_router_conf_wait; 2899 2900 nxt_event_engine_post(job->tmcf->engine, &job->work); 2901} 2902 2903 2904nxt_inline nxt_listen_event_t * 2905nxt_router_listen_event(nxt_queue_t *listen_connections, 2906 nxt_socket_conf_t *skcf) 2907{ 2908 nxt_socket_t fd; 2909 nxt_queue_link_t *qlk; 2910 nxt_listen_event_t *lev; 2911 2912 fd = skcf->listen->socket; 2913 2914 for (qlk = nxt_queue_first(listen_connections); 2915 qlk != nxt_queue_tail(listen_connections); 2916 qlk = nxt_queue_next(qlk)) 2917 { 2918 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 2919 2920 if (fd == lev->socket.fd) { 2921 return lev; 2922 } 2923 } 2924 2925 return NULL; 2926} 2927 2928 2929static void 2930nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 2931{ 2932 nxt_joint_job_t *job; 2933 nxt_event_engine_t *engine; 2934 nxt_listen_event_t *lev; 2935 nxt_socket_conf_joint_t *joint, *old; 2936 2937 job = obj; 2938 joint = data; 2939 2940 engine = task->thread->engine; 2941 2942 nxt_queue_insert_tail(&engine->joints, &joint->link); 2943 2944 lev = nxt_router_listen_event(&engine->listen_connections, 2945 joint->socket_conf); 2946 2947 old = lev->socket.data; 2948 lev->socket.data = joint; 2949 lev->listen = joint->socket_conf->listen; 2950 2951 job->work.next = NULL; 2952 job->work.handler = nxt_router_conf_wait; 2953 2954 nxt_event_engine_post(job->tmcf->engine, &job->work); 2955 2956 /* 2957 * The task is allocated from configuration temporary 2958 * memory pool so it can be freed after engine post operation. 2959 */ 2960 2961 nxt_router_conf_release(&engine->task, old); 2962} 2963 2964 2965static void 2966nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 2967{ 2968 nxt_joint_job_t *job; 2969 nxt_socket_conf_t *skcf; 2970 nxt_listen_event_t *lev; 2971 nxt_event_engine_t *engine; 2972 2973 job = obj; 2974 skcf = data; 2975 2976 engine = task->thread->engine; 2977 2978 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 2979 2980 nxt_fd_event_delete(engine, &lev->socket); 2981 2982 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 2983 lev->socket.fd); 2984 2985 lev->timer.handler = nxt_router_listen_socket_close; 2986 lev->timer.work_queue = &engine->fast_work_queue; 2987 2988 nxt_timer_add(engine, &lev->timer, 0); 2989 2990 job->work.next = NULL; 2991 job->work.handler = nxt_router_conf_wait; 2992 2993 nxt_event_engine_post(job->tmcf->engine, &job->work); 2994} 2995 2996 2997static void 2998nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 2999{ 3000 nxt_event_engine_t *engine; 3001 3002 nxt_debug(task, "router worker thread quit"); 3003 3004 engine = task->thread->engine; 3005 3006 engine->shutdown = 1; 3007 3008 if (nxt_queue_is_empty(&engine->joints)) { 3009 nxt_thread_exit(task->thread); 3010 } 3011} 3012 3013 3014static void 3015nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3016{ 3017 nxt_timer_t *timer; 3018 nxt_listen_event_t *lev; 3019 nxt_socket_conf_joint_t *joint; 3020 3021 timer = obj; 3022 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3023 3024 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3025 lev->socket.fd); 3026 3027 nxt_queue_remove(&lev->link); 3028 3029 joint = lev->socket.data; 3030 lev->socket.data = NULL; 3031 3032 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3033 task = &task->thread->engine->task; 3034 3035 nxt_router_listen_socket_release(task, joint->socket_conf); 3036 3037 nxt_router_listen_event_release(task, lev, joint); 3038} 3039 3040 3041static void 3042nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3043{ 3044 nxt_listen_socket_t *ls; 3045 nxt_thread_spinlock_t *lock; 3046 3047 ls = skcf->listen; 3048 lock = &skcf->router_conf->router->lock; 3049 3050 nxt_thread_spin_lock(lock); 3051 3052 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3053 task->thread->engine, ls->count); 3054 3055 if (--ls->count != 0) { 3056 ls = NULL; 3057 } 3058 3059 nxt_thread_spin_unlock(lock); 3060 3061 if (ls != NULL) { 3062 nxt_socket_close(task, ls->socket); 3063 nxt_free(ls); 3064 } 3065} 3066 3067 3068void 3069nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3070 nxt_socket_conf_joint_t *joint) 3071{ 3072 nxt_event_engine_t *engine; 3073 3074 nxt_debug(task, "listen event count: %D", lev->count); 3075 3076 if (--lev->count == 0) { 3077 nxt_free(lev); 3078 } 3079 3080 if (joint != NULL) { 3081 nxt_router_conf_release(task, joint); 3082 } 3083 3084 engine = task->thread->engine; 3085 3086 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3087 nxt_thread_exit(task->thread); 3088 } 3089} 3090 3091 3092void 3093nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3094{ 3095 nxt_socket_conf_t *skcf; 3096 nxt_router_conf_t *rtcf; 3097 nxt_thread_spinlock_t *lock; 3098 3099 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3100 3101 if (--joint->count != 0) { 3102 return; 3103 } 3104 3105 nxt_queue_remove(&joint->link); 3106 3107 /* 3108 * The joint content can not be safely used after the critical 3109 * section protected by the spinlock because its memory pool may 3110 * be already destroyed by another thread. 3111 */ 3112 skcf = joint->socket_conf; 3113 rtcf = skcf->router_conf; 3114 lock = &rtcf->router->lock; 3115 3116 nxt_thread_spin_lock(lock); 3117 3118 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3119 rtcf, rtcf->count); 3120 3121 if (--skcf->count != 0) { 3122 skcf = NULL; 3123 rtcf = NULL; 3124 3125 } else { 3126 nxt_queue_remove(&skcf->link); 3127 3128 if (--rtcf->count != 0) { 3129 rtcf = NULL; 3130 } 3131 } 3132 3133 nxt_thread_spin_unlock(lock); 3134 3135 if (skcf != NULL) { 3136 if (skcf->action != NULL) { 3137 nxt_http_action_cleanup(task, skcf->action); 3138 } 3139 3140#if (NXT_TLS) 3141 if (skcf->tls != NULL) { 3142 task->thread->runtime->tls->server_free(task, skcf->tls); 3143 } 3144#endif 3145 } 3146 3147 /* TODO remove engine->port */ 3148 /* TODO excude from connected ports */ 3149 3150 if (rtcf != NULL) { 3151 nxt_debug(task, "old router conf is destroyed"); 3152 3153 nxt_http_routes_cleanup(task, rtcf->routes); 3154 3155 nxt_router_access_log_release(task, lock, rtcf->access_log); 3156 3157 nxt_mp_thread_adopt(rtcf->mem_pool); 3158 3159 nxt_mp_destroy(rtcf->mem_pool); 3160 } 3161} 3162 3163 3164static void 3165nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3166 nxt_router_access_log_t *access_log) 3167{ 3168 size_t size; 3169 u_char *buf, *p; 3170 nxt_off_t bytes; 3171 3172 static nxt_time_string_t date_cache = { 3173 (nxt_atomic_uint_t) -1, 3174 nxt_router_access_log_date, 3175 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3176 nxt_length("31/Dec/1986:19:40:00 +0300"), 3177 NXT_THREAD_TIME_LOCAL, 3178 NXT_THREAD_TIME_SEC, 3179 }; 3180 3181 size = r->remote->address_length 3182 + 6 /* ' - - [' */ 3183 + date_cache.size 3184 + 3 /* '] "' */ 3185 + r->method->length 3186 + 1 /* space */ 3187 + r->target.length 3188 + 1 /* space */ 3189 + r->version.length 3190 + 2 /* '" ' */ 3191 + 3 /* status */ 3192 + 1 /* space */ 3193 + NXT_OFF_T_LEN 3194 + 2 /* ' "' */ 3195 + (r->referer != NULL ? r->referer->value_length : 1) 3196 + 3 /* '" "' */ 3197 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3198 + 2 /* '"\n' */ 3199 ; 3200 3201 buf = nxt_mp_nget(r->mem_pool, size); 3202 if (nxt_slow_path(buf == NULL)) { 3203 return; 3204 } 3205 3206 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3207 r->remote->address_length); 3208 3209 p = nxt_cpymem(p, " - - [", 6); 3210 3211 p = nxt_thread_time_string(task->thread, &date_cache, p); 3212 3213 p = nxt_cpymem(p, "] \"", 3); 3214 3215 if (r->method->length != 0) { 3216 p = nxt_cpymem(p, r->method->start, r->method->length); 3217 3218 if (r->target.length != 0) { 3219 *p++ = ' '; 3220 p = nxt_cpymem(p, r->target.start, r->target.length); 3221 3222 if (r->version.length != 0) { 3223 *p++ = ' '; 3224 p = nxt_cpymem(p, r->version.start, r->version.length); 3225 } 3226 } 3227 3228 } else { 3229 *p++ = '-'; 3230 } 3231 3232 p = nxt_cpymem(p, "\" ", 2); 3233 3234 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3235 3236 *p++ = ' '; 3237 3238 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3239 3240 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3241 3242 p = nxt_cpymem(p, " \"", 2); 3243 3244 if (r->referer != NULL) { 3245 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3246 3247 } else { 3248 *p++ = '-'; 3249 } 3250 3251 p = nxt_cpymem(p, "\" \"", 3); 3252 3253 if (r->user_agent != NULL) { 3254 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3255 3256 } else { 3257 *p++ = '-'; 3258 } 3259 3260 p = nxt_cpymem(p, "\"\n", 2); 3261 3262 nxt_fd_write(access_log->fd, buf, p - buf); 3263} 3264 3265 3266static u_char * 3267nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3268 size_t size, const char *format) 3269{ 3270 u_char sign; 3271 time_t gmtoff; 3272 3273 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3274 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3275 3276 gmtoff = nxt_timezone(tm) / 60; 3277 3278 if (gmtoff < 0) { 3279 gmtoff = -gmtoff; 3280 sign = '-'; 3281 3282 } else { 3283 sign = '+'; 3284 } 3285 3286 return nxt_sprintf(buf, buf + size, format, 3287 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3288 tm->tm_hour, tm->tm_min, tm->tm_sec, 3289 sign, gmtoff / 60, gmtoff % 60); 3290} 3291 3292 3293static void 3294nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3295{ 3296 uint32_t stream; 3297 nxt_int_t ret; 3298 nxt_buf_t *b; 3299 nxt_port_t *main_port, *router_port; 3300 nxt_runtime_t *rt; 3301 nxt_router_access_log_t *access_log; 3302 3303 access_log = tmcf->router_conf->access_log; 3304 3305 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3306 if (nxt_slow_path(b == NULL)) { 3307 goto fail; 3308 } 3309 3310 nxt_buf_cpystr(b, &access_log->path); 3311 *b->mem.free++ = '\0'; 3312 3313 rt = task->thread->runtime; 3314 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3315 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3316 3317 stream = nxt_port_rpc_register_handler(task, router_port, 3318 nxt_router_access_log_ready, 3319 nxt_router_access_log_error, 3320 -1, tmcf); 3321 if (nxt_slow_path(stream == 0)) { 3322 goto fail; 3323 } 3324 3325 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3326 stream, router_port->id, b); 3327 3328 if (nxt_slow_path(ret != NXT_OK)) { 3329 nxt_port_rpc_cancel(task, router_port, stream); 3330 goto fail; 3331 } 3332 3333 return; 3334 3335fail: 3336 3337 nxt_router_conf_error(task, tmcf); 3338} 3339 3340 3341static void 3342nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3343 void *data) 3344{ 3345 nxt_router_temp_conf_t *tmcf; 3346 nxt_router_access_log_t *access_log; 3347 3348 tmcf = data; 3349 3350 access_log = tmcf->router_conf->access_log; 3351 3352 access_log->fd = msg->fd; 3353 3354 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3355 nxt_router_conf_apply, task, tmcf, NULL); 3356} 3357 3358 3359static void 3360nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3361 void *data) 3362{ 3363 nxt_router_temp_conf_t *tmcf; 3364 3365 tmcf = data; 3366 3367 nxt_router_conf_error(task, tmcf); 3368} 3369 3370 3371static void 3372nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3373 nxt_router_access_log_t *access_log) 3374{ 3375 if (access_log == NULL) { 3376 return; 3377 } 3378 3379 nxt_thread_spin_lock(lock); 3380 3381 if (--access_log->count != 0) { 3382 access_log = NULL; 3383 } 3384 3385 nxt_thread_spin_unlock(lock); 3386 3387 if (access_log != NULL) { 3388 3389 if (access_log->fd != -1) { 3390 nxt_fd_close(access_log->fd); 3391 } 3392 3393 nxt_free(access_log); 3394 } 3395} 3396 3397 3398typedef struct { 3399 nxt_mp_t *mem_pool; 3400 nxt_router_access_log_t *access_log; 3401} nxt_router_access_log_reopen_t; 3402 3403 3404void 3405nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3406{ 3407 nxt_mp_t *mp; 3408 uint32_t stream; 3409 nxt_int_t ret; 3410 nxt_buf_t *b; 3411 nxt_port_t *main_port, *router_port; 3412 nxt_runtime_t *rt; 3413 nxt_router_access_log_t *access_log; 3414 nxt_router_access_log_reopen_t *reopen; 3415 3416 access_log = nxt_router->access_log; 3417 3418 if (access_log == NULL) { 3419 return; 3420 } 3421 3422 mp = nxt_mp_create(1024, 128, 256, 32); 3423 if (nxt_slow_path(mp == NULL)) { 3424 return; 3425 } 3426 3427 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3428 if (nxt_slow_path(reopen == NULL)) { 3429 goto fail; 3430 } 3431 3432 reopen->mem_pool = mp; 3433 reopen->access_log = access_log; 3434 3435 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 3436 if (nxt_slow_path(b == NULL)) { 3437 goto fail; 3438 } 3439 3440 b->completion_handler = nxt_router_access_log_reopen_completion; 3441 3442 nxt_buf_cpystr(b, &access_log->path); 3443 *b->mem.free++ = '\0'; 3444 3445 rt = task->thread->runtime; 3446 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3447 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3448 3449 stream = nxt_port_rpc_register_handler(task, router_port, 3450 nxt_router_access_log_reopen_ready, 3451 nxt_router_access_log_reopen_error, 3452 -1, reopen); 3453 if (nxt_slow_path(stream == 0)) { 3454 goto fail; 3455 } 3456 3457 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3458 stream, router_port->id, b); 3459 3460 if (nxt_slow_path(ret != NXT_OK)) { 3461 nxt_port_rpc_cancel(task, router_port, stream); 3462 goto fail; 3463 } 3464 3465 nxt_mp_retain(mp); 3466 3467 return; 3468 3469fail: 3470 3471 nxt_mp_destroy(mp); 3472} 3473 3474 3475static void 3476nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 3477{ 3478 nxt_mp_t *mp; 3479 nxt_buf_t *b; 3480 3481 b = obj; 3482 mp = b->data; 3483 3484 nxt_mp_release(mp); 3485} 3486 3487 3488static void 3489nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3490 void *data) 3491{ 3492 nxt_router_access_log_t *access_log; 3493 nxt_router_access_log_reopen_t *reopen; 3494 3495 reopen = data; 3496 3497 access_log = reopen->access_log; 3498 3499 if (access_log == nxt_router->access_log) { 3500 3501 if (nxt_slow_path(dup2(msg->fd, access_log->fd) == -1)) { 3502 nxt_alert(task, "dup2(%FD, %FD) failed %E", 3503 msg->fd, access_log->fd, nxt_errno); 3504 } 3505 } 3506 3507 nxt_fd_close(msg->fd); 3508 nxt_mp_release(reopen->mem_pool); 3509} 3510 3511 3512static void 3513nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3514 void *data) 3515{ 3516 nxt_router_access_log_reopen_t *reopen; 3517 3518 reopen = data; 3519 3520 nxt_mp_release(reopen->mem_pool); 3521} 3522 3523 3524static void 3525nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 3526{ 3527 nxt_port_t *port; 3528 nxt_thread_link_t *link; 3529 nxt_event_engine_t *engine; 3530 nxt_thread_handle_t handle; 3531 3532 handle = (nxt_thread_handle_t) obj; 3533 link = data; 3534 3535 nxt_thread_wait(handle); 3536 3537 engine = link->engine; 3538 3539 nxt_queue_remove(&engine->link); 3540 3541 port = engine->port; 3542 3543 // TODO notify all apps 3544 3545 port->engine = task->thread->engine; 3546 nxt_mp_thread_adopt(port->mem_pool); 3547 nxt_port_use(task, port, -1); 3548 3549 nxt_mp_thread_adopt(engine->mem_pool); 3550 nxt_mp_destroy(engine->mem_pool); 3551 3552 nxt_event_engine_free(engine); 3553 3554 nxt_free(link); 3555} 3556 3557 3558static void 3559nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3560 void *data) 3561{ 3562 nxt_int_t ret; 3563 nxt_buf_t *b, *next; 3564 nxt_port_t *app_port; 3565 nxt_unit_field_t *f; 3566 nxt_http_field_t *field; 3567 nxt_http_request_t *r; 3568 nxt_unit_response_t *resp; 3569 nxt_request_app_link_t *req_app_link; 3570 nxt_request_rpc_data_t *req_rpc_data; 3571 3572 b = msg->buf; 3573 req_rpc_data = data; 3574 3575 if (msg->size == 0) { 3576 b = NULL; 3577 } 3578 3579 r = req_rpc_data->request; 3580 if (nxt_slow_path(r == NULL)) { 3581 return; 3582 } 3583 3584 if (r->error) { 3585 nxt_request_rpc_data_unlink(task, req_rpc_data); 3586 return; 3587 } 3588 3589 if (msg->port_msg.last != 0) { 3590 nxt_debug(task, "router data create last buf"); 3591 3592 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 3593 3594 nxt_request_rpc_data_unlink(task, req_rpc_data); 3595 3596 } else { 3597 if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) { 3598 r->timer.handler = nxt_router_app_timeout; 3599 r->timer_data = req_rpc_data; 3600 nxt_timer_add(task->thread->engine, &r->timer, 3601 req_rpc_data->app->timeout); 3602 } 3603 } 3604 3605 if (b == NULL) { 3606 return; 3607 } 3608 3609 if (msg->buf == b) { 3610 /* Disable instant buffer completion/re-using by port. */ 3611 msg->buf = NULL; 3612 } 3613 3614 if (r->header_sent) { 3615 nxt_buf_chain_add(&r->out, b); 3616 nxt_http_request_send_body(task, r, NULL); 3617 3618 } else { 3619 size_t b_size = nxt_buf_mem_used_size(&b->mem); 3620 3621 if (nxt_slow_path(b_size < sizeof(*resp))) { 3622 goto fail; 3623 } 3624 3625 resp = (void *) b->mem.pos; 3626 if (nxt_slow_path(b_size < sizeof(*resp) 3627 + resp->fields_count * sizeof(nxt_unit_field_t))) { 3628 goto fail; 3629 } 3630 3631 field = NULL; 3632 3633 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 3634 if (f->skip) { 3635 continue; 3636 } 3637 3638 field = nxt_list_add(r->resp.fields); 3639 3640 if (nxt_slow_path(field == NULL)) { 3641 goto fail; 3642 } 3643 3644 field->hash = f->hash; 3645 field->skip = 0; 3646 field->hopbyhop = 0; 3647 3648 field->name_length = f->name_length; 3649 field->value_length = f->value_length; 3650 field->name = nxt_unit_sptr_get(&f->name); 3651 field->value = nxt_unit_sptr_get(&f->value); 3652 3653 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 3654 if (nxt_slow_path(ret != NXT_OK)) { 3655 goto fail; 3656 } 3657 3658 nxt_debug(task, "header%s: %*s: %*s", 3659 (field->skip ? " skipped" : ""), 3660 (size_t) field->name_length, field->name, 3661 (size_t) field->value_length, field->value); 3662 3663 if (field->skip) { 3664 r->resp.fields->last->nelts--; 3665 } 3666 } 3667 3668 r->status = resp->status; 3669 3670 if (resp->piggyback_content_length != 0) { 3671 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 3672 b->mem.free = b->mem.pos + resp->piggyback_content_length; 3673 3674 } else { 3675 b->mem.pos = b->mem.free; 3676 } 3677 3678 if (nxt_buf_mem_used_size(&b->mem) == 0) { 3679 next = b->next; 3680 b->next = NULL; 3681 3682 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3683 b->completion_handler, task, b, b->parent); 3684 3685 b = next; 3686 } 3687 3688 if (b != NULL) { 3689 nxt_buf_chain_add(&r->out, b); 3690 } 3691 3692 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 3693 3694 if (r->websocket_handshake 3695 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 3696 { 3697 req_app_link = nxt_request_app_link_alloc(task, 3698 req_rpc_data->req_app_link, 3699 req_rpc_data); 3700 if (nxt_slow_path(req_app_link == NULL)) { 3701 goto fail; 3702 } 3703 3704 app_port = req_app_link->app_port; 3705 3706 if (app_port == NULL && req_rpc_data->app_port != NULL) { 3707 req_app_link->app_port = req_rpc_data->app_port; 3708 app_port = req_app_link->app_port; 3709 req_app_link->apr_action = req_rpc_data->apr_action; 3710 3711 req_rpc_data->app_port = NULL; 3712 } 3713 3714 if (nxt_slow_path(app_port == NULL)) { 3715 goto fail; 3716 } 3717 3718 nxt_thread_mutex_lock(&req_rpc_data->app->mutex); 3719 3720 nxt_queue_insert_tail(&app_port->active_websockets, 3721 &req_app_link->link_port_websockets); 3722 3723 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); 3724 3725 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); 3726 req_app_link->apr_action = NXT_APR_CLOSE; 3727 3728 nxt_debug(task, "req_app_link stream #%uD upgrade", 3729 req_app_link->stream); 3730 3731 r->state = &nxt_http_websocket; 3732 3733 } else { 3734 r->state = &nxt_http_request_send_state; 3735 } 3736 } 3737 3738 return; 3739 3740fail: 3741 3742 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 3743 3744 nxt_request_rpc_data_unlink(task, req_rpc_data); 3745} 3746 3747 3748static const nxt_http_request_state_t nxt_http_request_send_state 3749 nxt_aligned(64) = 3750{ 3751 .error_handler = nxt_http_request_error_handler, 3752}; 3753 3754 3755static void 3756nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 3757{ 3758 nxt_buf_t *out; 3759 nxt_http_request_t *r; 3760 3761 r = obj; 3762 3763 out = r->out; 3764 3765 if (out != NULL) { 3766 r->out = NULL; 3767 nxt_http_request_send(task, r, out); 3768 } 3769} 3770 3771 3772static void 3773nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3774 void *data) 3775{ 3776 nxt_int_t res; 3777 nxt_port_t *port; 3778 nxt_bool_t cancelled; 3779 nxt_request_app_link_t *req_app_link; 3780 nxt_request_rpc_data_t *req_rpc_data; 3781 3782 req_rpc_data = data; 3783 3784 req_app_link = req_rpc_data->req_app_link; 3785 3786 if (req_app_link != NULL) { 3787 cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, 3788 req_app_link->stream); 3789 if (cancelled) { 3790 res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); 3791 3792 if (res == NXT_OK) { 3793 port = req_app_link->app_port; 3794 3795 if (nxt_slow_path(port == NULL)) { 3796 nxt_log(task, NXT_LOG_ERR, 3797 "port is NULL in cancelled req_app_link"); 3798 return; 3799 } 3800 3801 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, 3802 req_rpc_data, port->pid); 3803 3804 nxt_router_app_prepare_request(task, req_app_link); 3805 } 3806 3807 msg->port_msg.last = 0; 3808 3809 return; 3810 } 3811 } 3812 3813 if (req_rpc_data->request != NULL) { 3814 nxt_http_request_error(task, req_rpc_data->request, 3815 NXT_HTTP_SERVICE_UNAVAILABLE); 3816 } 3817 3818 nxt_request_rpc_data_unlink(task, req_rpc_data); 3819} 3820 3821 3822static void 3823nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3824 void *data) 3825{ 3826 nxt_app_t *app; 3827 nxt_port_t *port; 3828 nxt_app_joint_t *app_joint; 3829 3830 app_joint = data; 3831 port = msg->u.new_port; 3832 3833 nxt_assert(app_joint != NULL); 3834 nxt_assert(port != NULL); 3835 3836 app = app_joint->app; 3837 3838 nxt_router_app_joint_use(task, app_joint, -1); 3839 3840 if (nxt_slow_path(app == NULL)) { 3841 nxt_debug(task, "new port ready for released app, send QUIT"); 3842 3843 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 3844 3845 return; 3846 } 3847 3848 port->app = app; 3849 3850 nxt_thread_mutex_lock(&app->mutex); 3851 3852 nxt_assert(app->pending_processes != 0); 3853 3854 app->pending_processes--; 3855 app->processes++; 3856 3857 nxt_thread_mutex_unlock(&app->mutex); 3858 3859 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 3860 &app->name, port->pid, app->processes, app->pending_processes); 3861 3862 nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); 3863} 3864 3865 3866static void 3867nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3868 void *data) 3869{ 3870 nxt_app_t *app; 3871 nxt_app_joint_t *app_joint; 3872 nxt_queue_link_t *lnk; 3873 nxt_request_app_link_t *req_app_link; 3874 3875 app_joint = data; 3876 3877 nxt_assert(app_joint != NULL); 3878 3879 app = app_joint->app; 3880 3881 nxt_router_app_joint_use(task, app_joint, -1); 3882 3883 if (nxt_slow_path(app == NULL)) { 3884 nxt_debug(task, "start error for released app"); 3885 3886 return; 3887 } 3888 3889 nxt_debug(task, "app '%V' %p start error", &app->name, app); 3890 3891 nxt_thread_mutex_lock(&app->mutex); 3892 3893 nxt_assert(app->pending_processes != 0); 3894 3895 app->pending_processes--; 3896 3897 if (!nxt_queue_is_empty(&app->requests)) { 3898 lnk = nxt_queue_last(&app->requests); 3899 nxt_queue_remove(lnk); 3900 lnk->next = NULL; 3901 3902 req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, 3903 link_app_requests); 3904 3905 } else { 3906 req_app_link = NULL; 3907 } 3908 3909 nxt_thread_mutex_unlock(&app->mutex); 3910 3911 if (req_app_link != NULL) { 3912 nxt_debug(task, "app '%V' %p abort next stream #%uD", 3913 &app->name, app, req_app_link->stream); 3914
|