1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 18 19 typedef struct { 20 nxt_str_t type; 21 uint32_t processes; 22 uint32_t max_processes; 23 uint32_t spare_processes; 24 nxt_msec_t timeout; 25 nxt_msec_t res_timeout; 26 nxt_msec_t idle_timeout; 27 uint32_t requests; 28 nxt_conf_value_t *limits_value; 29 nxt_conf_value_t *processes_value; 30 } nxt_router_app_conf_t; 31 32 33 typedef struct { 34 nxt_str_t pass; 35 nxt_str_t application; 36 } nxt_router_listener_conf_t; 37 38 39 #if (NXT_TLS) 40 41 typedef struct { 42 nxt_str_t name; 43 nxt_socket_conf_t *conf; 44 45 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 46 } nxt_router_tlssock_t; 47 48 #endif 49 50 51 typedef struct nxt_msg_info_s { 52 nxt_buf_t *buf; 53 nxt_port_mmap_tracking_t tracking; 54 nxt_work_handler_t completion_handler; 55 } nxt_msg_info_t; 56 57 58 typedef struct nxt_req_app_link_s nxt_req_app_link_t; 59 60 61 typedef struct { 62 uint32_t stream; 63 nxt_app_t *app; 64 nxt_port_t *app_port; 65 nxt_app_parse_ctx_t *ap; 66 nxt_msg_info_t msg_info; 67 nxt_req_app_link_t *ra; 68 69 nxt_queue_link_t link; /* for nxt_conn_t.requests */ 70 } nxt_req_conn_link_t; 71 72 73 struct nxt_req_app_link_s { 74 uint32_t stream; 75 nxt_atomic_t use_count; 76 nxt_port_t *app_port; 77 nxt_port_t *reply_port; 78 nxt_app_parse_ctx_t *ap; 79 nxt_msg_info_t msg_info; 80 nxt_req_conn_link_t *rc; 81 82 nxt_nsec_t res_time; 83 84 nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ 85 nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */ 86 nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ 87 88 nxt_mp_t *mem_pool; 89 nxt_work_t work; 90 91 int err_code; 92 const char *err_str; 93 }; 94 95 96 typedef struct { 97 nxt_socket_conf_t *socket_conf; 98 nxt_router_temp_conf_t *temp_conf; 99 } nxt_socket_rpc_t; 100 101 102 typedef struct { 103 nxt_app_t *app; 104 nxt_router_temp_conf_t *temp_conf; 105 } nxt_app_rpc_t; 106 107 108 struct nxt_port_select_state_s { 109 nxt_app_t *app; 110 nxt_req_app_link_t *ra; 111 112 nxt_port_t *failed_port; 113 int failed_port_use_delta; 114 115 uint8_t start_process; /* 1 bit */ 116 nxt_req_app_link_t *shared_ra; 117 nxt_port_t *port; 118 }; 119 120 typedef struct nxt_port_select_state_s nxt_port_select_state_t; 121 122 static void nxt_router_greet_controller(nxt_task_t *task, 123 nxt_port_t *controller_port); 124 125 static void nxt_router_port_select(nxt_task_t *task, 126 nxt_port_select_state_t *state); 127 128 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, 129 nxt_port_select_state_t *state); 130 131 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 132 133 nxt_inline void 134 nxt_router_ra_inc_use(nxt_req_app_link_t *ra) 135 { 136 nxt_atomic_fetch_add(&ra->use_count, 1); 137 } 138 139 nxt_inline void 140 nxt_router_ra_dec_use(nxt_req_app_link_t *ra) 141 { 142 #if (NXT_DEBUG) 143 int c; 144 145 c = nxt_atomic_fetch_add(&ra->use_count, -1); 146 147 nxt_assert(c > 1); 148 #else 149 (void) nxt_atomic_fetch_add(&ra->use_count, -1); 150 #endif 151 } 152 153 static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i); 154 155 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 156 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 157 static void nxt_router_conf_ready(nxt_task_t *task, 158 nxt_router_temp_conf_t *tmcf); 159 static void nxt_router_conf_error(nxt_task_t *task, 160 nxt_router_temp_conf_t *tmcf); 161 static void nxt_router_conf_send(nxt_task_t *task, 162 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 163 164 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 165 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 166 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 167 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 168 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 169 static void nxt_router_listen_socket_ready(nxt_task_t *task, 170 nxt_port_recv_msg_t *msg, void *data); 171 static void nxt_router_listen_socket_error(nxt_task_t *task, 172 nxt_port_recv_msg_t *msg, void *data); 173 #if (NXT_TLS) 174 static void nxt_router_tls_rpc_create(nxt_task_t *task, 175 nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls); 176 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 177 nxt_port_recv_msg_t *msg, void *data); 178 #endif 179 static void nxt_router_app_rpc_create(nxt_task_t *task, 180 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 181 static void nxt_router_app_prefork_ready(nxt_task_t *task, 182 nxt_port_recv_msg_t *msg, void *data); 183 static void nxt_router_app_prefork_error(nxt_task_t *task, 184 nxt_port_recv_msg_t *msg, void *data); 185 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 186 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 187 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 188 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 189 190 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 191 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 192 const nxt_event_interface_t *interface); 193 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 194 nxt_router_engine_conf_t *recf); 195 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 196 nxt_router_engine_conf_t *recf); 197 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 198 nxt_router_engine_conf_t *recf); 199 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 200 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 201 nxt_work_handler_t handler); 202 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 203 nxt_router_engine_conf_t *recf); 204 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 205 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 206 207 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 208 nxt_router_temp_conf_t *tmcf); 209 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 210 nxt_event_engine_t *engine); 211 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 212 nxt_router_temp_conf_t *tmcf); 213 214 static void nxt_router_engines_post(nxt_router_t *router, 215 nxt_router_temp_conf_t *tmcf); 216 static void nxt_router_engine_post(nxt_event_engine_t *engine, 217 nxt_work_t *jobs); 218 219 static void nxt_router_thread_start(void *data); 220 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 221 void *data); 222 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 223 void *data); 224 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 225 void *data); 226 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 227 void *data); 228 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 229 void *data); 230 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 231 void *data); 232 static void nxt_router_listen_socket_release(nxt_task_t *task, 233 nxt_socket_conf_t *skcf); 234 235 static void nxt_router_access_log_writer(nxt_task_t *task, 236 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 237 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 238 struct tm *tm, size_t size, const char *format); 239 static void nxt_router_access_log_open(nxt_task_t *task, 240 nxt_router_temp_conf_t *tmcf); 241 static void nxt_router_access_log_ready(nxt_task_t *task, 242 nxt_port_recv_msg_t *msg, void *data); 243 static void nxt_router_access_log_error(nxt_task_t *task, 244 nxt_port_recv_msg_t *msg, void *data); 245 static void nxt_router_access_log_release(nxt_task_t *task, 246 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 247 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 248 void *data); 249 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 250 nxt_port_recv_msg_t *msg, void *data); 251 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 252 nxt_port_recv_msg_t *msg, void *data); 253 254 static void nxt_router_app_port_ready(nxt_task_t *task, 255 nxt_port_recv_msg_t *msg, void *data); 256 static void nxt_router_app_port_error(nxt_task_t *task, 257 nxt_port_recv_msg_t *msg, void *data); 258 259 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 260 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 261 uint32_t request_failed, uint32_t got_response); 262 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, 263 nxt_req_app_link_t *ra); 264 265 static void nxt_router_app_prepare_request(nxt_task_t *task, 266 nxt_req_app_link_t *ra); 267 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 268 nxt_port_t *port, const nxt_str_t *prefix); 269 270 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 271 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 272 void *data); 273 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 274 void *data); 275 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 276 void *data); 277 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 278 279 static const nxt_http_request_state_t nxt_http_request_send_state; 280 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 281 282 static void nxt_router_app_joint_use(nxt_task_t *task, 283 nxt_app_joint_t *app_joint, int i); 284 285 static nxt_router_t *nxt_router; 286 287 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 288 static const nxt_str_t empty_prefix = nxt_string(""); 289 290 static const nxt_str_t *nxt_app_msg_prefix[] = { 291 &empty_prefix, 292 &http_prefix, 293 &http_prefix, 294 &http_prefix, 295 &http_prefix, 296 &empty_prefix, 297 }; 298 299 300 nxt_port_handlers_t nxt_router_process_port_handlers = { 301 .quit = nxt_worker_process_quit_handler, 302 .new_port = nxt_router_new_port_handler, 303 .change_file = nxt_port_change_log_file_handler, 304 .mmap = nxt_port_mmap_handler, 305 .data = nxt_router_conf_data_handler, 306 .remove_pid = nxt_router_remove_pid_handler, 307 .access_log = nxt_router_access_log_reopen_handler, 308 .rpc_ready = nxt_port_rpc_handler, 309 .rpc_error = nxt_port_rpc_handler, 310 }; 311 312 313 nxt_int_t 314 nxt_router_start(nxt_task_t *task, void *data) 315 { 316 nxt_int_t ret; 317 nxt_port_t *controller_port; 318 nxt_router_t *router; 319 nxt_runtime_t *rt; 320 321 rt = task->thread->runtime; 322 323 #if (NXT_TLS) 324 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 325 if (nxt_slow_path(rt->tls == NULL)) { 326 return NXT_ERROR; 327 } 328 329 ret = rt->tls->library_init(task); 330 if (nxt_slow_path(ret != NXT_OK)) { 331 return ret; 332 } 333 #endif 334 335 ret = nxt_http_init(task, rt); 336 if (nxt_slow_path(ret != NXT_OK)) { 337 return ret; 338 } 339 340 router = nxt_zalloc(sizeof(nxt_router_t)); 341 if (nxt_slow_path(router == NULL)) { 342 return NXT_ERROR; 343 } 344 345 nxt_queue_init(&router->engines); 346 nxt_queue_init(&router->sockets); 347 nxt_queue_init(&router->apps); 348 349 nxt_router = router; 350 351 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 352 if (controller_port != NULL) { 353 nxt_router_greet_controller(task, controller_port); 354 } 355 356 return NXT_OK; 357 } 358 359 360 static void 361 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 362 { 363 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 364 -1, 0, 0, NULL); 365 } 366 367 368 static void 369 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 370 void *data) 371 { 372 size_t size; 373 uint32_t stream; 374 nxt_mp_t *mp; 375 nxt_int_t ret; 376 nxt_app_t *app; 377 nxt_buf_t *b; 378 nxt_port_t *main_port; 379 nxt_runtime_t *rt; 380 381 app = data; 382 383 rt = task->thread->runtime; 384 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 385 386 nxt_debug(task, "app '%V' %p start process", &app->name, app); 387 388 size = app->name.length + 1 + app->conf.length; 389 390 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 391 392 if (nxt_slow_path(b == NULL)) { 393 goto failed; 394 } 395 396 nxt_buf_cpystr(b, &app->name); 397 *b->mem.free++ = '\0'; 398 nxt_buf_cpystr(b, &app->conf); 399 400 nxt_router_app_joint_use(task, app->joint, 1); 401 402 stream = nxt_port_rpc_register_handler(task, port, 403 nxt_router_app_port_ready, 404 nxt_router_app_port_error, 405 -1, app->joint); 406 407 if (nxt_slow_path(stream == 0)) { 408 nxt_router_app_joint_use(task, app->joint, -1); 409 410 goto failed; 411 } 412 413 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 414 stream, port->id, b); 415 416 if (nxt_slow_path(ret != NXT_OK)) { 417 nxt_port_rpc_cancel(task, port, stream); 418 419 nxt_router_app_joint_use(task, app->joint, -1); 420 421 goto failed; 422 } 423 424 nxt_router_app_use(task, app, -1); 425 426 return; 427 428 failed: 429 430 if (b != NULL) { 431 mp = b->data; 432 nxt_mp_free(mp, b); 433 nxt_mp_release(mp); 434 } 435 436 nxt_thread_mutex_lock(&app->mutex); 437 438 app->pending_processes--; 439 440 nxt_thread_mutex_unlock(&app->mutex); 441 442 nxt_router_app_use(task, app, -1); 443 } 444 445 446 static void 447 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 448 { 449 app_joint->use_count += i; 450 451 if (app_joint->use_count == 0) { 452 nxt_assert(app_joint->app == NULL); 453 454 nxt_free(app_joint); 455 } 456 } 457 458 459 static nxt_int_t 460 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 461 { 462 nxt_int_t res; 463 nxt_port_t *router_port; 464 nxt_runtime_t *rt; 465 466 rt = task->thread->runtime; 467 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 468 469 nxt_router_app_use(task, app, 1); 470 471 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 472 app); 473 474 if (res == NXT_OK) { 475 return res; 476 } 477 478 nxt_thread_mutex_lock(&app->mutex); 479 480 app->pending_processes--; 481 482 nxt_thread_mutex_unlock(&app->mutex); 483 484 nxt_router_app_use(task, app, -1); 485 486 return NXT_ERROR; 487 } 488 489 490 nxt_inline void 491 nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra, 492 nxt_req_conn_link_t *rc) 493 { 494 nxt_event_engine_t *engine; 495 496 engine = task->thread->engine; 497 498 nxt_memzero(ra, sizeof(nxt_req_app_link_t)); 499 500 ra->stream = rc->stream; 501 ra->use_count = 1; 502 ra->rc = rc; 503 rc->ra = ra; 504 ra->reply_port = engine->port; 505 ra->ap = rc->ap; 506 507 ra->work.handler = NULL; 508 ra->work.task = &engine->task; 509 ra->work.obj = ra; 510 ra->work.data = engine; 511 } 512 513 514 nxt_inline nxt_req_app_link_t * 515 nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) 516 { 517 nxt_mp_t *mp; 518 nxt_req_app_link_t *ra; 519 520 if (ra_src->mem_pool != NULL) { 521 return ra_src; 522 } 523 524 mp = ra_src->ap->mem_pool; 525 526 ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t)); 527 528 if (nxt_slow_path(ra == NULL)) { 529 530 ra_src->rc->ra = NULL; 531 ra_src->rc = NULL; 532 533 return NULL; 534 } 535 536 nxt_mp_retain(mp); 537 538 nxt_router_ra_init(task, ra, ra_src->rc); 539 540 ra->mem_pool = mp; 541 542 return ra; 543 } 544 545 546 nxt_inline nxt_bool_t 547 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, 548 uint32_t stream) 549 { 550 nxt_buf_t *b, *next; 551 nxt_bool_t cancelled; 552 553 if (msg_info->buf == NULL) { 554 return 0; 555 } 556 557 cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, 558 stream); 559 560 if (cancelled) { 561 nxt_debug(task, "stream #%uD: cancelled by router", stream); 562 } 563 564 for (b = msg_info->buf; b != NULL; b = next) { 565 next = b->next; 566 567 b->completion_handler = msg_info->completion_handler; 568 569 if (b->is_port_mmap_sent) { 570 b->is_port_mmap_sent = cancelled == 0; 571 b->completion_handler(task, b, b->parent); 572 } 573 } 574 575 msg_info->buf = NULL; 576 577 return cancelled; 578 } 579 580 581 static void 582 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra); 583 584 585 static void 586 nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data) 587 { 588 nxt_req_app_link_t *ra; 589 590 ra = obj; 591 592 nxt_router_ra_update_peer(task, ra); 593 594 nxt_router_ra_use(task, ra, -1); 595 } 596 597 598 static void 599 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra) 600 { 601 nxt_event_engine_t *engine; 602 nxt_req_conn_link_t *rc; 603 604 engine = ra->work.data; 605 606 if (task->thread->engine != engine) { 607 nxt_router_ra_inc_use(ra); 608 609 ra->work.handler = nxt_router_ra_update_peer_handler; 610 ra->work.task = &engine->task; 611 ra->work.next = NULL; 612 613 nxt_debug(task, "ra stream #%uD post update peer to %p", 614 ra->stream, engine); 615 616 nxt_event_engine_post(engine, &ra->work); 617 618 return; 619 } 620 621 nxt_debug(task, "ra stream #%uD update peer", ra->stream); 622 623 rc = ra->rc; 624 625 if (rc != NULL && ra->app_port != NULL) { 626 nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid); 627 } 628 629 nxt_router_ra_use(task, ra, -1); 630 } 631 632 633 static void 634 nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) 635 { 636 nxt_mp_t *mp; 637 nxt_req_conn_link_t *rc; 638 639 nxt_assert(task->thread->engine == ra->work.data); 640 nxt_assert(ra->use_count == 0); 641 642 nxt_debug(task, "ra stream #%uD release", ra->stream); 643 644 rc = ra->rc; 645 646 if (rc != NULL) { 647 if (nxt_slow_path(ra->err_code != 0)) { 648 nxt_http_request_error(task, rc->ap->request, ra->err_code); 649 650 } else { 651 rc->app_port = ra->app_port; 652 rc->msg_info = ra->msg_info; 653 654 if (rc->app->timeout != 0) { 655 rc->ap->timer.handler = nxt_router_app_timeout; 656 rc->ap->timer_data = rc; 657 nxt_timer_add(task->thread->engine, &rc->ap->timer, 658 rc->app->timeout); 659 } 660 661 ra->app_port = NULL; 662 ra->msg_info.buf = NULL; 663 } 664 665 rc->ra = NULL; 666 ra->rc = NULL; 667 } 668 669 if (ra->app_port != NULL) { 670 nxt_router_app_port_release(task, ra->app_port, 0, 1); 671 672 ra->app_port = NULL; 673 } 674 675 nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); 676 677 mp = ra->mem_pool; 678 679 if (mp != NULL) { 680 nxt_mp_free(mp, ra); 681 nxt_mp_release(mp); 682 } 683 } 684 685 686 static void 687 nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data) 688 { 689 nxt_req_app_link_t *ra; 690 691 ra = obj; 692 693 nxt_assert(ra->work.data == data); 694 695 nxt_atomic_fetch_add(&ra->use_count, -1); 696 697 nxt_router_ra_release(task, ra); 698 } 699 700 701 static void 702 nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i) 703 { 704 int c; 705 nxt_event_engine_t *engine; 706 707 c = nxt_atomic_fetch_add(&ra->use_count, i); 708 709 if (i < 0 && c == -i) { 710 engine = ra->work.data; 711 712 if (task->thread->engine == engine) { 713 nxt_router_ra_release(task, ra); 714 715 return; 716 } 717 718 nxt_router_ra_inc_use(ra); 719 720 ra->work.handler = nxt_router_ra_release_handler; 721 ra->work.task = &engine->task; 722 ra->work.next = NULL; 723 724 nxt_debug(task, "ra stream #%uD post release to %p", 725 ra->stream, engine); 726 727 nxt_event_engine_post(engine, &ra->work); 728 } 729 } 730 731 732 nxt_inline void 733 nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str) 734 { 735 ra->app_port = NULL; 736 ra->err_code = code; 737 ra->err_str = str; 738 } 739 740 741 nxt_inline void 742 nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) 743 { 744 nxt_queue_insert_tail(&ra->app_port->pending_requests, 745 &ra->link_port_pending); 746 nxt_queue_insert_tail(&app->pending, &ra->link_app_pending); 747 748 nxt_router_ra_inc_use(ra); 749 750 ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout; 751 752 nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream); 753 } 754 755 756 nxt_inline nxt_bool_t 757 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 758 { 759 if (lnk->next != NULL) { 760 nxt_queue_remove(lnk); 761 762 lnk->next = NULL; 763 764 return 1; 765 } 766 767 return 0; 768 } 769 770 771 nxt_inline void 772 nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) 773 { 774 int ra_use_delta; 775 nxt_req_app_link_t *ra; 776 777 if (rc->app_port != NULL) { 778 nxt_router_app_port_release(task, rc->app_port, 0, 1); 779 780 rc->app_port = NULL; 781 } 782 783 nxt_router_msg_cancel(task, &rc->msg_info, rc->stream); 784 785 ra = rc->ra; 786 787 if (ra != NULL) { 788 rc->ra = NULL; 789 ra->rc = NULL; 790 791 ra_use_delta = 0; 792 793 nxt_thread_mutex_lock(&rc->app->mutex); 794 795 if (ra->link_app_requests.next == NULL 796 && ra->link_port_pending.next == NULL 797 && ra->link_app_pending.next == NULL) 798 { 799 ra = NULL; 800 801 } else { 802 ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests); 803 ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending); 804 nxt_queue_chk_remove(&ra->link_app_pending); 805 } 806 807 nxt_thread_mutex_unlock(&rc->app->mutex); 808 809 if (ra != NULL) { 810 nxt_router_ra_use(task, ra, ra_use_delta); 811 } 812 } 813 814 if (rc->app != NULL) { 815 nxt_router_app_use(task, rc->app, -1); 816 817 rc->app = NULL; 818 } 819 820 if (rc->ap != NULL) { 821 rc->ap->timer_data = NULL; 822 823 nxt_app_http_req_done(task, rc->ap); 824 825 rc->ap = NULL; 826 } 827 } 828 829 830 void 831 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 832 { 833 nxt_port_new_port_handler(task, msg); 834 835 if (msg->u.new_port != NULL 836 && msg->u.new_port->type == NXT_PROCESS_CONTROLLER) 837 { 838 nxt_router_greet_controller(task, msg->u.new_port); 839 } 840 841 if (msg->port_msg.stream == 0) { 842 return; 843 } 844 845 if (msg->u.new_port == NULL 846 || msg->u.new_port->type != NXT_PROCESS_WORKER) 847 { 848 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 849 } 850 851 nxt_port_rpc_handler(task, msg); 852 } 853 854 855 void 856 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 857 { 858 nxt_int_t ret; 859 nxt_buf_t *b; 860 nxt_router_temp_conf_t *tmcf; 861 862 tmcf = nxt_router_temp_conf(task); 863 if (nxt_slow_path(tmcf == NULL)) { 864 return; 865 } 866 867 nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s", 868 nxt_buf_used_size(msg->buf), 869 (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos); 870 871 tmcf->router_conf->router = nxt_router; 872 tmcf->stream = msg->port_msg.stream; 873 tmcf->port = nxt_runtime_port_find(task->thread->runtime, 874 msg->port_msg.pid, 875 msg->port_msg.reply_port); 876 877 if (nxt_slow_path(tmcf->port == NULL)) { 878 nxt_alert(task, "reply port not found"); 879 880 return; 881 } 882 883 nxt_port_use(task, tmcf->port, 1); 884 885 b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool, 886 msg->buf, msg->size); 887 if (nxt_slow_path(b == NULL)) { 888 nxt_router_conf_error(task, tmcf); 889 890 return; 891 } 892 893 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); 894 895 if (nxt_fast_path(ret == NXT_OK)) { 896 nxt_router_conf_apply(task, tmcf, NULL); 897 898 } else { 899 nxt_router_conf_error(task, tmcf); 900 } 901 } 902 903 904 static void 905 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 906 void *data) 907 { 908 union { 909 nxt_pid_t removed_pid; 910 void *data; 911 } u; 912 913 u.data = data; 914 915 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 916 } 917 918 919 void 920 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 921 { 922 nxt_event_engine_t *engine; 923 924 nxt_port_remove_pid_handler(task, msg); 925 926 if (msg->port_msg.stream == 0) { 927 return; 928 } 929 930 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 931 { 932 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 933 msg->u.data); 934 } 935 nxt_queue_loop; 936 937 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 938 939 nxt_port_rpc_handler(task, msg); 940 } 941 942 943 static nxt_router_temp_conf_t * 944 nxt_router_temp_conf(nxt_task_t *task) 945 { 946 nxt_mp_t *mp, *tmp; 947 nxt_router_conf_t *rtcf; 948 nxt_router_temp_conf_t *tmcf; 949 950 mp = nxt_mp_create(1024, 128, 256, 32); 951 if (nxt_slow_path(mp == NULL)) { 952 return NULL; 953 } 954 955 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 956 if (nxt_slow_path(rtcf == NULL)) { 957 goto fail; 958 } 959 960 rtcf->mem_pool = mp; 961 962 tmp = nxt_mp_create(1024, 128, 256, 32); 963 if (nxt_slow_path(tmp == NULL)) { 964 goto fail; 965 } 966 967 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 968 if (nxt_slow_path(tmcf == NULL)) { 969 goto temp_fail; 970 } 971 972 tmcf->mem_pool = tmp; 973 tmcf->router_conf = rtcf; 974 tmcf->count = 1; 975 tmcf->engine = task->thread->engine; 976 977 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 978 sizeof(nxt_router_engine_conf_t)); 979 if (nxt_slow_path(tmcf->engines == NULL)) { 980 goto temp_fail; 981 } 982 983 nxt_queue_init(&tmcf->deleting); 984 nxt_queue_init(&tmcf->keeping); 985 nxt_queue_init(&tmcf->updating); 986 nxt_queue_init(&tmcf->pending); 987 nxt_queue_init(&tmcf->creating); 988 989 #if (NXT_TLS) 990 nxt_queue_init(&tmcf->tls); 991 #endif 992 993 nxt_queue_init(&tmcf->apps); 994 nxt_queue_init(&tmcf->previous); 995 996 return tmcf; 997 998 temp_fail: 999 1000 nxt_mp_destroy(tmp); 1001 1002 fail: 1003 1004 nxt_mp_destroy(mp); 1005 1006 return NULL; 1007 } 1008 1009 1010 nxt_inline nxt_bool_t 1011 nxt_router_app_can_start(nxt_app_t *app) 1012 { 1013 return app->processes + app->pending_processes < app->max_processes 1014 && app->pending_processes < app->max_pending_processes; 1015 } 1016 1017 1018 nxt_inline nxt_bool_t 1019 nxt_router_app_need_start(nxt_app_t *app) 1020 { 1021 return app->idle_processes + app->pending_processes 1022 < app->spare_processes; 1023 } 1024 1025 1026 static void 1027 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1028 { 1029 nxt_int_t ret; 1030 nxt_app_t *app; 1031 nxt_router_t *router; 1032 nxt_runtime_t *rt; 1033 nxt_queue_link_t *qlk; 1034 nxt_socket_conf_t *skcf; 1035 nxt_router_conf_t *rtcf; 1036 nxt_router_temp_conf_t *tmcf; 1037 const nxt_event_interface_t *interface; 1038 #if (NXT_TLS) 1039 nxt_router_tlssock_t *tls; 1040 #endif 1041 1042 tmcf = obj; 1043 1044 qlk = nxt_queue_first(&tmcf->pending); 1045 1046 if (qlk != nxt_queue_tail(&tmcf->pending)) { 1047 nxt_queue_remove(qlk); 1048 nxt_queue_insert_tail(&tmcf->creating, qlk); 1049 1050 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1051 1052 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1053 1054 return; 1055 } 1056 1057 #if (NXT_TLS) 1058 qlk = nxt_queue_first(&tmcf->tls); 1059 1060 if (qlk != nxt_queue_tail(&tmcf->tls)) { 1061 nxt_queue_remove(qlk); 1062 1063 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1064 1065 nxt_router_tls_rpc_create(task, tmcf, tls); 1066 return; 1067 } 1068 #endif 1069 1070 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1071 1072 if (nxt_router_app_need_start(app)) { 1073 nxt_router_app_rpc_create(task, tmcf, app); 1074 return; 1075 } 1076 1077 } nxt_queue_loop; 1078 1079 rtcf = tmcf->router_conf; 1080 1081 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1082 nxt_router_access_log_open(task, tmcf); 1083 return; 1084 } 1085 1086 rt = task->thread->runtime; 1087 1088 interface = nxt_service_get(rt->services, "engine", NULL); 1089 1090 router = rtcf->router; 1091 1092 ret = nxt_router_engines_create(task, router, tmcf, interface); 1093 if (nxt_slow_path(ret != NXT_OK)) { 1094 goto fail; 1095 } 1096 1097 ret = nxt_router_threads_create(task, rt, tmcf); 1098 if (nxt_slow_path(ret != NXT_OK)) { 1099 goto fail; 1100 } 1101 1102 nxt_router_apps_sort(task, router, tmcf); 1103 1104 nxt_router_engines_post(router, tmcf); 1105 1106 nxt_queue_add(&router->sockets, &tmcf->updating); 1107 nxt_queue_add(&router->sockets, &tmcf->creating); 1108 1109 router->access_log = rtcf->access_log; 1110 1111 nxt_router_conf_ready(task, tmcf); 1112 1113 return; 1114 1115 fail: 1116 1117 nxt_router_conf_error(task, tmcf); 1118 1119 return; 1120 } 1121 1122 1123 static void 1124 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1125 { 1126 nxt_joint_job_t *job; 1127 1128 job = obj; 1129 1130 nxt_router_conf_ready(task, job->tmcf); 1131 } 1132 1133 1134 static void 1135 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1136 { 1137 nxt_debug(task, "temp conf count:%D", tmcf->count); 1138 1139 if (--tmcf->count == 0) { 1140 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1141 } 1142 } 1143 1144 1145 static void 1146 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1147 { 1148 nxt_app_t *app; 1149 nxt_queue_t new_socket_confs; 1150 nxt_socket_t s; 1151 nxt_router_t *router; 1152 nxt_queue_link_t *qlk; 1153 nxt_socket_conf_t *skcf; 1154 nxt_router_conf_t *rtcf; 1155 1156 nxt_alert(task, "failed to apply new conf"); 1157 1158 for (qlk = nxt_queue_first(&tmcf->creating); 1159 qlk != nxt_queue_tail(&tmcf->creating); 1160 qlk = nxt_queue_next(qlk)) 1161 { 1162 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1163 s = skcf->listen->socket; 1164 1165 if (s != -1) { 1166 nxt_socket_close(task, s); 1167 } 1168 1169 nxt_free(skcf->listen); 1170 } 1171 1172 nxt_queue_init(&new_socket_confs); 1173 nxt_queue_add(&new_socket_confs, &tmcf->updating); 1174 nxt_queue_add(&new_socket_confs, &tmcf->pending); 1175 nxt_queue_add(&new_socket_confs, &tmcf->creating); 1176 1177 rtcf = tmcf->router_conf; 1178 1179 nxt_http_routes_cleanup(task, rtcf->routes); 1180 1181 nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) { 1182 1183 if (skcf->pass != NULL) { 1184 nxt_http_pass_cleanup(task, skcf->pass); 1185 } 1186 1187 } nxt_queue_loop; 1188 1189 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1190 1191 nxt_router_app_unlink(task, app); 1192 1193 } nxt_queue_loop; 1194 1195 router = rtcf->router; 1196 1197 nxt_queue_add(&router->sockets, &tmcf->keeping); 1198 nxt_queue_add(&router->sockets, &tmcf->deleting); 1199 1200 nxt_queue_add(&router->apps, &tmcf->previous); 1201 1202 // TODO: new engines and threads 1203 1204 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1205 1206 nxt_mp_destroy(rtcf->mem_pool); 1207 1208 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1209 } 1210 1211 1212 static void 1213 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1214 nxt_port_msg_type_t type) 1215 { 1216 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1217 1218 nxt_port_use(task, tmcf->port, -1); 1219 1220 tmcf->port = NULL; 1221 } 1222 1223 1224 static nxt_conf_map_t nxt_router_conf[] = { 1225 { 1226 nxt_string("listeners_threads"), 1227 NXT_CONF_MAP_INT32, 1228 offsetof(nxt_router_conf_t, threads), 1229 }, 1230 }; 1231 1232 1233 static nxt_conf_map_t nxt_router_app_conf[] = { 1234 { 1235 nxt_string("type"), 1236 NXT_CONF_MAP_STR, 1237 offsetof(nxt_router_app_conf_t, type), 1238 }, 1239 1240 { 1241 nxt_string("limits"), 1242 NXT_CONF_MAP_PTR, 1243 offsetof(nxt_router_app_conf_t, limits_value), 1244 }, 1245 1246 { 1247 nxt_string("processes"), 1248 NXT_CONF_MAP_INT32, 1249 offsetof(nxt_router_app_conf_t, processes), 1250 }, 1251 1252 { 1253 nxt_string("processes"), 1254 NXT_CONF_MAP_PTR, 1255 offsetof(nxt_router_app_conf_t, processes_value), 1256 }, 1257 }; 1258 1259 1260 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1261 { 1262 nxt_string("timeout"), 1263 NXT_CONF_MAP_MSEC, 1264 offsetof(nxt_router_app_conf_t, timeout), 1265 }, 1266 1267 { 1268 nxt_string("reschedule_timeout"), 1269 NXT_CONF_MAP_MSEC, 1270 offsetof(nxt_router_app_conf_t, res_timeout), 1271 }, 1272 1273 { 1274 nxt_string("requests"), 1275 NXT_CONF_MAP_INT32, 1276 offsetof(nxt_router_app_conf_t, requests), 1277 }, 1278 }; 1279 1280 1281 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1282 { 1283 nxt_string("spare"), 1284 NXT_CONF_MAP_INT32, 1285 offsetof(nxt_router_app_conf_t, spare_processes), 1286 }, 1287 1288 { 1289 nxt_string("max"), 1290 NXT_CONF_MAP_INT32, 1291 offsetof(nxt_router_app_conf_t, max_processes), 1292 }, 1293 1294 { 1295 nxt_string("idle_timeout"), 1296 NXT_CONF_MAP_MSEC, 1297 offsetof(nxt_router_app_conf_t, idle_timeout), 1298 }, 1299 }; 1300 1301 1302 static nxt_conf_map_t nxt_router_listener_conf[] = { 1303 { 1304 nxt_string("pass"), 1305 NXT_CONF_MAP_STR_COPY, 1306 offsetof(nxt_router_listener_conf_t, pass), 1307 }, 1308 1309 { 1310 nxt_string("application"), 1311 NXT_CONF_MAP_STR_COPY, 1312 offsetof(nxt_router_listener_conf_t, application), 1313 }, 1314 }; 1315 1316 1317 static nxt_conf_map_t nxt_router_http_conf[] = { 1318 { 1319 nxt_string("header_buffer_size"), 1320 NXT_CONF_MAP_SIZE, 1321 offsetof(nxt_socket_conf_t, header_buffer_size), 1322 }, 1323 1324 { 1325 nxt_string("large_header_buffer_size"), 1326 NXT_CONF_MAP_SIZE, 1327 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1328 }, 1329 1330 { 1331 nxt_string("large_header_buffers"), 1332 NXT_CONF_MAP_SIZE, 1333 offsetof(nxt_socket_conf_t, large_header_buffers), 1334 }, 1335 1336 { 1337 nxt_string("body_buffer_size"), 1338 NXT_CONF_MAP_SIZE, 1339 offsetof(nxt_socket_conf_t, body_buffer_size), 1340 }, 1341 1342 { 1343 nxt_string("max_body_size"), 1344 NXT_CONF_MAP_SIZE, 1345 offsetof(nxt_socket_conf_t, max_body_size), 1346 }, 1347 1348 { 1349 nxt_string("idle_timeout"), 1350 NXT_CONF_MAP_MSEC, 1351 offsetof(nxt_socket_conf_t, idle_timeout), 1352 }, 1353 1354 { 1355 nxt_string("header_read_timeout"), 1356 NXT_CONF_MAP_MSEC, 1357 offsetof(nxt_socket_conf_t, header_read_timeout), 1358 }, 1359 1360 { 1361 nxt_string("body_read_timeout"), 1362 NXT_CONF_MAP_MSEC, 1363 offsetof(nxt_socket_conf_t, body_read_timeout), 1364 }, 1365 1366 { 1367 nxt_string("send_timeout"), 1368 NXT_CONF_MAP_MSEC, 1369 offsetof(nxt_socket_conf_t, send_timeout), 1370 }, 1371 }; 1372 1373 1374 static nxt_int_t 1375 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1376 u_char *start, u_char *end) 1377 { 1378 u_char *p; 1379 size_t size; 1380 nxt_mp_t *mp; 1381 uint32_t next; 1382 nxt_int_t ret; 1383 nxt_str_t name, path; 1384 nxt_app_t *app, *prev; 1385 nxt_router_t *router; 1386 nxt_app_joint_t *app_joint; 1387 nxt_conf_value_t *conf, *http, *value; 1388 nxt_conf_value_t *applications, *application; 1389 nxt_conf_value_t *listeners, *listener; 1390 nxt_conf_value_t *routes_conf; 1391 nxt_socket_conf_t *skcf; 1392 nxt_http_routes_t *routes; 1393 nxt_event_engine_t *engine; 1394 nxt_app_lang_module_t *lang; 1395 nxt_router_app_conf_t apcf; 1396 nxt_router_access_log_t *access_log; 1397 nxt_router_listener_conf_t lscf; 1398 #if (NXT_TLS) 1399 nxt_router_tlssock_t *tls; 1400 #endif 1401 1402 static nxt_str_t http_path = nxt_string("/settings/http"); 1403 static nxt_str_t applications_path = nxt_string("/applications"); 1404 static nxt_str_t listeners_path = nxt_string("/listeners"); 1405 static nxt_str_t routes_path = nxt_string("/routes"); 1406 static nxt_str_t access_log_path = nxt_string("/access_log"); 1407 #if (NXT_TLS) 1408 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1409 #endif 1410 1411 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1412 if (conf == NULL) { 1413 nxt_alert(task, "configuration parsing error"); 1414 return NXT_ERROR; 1415 } 1416 1417 mp = tmcf->router_conf->mem_pool; 1418 1419 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1420 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1421 if (ret != NXT_OK) { 1422 nxt_alert(task, "root map error"); 1423 return NXT_ERROR; 1424 } 1425 1426 if (tmcf->router_conf->threads == 0) { 1427 tmcf->router_conf->threads = nxt_ncpu; 1428 } 1429 1430 applications = nxt_conf_get_path(conf, &applications_path); 1431 if (applications == NULL) { 1432 nxt_alert(task, "no \"applications\" block"); 1433 return NXT_ERROR; 1434 } 1435 1436 router = tmcf->router_conf->router; 1437 1438 next = 0; 1439 1440 for ( ;; ) { 1441 application = nxt_conf_next_object_member(applications, &name, &next); 1442 if (application == NULL) { 1443 break; 1444 } 1445 1446 nxt_debug(task, "application \"%V\"", &name); 1447 1448 size = nxt_conf_json_length(application, NULL); 1449 1450 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); 1451 if (app == NULL) { 1452 goto fail; 1453 } 1454 1455 nxt_memzero(app, sizeof(nxt_app_t)); 1456 1457 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1458 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length); 1459 1460 p = nxt_conf_json_print(app->conf.start, application, NULL); 1461 app->conf.length = p - app->conf.start; 1462 1463 nxt_assert(app->conf.length <= size); 1464 1465 nxt_debug(task, "application conf \"%V\"", &app->conf); 1466 1467 prev = nxt_router_app_find(&router->apps, &name); 1468 1469 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1470 nxt_free(app); 1471 1472 nxt_queue_remove(&prev->link); 1473 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1474 continue; 1475 } 1476 1477 apcf.processes = 1; 1478 apcf.max_processes = 1; 1479 apcf.spare_processes = 0; 1480 apcf.timeout = 0; 1481 apcf.res_timeout = 1000; 1482 apcf.idle_timeout = 15000; 1483 apcf.requests = 0; 1484 apcf.limits_value = NULL; 1485 apcf.processes_value = NULL; 1486 1487 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1488 if (nxt_slow_path(app_joint == NULL)) { 1489 goto app_fail; 1490 } 1491 1492 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1493 1494 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1495 nxt_nitems(nxt_router_app_conf), &apcf); 1496 if (ret != NXT_OK) { 1497 nxt_alert(task, "application map error"); 1498 goto app_fail; 1499 } 1500 1501 if (apcf.limits_value != NULL) { 1502 1503 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1504 nxt_alert(task, "application limits is not object"); 1505 goto app_fail; 1506 } 1507 1508 ret = nxt_conf_map_object(mp, apcf.limits_value, 1509 nxt_router_app_limits_conf, 1510 nxt_nitems(nxt_router_app_limits_conf), 1511 &apcf); 1512 if (ret != NXT_OK) { 1513 nxt_alert(task, "application limits map error"); 1514 goto app_fail; 1515 } 1516 } 1517 1518 if (apcf.processes_value != NULL 1519 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1520 { 1521 ret = nxt_conf_map_object(mp, apcf.processes_value, 1522 nxt_router_app_processes_conf, 1523 nxt_nitems(nxt_router_app_processes_conf), 1524 &apcf); 1525 if (ret != NXT_OK) { 1526 nxt_alert(task, "application processes map error"); 1527 goto app_fail; 1528 } 1529 1530 } else { 1531 apcf.max_processes = apcf.processes; 1532 apcf.spare_processes = apcf.processes; 1533 } 1534 1535 nxt_debug(task, "application type: %V", &apcf.type); 1536 nxt_debug(task, "application processes: %D", apcf.processes); 1537 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1538 nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout); 1539 nxt_debug(task, "application requests: %D", apcf.requests); 1540 1541 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1542 1543 if (lang == NULL) { 1544 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1545 goto app_fail; 1546 } 1547 1548 nxt_debug(task, "application language module: \"%s\"", lang->file); 1549 1550 ret = nxt_thread_mutex_create(&app->mutex); 1551 if (ret != NXT_OK) { 1552 goto app_fail; 1553 } 1554 1555 nxt_queue_init(&app->ports); 1556 nxt_queue_init(&app->spare_ports); 1557 nxt_queue_init(&app->idle_ports); 1558 nxt_queue_init(&app->requests); 1559 nxt_queue_init(&app->pending); 1560 1561 app->name.length = name.length; 1562 nxt_memcpy(app->name.start, name.start, name.length); 1563 1564 app->type = lang->type; 1565 app->max_processes = apcf.max_processes; 1566 app->spare_processes = apcf.spare_processes; 1567 app->max_pending_processes = apcf.spare_processes 1568 ? apcf.spare_processes : 1; 1569 app->timeout = apcf.timeout; 1570 app->res_timeout = apcf.res_timeout * 1000000; 1571 app->idle_timeout = apcf.idle_timeout; 1572 app->max_pending_responses = 2; 1573 app->max_requests = apcf.requests; 1574 1575 engine = task->thread->engine; 1576 1577 app->engine = engine; 1578 1579 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1580 app->adjust_idle_work.task = &engine->task; 1581 app->adjust_idle_work.obj = app; 1582 1583 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1584 1585 nxt_router_app_use(task, app, 1); 1586 1587 app->joint = app_joint; 1588 1589 app_joint->use_count = 1; 1590 app_joint->app = app; 1591 1592 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1593 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1594 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1595 app_joint->idle_timer.task = &engine->task; 1596 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1597 1598 app_joint->free_app_work.handler = nxt_router_free_app; 1599 app_joint->free_app_work.task = &engine->task; 1600 app_joint->free_app_work.obj = app_joint; 1601 } 1602 1603 routes_conf = nxt_conf_get_path(conf, &routes_path); 1604 if (nxt_fast_path(routes_conf != NULL)) { 1605 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1606 if (nxt_slow_path(routes == NULL)) { 1607 return NXT_ERROR; 1608 } 1609 tmcf->router_conf->routes = routes; 1610 } 1611 1612 http = nxt_conf_get_path(conf, &http_path); 1613 #if 0 1614 if (http == NULL) { 1615 nxt_alert(task, "no \"http\" block"); 1616 return NXT_ERROR; 1617 } 1618 #endif 1619 1620 listeners = nxt_conf_get_path(conf, &listeners_path); 1621 if (listeners == NULL) { 1622 nxt_alert(task, "no \"listeners\" block"); 1623 return NXT_ERROR; 1624 } 1625 1626 next = 0; 1627 1628 for ( ;; ) { 1629 listener = nxt_conf_next_object_member(listeners, &name, &next); 1630 if (listener == NULL) { 1631 break; 1632 } 1633 1634 skcf = nxt_router_socket_conf(task, tmcf, &name); 1635 if (skcf == NULL) { 1636 goto fail; 1637 } 1638 1639 nxt_memzero(&lscf, sizeof(lscf)); 1640 1641 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1642 nxt_nitems(nxt_router_listener_conf), &lscf); 1643 if (ret != NXT_OK) { 1644 nxt_alert(task, "listener map error"); 1645 goto fail; 1646 } 1647 1648 nxt_debug(task, "application: %V", &lscf.application); 1649 1650 // STUB, default values if http block is not defined. 1651 skcf->header_buffer_size = 2048; 1652 skcf->large_header_buffer_size = 8192; 1653 skcf->large_header_buffers = 4; 1654 skcf->body_buffer_size = 16 * 1024; 1655 skcf->max_body_size = 8 * 1024 * 1024; 1656 skcf->idle_timeout = 180 * 1000; 1657 skcf->header_read_timeout = 30 * 1000; 1658 skcf->body_read_timeout = 30 * 1000; 1659 skcf->send_timeout = 30 * 1000; 1660 1661 if (http != NULL) { 1662 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1663 nxt_nitems(nxt_router_http_conf), skcf); 1664 if (ret != NXT_OK) { 1665 nxt_alert(task, "http map error"); 1666 goto fail; 1667 } 1668 } 1669 1670 #if (NXT_TLS) 1671 1672 value = nxt_conf_get_path(listener, &certificate_path); 1673 1674 if (value != NULL) { 1675 nxt_conf_get_string(value, &name); 1676 1677 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t)); 1678 if (nxt_slow_path(tls == NULL)) { 1679 goto fail; 1680 } 1681 1682 tls->name = name; 1683 tls->conf = skcf; 1684 1685 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 1686 } 1687 1688 #endif 1689 1690 skcf->listen->handler = nxt_http_conn_init; 1691 skcf->router_conf = tmcf->router_conf; 1692 skcf->router_conf->count++; 1693 1694 if (lscf.pass.length != 0) { 1695 skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass); 1696 1697 /* COMPATIBILITY: listener application. */ 1698 } else if (lscf.application.length > 0) { 1699 skcf->pass = nxt_http_pass_application(task, tmcf, 1700 &lscf.application); 1701 } 1702 } 1703 1704 value = nxt_conf_get_path(conf, &access_log_path); 1705 1706 if (value != NULL) { 1707 nxt_conf_get_string(value, &path); 1708 1709 access_log = router->access_log; 1710 1711 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1712 nxt_thread_spin_lock(&router->lock); 1713 access_log->count++; 1714 nxt_thread_spin_unlock(&router->lock); 1715 1716 } else { 1717 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1718 + path.length); 1719 if (access_log == NULL) { 1720 nxt_alert(task, "failed to allocate access log structure"); 1721 goto fail; 1722 } 1723 1724 access_log->fd = -1; 1725 access_log->handler = &nxt_router_access_log_writer; 1726 access_log->count = 1; 1727 1728 access_log->path.length = path.length; 1729 access_log->path.start = (u_char *) access_log 1730 + sizeof(nxt_router_access_log_t); 1731 1732 nxt_memcpy(access_log->path.start, path.start, path.length); 1733 } 1734 1735 tmcf->router_conf->access_log = access_log; 1736 } 1737 1738 nxt_http_routes_resolve(task, tmcf); 1739 1740 nxt_queue_add(&tmcf->deleting, &router->sockets); 1741 nxt_queue_init(&router->sockets); 1742 1743 return NXT_OK; 1744 1745 app_fail: 1746 1747 nxt_free(app); 1748 1749 fail: 1750 1751 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1752 1753 nxt_queue_remove(&app->link); 1754 nxt_thread_mutex_destroy(&app->mutex); 1755 nxt_free(app); 1756 1757 } nxt_queue_loop; 1758 1759 return NXT_ERROR; 1760 } 1761 1762 1763 static nxt_app_t * 1764 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1765 { 1766 nxt_app_t *app; 1767 1768 nxt_queue_each(app, queue, nxt_app_t, link) { 1769 1770 if (nxt_strstr_eq(name, &app->name)) { 1771 return app; 1772 } 1773 1774 } nxt_queue_loop; 1775 1776 return NULL; 1777 } 1778 1779 1780 nxt_app_t * 1781 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) 1782 { 1783 nxt_app_t *app; 1784 1785 app = nxt_router_app_find(&tmcf->apps, name); 1786 1787 if (app == NULL) { 1788 app = nxt_router_app_find(&tmcf->previous, name); 1789 } 1790 1791 return app; 1792 } 1793 1794 1795 static nxt_socket_conf_t * 1796 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1797 nxt_str_t *name) 1798 { 1799 size_t size; 1800 nxt_int_t ret; 1801 nxt_bool_t wildcard; 1802 nxt_sockaddr_t *sa; 1803 nxt_socket_conf_t *skcf; 1804 nxt_listen_socket_t *ls; 1805 1806 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 1807 if (nxt_slow_path(sa == NULL)) { 1808 nxt_alert(task, "invalid listener \"%V\"", name); 1809 return NULL; 1810 } 1811 1812 sa->type = SOCK_STREAM; 1813 1814 nxt_debug(task, "router listener: \"%*s\"", 1815 (size_t) sa->length, nxt_sockaddr_start(sa)); 1816 1817 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 1818 if (nxt_slow_path(skcf == NULL)) { 1819 return NULL; 1820 } 1821 1822 size = nxt_sockaddr_size(sa); 1823 1824 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 1825 1826 if (ret != NXT_OK) { 1827 1828 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 1829 if (nxt_slow_path(ls == NULL)) { 1830 return NULL; 1831 } 1832 1833 skcf->listen = ls; 1834 1835 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 1836 nxt_memcpy(ls->sockaddr, sa, size); 1837 1838 nxt_listen_socket_remote_size(ls); 1839 1840 ls->socket = -1; 1841 ls->backlog = NXT_LISTEN_BACKLOG; 1842 ls->flags = NXT_NONBLOCK; 1843 ls->read_after_accept = 1; 1844 } 1845 1846 switch (sa->u.sockaddr.sa_family) { 1847 #if (NXT_HAVE_UNIX_DOMAIN) 1848 case AF_UNIX: 1849 wildcard = 0; 1850 break; 1851 #endif 1852 #if (NXT_INET6) 1853 case AF_INET6: 1854 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 1855 break; 1856 #endif 1857 case AF_INET: 1858 default: 1859 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 1860 break; 1861 } 1862 1863 if (!wildcard) { 1864 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 1865 if (nxt_slow_path(skcf->sockaddr == NULL)) { 1866 return NULL; 1867 } 1868 1869 nxt_memcpy(skcf->sockaddr, sa, size); 1870 } 1871 1872 return skcf; 1873 } 1874 1875 1876 static nxt_int_t 1877 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 1878 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 1879 { 1880 nxt_router_t *router; 1881 nxt_queue_link_t *qlk; 1882 nxt_socket_conf_t *skcf; 1883 1884 router = tmcf->router_conf->router; 1885 1886 for (qlk = nxt_queue_first(&router->sockets); 1887 qlk != nxt_queue_tail(&router->sockets); 1888 qlk = nxt_queue_next(qlk)) 1889 { 1890 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1891 1892 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 1893 nskcf->listen = skcf->listen; 1894 1895 nxt_queue_remove(qlk); 1896 nxt_queue_insert_tail(&tmcf->keeping, qlk); 1897 1898 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link); 1899 1900 return NXT_OK; 1901 } 1902 } 1903 1904 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link); 1905 1906 return NXT_DECLINED; 1907 } 1908 1909 1910 static void 1911 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 1912 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 1913 { 1914 size_t size; 1915 uint32_t stream; 1916 nxt_int_t ret; 1917 nxt_buf_t *b; 1918 nxt_port_t *main_port, *router_port; 1919 nxt_runtime_t *rt; 1920 nxt_socket_rpc_t *rpc; 1921 1922 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 1923 if (rpc == NULL) { 1924 goto fail; 1925 } 1926 1927 rpc->socket_conf = skcf; 1928 rpc->temp_conf = tmcf; 1929 1930 size = nxt_sockaddr_size(skcf->listen->sockaddr); 1931 1932 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1933 if (b == NULL) { 1934 goto fail; 1935 } 1936 1937 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 1938 1939 rt = task->thread->runtime; 1940 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1941 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1942 1943 stream = nxt_port_rpc_register_handler(task, router_port, 1944 nxt_router_listen_socket_ready, 1945 nxt_router_listen_socket_error, 1946 main_port->pid, rpc); 1947 if (nxt_slow_path(stream == 0)) { 1948 goto fail; 1949 } 1950 1951 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 1952 stream, router_port->id, b); 1953 1954 if (nxt_slow_path(ret != NXT_OK)) { 1955 nxt_port_rpc_cancel(task, router_port, stream); 1956 goto fail; 1957 } 1958 1959 return; 1960 1961 fail: 1962 1963 nxt_router_conf_error(task, tmcf); 1964 } 1965 1966 1967 static void 1968 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1969 void *data) 1970 { 1971 nxt_int_t ret; 1972 nxt_socket_t s; 1973 nxt_socket_rpc_t *rpc; 1974 1975 rpc = data; 1976 1977 s = msg->fd; 1978 1979 ret = nxt_socket_nonblocking(task, s); 1980 if (nxt_slow_path(ret != NXT_OK)) { 1981 goto fail; 1982 } 1983 1984 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 1985 1986 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 1987 if (nxt_slow_path(ret != NXT_OK)) { 1988 goto fail; 1989 } 1990 1991 rpc->socket_conf->listen->socket = s; 1992 1993 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 1994 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 1995 1996 return; 1997 1998 fail: 1999 2000 nxt_socket_close(task, s); 2001 2002 nxt_router_conf_error(task, rpc->temp_conf); 2003 } 2004 2005 2006 static void 2007 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2008 void *data) 2009 { 2010 nxt_socket_rpc_t *rpc; 2011 nxt_router_temp_conf_t *tmcf; 2012 2013 rpc = data; 2014 tmcf = rpc->temp_conf; 2015 2016 #if 0 2017 u_char *p; 2018 size_t size; 2019 uint8_t error; 2020 nxt_buf_t *in, *out; 2021 nxt_sockaddr_t *sa; 2022 2023 static nxt_str_t socket_errors[] = { 2024 nxt_string("ListenerSystem"), 2025 nxt_string("ListenerNoIPv6"), 2026 nxt_string("ListenerPort"), 2027 nxt_string("ListenerInUse"), 2028 nxt_string("ListenerNoAddress"), 2029 nxt_string("ListenerNoAccess"), 2030 nxt_string("ListenerPath"), 2031 }; 2032 2033 sa = rpc->socket_conf->listen->sockaddr; 2034 2035 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2036 2037 if (nxt_slow_path(in == NULL)) { 2038 return; 2039 } 2040 2041 p = in->mem.pos; 2042 2043 error = *p++; 2044 2045 size = nxt_length("listen socket error: ") 2046 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2047 + sa->length + socket_errors[error].length + (in->mem.free - p); 2048 2049 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2050 if (nxt_slow_path(out == NULL)) { 2051 return; 2052 } 2053 2054 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2055 "listen socket error: " 2056 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2057 (size_t) sa->length, nxt_sockaddr_start(sa), 2058 &socket_errors[error], in->mem.free - p, p); 2059 2060 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2061 #endif 2062 2063 nxt_router_conf_error(task, tmcf); 2064 } 2065 2066 2067 #if (NXT_TLS) 2068 2069 static void 2070 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2071 nxt_router_tlssock_t *tls) 2072 { 2073 nxt_socket_rpc_t *rpc; 2074 2075 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2076 if (rpc == NULL) { 2077 nxt_router_conf_error(task, tmcf); 2078 return; 2079 } 2080 2081 rpc->socket_conf = tls->conf; 2082 rpc->temp_conf = tmcf; 2083 2084 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 2085 nxt_router_tls_rpc_handler, rpc); 2086 } 2087 2088 2089 static void 2090 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2091 void *data) 2092 { 2093 nxt_mp_t *mp; 2094 nxt_int_t ret; 2095 nxt_tls_conf_t *tlscf; 2096 nxt_socket_rpc_t *rpc; 2097 nxt_router_temp_conf_t *tmcf; 2098 2099 nxt_debug(task, "tls rpc handler"); 2100 2101 rpc = data; 2102 tmcf = rpc->temp_conf; 2103 2104 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2105 goto fail; 2106 } 2107 2108 mp = tmcf->router_conf->mem_pool; 2109 2110 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2111 if (nxt_slow_path(tlscf == NULL)) { 2112 goto fail; 2113 } 2114 2115 tlscf->chain_file = msg->fd; 2116 2117 ret = task->thread->runtime->tls->server_init(task, tlscf); 2118 if (nxt_slow_path(ret != NXT_OK)) { 2119 goto fail; 2120 } 2121 2122 rpc->socket_conf->tls = tlscf; 2123 2124 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2125 nxt_router_conf_apply, task, tmcf, NULL); 2126 return; 2127 2128 fail: 2129 2130 nxt_router_conf_error(task, tmcf); 2131 } 2132 2133 #endif 2134 2135 2136 static void 2137 nxt_router_app_rpc_create(nxt_task_t *task, 2138 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2139 { 2140 size_t size; 2141 uint32_t stream; 2142 nxt_int_t ret; 2143 nxt_buf_t *b; 2144 nxt_port_t *main_port, *router_port; 2145 nxt_runtime_t *rt; 2146 nxt_app_rpc_t *rpc; 2147 2148 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2149 if (rpc == NULL) { 2150 goto fail; 2151 } 2152 2153 rpc->app = app; 2154 rpc->temp_conf = tmcf; 2155 2156 nxt_debug(task, "app '%V' prefork", &app->name); 2157 2158 size = app->name.length + 1 + app->conf.length; 2159 2160 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2161 if (nxt_slow_path(b == NULL)) { 2162 goto fail; 2163 } 2164 2165 nxt_buf_cpystr(b, &app->name); 2166 *b->mem.free++ = '\0'; 2167 nxt_buf_cpystr(b, &app->conf); 2168 2169 rt = task->thread->runtime; 2170 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2171 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2172 2173 stream = nxt_port_rpc_register_handler(task, router_port, 2174 nxt_router_app_prefork_ready, 2175 nxt_router_app_prefork_error, 2176 -1, rpc); 2177 if (nxt_slow_path(stream == 0)) { 2178 goto fail; 2179 } 2180 2181 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 2182 stream, router_port->id, b); 2183 2184 if (nxt_slow_path(ret != NXT_OK)) { 2185 nxt_port_rpc_cancel(task, router_port, stream); 2186 goto fail; 2187 } 2188 2189 app->pending_processes++; 2190 2191 return; 2192 2193 fail: 2194 2195 nxt_router_conf_error(task, tmcf); 2196 } 2197 2198 2199 static void 2200 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2201 void *data) 2202 { 2203 nxt_app_t *app; 2204 nxt_port_t *port; 2205 nxt_app_rpc_t *rpc; 2206 nxt_event_engine_t *engine; 2207 2208 rpc = data; 2209 app = rpc->app; 2210 2211 port = msg->u.new_port; 2212 port->app = app; 2213 2214 app->pending_processes--; 2215 app->processes++; 2216 app->idle_processes++; 2217 2218 engine = task->thread->engine; 2219 2220 nxt_queue_insert_tail(&app->ports, &port->app_link); 2221 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2222 2223 port->idle_start = 0; 2224 2225 nxt_port_inc_use(port); 2226 2227 nxt_work_queue_add(&engine->fast_work_queue, 2228 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2229 } 2230 2231 2232 static void 2233 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2234 void *data) 2235 { 2236 nxt_app_t *app; 2237 nxt_app_rpc_t *rpc; 2238 nxt_router_temp_conf_t *tmcf; 2239 2240 rpc = data; 2241 app = rpc->app; 2242 tmcf = rpc->temp_conf; 2243 2244 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2245 &app->name); 2246 2247 app->pending_processes--; 2248 2249 nxt_router_conf_error(task, tmcf); 2250 } 2251 2252 2253 static nxt_int_t 2254 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2255 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2256 { 2257 nxt_int_t ret; 2258 nxt_uint_t n, threads; 2259 nxt_queue_link_t *qlk; 2260 nxt_router_engine_conf_t *recf; 2261 2262 threads = tmcf->router_conf->threads; 2263 2264 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2265 sizeof(nxt_router_engine_conf_t)); 2266 if (nxt_slow_path(tmcf->engines == NULL)) { 2267 return NXT_ERROR; 2268 } 2269 2270 n = 0; 2271 2272 for (qlk = nxt_queue_first(&router->engines); 2273 qlk != nxt_queue_tail(&router->engines); 2274 qlk = nxt_queue_next(qlk)) 2275 { 2276 recf = nxt_array_zero_add(tmcf->engines); 2277 if (nxt_slow_path(recf == NULL)) { 2278 return NXT_ERROR; 2279 } 2280 2281 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2282 2283 if (n < threads) { 2284 recf->action = NXT_ROUTER_ENGINE_KEEP; 2285 ret = nxt_router_engine_conf_update(tmcf, recf); 2286 2287 } else { 2288 recf->action = NXT_ROUTER_ENGINE_DELETE; 2289 ret = nxt_router_engine_conf_delete(tmcf, recf); 2290 } 2291 2292 if (nxt_slow_path(ret != NXT_OK)) { 2293 return ret; 2294 } 2295 2296 n++; 2297 } 2298 2299 tmcf->new_threads = n; 2300 2301 while (n < threads) { 2302 recf = nxt_array_zero_add(tmcf->engines); 2303 if (nxt_slow_path(recf == NULL)) { 2304 return NXT_ERROR; 2305 } 2306 2307 recf->action = NXT_ROUTER_ENGINE_ADD; 2308 2309 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2310 if (nxt_slow_path(recf->engine == NULL)) { 2311 return NXT_ERROR; 2312 } 2313 2314 ret = nxt_router_engine_conf_create(tmcf, recf); 2315 if (nxt_slow_path(ret != NXT_OK)) { 2316 return ret; 2317 } 2318 2319 n++; 2320 } 2321 2322 return NXT_OK; 2323 } 2324 2325 2326 static nxt_int_t 2327 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2328 nxt_router_engine_conf_t *recf) 2329 { 2330 nxt_int_t ret; 2331 2332 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2333 nxt_router_listen_socket_create); 2334 if (nxt_slow_path(ret != NXT_OK)) { 2335 return ret; 2336 } 2337 2338 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2339 nxt_router_listen_socket_create); 2340 if (nxt_slow_path(ret != NXT_OK)) { 2341 return ret; 2342 } 2343 2344 return ret; 2345 } 2346 2347 2348 static nxt_int_t 2349 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2350 nxt_router_engine_conf_t *recf) 2351 { 2352 nxt_int_t ret; 2353 2354 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2355 nxt_router_listen_socket_create); 2356 if (nxt_slow_path(ret != NXT_OK)) { 2357 return ret; 2358 } 2359 2360 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2361 nxt_router_listen_socket_update); 2362 if (nxt_slow_path(ret != NXT_OK)) { 2363 return ret; 2364 } 2365 2366 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2367 if (nxt_slow_path(ret != NXT_OK)) { 2368 return ret; 2369 } 2370 2371 return ret; 2372 } 2373 2374 2375 static nxt_int_t 2376 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2377 nxt_router_engine_conf_t *recf) 2378 { 2379 nxt_int_t ret; 2380 2381 ret = nxt_router_engine_quit(tmcf, recf); 2382 if (nxt_slow_path(ret != NXT_OK)) { 2383 return ret; 2384 } 2385 2386 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); 2387 if (nxt_slow_path(ret != NXT_OK)) { 2388 return ret; 2389 } 2390 2391 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2392 } 2393 2394 2395 static nxt_int_t 2396 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 2397 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 2398 nxt_work_handler_t handler) 2399 { 2400 nxt_joint_job_t *job; 2401 nxt_queue_link_t *qlk; 2402 nxt_socket_conf_t *skcf; 2403 nxt_socket_conf_joint_t *joint; 2404 2405 for (qlk = nxt_queue_first(sockets); 2406 qlk != nxt_queue_tail(sockets); 2407 qlk = nxt_queue_next(qlk)) 2408 { 2409 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2410 if (nxt_slow_path(job == NULL)) { 2411 return NXT_ERROR; 2412 } 2413 2414 job->work.next = recf->jobs; 2415 recf->jobs = &job->work; 2416 2417 job->task = tmcf->engine->task; 2418 job->work.handler = handler; 2419 job->work.task = &job->task; 2420 job->work.obj = job; 2421 job->tmcf = tmcf; 2422 2423 tmcf->count++; 2424 2425 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 2426 sizeof(nxt_socket_conf_joint_t)); 2427 if (nxt_slow_path(joint == NULL)) { 2428 return NXT_ERROR; 2429 } 2430 2431 job->work.data = joint; 2432 2433 joint->count = 1; 2434 2435 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2436 skcf->count++; 2437 joint->socket_conf = skcf; 2438 2439 joint->engine = recf->engine; 2440 } 2441 2442 return NXT_OK; 2443 } 2444 2445 2446 static nxt_int_t 2447 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 2448 nxt_router_engine_conf_t *recf) 2449 { 2450 nxt_joint_job_t *job; 2451 2452 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2453 if (nxt_slow_path(job == NULL)) { 2454 return NXT_ERROR; 2455 } 2456 2457 job->work.next = recf->jobs; 2458 recf->jobs = &job->work; 2459 2460 job->task = tmcf->engine->task; 2461 job->work.handler = nxt_router_worker_thread_quit; 2462 job->work.task = &job->task; 2463 job->work.obj = NULL; 2464 job->work.data = NULL; 2465 job->tmcf = NULL; 2466 2467 return NXT_OK; 2468 } 2469 2470 2471 static nxt_int_t 2472 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 2473 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 2474 { 2475 nxt_joint_job_t *job; 2476 nxt_queue_link_t *qlk; 2477 2478 for (qlk = nxt_queue_first(sockets); 2479 qlk != nxt_queue_tail(sockets); 2480 qlk = nxt_queue_next(qlk)) 2481 { 2482 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2483 if (nxt_slow_path(job == NULL)) { 2484 return NXT_ERROR; 2485 } 2486 2487 job->work.next = recf->jobs; 2488 recf->jobs = &job->work; 2489 2490 job->task = tmcf->engine->task; 2491 job->work.handler = nxt_router_listen_socket_delete; 2492 job->work.task = &job->task; 2493 job->work.obj = job; 2494 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2495 job->tmcf = tmcf; 2496 2497 tmcf->count++; 2498 } 2499 2500 return NXT_OK; 2501 } 2502 2503 2504 static nxt_int_t 2505 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 2506 nxt_router_temp_conf_t *tmcf) 2507 { 2508 nxt_int_t ret; 2509 nxt_uint_t i, threads; 2510 nxt_router_engine_conf_t *recf; 2511 2512 recf = tmcf->engines->elts; 2513 threads = tmcf->router_conf->threads; 2514 2515 for (i = tmcf->new_threads; i < threads; i++) { 2516 ret = nxt_router_thread_create(task, rt, recf[i].engine); 2517 if (nxt_slow_path(ret != NXT_OK)) { 2518 return ret; 2519 } 2520 } 2521 2522 return NXT_OK; 2523 } 2524 2525 2526 static nxt_int_t 2527 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 2528 nxt_event_engine_t *engine) 2529 { 2530 nxt_int_t ret; 2531 nxt_thread_link_t *link; 2532 nxt_thread_handle_t handle; 2533 2534 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2535 2536 if (nxt_slow_path(link == NULL)) { 2537 return NXT_ERROR; 2538 } 2539 2540 link->start = nxt_router_thread_start; 2541 link->engine = engine; 2542 link->work.handler = nxt_router_thread_exit_handler; 2543 link->work.task = task; 2544 link->work.data = link; 2545 2546 nxt_queue_insert_tail(&rt->engines, &engine->link); 2547 2548 ret = nxt_thread_create(&handle, link); 2549 2550 if (nxt_slow_path(ret != NXT_OK)) { 2551 nxt_queue_remove(&engine->link); 2552 } 2553 2554 return ret; 2555 } 2556 2557 2558 static void 2559 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 2560 nxt_router_temp_conf_t *tmcf) 2561 { 2562 nxt_app_t *app; 2563 2564 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 2565 2566 nxt_router_app_unlink(task, app); 2567 2568 } nxt_queue_loop; 2569 2570 nxt_queue_add(&router->apps, &tmcf->previous); 2571 nxt_queue_add(&router->apps, &tmcf->apps); 2572 } 2573 2574 2575 static void 2576 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 2577 { 2578 nxt_uint_t n; 2579 nxt_event_engine_t *engine; 2580 nxt_router_engine_conf_t *recf; 2581 2582 recf = tmcf->engines->elts; 2583 2584 for (n = tmcf->engines->nelts; n != 0; n--) { 2585 engine = recf->engine; 2586 2587 switch (recf->action) { 2588 2589 case NXT_ROUTER_ENGINE_KEEP: 2590 break; 2591 2592 case NXT_ROUTER_ENGINE_ADD: 2593 nxt_queue_insert_tail(&router->engines, &engine->link0); 2594 break; 2595 2596 case NXT_ROUTER_ENGINE_DELETE: 2597 nxt_queue_remove(&engine->link0); 2598 break; 2599 } 2600 2601 nxt_router_engine_post(engine, recf->jobs); 2602 2603 recf++; 2604 } 2605 } 2606 2607 2608 static void 2609 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 2610 { 2611 nxt_work_t *work, *next; 2612 2613 for (work = jobs; work != NULL; work = next) { 2614 next = work->next; 2615 work->next = NULL; 2616 2617 nxt_event_engine_post(engine, work); 2618 } 2619 } 2620 2621 2622 static nxt_port_handlers_t nxt_router_app_port_handlers = { 2623 .rpc_error = nxt_port_rpc_handler, 2624 .mmap = nxt_port_mmap_handler, 2625 .data = nxt_port_rpc_handler, 2626 }; 2627 2628 2629 static void 2630 nxt_router_thread_start(void *data) 2631 { 2632 nxt_int_t ret; 2633 nxt_port_t *port; 2634 nxt_task_t *task; 2635 nxt_thread_t *thread; 2636 nxt_thread_link_t *link; 2637 nxt_event_engine_t *engine; 2638 2639 link = data; 2640 engine = link->engine; 2641 task = &engine->task; 2642 2643 thread = nxt_thread(); 2644 2645 nxt_event_engine_thread_adopt(engine); 2646 2647 /* STUB */ 2648 thread->runtime = engine->task.thread->runtime; 2649 2650 engine->task.thread = thread; 2651 engine->task.log = thread->log; 2652 thread->engine = engine; 2653 thread->task = &engine->task; 2654 #if 0 2655 thread->fiber = &engine->fibers->fiber; 2656 #endif 2657 2658 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 2659 if (nxt_slow_path(engine->mem_pool == NULL)) { 2660 return; 2661 } 2662 2663 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 2664 NXT_PROCESS_ROUTER); 2665 if (nxt_slow_path(port == NULL)) { 2666 return; 2667 } 2668 2669 ret = nxt_port_socket_init(task, port, 0); 2670 if (nxt_slow_path(ret != NXT_OK)) { 2671 nxt_port_use(task, port, -1); 2672 return; 2673 } 2674 2675 engine->port = port; 2676 2677 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 2678 2679 nxt_event_engine_start(engine); 2680 } 2681 2682 2683 static void 2684 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 2685 { 2686 nxt_joint_job_t *job; 2687 nxt_socket_conf_t *skcf; 2688 nxt_listen_event_t *lev; 2689 nxt_listen_socket_t *ls; 2690 nxt_thread_spinlock_t *lock; 2691 nxt_socket_conf_joint_t *joint; 2692 2693 job = obj; 2694 joint = data; 2695 2696 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 2697 2698 skcf = joint->socket_conf; 2699 ls = skcf->listen; 2700 2701 lev = nxt_listen_event(task, ls); 2702 if (nxt_slow_path(lev == NULL)) { 2703 nxt_router_listen_socket_release(task, skcf); 2704 return; 2705 } 2706 2707 lev->socket.data = joint; 2708 2709 lock = &skcf->router_conf->router->lock; 2710 2711 nxt_thread_spin_lock(lock); 2712 ls->count++; 2713 nxt_thread_spin_unlock(lock); 2714 2715 job->work.next = NULL; 2716 job->work.handler = nxt_router_conf_wait; 2717 2718 nxt_event_engine_post(job->tmcf->engine, &job->work); 2719 } 2720 2721 2722 nxt_inline nxt_listen_event_t * 2723 nxt_router_listen_event(nxt_queue_t *listen_connections, 2724 nxt_socket_conf_t *skcf) 2725 { 2726 nxt_socket_t fd; 2727 nxt_queue_link_t *qlk; 2728 nxt_listen_event_t *lev; 2729 2730 fd = skcf->listen->socket; 2731 2732 for (qlk = nxt_queue_first(listen_connections); 2733 qlk != nxt_queue_tail(listen_connections); 2734 qlk = nxt_queue_next(qlk)) 2735 { 2736 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 2737 2738 if (fd == lev->socket.fd) { 2739 return lev; 2740 } 2741 } 2742 2743 return NULL; 2744 } 2745 2746 2747 static void 2748 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 2749 { 2750 nxt_joint_job_t *job; 2751 nxt_event_engine_t *engine; 2752 nxt_listen_event_t *lev; 2753 nxt_socket_conf_joint_t *joint, *old; 2754 2755 job = obj; 2756 joint = data; 2757 2758 engine = task->thread->engine; 2759 2760 nxt_queue_insert_tail(&engine->joints, &joint->link); 2761 2762 lev = nxt_router_listen_event(&engine->listen_connections, 2763 joint->socket_conf); 2764 2765 old = lev->socket.data; 2766 lev->socket.data = joint; 2767 lev->listen = joint->socket_conf->listen; 2768 2769 job->work.next = NULL; 2770 job->work.handler = nxt_router_conf_wait; 2771 2772 nxt_event_engine_post(job->tmcf->engine, &job->work); 2773 2774 /* 2775 * The task is allocated from configuration temporary 2776 * memory pool so it can be freed after engine post operation. 2777 */ 2778 2779 nxt_router_conf_release(&engine->task, old); 2780 } 2781 2782 2783 static void 2784 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 2785 { 2786 nxt_joint_job_t *job; 2787 nxt_socket_conf_t *skcf; 2788 nxt_listen_event_t *lev; 2789 nxt_event_engine_t *engine; 2790 2791 job = obj; 2792 skcf = data; 2793 2794 engine = task->thread->engine; 2795 2796 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 2797 2798 nxt_fd_event_delete(engine, &lev->socket); 2799 2800 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 2801 lev->socket.fd); 2802 2803 lev->timer.handler = nxt_router_listen_socket_close; 2804 lev->timer.work_queue = &engine->fast_work_queue; 2805 2806 nxt_timer_add(engine, &lev->timer, 0); 2807 2808 job->work.next = NULL; 2809 job->work.handler = nxt_router_conf_wait; 2810 2811 nxt_event_engine_post(job->tmcf->engine, &job->work); 2812 } 2813 2814 2815 static void 2816 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 2817 { 2818 nxt_event_engine_t *engine; 2819 2820 nxt_debug(task, "router worker thread quit"); 2821 2822 engine = task->thread->engine; 2823 2824 engine->shutdown = 1; 2825 2826 if (nxt_queue_is_empty(&engine->joints)) { 2827 nxt_thread_exit(task->thread); 2828 } 2829 } 2830 2831 2832 static void 2833 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 2834 { 2835 nxt_timer_t *timer; 2836 nxt_listen_event_t *lev; 2837 nxt_socket_conf_joint_t *joint; 2838 2839 timer = obj; 2840 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 2841 2842 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 2843 lev->socket.fd); 2844 2845 nxt_queue_remove(&lev->link); 2846 2847 joint = lev->socket.data; 2848 lev->socket.data = NULL; 2849 2850 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 2851 task = &task->thread->engine->task; 2852 2853 nxt_router_listen_socket_release(task, joint->socket_conf); 2854 2855 nxt_router_listen_event_release(task, lev, joint); 2856 } 2857 2858 2859 static void 2860 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 2861 { 2862 nxt_listen_socket_t *ls; 2863 nxt_thread_spinlock_t *lock; 2864 2865 ls = skcf->listen; 2866 lock = &skcf->router_conf->router->lock; 2867 2868 nxt_thread_spin_lock(lock); 2869 2870 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 2871 task->thread->engine, ls->count); 2872 2873 if (--ls->count != 0) { 2874 ls = NULL; 2875 } 2876 2877 nxt_thread_spin_unlock(lock); 2878 2879 if (ls != NULL) { 2880 nxt_socket_close(task, ls->socket); 2881 nxt_free(ls); 2882 } 2883 } 2884 2885 2886 void 2887 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 2888 nxt_socket_conf_joint_t *joint) 2889 { 2890 nxt_event_engine_t *engine; 2891 2892 nxt_debug(task, "listen event count: %D", lev->count); 2893 2894 if (--lev->count == 0) { 2895 nxt_free(lev); 2896 } 2897 2898 if (joint != NULL) { 2899 nxt_router_conf_release(task, joint); 2900 } 2901 2902 engine = task->thread->engine; 2903 2904 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 2905 nxt_thread_exit(task->thread); 2906 } 2907 } 2908 2909 2910 void 2911 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 2912 { 2913 nxt_socket_conf_t *skcf; 2914 nxt_router_conf_t *rtcf; 2915 nxt_thread_spinlock_t *lock; 2916 2917 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 2918 2919 if (--joint->count != 0) { 2920 return; 2921 } 2922 2923 nxt_queue_remove(&joint->link); 2924 2925 /* 2926 * The joint content can not be safely used after the critical 2927 * section protected by the spinlock because its memory pool may 2928 * be already destroyed by another thread. 2929 */ 2930 skcf = joint->socket_conf; 2931 rtcf = skcf->router_conf; 2932 lock = &rtcf->router->lock; 2933 2934 nxt_thread_spin_lock(lock); 2935 2936 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 2937 rtcf, rtcf->count); 2938 2939 if (--skcf->count != 0) { 2940 skcf = NULL; 2941 rtcf = NULL; 2942 2943 } else { 2944 nxt_queue_remove(&skcf->link); 2945 2946 if (--rtcf->count != 0) { 2947 rtcf = NULL; 2948 } 2949 } 2950 2951 nxt_thread_spin_unlock(lock); 2952 2953 if (skcf != NULL) { 2954 if (skcf->pass != NULL) { 2955 nxt_http_pass_cleanup(task, skcf->pass); 2956 } 2957 2958 #if (NXT_TLS) 2959 if (skcf->tls != NULL) { 2960 task->thread->runtime->tls->server_free(task, skcf->tls); 2961 } 2962 #endif 2963 } 2964 2965 /* TODO remove engine->port */ 2966 /* TODO excude from connected ports */ 2967 2968 if (rtcf != NULL) { 2969 nxt_debug(task, "old router conf is destroyed"); 2970 2971 nxt_http_routes_cleanup(task, rtcf->routes); 2972 2973 nxt_router_access_log_release(task, lock, rtcf->access_log); 2974 2975 nxt_mp_thread_adopt(rtcf->mem_pool); 2976 2977 nxt_mp_destroy(rtcf->mem_pool); 2978 } 2979 } 2980 2981 2982 static void 2983 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 2984 nxt_router_access_log_t *access_log) 2985 { 2986 size_t size; 2987 u_char *buf, *p; 2988 nxt_off_t bytes; 2989 2990 static nxt_time_string_t date_cache = { 2991 (nxt_atomic_uint_t) -1, 2992 nxt_router_access_log_date, 2993 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 2994 nxt_length("31/Dec/1986:19:40:00 +0300"), 2995 NXT_THREAD_TIME_LOCAL, 2996 NXT_THREAD_TIME_SEC, 2997 }; 2998 2999 size = r->remote->address_length 3000 + 6 /* ' - - [' */ 3001 + date_cache.size 3002 + 3 /* '] "' */ 3003 + r->method->length 3004 + 1 /* space */ 3005 + r->target.length 3006 + 1 /* space */ 3007 + r->version.length 3008 + 2 /* '" ' */ 3009 + 3 /* status */ 3010 + 1 /* space */ 3011 + NXT_OFF_T_LEN 3012 + 2 /* ' "' */ 3013 + (r->referer != NULL ? r->referer->value_length : 1) 3014 + 3 /* '" "' */ 3015 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3016 + 2 /* '"\n' */ 3017 ; 3018 3019 buf = nxt_mp_nget(r->mem_pool, size); 3020 if (nxt_slow_path(buf == NULL)) { 3021 return; 3022 } 3023 3024 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3025 r->remote->address_length); 3026 3027 p = nxt_cpymem(p, " - - [", 6); 3028 3029 p = nxt_thread_time_string(task->thread, &date_cache, p); 3030 3031 p = nxt_cpymem(p, "] \"", 3); 3032 3033 if (r->method->length != 0) { 3034 p = nxt_cpymem(p, r->method->start, r->method->length); 3035 3036 if (r->target.length != 0) { 3037 *p++ = ' '; 3038 p = nxt_cpymem(p, r->target.start, r->target.length); 3039 3040 if (r->version.length != 0) { 3041 *p++ = ' '; 3042 p = nxt_cpymem(p, r->version.start, r->version.length); 3043 } 3044 } 3045 3046 } else { 3047 *p++ = '-'; 3048 } 3049 3050 p = nxt_cpymem(p, "\" ", 2); 3051 3052 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3053 3054 *p++ = ' '; 3055 3056 bytes = nxt_http_proto_body_bytes_sent[r->protocol](task, r->proto); 3057 3058 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3059 3060 p = nxt_cpymem(p, " \"", 2); 3061 3062 if (r->referer != NULL) { 3063 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3064 3065 } else { 3066 *p++ = '-'; 3067 } 3068 3069 p = nxt_cpymem(p, "\" \"", 3); 3070 3071 if (r->user_agent != NULL) { 3072 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3073 3074 } else { 3075 *p++ = '-'; 3076 } 3077 3078 p = nxt_cpymem(p, "\"\n", 2); 3079 3080 nxt_fd_write(access_log->fd, buf, p - buf); 3081 } 3082 3083 3084 static u_char * 3085 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3086 size_t size, const char *format) 3087 { 3088 u_char sign; 3089 time_t gmtoff; 3090 3091 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3092 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3093 3094 gmtoff = nxt_timezone(tm) / 60; 3095 3096 if (gmtoff < 0) { 3097 gmtoff = -gmtoff; 3098 sign = '-'; 3099 3100 } else { 3101 sign = '+'; 3102 } 3103 3104 return nxt_sprintf(buf, buf + size, format, 3105 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3106 tm->tm_hour, tm->tm_min, tm->tm_sec, 3107 sign, gmtoff / 60, gmtoff % 60); 3108 } 3109 3110 3111 static void 3112 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3113 { 3114 uint32_t stream; 3115 nxt_int_t ret; 3116 nxt_buf_t *b; 3117 nxt_port_t *main_port, *router_port; 3118 nxt_runtime_t *rt; 3119 nxt_router_access_log_t *access_log; 3120 3121 access_log = tmcf->router_conf->access_log; 3122 3123 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3124 if (nxt_slow_path(b == NULL)) { 3125 goto fail; 3126 } 3127 3128 nxt_buf_cpystr(b, &access_log->path); 3129 *b->mem.free++ = '\0'; 3130 3131 rt = task->thread->runtime; 3132 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3133 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3134 3135 stream = nxt_port_rpc_register_handler(task, router_port, 3136 nxt_router_access_log_ready, 3137 nxt_router_access_log_error, 3138 -1, tmcf); 3139 if (nxt_slow_path(stream == 0)) { 3140 goto fail; 3141 } 3142 3143 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3144 stream, router_port->id, b); 3145 3146 if (nxt_slow_path(ret != NXT_OK)) { 3147 nxt_port_rpc_cancel(task, router_port, stream); 3148 goto fail; 3149 } 3150 3151 return; 3152 3153 fail: 3154 3155 nxt_router_conf_error(task, tmcf); 3156 } 3157 3158 3159 static void 3160 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3161 void *data) 3162 { 3163 nxt_router_temp_conf_t *tmcf; 3164 nxt_router_access_log_t *access_log; 3165 3166 tmcf = data; 3167 3168 access_log = tmcf->router_conf->access_log; 3169 3170 access_log->fd = msg->fd; 3171 3172 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3173 nxt_router_conf_apply, task, tmcf, NULL); 3174 } 3175 3176 3177 static void 3178 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *