1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #include <nxt_http.h> 11 #include <nxt_port_memory_int.h> 12 #include <nxt_unit_request.h> 13 #include <nxt_unit_response.h> 14 15 16 typedef struct { 17 nxt_str_t type; 18 uint32_t processes; 19 uint32_t max_processes; 20 uint32_t spare_processes; 21 nxt_msec_t timeout; 22 nxt_msec_t res_timeout; 23 nxt_msec_t idle_timeout; 24 uint32_t requests; 25 nxt_conf_value_t *limits_value; 26 nxt_conf_value_t *processes_value; 27 } nxt_router_app_conf_t; 28 29 30 typedef struct { 31 nxt_str_t application; 32 } nxt_router_listener_conf_t; 33 34 35 typedef struct nxt_msg_info_s { 36 nxt_buf_t *buf; 37 nxt_port_mmap_tracking_t tracking; 38 nxt_work_handler_t completion_handler; 39 } nxt_msg_info_t; 40 41 42 typedef struct nxt_req_app_link_s nxt_req_app_link_t; 43 44 45 typedef struct { 46 uint32_t stream; 47 nxt_app_t *app; 48 nxt_port_t *app_port; 49 nxt_app_parse_ctx_t *ap; 50 nxt_msg_info_t msg_info; 51 nxt_req_app_link_t *ra; 52 53 nxt_queue_link_t link; /* for nxt_conn_t.requests */ 54 } nxt_req_conn_link_t; 55 56 57 struct nxt_req_app_link_s { 58 uint32_t stream; 59 nxt_atomic_t use_count; 60 nxt_port_t *app_port; 61 nxt_port_t *reply_port; 62 nxt_app_parse_ctx_t *ap; 63 nxt_msg_info_t msg_info; 64 nxt_req_conn_link_t *rc; 65 66 nxt_nsec_t res_time; 67 68 nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ 69 nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */ 70 nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ 71 72 nxt_mp_t *mem_pool; 73 nxt_work_t work; 74 75 int err_code; 76 const char *err_str; 77 }; 78 79 80 typedef struct { 81 nxt_socket_conf_t *socket_conf; 82 nxt_router_temp_conf_t *temp_conf; 83 } nxt_socket_rpc_t; 84 85 86 typedef struct { 87 nxt_app_t *app; 88 nxt_router_temp_conf_t *temp_conf; 89 } nxt_app_rpc_t; 90 91 92 struct nxt_port_select_state_s { 93 nxt_app_t *app; 94 nxt_req_app_link_t *ra; 95 96 nxt_port_t *failed_port; 97 int failed_port_use_delta; 98 99 uint8_t start_process; /* 1 bit */ 100 nxt_req_app_link_t *shared_ra; 101 nxt_port_t *port; 102 }; 103 104 typedef struct nxt_port_select_state_s nxt_port_select_state_t; 105 106 static void nxt_router_greet_controller(nxt_task_t *task, 107 nxt_port_t *controller_port); 108 109 static void nxt_router_port_select(nxt_task_t *task, 110 nxt_port_select_state_t *state); 111 112 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, 113 nxt_port_select_state_t *state); 114 115 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 116 117 nxt_inline void 118 nxt_router_ra_inc_use(nxt_req_app_link_t *ra) 119 { 120 nxt_atomic_fetch_add(&ra->use_count, 1); 121 } 122 123 nxt_inline void 124 nxt_router_ra_dec_use(nxt_req_app_link_t *ra) 125 { 126 #if (NXT_DEBUG) 127 int c; 128 129 c = nxt_atomic_fetch_add(&ra->use_count, -1); 130 131 nxt_assert(c > 1); 132 #else 133 (void) nxt_atomic_fetch_add(&ra->use_count, -1); 134 #endif 135 } 136 137 static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i); 138 139 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 140 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 141 static void nxt_router_conf_ready(nxt_task_t *task, 142 nxt_router_temp_conf_t *tmcf); 143 static void nxt_router_conf_error(nxt_task_t *task, 144 nxt_router_temp_conf_t *tmcf); 145 static void nxt_router_conf_send(nxt_task_t *task, 146 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 147 148 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 149 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 150 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 151 static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, 152 nxt_str_t *name); 153 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 154 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 155 static void nxt_router_listen_socket_ready(nxt_task_t *task, 156 nxt_port_recv_msg_t *msg, void *data); 157 static void nxt_router_listen_socket_error(nxt_task_t *task, 158 nxt_port_recv_msg_t *msg, void *data); 159 static void nxt_router_app_rpc_create(nxt_task_t *task, 160 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 161 static void nxt_router_app_prefork_ready(nxt_task_t *task, 162 nxt_port_recv_msg_t *msg, void *data); 163 static void nxt_router_app_prefork_error(nxt_task_t *task, 164 nxt_port_recv_msg_t *msg, void *data); 165 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 166 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 167 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 168 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 169 170 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 171 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 172 const nxt_event_interface_t *interface); 173 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 174 nxt_router_engine_conf_t *recf); 175 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 176 nxt_router_engine_conf_t *recf); 177 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 178 nxt_router_engine_conf_t *recf); 179 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 180 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 181 nxt_work_handler_t handler); 182 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 183 nxt_router_engine_conf_t *recf); 184 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 185 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 186 187 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 188 nxt_router_temp_conf_t *tmcf); 189 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 190 nxt_event_engine_t *engine); 191 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 192 nxt_router_temp_conf_t *tmcf); 193 194 static void nxt_router_engines_post(nxt_router_t *router, 195 nxt_router_temp_conf_t *tmcf); 196 static void nxt_router_engine_post(nxt_event_engine_t *engine, 197 nxt_work_t *jobs); 198 199 static void nxt_router_thread_start(void *data); 200 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 201 void *data); 202 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 203 void *data); 204 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 205 void *data); 206 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 207 void *data); 208 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 209 void *data); 210 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 211 void *data); 212 static void nxt_router_listen_socket_release(nxt_task_t *task, 213 nxt_socket_conf_t *skcf); 214 215 static void nxt_router_access_log_writer(nxt_task_t *task, 216 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 217 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 218 struct tm *tm, size_t size, const char *format); 219 static void nxt_router_access_log_open(nxt_task_t *task, 220 nxt_router_temp_conf_t *tmcf); 221 static void nxt_router_access_log_ready(nxt_task_t *task, 222 nxt_port_recv_msg_t *msg, void *data); 223 static void nxt_router_access_log_error(nxt_task_t *task, 224 nxt_port_recv_msg_t *msg, void *data); 225 static void nxt_router_access_log_release(nxt_task_t *task, 226 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 227 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 228 void *data); 229 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 230 nxt_port_recv_msg_t *msg, void *data); 231 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 232 nxt_port_recv_msg_t *msg, void *data); 233 234 static void nxt_router_app_port_ready(nxt_task_t *task, 235 nxt_port_recv_msg_t *msg, void *data); 236 static void nxt_router_app_port_error(nxt_task_t *task, 237 nxt_port_recv_msg_t *msg, void *data); 238 239 static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app); 240 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 241 uint32_t request_failed, uint32_t got_response); 242 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, 243 nxt_req_app_link_t *ra); 244 245 static void nxt_router_app_prepare_request(nxt_task_t *task, 246 nxt_req_app_link_t *ra); 247 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 248 nxt_port_t *port, const nxt_str_t *prefix); 249 250 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 251 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 252 void *data); 253 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 254 void *data); 255 static void nxt_router_app_release_handler(nxt_task_t *task, void *obj, 256 void *data); 257 258 static const nxt_http_request_state_t nxt_http_request_send_state; 259 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 260 261 static nxt_router_t *nxt_router; 262 263 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 264 static const nxt_str_t empty_prefix = nxt_string(""); 265 266 static const nxt_str_t *nxt_app_msg_prefix[] = { 267 &http_prefix, 268 &http_prefix, 269 &empty_prefix, 270 &http_prefix, 271 &http_prefix, 272 }; 273 274 275 nxt_port_handlers_t nxt_router_process_port_handlers = { 276 .quit = nxt_worker_process_quit_handler, 277 .new_port = nxt_router_new_port_handler, 278 .change_file = nxt_port_change_log_file_handler, 279 .mmap = nxt_port_mmap_handler, 280 .data = nxt_router_conf_data_handler, 281 .remove_pid = nxt_router_remove_pid_handler, 282 .access_log = nxt_router_access_log_reopen_handler, 283 .rpc_ready = nxt_port_rpc_handler, 284 .rpc_error = nxt_port_rpc_handler, 285 }; 286 287 288 nxt_int_t 289 nxt_router_start(nxt_task_t *task, void *data) 290 { 291 nxt_int_t ret; 292 nxt_port_t *controller_port; 293 nxt_router_t *router; 294 nxt_runtime_t *rt; 295 296 rt = task->thread->runtime; 297 298 ret = nxt_http_init(task, rt); 299 if (nxt_slow_path(ret != NXT_OK)) { 300 return ret; 301 } 302 303 router = nxt_zalloc(sizeof(nxt_router_t)); 304 if (nxt_slow_path(router == NULL)) { 305 return NXT_ERROR; 306 } 307 308 nxt_queue_init(&router->engines); 309 nxt_queue_init(&router->sockets); 310 nxt_queue_init(&router->apps); 311 312 nxt_router = router; 313 314 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 315 if (controller_port != NULL) { 316 nxt_router_greet_controller(task, controller_port); 317 } 318 319 return NXT_OK; 320 } 321 322 323 static void 324 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 325 { 326 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 327 -1, 0, 0, NULL); 328 } 329 330 331 static void 332 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 333 void *data) 334 { 335 size_t size; 336 uint32_t stream; 337 nxt_mp_t *mp; 338 nxt_int_t ret; 339 nxt_app_t *app; 340 nxt_buf_t *b; 341 nxt_port_t *main_port; 342 nxt_runtime_t *rt; 343 344 app = data; 345 346 rt = task->thread->runtime; 347 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 348 349 nxt_debug(task, "app '%V' %p start process", &app->name, app); 350 351 size = app->name.length + 1 + app->conf.length; 352 353 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 354 355 if (nxt_slow_path(b == NULL)) { 356 goto failed; 357 } 358 359 nxt_buf_cpystr(b, &app->name); 360 *b->mem.free++ = '\0'; 361 nxt_buf_cpystr(b, &app->conf); 362 363 stream = nxt_port_rpc_register_handler(task, port, 364 nxt_router_app_port_ready, 365 nxt_router_app_port_error, 366 -1, app); 367 368 if (nxt_slow_path(stream == 0)) { 369 goto failed; 370 } 371 372 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 373 stream, port->id, b); 374 375 if (nxt_slow_path(ret != NXT_OK)) { 376 nxt_port_rpc_cancel(task, port, stream); 377 goto failed; 378 } 379 380 return; 381 382 failed: 383 384 if (b != NULL) { 385 mp = b->data; 386 nxt_mp_free(mp, b); 387 nxt_mp_release(mp); 388 } 389 390 nxt_thread_mutex_lock(&app->mutex); 391 392 app->pending_processes--; 393 394 nxt_thread_mutex_unlock(&app->mutex); 395 396 nxt_router_app_use(task, app, -1); 397 } 398 399 400 static nxt_int_t 401 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 402 { 403 nxt_int_t res; 404 nxt_port_t *router_port; 405 nxt_runtime_t *rt; 406 407 rt = task->thread->runtime; 408 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 409 410 nxt_router_app_use(task, app, 1); 411 412 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 413 app); 414 415 if (res == NXT_OK) { 416 return res; 417 } 418 419 nxt_thread_mutex_lock(&app->mutex); 420 421 app->pending_processes--; 422 423 nxt_thread_mutex_unlock(&app->mutex); 424 425 nxt_router_app_use(task, app, -1); 426 427 return NXT_ERROR; 428 } 429 430 431 nxt_inline void 432 nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra, 433 nxt_req_conn_link_t *rc) 434 { 435 nxt_event_engine_t *engine; 436 437 engine = task->thread->engine; 438 439 nxt_memzero(ra, sizeof(nxt_req_app_link_t)); 440 441 ra->stream = rc->stream; 442 ra->use_count = 1; 443 ra->rc = rc; 444 rc->ra = ra; 445 ra->reply_port = engine->port; 446 ra->ap = rc->ap; 447 448 ra->work.handler = NULL; 449 ra->work.task = &engine->task; 450 ra->work.obj = ra; 451 ra->work.data = engine; 452 } 453 454 455 nxt_inline nxt_req_app_link_t * 456 nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) 457 { 458 nxt_mp_t *mp; 459 nxt_req_app_link_t *ra; 460 461 if (ra_src->mem_pool != NULL) { 462 return ra_src; 463 } 464 465 mp = ra_src->ap->mem_pool; 466 467 ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t)); 468 469 if (nxt_slow_path(ra == NULL)) { 470 471 ra_src->rc->ra = NULL; 472 ra_src->rc = NULL; 473 474 return NULL; 475 } 476 477 nxt_mp_retain(mp); 478 479 nxt_router_ra_init(task, ra, ra_src->rc); 480 481 ra->mem_pool = mp; 482 483 return ra; 484 } 485 486 487 nxt_inline nxt_bool_t 488 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, 489 uint32_t stream) 490 { 491 nxt_buf_t *b, *next; 492 nxt_bool_t cancelled; 493 494 if (msg_info->buf == NULL) { 495 return 0; 496 } 497 498 cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, 499 stream); 500 501 if (cancelled) { 502 nxt_debug(task, "stream #%uD: cancelled by router", stream); 503 } 504 505 for (b = msg_info->buf; b != NULL; b = next) { 506 next = b->next; 507 508 b->completion_handler = msg_info->completion_handler; 509 510 if (b->is_port_mmap_sent) { 511 b->is_port_mmap_sent = cancelled == 0; 512 b->completion_handler(task, b, b->parent); 513 } 514 } 515 516 msg_info->buf = NULL; 517 518 return cancelled; 519 } 520 521 522 static void 523 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra); 524 525 526 static void 527 nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data) 528 { 529 nxt_req_app_link_t *ra; 530 531 ra = obj; 532 533 nxt_router_ra_update_peer(task, ra); 534 535 nxt_router_ra_use(task, ra, -1); 536 } 537 538 539 static void 540 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra) 541 { 542 nxt_event_engine_t *engine; 543 nxt_req_conn_link_t *rc; 544 545 engine = ra->work.data; 546 547 if (task->thread->engine != engine) { 548 nxt_router_ra_inc_use(ra); 549 550 ra->work.handler = nxt_router_ra_update_peer_handler; 551 ra->work.task = &engine->task; 552 ra->work.next = NULL; 553 554 nxt_debug(task, "ra stream #%uD post update peer to %p", 555 ra->stream, engine); 556 557 nxt_event_engine_post(engine, &ra->work); 558 559 return; 560 } 561 562 nxt_debug(task, "ra stream #%uD update peer", ra->stream); 563 564 rc = ra->rc; 565 566 if (rc != NULL && ra->app_port != NULL) { 567 nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid); 568 } 569 570 nxt_router_ra_use(task, ra, -1); 571 } 572 573 574 static void 575 nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) 576 { 577 nxt_mp_t *mp; 578 nxt_req_conn_link_t *rc; 579 580 nxt_assert(task->thread->engine == ra->work.data); 581 nxt_assert(ra->use_count == 0); 582 583 nxt_debug(task, "ra stream #%uD release", ra->stream); 584 585 rc = ra->rc; 586 587 if (rc != NULL) { 588 if (nxt_slow_path(ra->err_code != 0)) { 589 nxt_http_request_error(task, rc->ap->request, ra->err_code); 590 591 } else { 592 rc->app_port = ra->app_port; 593 rc->msg_info = ra->msg_info; 594 595 if (rc->app->timeout != 0) { 596 rc->ap->timer.handler = nxt_router_app_timeout; 597 rc->ap->timer_data = rc; 598 nxt_timer_add(task->thread->engine, &rc->ap->timer, 599 rc->app->timeout); 600 } 601 602 ra->app_port = NULL; 603 ra->msg_info.buf = NULL; 604 } 605 606 rc->ra = NULL; 607 ra->rc = NULL; 608 } 609 610 if (ra->app_port != NULL) { 611 nxt_router_app_port_release(task, ra->app_port, 0, 1); 612 613 ra->app_port = NULL; 614 } 615 616 nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); 617 618 mp = ra->mem_pool; 619 620 if (mp != NULL) { 621 nxt_mp_free(mp, ra); 622 nxt_mp_release(mp); 623 } 624 } 625 626 627 static void 628 nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data) 629 { 630 nxt_req_app_link_t *ra; 631 632 ra = obj; 633 634 nxt_assert(ra->work.data == data); 635 636 nxt_atomic_fetch_add(&ra->use_count, -1); 637 638 nxt_router_ra_release(task, ra); 639 } 640 641 642 static void 643 nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i) 644 { 645 int c; 646 nxt_event_engine_t *engine; 647 648 c = nxt_atomic_fetch_add(&ra->use_count, i); 649 650 if (i < 0 && c == -i) { 651 engine = ra->work.data; 652 653 if (task->thread->engine == engine) { 654 nxt_router_ra_release(task, ra); 655 656 return; 657 } 658 659 nxt_router_ra_inc_use(ra); 660 661 ra->work.handler = nxt_router_ra_release_handler; 662 ra->work.task = &engine->task; 663 ra->work.next = NULL; 664 665 nxt_debug(task, "ra stream #%uD post release to %p", 666 ra->stream, engine); 667 668 nxt_event_engine_post(engine, &ra->work); 669 } 670 } 671 672 673 nxt_inline void 674 nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str) 675 { 676 ra->app_port = NULL; 677 ra->err_code = code; 678 ra->err_str = str; 679 } 680 681 682 nxt_inline void 683 nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) 684 { 685 nxt_queue_insert_tail(&ra->app_port->pending_requests, 686 &ra->link_port_pending); 687 nxt_queue_insert_tail(&app->pending, &ra->link_app_pending); 688 689 nxt_router_ra_inc_use(ra); 690 691 ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout; 692 693 nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream); 694 } 695 696 697 nxt_inline nxt_bool_t 698 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 699 { 700 if (lnk->next != NULL) { 701 nxt_queue_remove(lnk); 702 703 lnk->next = NULL; 704 705 return 1; 706 } 707 708 return 0; 709 } 710 711 712 nxt_inline void 713 nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) 714 { 715 int ra_use_delta; 716 nxt_req_app_link_t *ra; 717 718 if (rc->app_port != NULL) { 719 nxt_router_app_port_release(task, rc->app_port, 0, 1); 720 721 rc->app_port = NULL; 722 } 723 724 nxt_router_msg_cancel(task, &rc->msg_info, rc->stream); 725 726 ra = rc->ra; 727 728 if (ra != NULL) { 729 rc->ra = NULL; 730 ra->rc = NULL; 731 732 ra_use_delta = 0; 733 734 nxt_thread_mutex_lock(&rc->app->mutex); 735 736 if (ra->link_app_requests.next == NULL 737 && ra->link_port_pending.next == NULL 738 && ra->link_app_pending.next == NULL) 739 { 740 ra = NULL; 741 742 } else { 743 ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests); 744 ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending); 745 nxt_queue_chk_remove(&ra->link_app_pending); 746 } 747 748 nxt_thread_mutex_unlock(&rc->app->mutex); 749 750 if (ra != NULL) { 751 nxt_router_ra_use(task, ra, ra_use_delta); 752 } 753 } 754 755 if (rc->app != NULL) { 756 nxt_router_app_use(task, rc->app, -1); 757 758 rc->app = NULL; 759 } 760 761 if (rc->ap != NULL) { 762 rc->ap->timer_data = NULL; 763 764 nxt_app_http_req_done(task, rc->ap); 765 766 rc->ap = NULL; 767 } 768 } 769 770 771 void 772 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 773 { 774 nxt_port_new_port_handler(task, msg); 775 776 if (msg->u.new_port != NULL 777 && msg->u.new_port->type == NXT_PROCESS_CONTROLLER) 778 { 779 nxt_router_greet_controller(task, msg->u.new_port); 780 } 781 782 if (msg->port_msg.stream == 0) { 783 return; 784 } 785 786 if (msg->u.new_port == NULL 787 || msg->u.new_port->type != NXT_PROCESS_WORKER) 788 { 789 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 790 } 791 792 nxt_port_rpc_handler(task, msg); 793 } 794 795 796 void 797 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 798 { 799 nxt_int_t ret; 800 nxt_buf_t *b; 801 nxt_router_temp_conf_t *tmcf; 802 803 tmcf = nxt_router_temp_conf(task); 804 if (nxt_slow_path(tmcf == NULL)) { 805 return; 806 } 807 808 nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s", 809 nxt_buf_used_size(msg->buf), 810 (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos); 811 812 tmcf->router_conf->router = nxt_router; 813 tmcf->stream = msg->port_msg.stream; 814 tmcf->port = nxt_runtime_port_find(task->thread->runtime, 815 msg->port_msg.pid, 816 msg->port_msg.reply_port); 817 818 b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool, 819 msg->buf, msg->size); 820 if (nxt_slow_path(b == NULL)) { 821 nxt_router_conf_error(task, tmcf); 822 823 return; 824 } 825 826 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); 827 828 if (nxt_fast_path(ret == NXT_OK)) { 829 nxt_router_conf_apply(task, tmcf, NULL); 830 831 } else { 832 nxt_router_conf_error(task, tmcf); 833 } 834 } 835 836 837 static void 838 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 839 void *data) 840 { 841 union { 842 nxt_pid_t removed_pid; 843 void *data; 844 } u; 845 846 u.data = data; 847 848 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 849 } 850 851 852 void 853 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 854 { 855 nxt_event_engine_t *engine; 856 857 nxt_port_remove_pid_handler(task, msg); 858 859 if (msg->port_msg.stream == 0) { 860 return; 861 } 862 863 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 864 { 865 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 866 msg->u.data); 867 } 868 nxt_queue_loop; 869 870 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 871 872 nxt_port_rpc_handler(task, msg); 873 } 874 875 876 static nxt_router_temp_conf_t * 877 nxt_router_temp_conf(nxt_task_t *task) 878 { 879 nxt_mp_t *mp, *tmp; 880 nxt_router_conf_t *rtcf; 881 nxt_router_temp_conf_t *tmcf; 882 883 mp = nxt_mp_create(1024, 128, 256, 32); 884 if (nxt_slow_path(mp == NULL)) { 885 return NULL; 886 } 887 888 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 889 if (nxt_slow_path(rtcf == NULL)) { 890 goto fail; 891 } 892 893 rtcf->mem_pool = mp; 894 895 tmp = nxt_mp_create(1024, 128, 256, 32); 896 if (nxt_slow_path(tmp == NULL)) { 897 goto fail; 898 } 899 900 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 901 if (nxt_slow_path(tmcf == NULL)) { 902 goto temp_fail; 903 } 904 905 tmcf->mem_pool = tmp; 906 tmcf->router_conf = rtcf; 907 tmcf->count = 1; 908 tmcf->engine = task->thread->engine; 909 910 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 911 sizeof(nxt_router_engine_conf_t)); 912 if (nxt_slow_path(tmcf->engines == NULL)) { 913 goto temp_fail; 914 } 915 916 nxt_queue_init(&tmcf->deleting); 917 nxt_queue_init(&tmcf->keeping); 918 nxt_queue_init(&tmcf->updating); 919 nxt_queue_init(&tmcf->pending); 920 nxt_queue_init(&tmcf->creating); 921 922 nxt_queue_init(&tmcf->apps); 923 nxt_queue_init(&tmcf->previous); 924 925 return tmcf; 926 927 temp_fail: 928 929 nxt_mp_destroy(tmp); 930 931 fail: 932 933 nxt_mp_destroy(mp); 934 935 return NULL; 936 } 937 938 939 nxt_inline nxt_bool_t 940 nxt_router_app_can_start(nxt_app_t *app) 941 { 942 return app->processes + app->pending_processes < app->max_processes 943 && app->pending_processes < app->max_pending_processes; 944 } 945 946 947 nxt_inline nxt_bool_t 948 nxt_router_app_need_start(nxt_app_t *app) 949 { 950 return app->idle_processes + app->pending_processes 951 < app->spare_processes; 952 } 953 954 955 static void 956 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 957 { 958 nxt_int_t ret; 959 nxt_app_t *app; 960 nxt_router_t *router; 961 nxt_runtime_t *rt; 962 nxt_queue_link_t *qlk; 963 nxt_socket_conf_t *skcf; 964 nxt_router_conf_t *rtcf; 965 nxt_router_temp_conf_t *tmcf; 966 const nxt_event_interface_t *interface; 967 968 tmcf = obj; 969 970 qlk = nxt_queue_first(&tmcf->pending); 971 972 if (qlk != nxt_queue_tail(&tmcf->pending)) { 973 nxt_queue_remove(qlk); 974 nxt_queue_insert_tail(&tmcf->creating, qlk); 975 976 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 977 978 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 979 980 return; 981 } 982 983 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 984 985 if (nxt_router_app_need_start(app)) { 986 nxt_router_app_rpc_create(task, tmcf, app); 987 return; 988 } 989 990 } nxt_queue_loop; 991 992 rtcf = tmcf->router_conf; 993 994 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 995 nxt_router_access_log_open(task, tmcf); 996 return; 997 } 998 999 rt = task->thread->runtime; 1000 1001 interface = nxt_service_get(rt->services, "engine", NULL); 1002 1003 router = rtcf->router; 1004 1005 ret = nxt_router_engines_create(task, router, tmcf, interface); 1006 if (nxt_slow_path(ret != NXT_OK)) { 1007 goto fail; 1008 } 1009 1010 ret = nxt_router_threads_create(task, rt, tmcf); 1011 if (nxt_slow_path(ret != NXT_OK)) { 1012 goto fail; 1013 } 1014 1015 nxt_router_apps_sort(task, router, tmcf); 1016 1017 nxt_router_engines_post(router, tmcf); 1018 1019 nxt_queue_add(&router->sockets, &tmcf->updating); 1020 nxt_queue_add(&router->sockets, &tmcf->creating); 1021 1022 router->access_log = rtcf->access_log; 1023 1024 nxt_router_conf_ready(task, tmcf); 1025 1026 return; 1027 1028 fail: 1029 1030 nxt_router_conf_error(task, tmcf); 1031 1032 return; 1033 } 1034 1035 1036 static void 1037 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1038 { 1039 nxt_joint_job_t *job; 1040 1041 job = obj; 1042 1043 nxt_router_conf_ready(task, job->tmcf); 1044 } 1045 1046 1047 static void 1048 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1049 { 1050 nxt_debug(task, "temp conf count:%D", tmcf->count); 1051 1052 if (--tmcf->count == 0) { 1053 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1054 } 1055 } 1056 1057 1058 static void 1059 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1060 { 1061 nxt_app_t *app; 1062 nxt_queue_t new_socket_confs; 1063 nxt_socket_t s; 1064 nxt_router_t *router; 1065 nxt_queue_link_t *qlk; 1066 nxt_socket_conf_t *skcf; 1067 nxt_router_conf_t *rtcf; 1068 1069 nxt_alert(task, "failed to apply new conf"); 1070 1071 for (qlk = nxt_queue_first(&tmcf->creating); 1072 qlk != nxt_queue_tail(&tmcf->creating); 1073 qlk = nxt_queue_next(qlk)) 1074 { 1075 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1076 s = skcf->listen->socket; 1077 1078 if (s != -1) { 1079 nxt_socket_close(task, s); 1080 } 1081 1082 nxt_free(skcf->listen); 1083 } 1084 1085 nxt_queue_init(&new_socket_confs); 1086 nxt_queue_add(&new_socket_confs, &tmcf->updating); 1087 nxt_queue_add(&new_socket_confs, &tmcf->pending); 1088 nxt_queue_add(&new_socket_confs, &tmcf->creating); 1089 1090 nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) { 1091 1092 if (skcf->application != NULL) { 1093 nxt_router_app_use(task, skcf->application, -1); 1094 skcf->application = NULL; 1095 } 1096 1097 } nxt_queue_loop; 1098 1099 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1100 1101 nxt_router_app_quit(task, app); 1102 1103 } nxt_queue_loop; 1104 1105 rtcf = tmcf->router_conf; 1106 router = rtcf->router; 1107 1108 nxt_queue_add(&router->sockets, &tmcf->keeping); 1109 nxt_queue_add(&router->sockets, &tmcf->deleting); 1110 1111 nxt_queue_add(&router->apps, &tmcf->previous); 1112 1113 // TODO: new engines and threads 1114 1115 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1116 1117 nxt_mp_destroy(rtcf->mem_pool); 1118 1119 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1120 } 1121 1122 1123 static void 1124 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1125 nxt_port_msg_type_t type) 1126 { 1127 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1128 } 1129 1130 1131 static nxt_conf_map_t nxt_router_conf[] = { 1132 { 1133 nxt_string("listeners_threads"), 1134 NXT_CONF_MAP_INT32, 1135 offsetof(nxt_router_conf_t, threads), 1136 }, 1137 }; 1138 1139 1140 static nxt_conf_map_t nxt_router_app_conf[] = { 1141 { 1142 nxt_string("type"), 1143 NXT_CONF_MAP_STR, 1144 offsetof(nxt_router_app_conf_t, type), 1145 }, 1146 1147 { 1148 nxt_string("limits"), 1149 NXT_CONF_MAP_PTR, 1150 offsetof(nxt_router_app_conf_t, limits_value), 1151 }, 1152 1153 { 1154 nxt_string("processes"), 1155 NXT_CONF_MAP_INT32, 1156 offsetof(nxt_router_app_conf_t, processes), 1157 }, 1158 1159 { 1160 nxt_string("processes"), 1161 NXT_CONF_MAP_PTR, 1162 offsetof(nxt_router_app_conf_t, processes_value), 1163 }, 1164 }; 1165 1166 1167 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1168 { 1169 nxt_string("timeout"), 1170 NXT_CONF_MAP_MSEC, 1171 offsetof(nxt_router_app_conf_t, timeout), 1172 }, 1173 1174 { 1175 nxt_string("reschedule_timeout"), 1176 NXT_CONF_MAP_MSEC, 1177 offsetof(nxt_router_app_conf_t, res_timeout), 1178 }, 1179 1180 { 1181 nxt_string("requests"), 1182 NXT_CONF_MAP_INT32, 1183 offsetof(nxt_router_app_conf_t, requests), 1184 }, 1185 }; 1186 1187 1188 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1189 { 1190 nxt_string("spare"), 1191 NXT_CONF_MAP_INT32, 1192 offsetof(nxt_router_app_conf_t, spare_processes), 1193 }, 1194 1195 { 1196 nxt_string("max"), 1197 NXT_CONF_MAP_INT32, 1198 offsetof(nxt_router_app_conf_t, max_processes), 1199 }, 1200 1201 { 1202 nxt_string("idle_timeout"), 1203 NXT_CONF_MAP_MSEC, 1204 offsetof(nxt_router_app_conf_t, idle_timeout), 1205 }, 1206 }; 1207 1208 1209 static nxt_conf_map_t nxt_router_listener_conf[] = { 1210 { 1211 nxt_string("application"), 1212 NXT_CONF_MAP_STR, 1213 offsetof(nxt_router_listener_conf_t, application), 1214 }, 1215 }; 1216 1217 1218 static nxt_conf_map_t nxt_router_http_conf[] = { 1219 { 1220 nxt_string("header_buffer_size"), 1221 NXT_CONF_MAP_SIZE, 1222 offsetof(nxt_socket_conf_t, header_buffer_size), 1223 }, 1224 1225 { 1226 nxt_string("large_header_buffer_size"), 1227 NXT_CONF_MAP_SIZE, 1228 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1229 }, 1230 1231 { 1232 nxt_string("large_header_buffers"), 1233 NXT_CONF_MAP_SIZE, 1234 offsetof(nxt_socket_conf_t, large_header_buffers), 1235 }, 1236 1237 { 1238 nxt_string("body_buffer_size"), 1239 NXT_CONF_MAP_SIZE, 1240 offsetof(nxt_socket_conf_t, body_buffer_size), 1241 }, 1242 1243 { 1244 nxt_string("max_body_size"), 1245 NXT_CONF_MAP_SIZE, 1246 offsetof(nxt_socket_conf_t, max_body_size), 1247 }, 1248 1249 { 1250 nxt_string("idle_timeout"), 1251 NXT_CONF_MAP_MSEC, 1252 offsetof(nxt_socket_conf_t, idle_timeout), 1253 }, 1254 1255 { 1256 nxt_string("header_read_timeout"), 1257 NXT_CONF_MAP_MSEC, 1258 offsetof(nxt_socket_conf_t, header_read_timeout), 1259 }, 1260 1261 { 1262 nxt_string("body_read_timeout"), 1263 NXT_CONF_MAP_MSEC, 1264 offsetof(nxt_socket_conf_t, body_read_timeout), 1265 }, 1266 1267 { 1268 nxt_string("send_timeout"), 1269 NXT_CONF_MAP_MSEC, 1270 offsetof(nxt_socket_conf_t, send_timeout), 1271 }, 1272 }; 1273 1274 1275 static nxt_int_t 1276 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1277 u_char *start, u_char *end) 1278 { 1279 u_char *p; 1280 size_t size; 1281 nxt_mp_t *mp; 1282 uint32_t next; 1283 nxt_int_t ret; 1284 nxt_str_t name, path; 1285 nxt_app_t *app, *prev; 1286 nxt_router_t *router; 1287 nxt_conf_value_t *conf, *http, *value; 1288 nxt_conf_value_t *applications, *application; 1289 nxt_conf_value_t *listeners, *listener; 1290 nxt_socket_conf_t *skcf; 1291 nxt_event_engine_t *engine; 1292 nxt_app_lang_module_t *lang; 1293 nxt_router_app_conf_t apcf; 1294 nxt_router_access_log_t *access_log; 1295 nxt_router_listener_conf_t lscf; 1296 1297 static nxt_str_t http_path = nxt_string("/settings/http"); 1298 static nxt_str_t applications_path = nxt_string("/applications"); 1299 static nxt_str_t listeners_path = nxt_string("/listeners"); 1300 static nxt_str_t access_log_path = nxt_string("/access_log"); 1301 1302 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1303 if (conf == NULL) { 1304 nxt_alert(task, "configuration parsing error"); 1305 return NXT_ERROR; 1306 } 1307 1308 mp = tmcf->router_conf->mem_pool; 1309 1310 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1311 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1312 if (ret != NXT_OK) { 1313 nxt_alert(task, "root map error"); 1314 return NXT_ERROR; 1315 } 1316 1317 if (tmcf->router_conf->threads == 0) { 1318 tmcf->router_conf->threads = nxt_ncpu; 1319 } 1320 1321 applications = nxt_conf_get_path(conf, &applications_path); 1322 if (applications == NULL) { 1323 nxt_alert(task, "no \"applications\" block"); 1324 return NXT_ERROR; 1325 } 1326 1327 router = tmcf->router_conf->router; 1328 1329 next = 0; 1330 1331 for ( ;; ) { 1332 application = nxt_conf_next_object_member(applications, &name, &next); 1333 if (application == NULL) { 1334 break; 1335 } 1336 1337 nxt_debug(task, "application \"%V\"", &name); 1338 1339 size = nxt_conf_json_length(application, NULL); 1340 1341 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); 1342 if (app == NULL) { 1343 goto fail; 1344 } 1345 1346 nxt_memzero(app, sizeof(nxt_app_t)); 1347 1348 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1349 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length); 1350 1351 p = nxt_conf_json_print(app->conf.start, application, NULL); 1352 app->conf.length = p - app->conf.start; 1353 1354 nxt_assert(app->conf.length <= size); 1355 1356 nxt_debug(task, "application conf \"%V\"", &app->conf); 1357 1358 prev = nxt_router_app_find(&router->apps, &name); 1359 1360 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1361 nxt_free(app); 1362 1363 nxt_queue_remove(&prev->link); 1364 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1365 continue; 1366 } 1367 1368 apcf.processes = 1; 1369 apcf.max_processes = 1; 1370 apcf.spare_processes = 0; 1371 apcf.timeout = 0; 1372 apcf.res_timeout = 1000; 1373 apcf.idle_timeout = 15000; 1374 apcf.requests = 0; 1375 apcf.limits_value = NULL; 1376 apcf.processes_value = NULL; 1377 1378 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1379 nxt_nitems(nxt_router_app_conf), &apcf); 1380 if (ret != NXT_OK) { 1381 nxt_alert(task, "application map error"); 1382 goto app_fail; 1383 } 1384 1385 if (apcf.limits_value != NULL) { 1386 1387 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1388 nxt_alert(task, "application limits is not object"); 1389 goto app_fail; 1390 } 1391 1392 ret = nxt_conf_map_object(mp, apcf.limits_value, 1393 nxt_router_app_limits_conf, 1394 nxt_nitems(nxt_router_app_limits_conf), 1395 &apcf); 1396 if (ret != NXT_OK) { 1397 nxt_alert(task, "application limits map error"); 1398 goto app_fail; 1399 } 1400 } 1401 1402 if (apcf.processes_value != NULL 1403 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1404 { 1405 ret = nxt_conf_map_object(mp, apcf.processes_value, 1406 nxt_router_app_processes_conf, 1407 nxt_nitems(nxt_router_app_processes_conf), 1408 &apcf); 1409 if (ret != NXT_OK) { 1410 nxt_alert(task, "application processes map error"); 1411 goto app_fail; 1412 } 1413 1414 } else { 1415 apcf.max_processes = apcf.processes; 1416 apcf.spare_processes = apcf.processes; 1417 } 1418 1419 nxt_debug(task, "application type: %V", &apcf.type); 1420 nxt_debug(task, "application processes: %D", apcf.processes); 1421 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1422 nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout); 1423 nxt_debug(task, "application requests: %D", apcf.requests); 1424 1425 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1426 1427 if (lang == NULL) { 1428 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1429 goto app_fail; 1430 } 1431 1432 nxt_debug(task, "application language module: \"%s\"", lang->file); 1433 1434 ret = nxt_thread_mutex_create(&app->mutex); 1435 if (ret != NXT_OK) { 1436 goto app_fail; 1437 } 1438 1439 nxt_queue_init(&app->ports); 1440 nxt_queue_init(&app->spare_ports); 1441 nxt_queue_init(&app->idle_ports); 1442 nxt_queue_init(&app->requests); 1443 nxt_queue_init(&app->pending); 1444 1445 app->name.length = name.length; 1446 nxt_memcpy(app->name.start, name.start, name.length); 1447 1448 app->type = lang->type; 1449 app->max_processes = apcf.max_processes; 1450 app->spare_processes = apcf.spare_processes; 1451 app->max_pending_processes = apcf.spare_processes 1452 ? apcf.spare_processes : 1; 1453 app->timeout = apcf.timeout; 1454 app->res_timeout = apcf.res_timeout * 1000000; 1455 app->idle_timeout = apcf.idle_timeout; 1456 app->live = 1; 1457 app->max_pending_responses = 2; 1458 app->max_requests = apcf.requests; 1459 1460 engine = task->thread->engine; 1461 1462 app->engine = engine; 1463 1464 app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION; 1465 app->idle_timer.work_queue = &engine->fast_work_queue; 1466 app->idle_timer.handler = nxt_router_app_idle_timeout; 1467 app->idle_timer.task = &engine->task; 1468 app->idle_timer.log = app->idle_timer.task->log; 1469 1470 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1471 app->adjust_idle_work.task = &engine->task; 1472 app->adjust_idle_work.obj = app; 1473 1474 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1475 1476 nxt_router_app_use(task, app, 1); 1477 } 1478 1479 http = nxt_conf_get_path(conf, &http_path); 1480 #if 0 1481 if (http == NULL) { 1482 nxt_alert(task, "no \"http\" block"); 1483 return NXT_ERROR; 1484 } 1485 #endif 1486 1487 listeners = nxt_conf_get_path(conf, &listeners_path); 1488 if (listeners == NULL) { 1489 nxt_alert(task, "no \"listeners\" block"); 1490 return NXT_ERROR; 1491 } 1492 1493 next = 0; 1494 1495 for ( ;; ) { 1496 listener = nxt_conf_next_object_member(listeners, &name, &next); 1497 if (listener == NULL) { 1498 break; 1499 } 1500 1501 skcf = nxt_router_socket_conf(task, tmcf, &name); 1502 if (skcf == NULL) { 1503 goto fail; 1504 } 1505 1506 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1507 nxt_nitems(nxt_router_listener_conf), &lscf); 1508 if (ret != NXT_OK) { 1509 nxt_alert(task, "listener map error"); 1510 goto fail; 1511 } 1512 1513 nxt_debug(task, "application: %V", &lscf.application); 1514 1515 // STUB, default values if http block is not defined. 1516 skcf->header_buffer_size = 2048; 1517 skcf->large_header_buffer_size = 8192; 1518 skcf->large_header_buffers = 4; 1519 skcf->body_buffer_size = 16 * 1024; 1520 skcf->max_body_size = 8 * 1024 * 1024; 1521 skcf->idle_timeout = 180 * 1000; 1522 skcf->header_read_timeout = 30 * 1000; 1523 skcf->body_read_timeout = 30 * 1000; 1524 skcf->send_timeout = 30 * 1000; 1525 1526 if (http != NULL) { 1527 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1528 nxt_nitems(nxt_router_http_conf), skcf); 1529 if (ret != NXT_OK) { 1530 nxt_alert(task, "http map error"); 1531 goto fail; 1532 } 1533 } 1534 1535 skcf->listen->handler = nxt_http_conn_init; 1536 skcf->router_conf = tmcf->router_conf; 1537 skcf->router_conf->count++; 1538 skcf->application = nxt_router_listener_application(tmcf, 1539 &lscf.application); 1540 nxt_router_app_use(task, skcf->application, 1); 1541 } 1542 1543 value = nxt_conf_get_path(conf, &access_log_path); 1544 1545 if (value != NULL) { 1546 nxt_conf_get_string(value, &path); 1547 1548 access_log = router->access_log; 1549 1550 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1551 nxt_thread_spin_lock(&router->lock); 1552 access_log->count++; 1553 nxt_thread_spin_unlock(&router->lock); 1554 1555 } else { 1556 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1557 + path.length); 1558 if (access_log == NULL) { 1559 nxt_alert(task, "failed to allocate access log structure"); 1560 goto fail; 1561 } 1562 1563 access_log->fd = -1; 1564 access_log->handler = &nxt_router_access_log_writer; 1565 access_log->count = 1; 1566 1567 access_log->path.length = path.length; 1568 access_log->path.start = (u_char *) access_log 1569 + sizeof(nxt_router_access_log_t); 1570 1571 nxt_memcpy(access_log->path.start, path.start, path.length); 1572 } 1573 1574 tmcf->router_conf->access_log = access_log; 1575 } 1576 1577 nxt_queue_add(&tmcf->deleting, &router->sockets); 1578 nxt_queue_init(&router->sockets); 1579 1580 return NXT_OK; 1581 1582 app_fail: 1583 1584 nxt_free(app); 1585 1586 fail: 1587 1588 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1589 1590 nxt_queue_remove(&app->link); 1591 nxt_thread_mutex_destroy(&app->mutex); 1592 nxt_free(app); 1593 1594 } nxt_queue_loop; 1595 1596 return NXT_ERROR; 1597 } 1598 1599 1600 static nxt_app_t * 1601 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1602 { 1603 nxt_app_t *app; 1604 1605 nxt_queue_each(app, queue, nxt_app_t, link) { 1606 1607 if (nxt_strstr_eq(name, &app->name)) { 1608 return app; 1609 } 1610 1611 } nxt_queue_loop; 1612 1613 return NULL; 1614 } 1615 1616 1617 static nxt_app_t * 1618 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) 1619 { 1620 nxt_app_t *app; 1621 1622 app = nxt_router_app_find(&tmcf->apps, name); 1623 1624 if (app == NULL) { 1625 app = nxt_router_app_find(&tmcf->previous, name); 1626 } 1627 1628 return app; 1629 } 1630 1631 1632 static nxt_socket_conf_t * 1633 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1634 nxt_str_t *name) 1635 { 1636 size_t size; 1637 nxt_int_t ret; 1638 nxt_bool_t wildcard; 1639 nxt_sockaddr_t *sa; 1640 nxt_socket_conf_t *skcf; 1641 nxt_listen_socket_t *ls; 1642 1643 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 1644 if (nxt_slow_path(sa == NULL)) { 1645 nxt_alert(task, "invalid listener \"%V\"", name); 1646 return NULL; 1647 } 1648 1649 sa->type = SOCK_STREAM; 1650 1651 nxt_debug(task, "router listener: \"%*s\"", 1652 (size_t) sa->length, nxt_sockaddr_start(sa)); 1653 1654 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 1655 if (nxt_slow_path(skcf == NULL)) { 1656 return NULL; 1657 } 1658 1659 size = nxt_sockaddr_size(sa); 1660 1661 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 1662 1663 if (ret != NXT_OK) { 1664 1665 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 1666 if (nxt_slow_path(ls == NULL)) { 1667 return NULL; 1668 } 1669 1670 skcf->listen = ls; 1671 1672 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 1673 nxt_memcpy(ls->sockaddr, sa, size); 1674 1675 nxt_listen_socket_remote_size(ls); 1676 1677 ls->socket = -1; 1678 ls->backlog = NXT_LISTEN_BACKLOG; 1679 ls->flags = NXT_NONBLOCK; 1680 ls->read_after_accept = 1; 1681 } 1682 1683 switch (sa->u.sockaddr.sa_family) { 1684 #if (NXT_HAVE_UNIX_DOMAIN) 1685 case AF_UNIX: 1686 wildcard = 0; 1687 break; 1688 #endif 1689 #if (NXT_INET6) 1690 case AF_INET6: 1691 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 1692 break; 1693 #endif 1694 case AF_INET: 1695 default: 1696 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 1697 break; 1698 } 1699 1700 if (!wildcard) { 1701 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 1702 if (nxt_slow_path(skcf->sockaddr == NULL)) { 1703 return NULL; 1704 } 1705 1706 nxt_memcpy(skcf->sockaddr, sa, size); 1707 } 1708 1709 return skcf; 1710 } 1711 1712 1713 static nxt_int_t 1714 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 1715 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 1716 { 1717 nxt_router_t *router; 1718 nxt_queue_link_t *qlk; 1719 nxt_socket_conf_t *skcf; 1720 1721 router = tmcf->router_conf->router; 1722 1723 for (qlk = nxt_queue_first(&router->sockets); 1724 qlk != nxt_queue_tail(&router->sockets); 1725 qlk = nxt_queue_next(qlk)) 1726 { 1727 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1728 1729 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 1730 nskcf->listen = skcf->listen; 1731 1732 nxt_queue_remove(qlk); 1733 nxt_queue_insert_tail(&tmcf->keeping, qlk); 1734 1735 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link); 1736 1737 return NXT_OK; 1738 } 1739 } 1740 1741 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link); 1742 1743 return NXT_DECLINED; 1744 } 1745 1746 1747 static void 1748 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 1749 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 1750 { 1751 size_t size; 1752 uint32_t stream; 1753 nxt_int_t ret; 1754 nxt_buf_t *b; 1755 nxt_port_t *main_port, *router_port; 1756 nxt_runtime_t *rt; 1757 nxt_socket_rpc_t *rpc; 1758 1759 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 1760 if (rpc == NULL) { 1761 goto fail; 1762 } 1763 1764 rpc->socket_conf = skcf; 1765 rpc->temp_conf = tmcf; 1766 1767 size = nxt_sockaddr_size(skcf->listen->sockaddr); 1768 1769 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1770 if (b == NULL) { 1771 goto fail; 1772 } 1773 1774 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 1775 1776 rt = task->thread->runtime; 1777 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1778 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1779 1780 stream = nxt_port_rpc_register_handler(task, router_port, 1781 nxt_router_listen_socket_ready, 1782 nxt_router_listen_socket_error, 1783 main_port->pid, rpc); 1784 if (nxt_slow_path(stream == 0)) { 1785 goto fail; 1786 } 1787 1788 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 1789 stream, router_port->id, b); 1790 1791 if (nxt_slow_path(ret != NXT_OK)) { 1792 nxt_port_rpc_cancel(task, router_port, stream); 1793 goto fail; 1794 } 1795 1796 return; 1797 1798 fail: 1799 1800 nxt_router_conf_error(task, tmcf); 1801 } 1802 1803 1804 static void 1805 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1806 void *data) 1807 { 1808 nxt_int_t ret; 1809 nxt_socket_t s; 1810 nxt_socket_rpc_t *rpc; 1811 1812 rpc = data; 1813 1814 s = msg->fd; 1815 1816 ret = nxt_socket_nonblocking(task, s); 1817 if (nxt_slow_path(ret != NXT_OK)) { 1818 goto fail; 1819 } 1820 1821 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 1822 1823 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 1824 if (nxt_slow_path(ret != NXT_OK)) { 1825 goto fail; 1826 } 1827 1828 rpc->socket_conf->listen->socket = s; 1829 1830 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 1831 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 1832 1833 return; 1834 1835 fail: 1836 1837 nxt_socket_close(task, s); 1838 1839 nxt_router_conf_error(task, rpc->temp_conf); 1840 } 1841 1842 1843 static void 1844 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1845 void *data) 1846 { 1847 u_char *p; 1848 size_t size; 1849 uint8_t error; 1850 nxt_buf_t *in, *out; 1851 nxt_sockaddr_t *sa; 1852 nxt_socket_rpc_t *rpc; 1853 nxt_router_temp_conf_t *tmcf; 1854 1855 static nxt_str_t socket_errors[] = { 1856 nxt_string("ListenerSystem"), 1857 nxt_string("ListenerNoIPv6"), 1858 nxt_string("ListenerPort"), 1859 nxt_string("ListenerInUse"), 1860 nxt_string("ListenerNoAddress"), 1861 nxt_string("ListenerNoAccess"), 1862 nxt_string("ListenerPath"), 1863 }; 1864 1865 rpc = data; 1866 sa = rpc->socket_conf->listen->sockaddr; 1867 tmcf = rpc->temp_conf; 1868 1869 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 1870 1871 if (nxt_slow_path(in == NULL)) { 1872 return; 1873 } 1874 1875 p = in->mem.pos; 1876 1877 error = *p++; 1878 1879 size = nxt_length("listen socket error: ") 1880 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 1881 + sa->length + socket_errors[error].length + (in->mem.free - p); 1882 1883 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1884 if (nxt_slow_path(out == NULL)) { 1885 return; 1886 } 1887 1888 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 1889 "listen socket error: " 1890 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 1891 (size_t) sa->length, nxt_sockaddr_start(sa), 1892 &socket_errors[error], in->mem.free - p, p); 1893 1894 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 1895 1896 nxt_router_conf_error(task, tmcf); 1897 } 1898 1899 1900 static void 1901 nxt_router_app_rpc_create(nxt_task_t *task, 1902 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 1903 { 1904 size_t size; 1905 uint32_t stream; 1906 nxt_int_t ret; 1907 nxt_buf_t *b; 1908 nxt_port_t *main_port, *router_port; 1909 nxt_runtime_t *rt; 1910 nxt_app_rpc_t *rpc; 1911 1912 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 1913 if (rpc == NULL) { 1914 goto fail; 1915 } 1916 1917 rpc->app = app; 1918 rpc->temp_conf = tmcf; 1919 1920 nxt_debug(task, "app '%V' prefork", &app->name); 1921 1922 size = app->name.length + 1 + app->conf.length; 1923 1924 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1925 if (nxt_slow_path(b == NULL)) { 1926 goto fail; 1927 } 1928 1929 nxt_buf_cpystr(b, &app->name); 1930 *b->mem.free++ = '\0'; 1931 nxt_buf_cpystr(b, &app->conf); 1932 1933 rt = task->thread->runtime; 1934 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1935 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1936 1937 stream = nxt_port_rpc_register_handler(task, router_port, 1938 nxt_router_app_prefork_ready, 1939 nxt_router_app_prefork_error, 1940 -1, rpc); 1941 if (nxt_slow_path(stream == 0)) { 1942 goto fail; 1943 } 1944 1945 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 1946 stream, router_port->id, b); 1947 1948 if (nxt_slow_path(ret != NXT_OK)) { 1949 nxt_port_rpc_cancel(task, router_port, stream); 1950 goto fail; 1951 } 1952 1953 app->pending_processes++; 1954 1955 return; 1956 1957 fail: 1958 1959 nxt_router_conf_error(task, tmcf); 1960 } 1961 1962 1963 static void 1964 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1965 void *data) 1966 { 1967 nxt_app_t *app; 1968 nxt_port_t *port; 1969 nxt_app_rpc_t *rpc; 1970 nxt_event_engine_t *engine; 1971 1972 rpc = data; 1973 app = rpc->app; 1974 1975 port = msg->u.new_port; 1976 port->app = app; 1977 1978 nxt_router_app_use(task, app, 1); 1979 1980 app->pending_processes--; 1981 app->processes++; 1982 app->idle_processes++; 1983 1984 engine = task->thread->engine; 1985 1986 nxt_queue_insert_tail(&app->ports, &port->app_link); 1987 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 1988 1989 port->idle_start = 0; 1990 1991 nxt_port_inc_use(port); 1992 1993 nxt_work_queue_add(&engine->fast_work_queue, 1994 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 1995 } 1996 1997 1998 static void 1999 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2000 void *data) 2001 { 2002 nxt_app_t *app; 2003 nxt_app_rpc_t *rpc; 2004 nxt_router_temp_conf_t *tmcf; 2005 2006 rpc = data; 2007 app = rpc->app; 2008 tmcf = rpc->temp_conf; 2009 2010 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2011 &app->name); 2012 2013 app->pending_processes--; 2014 2015 nxt_router_conf_error(task, tmcf); 2016 } 2017 2018 2019 static nxt_int_t 2020 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2021 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2022 { 2023 nxt_int_t ret; 2024 nxt_uint_t n, threads; 2025 nxt_queue_link_t *qlk; 2026 nxt_router_engine_conf_t *recf; 2027 2028 threads = tmcf->router_conf->threads; 2029 2030 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2031 sizeof(nxt_router_engine_conf_t)); 2032 if (nxt_slow_path(tmcf->engines == NULL)) { 2033 return NXT_ERROR; 2034 } 2035 2036 n = 0; 2037 2038 for (qlk = nxt_queue_first(&router->engines); 2039 qlk != nxt_queue_tail(&router->engines); 2040 qlk = nxt_queue_next(qlk)) 2041 { 2042 recf = nxt_array_zero_add(tmcf->engines); 2043 if (nxt_slow_path(recf == NULL)) { 2044 return NXT_ERROR; 2045 } 2046 2047 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2048 2049 if (n < threads) { 2050 recf->action = NXT_ROUTER_ENGINE_KEEP; 2051 ret = nxt_router_engine_conf_update(tmcf, recf); 2052 2053 } else { 2054 recf->action = NXT_ROUTER_ENGINE_DELETE; 2055 ret = nxt_router_engine_conf_delete(tmcf, recf); 2056 } 2057 2058 if (nxt_slow_path(ret != NXT_OK)) { 2059 return ret; 2060 } 2061 2062 n++; 2063 } 2064 2065 tmcf->new_threads = n; 2066 2067 while (n < threads) { 2068 recf = nxt_array_zero_add(tmcf->engines); 2069 if (nxt_slow_path(recf == NULL)) { 2070 return NXT_ERROR; 2071 } 2072 2073 recf->action = NXT_ROUTER_ENGINE_ADD; 2074 2075 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2076 if (nxt_slow_path(recf->engine == NULL)) { 2077 return NXT_ERROR; 2078 } 2079 2080 ret = nxt_router_engine_conf_create(tmcf, recf); 2081 if (nxt_slow_path(ret != NXT_OK)) { 2082 return ret; 2083 } 2084 2085 n++; 2086 } 2087 2088 return NXT_OK; 2089 } 2090 2091 2092 static nxt_int_t 2093 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2094 nxt_router_engine_conf_t *recf) 2095 { 2096 nxt_int_t ret; 2097 2098 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2099 nxt_router_listen_socket_create); 2100 if (nxt_slow_path(ret != NXT_OK)) { 2101 return ret; 2102 } 2103 2104 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2105 nxt_router_listen_socket_create); 2106 if (nxt_slow_path(ret != NXT_OK)) { 2107 return ret; 2108 } 2109 2110 return ret; 2111 } 2112 2113 2114 static nxt_int_t 2115 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2116 nxt_router_engine_conf_t *recf) 2117 { 2118 nxt_int_t ret; 2119 2120 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2121 nxt_router_listen_socket_create); 2122 if (nxt_slow_path(ret != NXT_OK)) { 2123 return ret; 2124 } 2125 2126 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2127 nxt_router_listen_socket_update); 2128 if (nxt_slow_path(ret != NXT_OK)) { 2129 return ret; 2130 } 2131 2132 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2133 if (nxt_slow_path(ret != NXT_OK)) { 2134 return ret; 2135 } 2136 2137 return ret; 2138 } 2139 2140 2141 static nxt_int_t 2142 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2143 nxt_router_engine_conf_t *recf) 2144 { 2145 nxt_int_t ret; 2146 2147 ret = nxt_router_engine_quit(tmcf, recf); 2148 if (nxt_slow_path(ret != NXT_OK)) { 2149 return ret; 2150 } 2151 2152 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); 2153 if (nxt_slow_path(ret != NXT_OK)) { 2154 return ret; 2155 } 2156 2157 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2158 } 2159 2160 2161 static nxt_int_t 2162 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 2163 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 2164 nxt_work_handler_t handler) 2165 { 2166 nxt_joint_job_t *job; 2167 nxt_queue_link_t *qlk; 2168 nxt_socket_conf_t *skcf; 2169 nxt_socket_conf_joint_t *joint; 2170 2171 for (qlk = nxt_queue_first(sockets); 2172 qlk != nxt_queue_tail(sockets); 2173 qlk = nxt_queue_next(qlk)) 2174 { 2175 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2176 if (nxt_slow_path(job == NULL)) { 2177 return NXT_ERROR; 2178 } 2179 2180 job->work.next = recf->jobs; 2181 recf->jobs = &job->work; 2182 2183 job->task = tmcf->engine->task; 2184 job->work.handler = handler; 2185 job->work.task = &job->task; 2186 job->work.obj = job; 2187 job->tmcf = tmcf; 2188 2189 tmcf->count++; 2190 2191 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 2192 sizeof(nxt_socket_conf_joint_t)); 2193 if (nxt_slow_path(joint == NULL)) { 2194 return NXT_ERROR; 2195 } 2196 2197 job->work.data = joint; 2198 2199 joint->count = 1; 2200 2201 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2202 skcf->count++; 2203 joint->socket_conf = skcf; 2204 2205 joint->engine = recf->engine; 2206 } 2207 2208 return NXT_OK; 2209 } 2210 2211 2212 static nxt_int_t 2213 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 2214 nxt_router_engine_conf_t *recf) 2215 { 2216 nxt_joint_job_t *job; 2217 2218 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2219 if (nxt_slow_path(job == NULL)) { 2220 return NXT_ERROR; 2221 } 2222 2223 job->work.next = recf->jobs; 2224 recf->jobs = &job->work; 2225 2226 job->task = tmcf->engine->task; 2227 job->work.handler = nxt_router_worker_thread_quit; 2228 job->work.task = &job->task; 2229 job->work.obj = NULL; 2230 job->work.data = NULL; 2231 job->tmcf = NULL; 2232 2233 return NXT_OK; 2234 } 2235 2236 2237 static nxt_int_t 2238 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 2239 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 2240 { 2241 nxt_joint_job_t *job; 2242 nxt_queue_link_t *qlk; 2243 2244 for (qlk = nxt_queue_first(sockets); 2245 qlk != nxt_queue_tail(sockets); 2246 qlk = nxt_queue_next(qlk)) 2247 { 2248 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2249 if (nxt_slow_path(job == NULL)) { 2250 return NXT_ERROR; 2251 } 2252 2253 job->work.next = recf->jobs; 2254 recf->jobs = &job->work; 2255 2256 job->task = tmcf->engine->task; 2257 job->work.handler = nxt_router_listen_socket_delete; 2258 job->work.task = &job->task; 2259 job->work.obj = job; 2260 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2261 job->tmcf = tmcf; 2262 2263 tmcf->count++; 2264 } 2265 2266 return NXT_OK; 2267 } 2268 2269 2270 static nxt_int_t 2271 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 2272 nxt_router_temp_conf_t *tmcf) 2273 { 2274 nxt_int_t ret; 2275 nxt_uint_t i, threads; 2276 nxt_router_engine_conf_t *recf; 2277 2278 recf = tmcf->engines->elts; 2279 threads = tmcf->router_conf->threads; 2280 2281 for (i = tmcf->new_threads; i < threads; i++) { 2282 ret = nxt_router_thread_create(task, rt, recf[i].engine); 2283 if (nxt_slow_path(ret != NXT_OK)) { 2284 return ret; 2285 } 2286 } 2287 2288 return NXT_OK; 2289 } 2290 2291 2292 static nxt_int_t 2293 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 2294 nxt_event_engine_t *engine) 2295 { 2296 nxt_int_t ret; 2297 nxt_thread_link_t *link; 2298 nxt_thread_handle_t handle; 2299 2300 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2301 2302 if (nxt_slow_path(link == NULL)) { 2303 return NXT_ERROR; 2304 } 2305 2306 link->start = nxt_router_thread_start; 2307 link->engine = engine; 2308 link->work.handler = nxt_router_thread_exit_handler; 2309 link->work.task = task; 2310 link->work.data = link; 2311 2312 nxt_queue_insert_tail(&rt->engines, &engine->link); 2313 2314 ret = nxt_thread_create(&handle, link); 2315 2316 if (nxt_slow_path(ret != NXT_OK)) { 2317 nxt_queue_remove(&engine->link); 2318 } 2319 2320 return ret; 2321 } 2322 2323 2324 static void 2325 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 2326 nxt_router_temp_conf_t *tmcf) 2327 { 2328 nxt_app_t *app; 2329 2330 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 2331 2332 nxt_router_app_quit(task, app); 2333 2334 } nxt_queue_loop; 2335 2336 nxt_queue_add(&router->apps, &tmcf->previous); 2337 nxt_queue_add(&router->apps, &tmcf->apps); 2338 } 2339 2340 2341 static void 2342 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 2343 { 2344 nxt_uint_t n; 2345 nxt_event_engine_t *engine; 2346 nxt_router_engine_conf_t *recf; 2347 2348 recf = tmcf->engines->elts; 2349 2350 for (n = tmcf->engines->nelts; n != 0; n--) { 2351 engine = recf->engine; 2352 2353 switch (recf->action) { 2354 2355 case NXT_ROUTER_ENGINE_KEEP: 2356 break; 2357 2358 case NXT_ROUTER_ENGINE_ADD: 2359 nxt_queue_insert_tail(&router->engines, &engine->link0); 2360 break; 2361 2362 case NXT_ROUTER_ENGINE_DELETE: 2363 nxt_queue_remove(&engine->link0); 2364 break; 2365 } 2366 2367 nxt_router_engine_post(engine, recf->jobs); 2368 2369 recf++; 2370 } 2371 } 2372 2373 2374 static void 2375 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 2376 { 2377 nxt_work_t *work, *next; 2378 2379 for (work = jobs; work != NULL; work = next) { 2380 next = work->next; 2381 work->next = NULL; 2382 2383 nxt_event_engine_post(engine, work); 2384 } 2385 } 2386 2387 2388 static nxt_port_handlers_t nxt_router_app_port_handlers = { 2389 .rpc_error = nxt_port_rpc_handler, 2390 .mmap = nxt_port_mmap_handler, 2391 .data = nxt_port_rpc_handler, 2392 }; 2393 2394 2395 static void 2396 nxt_router_thread_start(void *data) 2397 { 2398 nxt_int_t ret; 2399 nxt_port_t *port; 2400 nxt_task_t *task; 2401 nxt_thread_t *thread; 2402 nxt_thread_link_t *link; 2403 nxt_event_engine_t *engine; 2404 2405 link = data; 2406 engine = link->engine; 2407 task = &engine->task; 2408 2409 thread = nxt_thread(); 2410 2411 nxt_event_engine_thread_adopt(engine); 2412 2413 /* STUB */ 2414 thread->runtime = engine->task.thread->runtime; 2415 2416 engine->task.thread = thread; 2417 engine->task.log = thread->log; 2418 thread->engine = engine; 2419 thread->task = &engine->task; 2420 #if 0 2421 thread->fiber = &engine->fibers->fiber; 2422 #endif 2423 2424 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 2425 if (nxt_slow_path(engine->mem_pool == NULL)) { 2426 return; 2427 } 2428 2429 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 2430 NXT_PROCESS_ROUTER); 2431 if (nxt_slow_path(port == NULL)) { 2432 return; 2433 } 2434 2435 ret = nxt_port_socket_init(task, port, 0); 2436 if (nxt_slow_path(ret != NXT_OK)) { 2437 nxt_port_use(task, port, -1); 2438 return; 2439 } 2440 2441 engine->port = port; 2442 2443 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 2444 2445 nxt_event_engine_start(engine); 2446 } 2447 2448 2449 static void 2450 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 2451 { 2452 nxt_joint_job_t *job; 2453 nxt_socket_conf_t *skcf; 2454 nxt_listen_event_t *lev; 2455 nxt_listen_socket_t *ls; 2456 nxt_thread_spinlock_t *lock; 2457 nxt_socket_conf_joint_t *joint; 2458 2459 job = obj; 2460 joint = data; 2461 2462 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 2463 2464 skcf = joint->socket_conf; 2465 ls = skcf->listen; 2466 2467 lev = nxt_listen_event(task, ls); 2468 if (nxt_slow_path(lev == NULL)) { 2469 nxt_router_listen_socket_release(task, skcf); 2470 return; 2471 } 2472 2473 lev->socket.data = joint; 2474 2475 lock = &skcf->router_conf->router->lock; 2476 2477 nxt_thread_spin_lock(lock); 2478 ls->count++; 2479 nxt_thread_spin_unlock(lock); 2480 2481 job->work.next = NULL; 2482 job->work.handler = nxt_router_conf_wait; 2483 2484 nxt_event_engine_post(job->tmcf->engine, &job->work); 2485 } 2486 2487 2488 nxt_inline nxt_listen_event_t * 2489 nxt_router_listen_event(nxt_queue_t *listen_connections, 2490 nxt_socket_conf_t *skcf) 2491 { 2492 nxt_socket_t fd; 2493 nxt_queue_link_t *qlk; 2494 nxt_listen_event_t *lev; 2495 2496 fd = skcf->listen->socket; 2497 2498 for (qlk = nxt_queue_first(listen_connections); 2499 qlk != nxt_queue_tail(listen_connections); 2500 qlk = nxt_queue_next(qlk)) 2501 { 2502 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 2503 2504 if (fd == lev->socket.fd) { 2505 return lev; 2506 } 2507 } 2508 2509 return NULL; 2510 } 2511 2512 2513 static void 2514 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 2515 { 2516 nxt_joint_job_t *job; 2517 nxt_event_engine_t *engine; 2518 nxt_listen_event_t *lev; 2519 nxt_socket_conf_joint_t *joint, *old; 2520 2521 job = obj; 2522 joint = data; 2523 2524 engine = task->thread->engine; 2525 2526 nxt_queue_insert_tail(&engine->joints, &joint->link); 2527 2528 lev = nxt_router_listen_event(&engine->listen_connections, 2529 joint->socket_conf); 2530 2531 old = lev->socket.data; 2532 lev->socket.data = joint; 2533 lev->listen = joint->socket_conf->listen; 2534 2535 job->work.next = NULL; 2536 job->work.handler = nxt_router_conf_wait; 2537 2538 nxt_event_engine_post(job->tmcf->engine, &job->work); 2539 2540 /* 2541 * The task is allocated from configuration temporary 2542 * memory pool so it can be freed after engine post operation. 2543 */ 2544 2545 nxt_router_conf_release(&engine->task, old); 2546 } 2547 2548 2549 static void 2550 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 2551 { 2552 nxt_joint_job_t *job; 2553 nxt_socket_conf_t *skcf; 2554 nxt_listen_event_t *lev; 2555 nxt_event_engine_t *engine; 2556 2557 job = obj; 2558 skcf = data; 2559 2560 engine = task->thread->engine; 2561 2562 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 2563 2564 nxt_fd_event_delete(engine, &lev->socket); 2565 2566 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 2567 lev->socket.fd); 2568 2569 lev->timer.handler = nxt_router_listen_socket_close; 2570 lev->timer.work_queue = &engine->fast_work_queue; 2571 2572 nxt_timer_add(engine, &lev->timer, 0); 2573 2574 job->work.next = NULL; 2575 job->work.handler = nxt_router_conf_wait; 2576 2577 nxt_event_engine_post(job->tmcf->engine, &job->work); 2578 } 2579 2580 2581 static void 2582 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 2583 { 2584 nxt_event_engine_t *engine; 2585 2586 nxt_debug(task, "router worker thread quit"); 2587 2588 engine = task->thread->engine; 2589 2590 engine->shutdown = 1; 2591 2592 if (nxt_queue_is_empty(&engine->joints)) { 2593 nxt_thread_exit(task->thread); 2594 } 2595 } 2596 2597 2598 static void 2599 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 2600 { 2601 nxt_timer_t *timer; 2602 nxt_listen_event_t *lev; 2603 nxt_socket_conf_joint_t *joint; 2604 2605 timer = obj; 2606 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 2607 2608 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 2609 lev->socket.fd); 2610 2611 nxt_queue_remove(&lev->link); 2612 2613 joint = lev->socket.data; 2614 lev->socket.data = NULL; 2615 2616 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 2617 task = &task->thread->engine->task; 2618 2619 nxt_router_listen_socket_release(task, joint->socket_conf); 2620 2621 nxt_router_listen_event_release(task, lev, joint); 2622 } 2623 2624 2625 static void 2626 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 2627 { 2628 nxt_listen_socket_t *ls; 2629 nxt_thread_spinlock_t *lock; 2630 2631 ls = skcf->listen; 2632 lock = &skcf->router_conf->router->lock; 2633 2634 nxt_thread_spin_lock(lock); 2635 2636 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 2637 task->thread->engine, ls->count); 2638 2639 if (--ls->count != 0) { 2640 ls = NULL; 2641 } 2642 2643 nxt_thread_spin_unlock(lock); 2644 2645 if (ls != NULL) { 2646 nxt_socket_close(task, ls->socket); 2647 nxt_free(ls); 2648 } 2649 } 2650 2651 2652 void 2653 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 2654 nxt_socket_conf_joint_t *joint) 2655 { 2656 nxt_event_engine_t *engine; 2657 2658 nxt_debug(task, "listen event count: %D", lev->count); 2659 2660 if (--lev->count == 0) { 2661 nxt_free(lev); 2662 } 2663 2664 if (joint != NULL) { 2665 nxt_router_conf_release(task, joint); 2666 } 2667 2668 engine = task->thread->engine; 2669 2670 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 2671 nxt_thread_exit(task->thread); 2672 } 2673 } 2674 2675 2676 void 2677 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 2678 { 2679 nxt_app_t *app; 2680 nxt_socket_conf_t *skcf; 2681 nxt_router_conf_t *rtcf; 2682 nxt_thread_spinlock_t *lock; 2683 2684 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 2685 2686 if (--joint->count != 0) { 2687 return; 2688 } 2689 2690 nxt_queue_remove(&joint->link); 2691 2692 /* 2693 * The joint content can not be safely used after the critical 2694 * section protected by the spinlock because its memory pool may 2695 * be already destroyed by another thread. 2696 */ 2697 skcf = joint->socket_conf; 2698 app = skcf->application; 2699 rtcf = skcf->router_conf; 2700 lock = &rtcf->router->lock; 2701 2702 nxt_thread_spin_lock(lock); 2703 2704 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 2705 rtcf, rtcf->count); 2706 2707 if (--skcf->count != 0) { 2708 rtcf = NULL; 2709 app = NULL; 2710 2711 } else { 2712 nxt_queue_remove(&skcf->link); 2713 2714 if (--rtcf->count != 0) { 2715 rtcf = NULL; 2716 } 2717 } 2718 2719 nxt_thread_spin_unlock(lock); 2720 2721 if (app != NULL) { 2722 nxt_router_app_use(task, app, -1); 2723 } 2724 2725 /* TODO remove engine->port */ 2726 /* TODO excude from connected ports */ 2727 2728 if (rtcf != NULL) { 2729 nxt_debug(task, "old router conf is destroyed"); 2730 2731 nxt_router_access_log_release(task, lock, rtcf->access_log); 2732 2733 nxt_mp_thread_adopt(rtcf->mem_pool); 2734 2735 nxt_mp_destroy(rtcf->mem_pool); 2736 } 2737 } 2738 2739 2740 static void 2741 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 2742 nxt_router_access_log_t *access_log) 2743 { 2744 size_t size; 2745 u_char *buf, *p; 2746 nxt_off_t bytes; 2747 2748 static nxt_time_string_t date_cache = { 2749 (nxt_atomic_uint_t) -1, 2750 nxt_router_access_log_date, 2751 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 2752 nxt_length("31/Dec/1986:19:40:00 +0300"), 2753 NXT_THREAD_TIME_LOCAL, 2754 NXT_THREAD_TIME_SEC, 2755 }; 2756 2757 size = r->remote->address_length 2758 + 6 /* ' - - [' */ 2759 + date_cache.size 2760 + 3 /* '] "' */ 2761 + r->method->length 2762 + 1 /* space */ 2763 + r->target.length 2764 + 1 /* space */ 2765 + r->version.length 2766 + 2 /* '" ' */ 2767 + 3 /* status */ 2768 + 1 /* space */ 2769 + NXT_OFF_T_LEN 2770 + 2 /* ' "' */ 2771 + (r->referer != NULL ? r->referer->value_length : 1) 2772 + 3 /* '" "' */ 2773 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 2774 + 2 /* '"\n' */ 2775 ; 2776 2777 buf = nxt_mp_nget(r->mem_pool, size); 2778 if (nxt_slow_path(buf == NULL)) { 2779 return; 2780 } 2781 2782 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 2783 r->remote->address_length); 2784 2785 p = nxt_cpymem(p, " - - [", 6); 2786 2787 p = nxt_thread_time_string(task->thread, &date_cache, p); 2788 2789 p = nxt_cpymem(p, "] \"", 3); 2790 2791 if (r->method->length != 0) { 2792 p = nxt_cpymem(p, r->method->start, r->method->length); 2793 2794 if (r->target.length != 0) { 2795 *p++ = ' '; 2796 p = nxt_cpymem(p, r->target.start, r->target.length); 2797 2798 if (r->version.length != 0) { 2799 *p++ = ' '; 2800 p = nxt_cpymem(p, r->version.start, r->version.length); 2801 } 2802 } 2803 2804 } else { 2805 *p++ = '-'; 2806 } 2807 2808 p = nxt_cpymem(p, "\" ", 2); 2809 2810 p = nxt_sprintf(p, p + 3, "%03d", r->status); 2811 2812 *p++ = ' '; 2813 2814 bytes = nxt_http_proto_body_bytes_sent[r->protocol](task, r->proto); 2815 2816 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 2817 2818 p = nxt_cpymem(p, " \"", 2); 2819 2820 if (r->referer != NULL) { 2821 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 2822 2823 } else { 2824 *p++ = '-'; 2825 } 2826 2827 p = nxt_cpymem(p, "\" \"", 3); 2828 2829 if (r->user_agent != NULL) { 2830 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 2831 2832 } else { 2833 *p++ = '-'; 2834 } 2835 2836 p = nxt_cpymem(p, "\"\n", 2); 2837 2838 nxt_fd_write(access_log->fd, buf, p - buf); 2839 } 2840 2841 2842 static u_char * 2843 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 2844 size_t size, const char *format) 2845 { 2846 u_char sign; 2847 time_t gmtoff; 2848 2849 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 2850 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 2851 2852 gmtoff = nxt_timezone(tm) / 60; 2853 2854 if (gmtoff < 0) { 2855 gmtoff = -gmtoff; 2856 sign = '-'; 2857 2858 } else { 2859 sign = '+'; 2860 } 2861 2862 return nxt_sprintf(buf, buf + size, format, 2863 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 2864 tm->tm_hour, tm->tm_min, tm->tm_sec, 2865 sign, gmtoff / 60, gmtoff % 60); 2866 } 2867 2868 2869 static void 2870 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 2871 { 2872 uint32_t stream; 2873 nxt_int_t ret; 2874 nxt_buf_t *b; 2875 nxt_port_t *main_port, *router_port; 2876 nxt_runtime_t *rt; 2877 nxt_router_access_log_t *access_log; 2878 2879 access_log = tmcf->router_conf->access_log; 2880 2881 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 2882 if (nxt_slow_path(b == NULL)) { 2883 goto fail; 2884 } 2885 2886 nxt_buf_cpystr(b, &access_log->path); 2887 *b->mem.free++ = '\0'; 2888 2889 rt = task->thread->runtime; 2890 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2891 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2892 2893 stream = nxt_port_rpc_register_handler(task, router_port, 2894 nxt_router_access_log_ready, 2895 nxt_router_access_log_error, 2896 -1, tmcf); 2897 if (nxt_slow_path(stream == 0)) { 2898 goto fail; 2899 } 2900 2901 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 2902 stream, router_port->id, b); 2903 2904 if (nxt_slow_path(ret != NXT_OK)) { 2905 nxt_port_rpc_cancel(task, router_port, stream); 2906 goto fail; 2907 } 2908 2909 return; 2910 2911 fail: 2912 2913 nxt_router_conf_error(task, tmcf); 2914 } 2915 2916 2917 static void 2918 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2919 void *data) 2920 { 2921 nxt_router_temp_conf_t *tmcf; 2922 nxt_router_access_log_t *access_log; 2923 2924 tmcf = data; 2925 2926 access_log = tmcf->router_conf->access_log; 2927 2928 access_log->fd = msg->fd; 2929 2930 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2931 nxt_router_conf_apply, task, tmcf, NULL); 2932 } 2933 2934 2935 static void 2936 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2937 void *data) 2938 { 2939 nxt_router_temp_conf_t *tmcf; 2940 2941 tmcf = data; 2942 2943 nxt_router_conf_error(task, tmcf); 2944 } 2945 2946 2947 static void 2948 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 2949 nxt_router_access_log_t *access_log) 2950 { 2951 if (access_log == NULL) { 2952 return; 2953 } 2954 2955 nxt_thread_spin_lock(lock); 2956 2957 if (--access_log->count != 0) { 2958 access_log = NULL; 2959 } 2960 2961 nxt_thread_spin_unlock(lock); 2962 2963 if (access_log != NULL) { 2964 2965 if (access_log->fd != -1) { 2966 nxt_fd_close(access_log->fd); 2967 } 2968 2969 nxt_free(access_log); 2970 } 2971 } 2972 2973 2974 typedef struct { 2975 nxt_mp_t *mem_pool; 2976 nxt_router_access_log_t *access_log; 2977 } nxt_router_access_log_reopen_t; 2978 2979 2980 void 2981 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 2982 { 2983 nxt_mp_t *mp; 2984 uint32_t stream; 2985 nxt_int_t ret; 2986 nxt_buf_t *b; 2987 nxt_port_t *main_port, *router_port; 2988 nxt_runtime_t *rt; 2989 nxt_router_access_log_t *access_log; 2990 nxt_router_access_log_reopen_t *reopen; 2991 2992 access_log = nxt_router->access_log; 2993 2994 if (access_log == NULL) { 2995 return; 2996 } 2997 2998 mp = nxt_mp_create(1024, 128, 256, 32); 2999 if (nxt_slow_path(mp == NULL)) { 3000 return; 3001 } 3002 3003 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3004 if (nxt_slow_path(reopen == NULL)) { 3005 goto fail; 3006 } 3007 3008 reopen->mem_pool = mp; 3009 reopen->access_log = access_log; 3010 3011 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 3012 if (nxt_slow_path(b == NULL)) { 3013 goto fail; 3014 } 3015 3016 b->completion_handler = nxt_router_access_log_reopen_completion; 3017 3018 nxt_buf_cpystr(b, &access_log->path); 3019 *b->mem.free++ = '\0'; 3020 3021 rt = task->thread->runtime; 3022 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3023 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3024 3025 stream = nxt_port_rpc_register_handler(task, router_port, 3026 nxt_router_access_log_reopen_ready, 3027 nxt_router_access_log_reopen_error, 3028 -1, reopen); 3029 if (nxt_slow_path(stream == 0)) { 3030 goto fail; 3031 } 3032 3033 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3034 stream, router_port->id, b); 3035 3036 if (nxt_slow_path(ret != NXT_OK)) { 3037 nxt_port_rpc_cancel(task, router_port, stream); 3038 goto fail; 3039 } 3040 3041 nxt_mp_retain(mp); 3042 3043 return; 3044 3045 fail: 3046 3047 nxt_mp_destroy(mp); 3048 } 3049 3050 3051 static void 3052 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 3053 { 3054 nxt_mp_t *mp; 3055 nxt_buf_t *b; 3056 3057 b = obj; 3058 mp = b->data; 3059 3060 nxt_mp_release(mp); 3061 } 3062 3063 3064 static void 3065 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3066 void *data) 3067 { 3068 nxt_router_access_log_t *access_log; 3069 nxt_router_access_log_reopen_t *reopen; 3070 3071 reopen = data; 3072 3073 access_log = reopen->access_log; 3074 3075 if (access_log == nxt_router->access_log) { 3076 3077 if (nxt_slow_path(dup2(msg->fd, access_log->fd) == -1)) { 3078 nxt_alert(task, "dup2(%FD, %FD) failed %E", 3079 msg->fd, access_log->fd, nxt_errno); 3080 } 3081 } 3082 3083 nxt_fd_close(msg->fd); 3084 nxt_mp_release(reopen->mem_pool); 3085 } 3086 3087 3088 static void 3089 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3090 void *data) 3091 { 3092 nxt_router_access_log_reopen_t *reopen; 3093 3094 reopen = data; 3095 3096 nxt_mp_release(reopen->mem_pool); 3097 } 3098 3099 3100 static void 3101 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 3102 { 3103 nxt_port_t *port; 3104 nxt_thread_link_t *link; 3105 nxt_event_engine_t *engine; 3106 nxt_thread_handle_t handle; 3107 3108 handle = (nxt_thread_handle_t) obj; 3109 link = data; 3110 3111 nxt_thread_wait(handle); 3112 3113 engine = link->engine; 3114 3115 nxt_queue_remove(&engine->link); 3116 3117 port = engine->port; 3118 3119 // TODO notify all apps 3120 3121 port->engine = task->thread->engine; 3122 nxt_mp_thread_adopt(port->mem_pool); 3123 nxt_port_use(task, port, -1); 3124 3125 nxt_mp_thread_adopt(engine->mem_pool); 3126 nxt_mp_destroy(engine->mem_pool); 3127 3128 nxt_event_engine_free(engine); 3129 3130 nxt_free(link); 3131 } 3132 3133 3134 static void 3135 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3136 void *data) 3137 { 3138 size_t dump_size; 3139 nxt_int_t ret; 3140 nxt_buf_t *b; 3141 nxt_http_request_t *r; 3142 nxt_req_conn_link_t *rc; 3143 nxt_app_parse_ctx_t *ar; 3144 nxt_unit_response_t *resp; 3145 3146 b = msg->buf; 3147 rc = data; 3148 3149 dump_size = nxt_buf_used_size(b); 3150 3151 if (dump_size > 300) { 3152 dump_size = 300; 3153 } 3154 3155 nxt_debug(task, "%srouter app data (%uz): %*s", 3156 msg->port_msg.last ? "last " : "", msg->size, dump_size, 3157 b->mem.pos); 3158 3159 if (msg->size == 0) { 3160 b = NULL; 3161 } 3162 3163 ar = rc->ap; 3164 if (nxt_slow_path(ar == NULL)) { 3165 return; 3166 } 3167 3168 if (ar->request->error) { 3169 nxt_router_rc_unlink(task, rc); 3170 return; 3171 } 3172 3173 if (msg->port_msg.last != 0) { 3174 nxt_debug(task, "router data create last buf"); 3175 3176 nxt_buf_chain_add(&b, nxt_http_buf_last(ar->request)); 3177 3178 nxt_router_rc_unlink(task, rc); 3179 3180 } else { 3181 if (rc->app != NULL && rc->app->timeout != 0) { 3182 ar->timer.handler = nxt_router_app_timeout; 3183 ar->timer_data = rc; 3184 nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout); 3185 } 3186 } 3187 3188 if (b == NULL) {