1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 19 typedef struct { 20 nxt_str_t type; 21 uint32_t processes; 22 uint32_t max_processes; 23 uint32_t spare_processes; 24 nxt_msec_t timeout; 25 nxt_msec_t res_timeout; 26 nxt_msec_t idle_timeout; 27 uint32_t requests; 28 nxt_conf_value_t *limits_value; 29 nxt_conf_value_t *processes_value; 30 } nxt_router_app_conf_t; 31 32 33 typedef struct { 34 nxt_str_t pass; 35 nxt_str_t application; 36 } nxt_router_listener_conf_t; 37 38 39 #if (NXT_TLS) 40 41 typedef struct { 42 nxt_str_t name; 43 nxt_socket_conf_t *conf; 44 45 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 46 } nxt_router_tlssock_t; 47 48 #endif 49 50 51 typedef struct { 52 nxt_socket_conf_t *socket_conf; 53 nxt_router_temp_conf_t *temp_conf; 54 } nxt_socket_rpc_t; 55 56 57 typedef struct { 58 nxt_app_t *app; 59 nxt_router_temp_conf_t *temp_conf; 60 } nxt_app_rpc_t; 61 62 63 struct nxt_port_select_state_s { 64 nxt_app_t *app; 65 nxt_request_app_link_t *req_app_link; 66 67 nxt_port_t *failed_port; 68 int failed_port_use_delta; 69 70 uint8_t start_process; /* 1 bit */ 71 nxt_request_app_link_t *shared_ra; 72 nxt_port_t *port; 73 }; 74 75 typedef struct nxt_port_select_state_s nxt_port_select_state_t; 76 77 static void nxt_router_greet_controller(nxt_task_t *task, 78 nxt_port_t *controller_port); 79 80 static void nxt_router_port_select(nxt_task_t *task, 81 nxt_port_select_state_t *state); 82 83 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, 84 nxt_port_select_state_t *state); 85 86 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 87 static void nxt_request_app_link_update_peer(nxt_task_t *task, 88 nxt_request_app_link_t *req_app_link); 89 90 91 nxt_inline void 92 nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) 93 { 94 nxt_atomic_fetch_add(&req_app_link->use_count, 1); 95 } 96 97 nxt_inline void 98 nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link) 99 { 100 #if (NXT_DEBUG) 101 int c; 102 103 c = nxt_atomic_fetch_add(&req_app_link->use_count, -1); 104 105 nxt_assert(c > 1); 106 #else 107 (void) nxt_atomic_fetch_add(&req_app_link->use_count, -1); 108 #endif 109 } 110 111 static void nxt_request_app_link_use(nxt_task_t *task, 112 nxt_request_app_link_t *req_app_link, int i); 113 114 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 115 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 116 static void nxt_router_conf_ready(nxt_task_t *task, 117 nxt_router_temp_conf_t *tmcf); 118 static void nxt_router_conf_error(nxt_task_t *task, 119 nxt_router_temp_conf_t *tmcf); 120 static void nxt_router_conf_send(nxt_task_t *task, 121 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 122 123 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 124 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 125 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 126 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 127 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 128 static void nxt_router_listen_socket_ready(nxt_task_t *task, 129 nxt_port_recv_msg_t *msg, void *data); 130 static void nxt_router_listen_socket_error(nxt_task_t *task, 131 nxt_port_recv_msg_t *msg, void *data); 132 #if (NXT_TLS) 133 static void nxt_router_tls_rpc_create(nxt_task_t *task, 134 nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls); 135 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 136 nxt_port_recv_msg_t *msg, void *data); 137 #endif 138 static void nxt_router_app_rpc_create(nxt_task_t *task, 139 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 140 static void nxt_router_app_prefork_ready(nxt_task_t *task, 141 nxt_port_recv_msg_t *msg, void *data); 142 static void nxt_router_app_prefork_error(nxt_task_t *task, 143 nxt_port_recv_msg_t *msg, void *data); 144 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 145 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 146 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 147 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 148 149 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 150 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 151 const nxt_event_interface_t *interface); 152 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 153 nxt_router_engine_conf_t *recf); 154 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 155 nxt_router_engine_conf_t *recf); 156 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 157 nxt_router_engine_conf_t *recf); 158 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 159 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 160 nxt_work_handler_t handler); 161 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 162 nxt_router_engine_conf_t *recf); 163 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 164 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 165 166 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 167 nxt_router_temp_conf_t *tmcf); 168 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 169 nxt_event_engine_t *engine); 170 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 171 nxt_router_temp_conf_t *tmcf); 172 173 static void nxt_router_engines_post(nxt_router_t *router, 174 nxt_router_temp_conf_t *tmcf); 175 static void nxt_router_engine_post(nxt_event_engine_t *engine, 176 nxt_work_t *jobs); 177 178 static void nxt_router_thread_start(void *data); 179 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 180 void *data); 181 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 182 void *data); 183 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 184 void *data); 185 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 186 void *data); 187 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 188 void *data); 189 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 190 void *data); 191 static void nxt_router_listen_socket_release(nxt_task_t *task, 192 nxt_socket_conf_t *skcf); 193 194 static void nxt_router_access_log_writer(nxt_task_t *task, 195 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 196 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 197 struct tm *tm, size_t size, const char *format); 198 static void nxt_router_access_log_open(nxt_task_t *task, 199 nxt_router_temp_conf_t *tmcf); 200 static void nxt_router_access_log_ready(nxt_task_t *task, 201 nxt_port_recv_msg_t *msg, void *data); 202 static void nxt_router_access_log_error(nxt_task_t *task, 203 nxt_port_recv_msg_t *msg, void *data); 204 static void nxt_router_access_log_release(nxt_task_t *task, 205 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 206 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 207 void *data); 208 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 209 nxt_port_recv_msg_t *msg, void *data); 210 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 211 nxt_port_recv_msg_t *msg, void *data); 212 213 static void nxt_router_app_port_ready(nxt_task_t *task, 214 nxt_port_recv_msg_t *msg, void *data); 215 static void nxt_router_app_port_error(nxt_task_t *task, 216 nxt_port_recv_msg_t *msg, void *data); 217 218 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 219 220 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 221 nxt_apr_action_t action); 222 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, 223 nxt_request_app_link_t *req_app_link); 224 225 static void nxt_router_app_prepare_request(nxt_task_t *task, 226 nxt_request_app_link_t *req_app_link); 227 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 228 nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix); 229 230 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 231 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 232 void *data); 233 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 234 void *data); 235 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 236 void *data); 237 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 238 239 static const nxt_http_request_state_t nxt_http_request_send_state; 240 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 241 242 static void nxt_router_app_joint_use(nxt_task_t *task, 243 nxt_app_joint_t *app_joint, int i); 244 245 static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, 246 nxt_http_request_t *r); 247 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 248 void *data); 249 250 const nxt_http_request_state_t nxt_http_websocket; 251 252 static nxt_router_t *nxt_router; 253 254 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 255 static const nxt_str_t empty_prefix = nxt_string(""); 256 257 static const nxt_str_t *nxt_app_msg_prefix[] = { 258 &empty_prefix, 259 &http_prefix, 260 &http_prefix, 261 &http_prefix, 262 &http_prefix, 263 &empty_prefix, 264 }; 265 266 267 nxt_port_handlers_t nxt_router_process_port_handlers = { 268 .quit = nxt_worker_process_quit_handler, 269 .new_port = nxt_router_new_port_handler, 270 .change_file = nxt_port_change_log_file_handler, 271 .mmap = nxt_port_mmap_handler, 272 .data = nxt_router_conf_data_handler, 273 .remove_pid = nxt_router_remove_pid_handler, 274 .access_log = nxt_router_access_log_reopen_handler, 275 .rpc_ready = nxt_port_rpc_handler, 276 .rpc_error = nxt_port_rpc_handler, 277 }; 278 279 280 nxt_int_t 281 nxt_router_start(nxt_task_t *task, void *data) 282 { 283 nxt_int_t ret; 284 nxt_port_t *controller_port; 285 nxt_router_t *router; 286 nxt_runtime_t *rt; 287 288 rt = task->thread->runtime; 289 290 #if (NXT_TLS) 291 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 292 if (nxt_slow_path(rt->tls == NULL)) { 293 return NXT_ERROR; 294 } 295 296 ret = rt->tls->library_init(task); 297 if (nxt_slow_path(ret != NXT_OK)) { 298 return ret; 299 } 300 #endif 301 302 ret = nxt_http_init(task, rt); 303 if (nxt_slow_path(ret != NXT_OK)) { 304 return ret; 305 } 306 307 router = nxt_zalloc(sizeof(nxt_router_t)); 308 if (nxt_slow_path(router == NULL)) { 309 return NXT_ERROR; 310 } 311 312 nxt_queue_init(&router->engines); 313 nxt_queue_init(&router->sockets); 314 nxt_queue_init(&router->apps); 315 316 nxt_router = router; 317 318 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 319 if (controller_port != NULL) { 320 nxt_router_greet_controller(task, controller_port); 321 } 322 323 return NXT_OK; 324 } 325 326 327 static void 328 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 329 { 330 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 331 -1, 0, 0, NULL); 332 } 333 334 335 static void 336 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 337 void *data) 338 { 339 size_t size; 340 uint32_t stream; 341 nxt_mp_t *mp; 342 nxt_int_t ret; 343 nxt_app_t *app; 344 nxt_buf_t *b; 345 nxt_port_t *main_port; 346 nxt_runtime_t *rt; 347 348 app = data; 349 350 rt = task->thread->runtime; 351 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 352 353 nxt_debug(task, "app '%V' %p start process", &app->name, app); 354 355 size = app->name.length + 1 + app->conf.length; 356 357 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 358 359 if (nxt_slow_path(b == NULL)) { 360 goto failed; 361 } 362 363 nxt_buf_cpystr(b, &app->name); 364 *b->mem.free++ = '\0'; 365 nxt_buf_cpystr(b, &app->conf); 366 367 nxt_router_app_joint_use(task, app->joint, 1); 368 369 stream = nxt_port_rpc_register_handler(task, port, 370 nxt_router_app_port_ready, 371 nxt_router_app_port_error, 372 -1, app->joint); 373 374 if (nxt_slow_path(stream == 0)) { 375 nxt_router_app_joint_use(task, app->joint, -1); 376 377 goto failed; 378 } 379 380 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 381 stream, port->id, b); 382 383 if (nxt_slow_path(ret != NXT_OK)) { 384 nxt_port_rpc_cancel(task, port, stream); 385 386 nxt_router_app_joint_use(task, app->joint, -1); 387 388 goto failed; 389 } 390 391 nxt_router_app_use(task, app, -1); 392 393 return; 394 395 failed: 396 397 if (b != NULL) { 398 mp = b->data; 399 nxt_mp_free(mp, b); 400 nxt_mp_release(mp); 401 } 402 403 nxt_thread_mutex_lock(&app->mutex); 404 405 app->pending_processes--; 406 407 nxt_thread_mutex_unlock(&app->mutex); 408 409 nxt_router_app_use(task, app, -1); 410 } 411 412 413 static void 414 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 415 { 416 app_joint->use_count += i; 417 418 if (app_joint->use_count == 0) { 419 nxt_assert(app_joint->app == NULL); 420 421 nxt_free(app_joint); 422 } 423 } 424 425 426 static nxt_int_t 427 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 428 { 429 nxt_int_t res; 430 nxt_port_t *router_port; 431 nxt_runtime_t *rt; 432 433 rt = task->thread->runtime; 434 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 435 436 nxt_router_app_use(task, app, 1); 437 438 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 439 app); 440 441 if (res == NXT_OK) { 442 return res; 443 } 444 445 nxt_thread_mutex_lock(&app->mutex); 446 447 app->pending_processes--; 448 449 nxt_thread_mutex_unlock(&app->mutex); 450 451 nxt_router_app_use(task, app, -1); 452 453 return NXT_ERROR; 454 } 455 456 457 nxt_inline void 458 nxt_request_app_link_init(nxt_task_t *task, 459 nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data) 460 { 461 nxt_event_engine_t *engine; 462 463 engine = task->thread->engine; 464 465 nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t)); 466 467 req_app_link->stream = req_rpc_data->stream; 468 req_app_link->use_count = 1; 469 req_app_link->req_rpc_data = req_rpc_data; 470 req_rpc_data->req_app_link = req_app_link; 471 req_app_link->reply_port = engine->port; 472 req_app_link->request = req_rpc_data->request; 473 req_app_link->apr_action = NXT_APR_GOT_RESPONSE; 474 475 req_app_link->work.handler = NULL; 476 req_app_link->work.task = &engine->task; 477 req_app_link->work.obj = req_app_link; 478 req_app_link->work.data = engine; 479 } 480 481 482 nxt_inline nxt_request_app_link_t * 483 nxt_request_app_link_alloc(nxt_task_t *task, 484 nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data) 485 { 486 nxt_mp_t *mp; 487 nxt_request_app_link_t *req_app_link; 488 489 if (ra_src != NULL && ra_src->mem_pool != NULL) { 490 return ra_src; 491 } 492 493 mp = req_rpc_data->request->mem_pool; 494 495 req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t)); 496 497 if (nxt_slow_path(req_app_link == NULL)) { 498 499 req_rpc_data->req_app_link = NULL; 500 501 if (ra_src != NULL) { 502 ra_src->req_rpc_data = NULL; 503 } 504 505 return NULL; 506 } 507 508 nxt_mp_retain(mp); 509 510 nxt_request_app_link_init(task, req_app_link, req_rpc_data); 511 512 req_app_link->mem_pool = mp; 513 514 return req_app_link; 515 } 516 517 518 nxt_inline nxt_bool_t 519 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, 520 uint32_t stream) 521 { 522 nxt_buf_t *b, *next; 523 nxt_bool_t cancelled; 524 525 if (msg_info->buf == NULL) { 526 return 0; 527 } 528 529 cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, 530 stream); 531 532 if (cancelled) { 533 nxt_debug(task, "stream #%uD: cancelled by router", stream); 534 } 535 536 for (b = msg_info->buf; b != NULL; b = next) { 537 next = b->next; 538 539 b->completion_handler = msg_info->completion_handler; 540 541 if (b->is_port_mmap_sent) { 542 b->is_port_mmap_sent = cancelled == 0; 543 b->completion_handler(task, b, b->parent); 544 } 545 } 546 547 msg_info->buf = NULL; 548 549 return cancelled; 550 } 551 552 553 static void 554 nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj, 555 void *data) 556 { 557 nxt_request_app_link_t *req_app_link; 558 559 req_app_link = obj; 560 561 nxt_request_app_link_update_peer(task, req_app_link); 562 563 nxt_request_app_link_use(task, req_app_link, -1); 564 } 565 566 567 static void 568 nxt_request_app_link_update_peer(nxt_task_t *task, 569 nxt_request_app_link_t *req_app_link) 570 { 571 nxt_event_engine_t *engine; 572 nxt_request_rpc_data_t *req_rpc_data; 573 574 engine = req_app_link->work.data; 575 576 if (task->thread->engine != engine) { 577 nxt_request_app_link_inc_use(req_app_link); 578 579 req_app_link->work.handler = nxt_request_app_link_update_peer_handler; 580 req_app_link->work.task = &engine->task; 581 req_app_link->work.next = NULL; 582 583 nxt_debug(task, "req_app_link stream #%uD post update peer to %p", 584 req_app_link->stream, engine); 585 586 nxt_event_engine_post(engine, &req_app_link->work); 587 588 return; 589 } 590 591 nxt_debug(task, "req_app_link stream #%uD update peer", 592 req_app_link->stream); 593 594 req_rpc_data = req_app_link->req_rpc_data; 595 596 if (req_rpc_data != NULL && req_app_link->app_port != NULL) { 597 nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, 598 req_app_link->app_port->pid); 599 } 600 601 nxt_request_app_link_use(task, req_app_link, -1); 602 } 603 604 605 static void 606 nxt_request_app_link_release(nxt_task_t *task, 607 nxt_request_app_link_t *req_app_link) 608 { 609 nxt_mp_t *mp; 610 nxt_http_request_t *r; 611 nxt_request_rpc_data_t *req_rpc_data; 612 613 nxt_assert(task->thread->engine == req_app_link->work.data); 614 nxt_assert(req_app_link->use_count == 0); 615 616 nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream); 617 618 req_rpc_data = req_app_link->req_rpc_data; 619 620 if (req_rpc_data != NULL) { 621 if (nxt_slow_path(req_app_link->err_code != 0)) { 622 nxt_http_request_error(task, req_rpc_data->request, 623 req_app_link->err_code); 624 625 } else { 626 req_rpc_data->app_port = req_app_link->app_port; 627 req_rpc_data->apr_action = req_app_link->apr_action; 628 req_rpc_data->msg_info = req_app_link->msg_info; 629 630 if (req_rpc_data->app->timeout != 0) { 631 r = req_rpc_data->request; 632 633 r->timer.handler = nxt_router_app_timeout; 634 r->timer_data = req_rpc_data; 635 nxt_timer_add(task->thread->engine, &r->timer, 636 req_rpc_data->app->timeout); 637 } 638 639 req_app_link->app_port = NULL; 640 req_app_link->msg_info.buf = NULL; 641 } 642 643 req_rpc_data->req_app_link = NULL; 644 req_app_link->req_rpc_data = NULL; 645 } 646 647 if (req_app_link->app_port != NULL) { 648 nxt_router_app_port_release(task, req_app_link->app_port, 649 req_app_link->apr_action); 650 651 req_app_link->app_port = NULL; 652 } 653 654 nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); 655 656 mp = req_app_link->mem_pool; 657 658 if (mp != NULL) { 659 nxt_mp_free(mp, req_app_link); 660 nxt_mp_release(mp); 661 } 662 } 663 664 665 static void 666 nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data) 667 { 668 nxt_request_app_link_t *req_app_link; 669 670 req_app_link = obj; 671 672 nxt_assert(req_app_link->work.data == data); 673 674 nxt_atomic_fetch_add(&req_app_link->use_count, -1); 675 676 nxt_request_app_link_release(task, req_app_link); 677 } 678 679 680 static void 681 nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link, 682 int i) 683 { 684 int c; 685 nxt_event_engine_t *engine; 686 687 c = nxt_atomic_fetch_add(&req_app_link->use_count, i); 688 689 if (i < 0 && c == -i) { 690 engine = req_app_link->work.data; 691 692 if (task->thread->engine == engine) { 693 nxt_request_app_link_release(task, req_app_link); 694 695 return; 696 } 697 698 nxt_request_app_link_inc_use(req_app_link); 699 700 req_app_link->work.handler = nxt_request_app_link_release_handler; 701 req_app_link->work.task = &engine->task; 702 req_app_link->work.next = NULL; 703 704 nxt_debug(task, "req_app_link stream #%uD post release to %p", 705 req_app_link->stream, engine); 706 707 nxt_event_engine_post(engine, &req_app_link->work); 708 } 709 } 710 711 712 nxt_inline void 713 nxt_request_app_link_error(nxt_request_app_link_t *req_app_link, int code, 714 const char *str) 715 { 716 req_app_link->app_port = NULL; 717 req_app_link->err_code = code; 718 req_app_link->err_str = str; 719 } 720 721 722 nxt_inline void 723 nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app, 724 nxt_request_app_link_t *req_app_link) 725 { 726 nxt_queue_insert_tail(&req_app_link->app_port->pending_requests, 727 &req_app_link->link_port_pending); 728 nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending); 729 730 nxt_request_app_link_inc_use(req_app_link); 731 732 req_app_link->res_time = nxt_thread_monotonic_time(task->thread) 733 + app->res_timeout; 734 735 nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests", 736 req_app_link->stream); 737 } 738 739 740 nxt_inline nxt_bool_t 741 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 742 { 743 if (lnk->next != NULL) { 744 nxt_queue_remove(lnk); 745 746 lnk->next = NULL; 747 748 return 1; 749 } 750 751 return 0; 752 } 753 754 755 nxt_inline void 756 nxt_request_rpc_data_unlink(nxt_task_t *task, 757 nxt_request_rpc_data_t *req_rpc_data) 758 { 759 int ra_use_delta; 760 nxt_request_app_link_t *req_app_link; 761 762 if (req_rpc_data->app_port != NULL) { 763 nxt_router_app_port_release(task, req_rpc_data->app_port, 764 req_rpc_data->apr_action); 765 766 req_rpc_data->app_port = NULL; 767 } 768 769 nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 770 771 req_app_link = req_rpc_data->req_app_link; 772 if (req_app_link != NULL) { 773 req_rpc_data->req_app_link = NULL; 774 req_app_link->req_rpc_data = NULL; 775 776 ra_use_delta = 0; 777 778 nxt_thread_mutex_lock(&req_rpc_data->app->mutex); 779 780 if (req_app_link->link_app_requests.next == NULL 781 && req_app_link->link_port_pending.next == NULL 782 && req_app_link->link_app_pending.next == NULL 783 && req_app_link->link_port_websockets.next == NULL) 784 { 785 req_app_link = NULL; 786 787 } else { 788 ra_use_delta -= 789 nxt_queue_chk_remove(&req_app_link->link_app_requests) 790 + nxt_queue_chk_remove(&req_app_link->link_port_pending) 791 + nxt_queue_chk_remove(&req_app_link->link_port_websockets); 792 793 nxt_queue_chk_remove(&req_app_link->link_app_pending); 794 } 795 796 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); 797 798 if (req_app_link != NULL) { 799 nxt_request_app_link_use(task, req_app_link, ra_use_delta); 800 } 801 } 802 803 if (req_rpc_data->app != NULL) { 804 nxt_router_app_use(task, req_rpc_data->app, -1); 805 806 req_rpc_data->app = NULL; 807 } 808 809 if (req_rpc_data->request != NULL) { 810 req_rpc_data->request->timer_data = NULL; 811 812 nxt_router_http_request_done(task, req_rpc_data->request); 813 814 req_rpc_data->request->req_rpc_data = NULL; 815 req_rpc_data->request = NULL; 816 } 817 } 818 819 820 void 821 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 822 { 823 nxt_port_new_port_handler(task, msg); 824 825 if (msg->u.new_port != NULL 826 && msg->u.new_port->type == NXT_PROCESS_CONTROLLER) 827 { 828 nxt_router_greet_controller(task, msg->u.new_port); 829 } 830 831 if (msg->port_msg.stream == 0) { 832 return; 833 } 834 835 if (msg->u.new_port == NULL 836 || msg->u.new_port->type != NXT_PROCESS_WORKER) 837 { 838 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 839 } 840 841 nxt_port_rpc_handler(task, msg); 842 } 843 844 845 void 846 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 847 { 848 nxt_int_t ret; 849 nxt_buf_t *b; 850 nxt_router_temp_conf_t *tmcf; 851 852 tmcf = nxt_router_temp_conf(task); 853 if (nxt_slow_path(tmcf == NULL)) { 854 return; 855 } 856 857 nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s", 858 nxt_buf_used_size(msg->buf), 859 (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos); 860 861 tmcf->router_conf->router = nxt_router; 862 tmcf->stream = msg->port_msg.stream; 863 tmcf->port = nxt_runtime_port_find(task->thread->runtime, 864 msg->port_msg.pid, 865 msg->port_msg.reply_port); 866 867 if (nxt_slow_path(tmcf->port == NULL)) { 868 nxt_alert(task, "reply port not found"); 869 870 return; 871 } 872 873 nxt_port_use(task, tmcf->port, 1); 874 875 b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool, 876 msg->buf, msg->size); 877 if (nxt_slow_path(b == NULL)) { 878 nxt_router_conf_error(task, tmcf); 879 880 return; 881 } 882 883 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); 884 885 if (nxt_fast_path(ret == NXT_OK)) { 886 nxt_router_conf_apply(task, tmcf, NULL); 887 888 } else { 889 nxt_router_conf_error(task, tmcf); 890 } 891 } 892 893 894 static void 895 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 896 void *data) 897 { 898 union { 899 nxt_pid_t removed_pid; 900 void *data; 901 } u; 902 903 u.data = data; 904 905 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 906 } 907 908 909 void 910 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 911 { 912 nxt_event_engine_t *engine; 913 914 nxt_port_remove_pid_handler(task, msg); 915 916 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 917 { 918 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 919 msg->u.data); 920 } 921 nxt_queue_loop; 922 923 if (msg->port_msg.stream == 0) { 924 return; 925 } 926 927 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 928 929 nxt_port_rpc_handler(task, msg); 930 } 931 932 933 static nxt_router_temp_conf_t * 934 nxt_router_temp_conf(nxt_task_t *task) 935 { 936 nxt_mp_t *mp, *tmp; 937 nxt_router_conf_t *rtcf; 938 nxt_router_temp_conf_t *tmcf; 939 940 mp = nxt_mp_create(1024, 128, 256, 32); 941 if (nxt_slow_path(mp == NULL)) { 942 return NULL; 943 } 944 945 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 946 if (nxt_slow_path(rtcf == NULL)) { 947 goto fail; 948 } 949 950 rtcf->mem_pool = mp; 951 952 tmp = nxt_mp_create(1024, 128, 256, 32); 953 if (nxt_slow_path(tmp == NULL)) { 954 goto fail; 955 } 956 957 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 958 if (nxt_slow_path(tmcf == NULL)) { 959 goto temp_fail; 960 } 961 962 tmcf->mem_pool = tmp; 963 tmcf->router_conf = rtcf; 964 tmcf->count = 1; 965 tmcf->engine = task->thread->engine; 966 967 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 968 sizeof(nxt_router_engine_conf_t)); 969 if (nxt_slow_path(tmcf->engines == NULL)) { 970 goto temp_fail; 971 } 972 973 nxt_queue_init(&tmcf->deleting); 974 nxt_queue_init(&tmcf->keeping); 975 nxt_queue_init(&tmcf->updating); 976 nxt_queue_init(&tmcf->pending); 977 nxt_queue_init(&tmcf->creating); 978 979 #if (NXT_TLS) 980 nxt_queue_init(&tmcf->tls); 981 #endif 982 983 nxt_queue_init(&tmcf->apps); 984 nxt_queue_init(&tmcf->previous); 985 986 return tmcf; 987 988 temp_fail: 989 990 nxt_mp_destroy(tmp); 991 992 fail: 993 994 nxt_mp_destroy(mp); 995 996 return NULL; 997 } 998 999 1000 nxt_inline nxt_bool_t 1001 nxt_router_app_can_start(nxt_app_t *app) 1002 { 1003 return app->processes + app->pending_processes < app->max_processes 1004 && app->pending_processes < app->max_pending_processes; 1005 } 1006 1007 1008 nxt_inline nxt_bool_t 1009 nxt_router_app_need_start(nxt_app_t *app) 1010 { 1011 return app->idle_processes + app->pending_processes 1012 < app->spare_processes; 1013 } 1014 1015 1016 static void 1017 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1018 { 1019 nxt_int_t ret; 1020 nxt_app_t *app; 1021 nxt_router_t *router; 1022 nxt_runtime_t *rt; 1023 nxt_queue_link_t *qlk; 1024 nxt_socket_conf_t *skcf; 1025 nxt_router_conf_t *rtcf; 1026 nxt_router_temp_conf_t *tmcf; 1027 const nxt_event_interface_t *interface; 1028 #if (NXT_TLS) 1029 nxt_router_tlssock_t *tls; 1030 #endif 1031 1032 tmcf = obj; 1033 1034 qlk = nxt_queue_first(&tmcf->pending); 1035 1036 if (qlk != nxt_queue_tail(&tmcf->pending)) { 1037 nxt_queue_remove(qlk); 1038 nxt_queue_insert_tail(&tmcf->creating, qlk); 1039 1040 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1041 1042 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1043 1044 return; 1045 } 1046 1047 #if (NXT_TLS) 1048 qlk = nxt_queue_first(&tmcf->tls); 1049 1050 if (qlk != nxt_queue_tail(&tmcf->tls)) { 1051 nxt_queue_remove(qlk); 1052 1053 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1054 1055 nxt_router_tls_rpc_create(task, tmcf, tls); 1056 return; 1057 } 1058 #endif 1059 1060 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1061 1062 if (nxt_router_app_need_start(app)) { 1063 nxt_router_app_rpc_create(task, tmcf, app); 1064 return; 1065 } 1066 1067 } nxt_queue_loop; 1068 1069 rtcf = tmcf->router_conf; 1070 1071 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1072 nxt_router_access_log_open(task, tmcf); 1073 return; 1074 } 1075 1076 rt = task->thread->runtime; 1077 1078 interface = nxt_service_get(rt->services, "engine", NULL); 1079 1080 router = rtcf->router; 1081 1082 ret = nxt_router_engines_create(task, router, tmcf, interface); 1083 if (nxt_slow_path(ret != NXT_OK)) { 1084 goto fail; 1085 } 1086 1087 ret = nxt_router_threads_create(task, rt, tmcf); 1088 if (nxt_slow_path(ret != NXT_OK)) { 1089 goto fail; 1090 } 1091 1092 nxt_router_apps_sort(task, router, tmcf); 1093 1094 nxt_router_engines_post(router, tmcf); 1095 1096 nxt_queue_add(&router->sockets, &tmcf->updating); 1097 nxt_queue_add(&router->sockets, &tmcf->creating); 1098 1099 router->access_log = rtcf->access_log; 1100 1101 nxt_router_conf_ready(task, tmcf); 1102 1103 return; 1104 1105 fail: 1106 1107 nxt_router_conf_error(task, tmcf); 1108 1109 return; 1110 } 1111 1112 1113 static void 1114 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1115 { 1116 nxt_joint_job_t *job; 1117 1118 job = obj; 1119 1120 nxt_router_conf_ready(task, job->tmcf); 1121 } 1122 1123 1124 static void 1125 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1126 { 1127 nxt_debug(task, "temp conf count:%D", tmcf->count); 1128 1129 if (--tmcf->count == 0) { 1130 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1131 } 1132 } 1133 1134 1135 static void 1136 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1137 { 1138 nxt_app_t *app; 1139 nxt_queue_t new_socket_confs; 1140 nxt_socket_t s; 1141 nxt_router_t *router; 1142 nxt_queue_link_t *qlk; 1143 nxt_socket_conf_t *skcf; 1144 nxt_router_conf_t *rtcf; 1145 1146 nxt_alert(task, "failed to apply new conf"); 1147 1148 for (qlk = nxt_queue_first(&tmcf->creating); 1149 qlk != nxt_queue_tail(&tmcf->creating); 1150 qlk = nxt_queue_next(qlk)) 1151 { 1152 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1153 s = skcf->listen->socket; 1154 1155 if (s != -1) { 1156 nxt_socket_close(task, s); 1157 } 1158 1159 nxt_free(skcf->listen); 1160 } 1161 1162 nxt_queue_init(&new_socket_confs); 1163 nxt_queue_add(&new_socket_confs, &tmcf->updating); 1164 nxt_queue_add(&new_socket_confs, &tmcf->pending); 1165 nxt_queue_add(&new_socket_confs, &tmcf->creating); 1166 1167 rtcf = tmcf->router_conf; 1168 1169 nxt_http_routes_cleanup(task, rtcf->routes); 1170 1171 nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) { 1172 1173 if (skcf->pass != NULL) { 1174 nxt_http_pass_cleanup(task, skcf->pass); 1175 } 1176 1177 } nxt_queue_loop; 1178 1179 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1180 1181 nxt_router_app_unlink(task, app); 1182 1183 } nxt_queue_loop; 1184 1185 router = rtcf->router; 1186 1187 nxt_queue_add(&router->sockets, &tmcf->keeping); 1188 nxt_queue_add(&router->sockets, &tmcf->deleting); 1189 1190 nxt_queue_add(&router->apps, &tmcf->previous); 1191 1192 // TODO: new engines and threads 1193 1194 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1195 1196 nxt_mp_destroy(rtcf->mem_pool); 1197 1198 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1199 } 1200 1201 1202 static void 1203 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1204 nxt_port_msg_type_t type) 1205 { 1206 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1207 1208 nxt_port_use(task, tmcf->port, -1); 1209 1210 tmcf->port = NULL; 1211 } 1212 1213 1214 static nxt_conf_map_t nxt_router_conf[] = { 1215 { 1216 nxt_string("listeners_threads"), 1217 NXT_CONF_MAP_INT32, 1218 offsetof(nxt_router_conf_t, threads), 1219 }, 1220 }; 1221 1222 1223 static nxt_conf_map_t nxt_router_app_conf[] = { 1224 { 1225 nxt_string("type"), 1226 NXT_CONF_MAP_STR, 1227 offsetof(nxt_router_app_conf_t, type), 1228 }, 1229 1230 { 1231 nxt_string("limits"), 1232 NXT_CONF_MAP_PTR, 1233 offsetof(nxt_router_app_conf_t, limits_value), 1234 }, 1235 1236 { 1237 nxt_string("processes"), 1238 NXT_CONF_MAP_INT32, 1239 offsetof(nxt_router_app_conf_t, processes), 1240 }, 1241 1242 { 1243 nxt_string("processes"), 1244 NXT_CONF_MAP_PTR, 1245 offsetof(nxt_router_app_conf_t, processes_value), 1246 }, 1247 }; 1248 1249 1250 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1251 { 1252 nxt_string("timeout"), 1253 NXT_CONF_MAP_MSEC, 1254 offsetof(nxt_router_app_conf_t, timeout), 1255 }, 1256 1257 { 1258 nxt_string("reschedule_timeout"), 1259 NXT_CONF_MAP_MSEC, 1260 offsetof(nxt_router_app_conf_t, res_timeout), 1261 }, 1262 1263 { 1264 nxt_string("requests"), 1265 NXT_CONF_MAP_INT32, 1266 offsetof(nxt_router_app_conf_t, requests), 1267 }, 1268 }; 1269 1270 1271 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1272 { 1273 nxt_string("spare"), 1274 NXT_CONF_MAP_INT32, 1275 offsetof(nxt_router_app_conf_t, spare_processes), 1276 }, 1277 1278 { 1279 nxt_string("max"), 1280 NXT_CONF_MAP_INT32, 1281 offsetof(nxt_router_app_conf_t, max_processes), 1282 }, 1283 1284 { 1285 nxt_string("idle_timeout"), 1286 NXT_CONF_MAP_MSEC, 1287 offsetof(nxt_router_app_conf_t, idle_timeout), 1288 }, 1289 }; 1290 1291 1292 static nxt_conf_map_t nxt_router_listener_conf[] = { 1293 { 1294 nxt_string("pass"), 1295 NXT_CONF_MAP_STR_COPY, 1296 offsetof(nxt_router_listener_conf_t, pass), 1297 }, 1298 1299 { 1300 nxt_string("application"), 1301 NXT_CONF_MAP_STR_COPY, 1302 offsetof(nxt_router_listener_conf_t, application), 1303 }, 1304 }; 1305 1306 1307 static nxt_conf_map_t nxt_router_http_conf[] = { 1308 { 1309 nxt_string("header_buffer_size"), 1310 NXT_CONF_MAP_SIZE, 1311 offsetof(nxt_socket_conf_t, header_buffer_size), 1312 }, 1313 1314 { 1315 nxt_string("large_header_buffer_size"), 1316 NXT_CONF_MAP_SIZE, 1317 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1318 }, 1319 1320 { 1321 nxt_string("large_header_buffers"), 1322 NXT_CONF_MAP_SIZE, 1323 offsetof(nxt_socket_conf_t, large_header_buffers), 1324 }, 1325 1326 { 1327 nxt_string("body_buffer_size"), 1328 NXT_CONF_MAP_SIZE, 1329 offsetof(nxt_socket_conf_t, body_buffer_size), 1330 }, 1331 1332 { 1333 nxt_string("max_body_size"), 1334 NXT_CONF_MAP_SIZE, 1335 offsetof(nxt_socket_conf_t, max_body_size), 1336 }, 1337 1338 { 1339 nxt_string("idle_timeout"), 1340 NXT_CONF_MAP_MSEC, 1341 offsetof(nxt_socket_conf_t, idle_timeout), 1342 }, 1343 1344 { 1345 nxt_string("header_read_timeout"), 1346 NXT_CONF_MAP_MSEC, 1347 offsetof(nxt_socket_conf_t, header_read_timeout), 1348 }, 1349 1350 { 1351 nxt_string("body_read_timeout"), 1352 NXT_CONF_MAP_MSEC, 1353 offsetof(nxt_socket_conf_t, body_read_timeout), 1354 }, 1355 1356 { 1357 nxt_string("send_timeout"), 1358 NXT_CONF_MAP_MSEC, 1359 offsetof(nxt_socket_conf_t, send_timeout), 1360 }, 1361 }; 1362 1363 1364 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1365 { 1366 nxt_string("max_frame_size"), 1367 NXT_CONF_MAP_SIZE, 1368 offsetof(nxt_websocket_conf_t, max_frame_size), 1369 }, 1370 1371 { 1372 nxt_string("read_timeout"), 1373 NXT_CONF_MAP_MSEC, 1374 offsetof(nxt_websocket_conf_t, read_timeout), 1375 }, 1376 1377 { 1378 nxt_string("keepalive_interval"), 1379 NXT_CONF_MAP_MSEC, 1380 offsetof(nxt_websocket_conf_t, keepalive_interval), 1381 }, 1382 1383 }; 1384 1385 1386 static nxt_int_t 1387 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1388 u_char *start, u_char *end) 1389 { 1390 u_char *p; 1391 size_t size; 1392 nxt_mp_t *mp; 1393 uint32_t next; 1394 nxt_int_t ret; 1395 nxt_str_t name, path; 1396 nxt_app_t *app, *prev; 1397 nxt_router_t *router; 1398 nxt_app_joint_t *app_joint; 1399 nxt_conf_value_t *conf, *http, *value, *websocket; 1400 nxt_conf_value_t *applications, *application; 1401 nxt_conf_value_t *listeners, *listener; 1402 nxt_conf_value_t *routes_conf; 1403 nxt_socket_conf_t *skcf; 1404 nxt_http_routes_t *routes; 1405 nxt_event_engine_t *engine; 1406 nxt_app_lang_module_t *lang; 1407 nxt_router_app_conf_t apcf; 1408 nxt_router_access_log_t *access_log; 1409 nxt_router_listener_conf_t lscf; 1410 #if (NXT_TLS) 1411 nxt_router_tlssock_t *tls; 1412 #endif 1413 1414 static nxt_str_t http_path = nxt_string("/settings/http"); 1415 static nxt_str_t applications_path = nxt_string("/applications"); 1416 static nxt_str_t listeners_path = nxt_string("/listeners"); 1417 static nxt_str_t routes_path = nxt_string("/routes"); 1418 static nxt_str_t access_log_path = nxt_string("/access_log"); 1419 #if (NXT_TLS) 1420 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1421 #endif 1422 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1423 1424 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1425 if (conf == NULL) { 1426 nxt_alert(task, "configuration parsing error"); 1427 return NXT_ERROR; 1428 } 1429 1430 mp = tmcf->router_conf->mem_pool; 1431 1432 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1433 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1434 if (ret != NXT_OK) { 1435 nxt_alert(task, "root map error"); 1436 return NXT_ERROR; 1437 } 1438 1439 if (tmcf->router_conf->threads == 0) { 1440 tmcf->router_conf->threads = nxt_ncpu; 1441 } 1442 1443 router = tmcf->router_conf->router; 1444 1445 applications = nxt_conf_get_path(conf, &applications_path); 1446 1447 if (applications != NULL) { 1448 next = 0; 1449 1450 for ( ;; ) { 1451 application = nxt_conf_next_object_member(applications, &name, &next); 1452 if (application == NULL) { 1453 break; 1454 } 1455 1456 nxt_debug(task, "application \"%V\"", &name); 1457 1458 size = nxt_conf_json_length(application, NULL); 1459 1460 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); 1461 if (app == NULL) { 1462 goto fail; 1463 } 1464 1465 nxt_memzero(app, sizeof(nxt_app_t)); 1466 1467 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1468 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1469 + name.length); 1470 1471 p = nxt_conf_json_print(app->conf.start, application, NULL); 1472 app->conf.length = p - app->conf.start; 1473 1474 nxt_assert(app->conf.length <= size); 1475 1476 nxt_debug(task, "application conf \"%V\"", &app->conf); 1477 1478 prev = nxt_router_app_find(&router->apps, &name); 1479 1480 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1481 nxt_free(app); 1482 1483 nxt_queue_remove(&prev->link); 1484 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1485 continue; 1486 } 1487 1488 apcf.processes = 1; 1489 apcf.max_processes = 1; 1490 apcf.spare_processes = 0; 1491 apcf.timeout = 0; 1492 apcf.res_timeout = 1000; 1493 apcf.idle_timeout = 15000; 1494 apcf.requests = 0; 1495 apcf.limits_value = NULL; 1496 apcf.processes_value = NULL; 1497 1498 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1499 if (nxt_slow_path(app_joint == NULL)) { 1500 goto app_fail; 1501 } 1502 1503 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1504 1505 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1506 nxt_nitems(nxt_router_app_conf), &apcf); 1507 if (ret != NXT_OK) { 1508 nxt_alert(task, "application map error"); 1509 goto app_fail; 1510 } 1511 1512 if (apcf.limits_value != NULL) { 1513 1514 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1515 nxt_alert(task, "application limits is not object"); 1516 goto app_fail; 1517 } 1518 1519 ret = nxt_conf_map_object(mp, apcf.limits_value, 1520 nxt_router_app_limits_conf, 1521 nxt_nitems(nxt_router_app_limits_conf), 1522 &apcf); 1523 if (ret != NXT_OK) { 1524 nxt_alert(task, "application limits map error"); 1525 goto app_fail; 1526 } 1527 } 1528 1529 if (apcf.processes_value != NULL 1530 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1531 { 1532 ret = nxt_conf_map_object(mp, apcf.processes_value, 1533 nxt_router_app_processes_conf, 1534 nxt_nitems(nxt_router_app_processes_conf), 1535 &apcf); 1536 if (ret != NXT_OK) { 1537 nxt_alert(task, "application processes map error"); 1538 goto app_fail; 1539 } 1540 1541 } else { 1542 apcf.max_processes = apcf.processes; 1543 apcf.spare_processes = apcf.processes; 1544 } 1545 1546 nxt_debug(task, "application type: %V", &apcf.type); 1547 nxt_debug(task, "application processes: %D", apcf.processes); 1548 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1549 nxt_debug(task, "application reschedule timeout: %M", 1550 apcf.res_timeout); 1551 nxt_debug(task, "application requests: %D", apcf.requests); 1552 1553 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1554 1555 if (lang == NULL) { 1556 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1557 goto app_fail; 1558 } 1559 1560 nxt_debug(task, "application language module: \"%s\"", lang->file); 1561 1562 ret = nxt_thread_mutex_create(&app->mutex); 1563 if (ret != NXT_OK) { 1564 goto app_fail; 1565 } 1566 1567 nxt_queue_init(&app->ports); 1568 nxt_queue_init(&app->spare_ports); 1569 nxt_queue_init(&app->idle_ports); 1570 nxt_queue_init(&app->requests); 1571 nxt_queue_init(&app->pending); 1572 1573 app->name.length = name.length; 1574 nxt_memcpy(app->name.start, name.start, name.length); 1575 1576 app->type = lang->type; 1577 app->max_processes = apcf.max_processes; 1578 app->spare_processes = apcf.spare_processes; 1579 app->max_pending_processes = apcf.spare_processes 1580 ? apcf.spare_processes : 1; 1581 app->timeout = apcf.timeout; 1582 app->res_timeout = apcf.res_timeout * 1000000; 1583 app->idle_timeout = apcf.idle_timeout; 1584 app->max_pending_responses = 2; 1585 app->max_requests = apcf.requests; 1586 1587 engine = task->thread->engine; 1588 1589 app->engine = engine; 1590 1591 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1592 app->adjust_idle_work.task = &engine->task; 1593 app->adjust_idle_work.obj = app; 1594 1595 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1596 1597 nxt_router_app_use(task, app, 1); 1598 1599 app->joint = app_joint; 1600 1601 app_joint->use_count = 1; 1602 app_joint->app = app; 1603 1604 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1605 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1606 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1607 app_joint->idle_timer.task = &engine->task; 1608 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1609 1610 app_joint->free_app_work.handler = nxt_router_free_app; 1611 app_joint->free_app_work.task = &engine->task; 1612 app_joint->free_app_work.obj = app_joint; 1613 } 1614 } 1615 1616 routes_conf = nxt_conf_get_path(conf, &routes_path); 1617 if (nxt_fast_path(routes_conf != NULL)) { 1618 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1619 if (nxt_slow_path(routes == NULL)) { 1620 return NXT_ERROR; 1621 } 1622 tmcf->router_conf->routes = routes; 1623 } 1624 1625 http = nxt_conf_get_path(conf, &http_path); 1626 #if 0 1627 if (http == NULL) { 1628 nxt_alert(task, "no \"http\" block"); 1629 return NXT_ERROR; 1630 } 1631 #endif 1632 1633 websocket = nxt_conf_get_path(conf, &websocket_path); 1634 1635 listeners = nxt_conf_get_path(conf, &listeners_path); 1636 1637 if (listeners != NULL) { 1638 next = 0; 1639 1640 for ( ;; ) { 1641 listener = nxt_conf_next_object_member(listeners, &name, &next); 1642 if (listener == NULL) { 1643 break; 1644 } 1645 1646 skcf = nxt_router_socket_conf(task, tmcf, &name); 1647 if (skcf == NULL) { 1648 goto fail; 1649 } 1650 1651 nxt_memzero(&lscf, sizeof(lscf)); 1652 1653 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1654 nxt_nitems(nxt_router_listener_conf), 1655 &lscf); 1656 if (ret != NXT_OK) { 1657 nxt_alert(task, "listener map error"); 1658 goto fail; 1659 } 1660 1661 nxt_debug(task, "application: %V", &lscf.application); 1662 1663 // STUB, default values if http block is not defined. 1664 skcf->header_buffer_size = 2048; 1665 skcf->large_header_buffer_size = 8192; 1666 skcf->large_header_buffers = 4; 1667 skcf->body_buffer_size = 16 * 1024; 1668 skcf->max_body_size = 8 * 1024 * 1024; 1669 skcf->idle_timeout = 180 * 1000; 1670 skcf->header_read_timeout = 30 * 1000; 1671 skcf->body_read_timeout = 30 * 1000; 1672 skcf->send_timeout = 30 * 1000; 1673 1674 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1675 skcf->websocket_conf.read_timeout = 60 * 1000; 1676 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1677 1678 if (http != NULL) { 1679 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1680 nxt_nitems(nxt_router_http_conf), 1681 skcf); 1682 if (ret != NXT_OK) { 1683 nxt_alert(task, "http map error"); 1684 goto fail; 1685 } 1686 } 1687 1688 if (websocket != NULL) { 1689 ret = nxt_conf_map_object(mp, websocket, 1690 nxt_router_websocket_conf, 1691 nxt_nitems(nxt_router_websocket_conf), 1692 &skcf->websocket_conf); 1693 if (ret != NXT_OK) { 1694 nxt_alert(task, "websocket map error"); 1695 goto fail; 1696 } 1697 } 1698 1699 #if (NXT_TLS) 1700 value = nxt_conf_get_path(listener, &certificate_path); 1701 1702 if (value != NULL) { 1703 nxt_conf_get_string(value, &name); 1704 1705 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t)); 1706 if (nxt_slow_path(tls == NULL)) { 1707 goto fail; 1708 } 1709 1710 tls->name = name; 1711 tls->conf = skcf; 1712 1713 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 1714 } 1715 #endif 1716 1717 skcf->listen->handler = nxt_http_conn_init; 1718 skcf->router_conf = tmcf->router_conf; 1719 skcf->router_conf->count++; 1720 1721 if (lscf.pass.length != 0) { 1722 skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass); 1723 1724 /* COMPATIBILITY: listener application. */ 1725 } else if (lscf.application.length > 0) { 1726 skcf->pass = nxt_http_pass_application(task, tmcf, 1727 &lscf.application); 1728 } 1729 } 1730 } 1731 1732 value = nxt_conf_get_path(conf, &access_log_path); 1733 1734 if (value != NULL) { 1735 nxt_conf_get_string(value, &path); 1736 1737 access_log = router->access_log; 1738 1739 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1740 nxt_thread_spin_lock(&router->lock); 1741 access_log->count++; 1742 nxt_thread_spin_unlock(&router->lock); 1743 1744 } else { 1745 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1746 + path.length); 1747 if (access_log == NULL) { 1748 nxt_alert(task, "failed to allocate access log structure"); 1749 goto fail; 1750 } 1751 1752 access_log->fd = -1; 1753 access_log->handler = &nxt_router_access_log_writer; 1754 access_log->count = 1; 1755 1756 access_log->path.length = path.length; 1757 access_log->path.start = (u_char *) access_log 1758 + sizeof(nxt_router_access_log_t); 1759 1760 nxt_memcpy(access_log->path.start, path.start, path.length); 1761 } 1762 1763 tmcf->router_conf->access_log = access_log; 1764 } 1765 1766 nxt_http_routes_resolve(task, tmcf); 1767 1768 nxt_queue_add(&tmcf->deleting, &router->sockets); 1769 nxt_queue_init(&router->sockets); 1770 1771 return NXT_OK; 1772 1773 app_fail: 1774 1775 nxt_free(app); 1776 1777 fail: 1778 1779 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1780 1781 nxt_queue_remove(&app->link); 1782 nxt_thread_mutex_destroy(&app->mutex); 1783 nxt_free(app); 1784 1785 } nxt_queue_loop; 1786 1787 return NXT_ERROR; 1788 } 1789 1790 1791 static nxt_app_t * 1792 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1793 { 1794 nxt_app_t *app; 1795 1796 nxt_queue_each(app, queue, nxt_app_t, link) { 1797 1798 if (nxt_strstr_eq(name, &app->name)) { 1799 return app; 1800 } 1801 1802 } nxt_queue_loop; 1803 1804 return NULL; 1805 } 1806 1807 1808 nxt_app_t * 1809 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) 1810 { 1811 nxt_app_t *app; 1812 1813 app = nxt_router_app_find(&tmcf->apps, name); 1814 1815 if (app == NULL) { 1816 app = nxt_router_app_find(&tmcf->previous, name); 1817 } 1818 1819 return app; 1820 } 1821 1822 1823 static nxt_socket_conf_t * 1824 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1825 nxt_str_t *name) 1826 { 1827 size_t size; 1828 nxt_int_t ret; 1829 nxt_bool_t wildcard; 1830 nxt_sockaddr_t *sa; 1831 nxt_socket_conf_t *skcf; 1832 nxt_listen_socket_t *ls; 1833 1834 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 1835 if (nxt_slow_path(sa == NULL)) { 1836 nxt_alert(task, "invalid listener \"%V\"", name); 1837 return NULL; 1838 } 1839 1840 sa->type = SOCK_STREAM; 1841 1842 nxt_debug(task, "router listener: \"%*s\"", 1843 (size_t) sa->length, nxt_sockaddr_start(sa)); 1844 1845 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 1846 if (nxt_slow_path(skcf == NULL)) { 1847 return NULL; 1848 } 1849 1850 size = nxt_sockaddr_size(sa); 1851 1852 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 1853 1854 if (ret != NXT_OK) { 1855 1856 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 1857 if (nxt_slow_path(ls == NULL)) { 1858 return NULL; 1859 } 1860 1861 skcf->listen = ls; 1862 1863 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 1864 nxt_memcpy(ls->sockaddr, sa, size); 1865 1866 nxt_listen_socket_remote_size(ls); 1867 1868 ls->socket = -1; 1869 ls->backlog = NXT_LISTEN_BACKLOG; 1870 ls->flags = NXT_NONBLOCK; 1871 ls->read_after_accept = 1; 1872 } 1873 1874 switch (sa->u.sockaddr.sa_family) { 1875 #if (NXT_HAVE_UNIX_DOMAIN) 1876 case AF_UNIX: 1877 wildcard = 0; 1878 break; 1879 #endif 1880 #if (NXT_INET6) 1881 case AF_INET6: 1882 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 1883 break; 1884 #endif 1885 case AF_INET: 1886 default: 1887 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 1888 break; 1889 } 1890 1891 if (!wildcard) { 1892 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 1893 if (nxt_slow_path(skcf->sockaddr == NULL)) { 1894 return NULL; 1895 } 1896 1897 nxt_memcpy(skcf->sockaddr, sa, size); 1898 } 1899 1900 return skcf; 1901 } 1902 1903 1904 static nxt_int_t 1905 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 1906 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 1907 { 1908 nxt_router_t *router; 1909 nxt_queue_link_t *qlk; 1910 nxt_socket_conf_t *skcf; 1911 1912 router = tmcf->router_conf->router; 1913 1914 for (qlk = nxt_queue_first(&router->sockets); 1915 qlk != nxt_queue_tail(&router->sockets); 1916 qlk = nxt_queue_next(qlk)) 1917 { 1918 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1919 1920 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 1921 nskcf->listen = skcf->listen; 1922 1923 nxt_queue_remove(qlk); 1924 nxt_queue_insert_tail(&tmcf->keeping, qlk); 1925 1926 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link); 1927 1928 return NXT_OK; 1929 } 1930 } 1931 1932 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link); 1933 1934 return NXT_DECLINED; 1935 } 1936 1937 1938 static void 1939 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 1940 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 1941 { 1942 size_t size; 1943 uint32_t stream; 1944 nxt_int_t ret; 1945 nxt_buf_t *b; 1946 nxt_port_t *main_port, *router_port; 1947 nxt_runtime_t *rt; 1948 nxt_socket_rpc_t *rpc; 1949 1950 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 1951 if (rpc == NULL) { 1952 goto fail; 1953 } 1954 1955 rpc->socket_conf = skcf; 1956 rpc->temp_conf = tmcf; 1957 1958 size = nxt_sockaddr_size(skcf->listen->sockaddr); 1959 1960 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 1961 if (b == NULL) { 1962 goto fail; 1963 } 1964 1965 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 1966 1967 rt = task->thread->runtime; 1968 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1969 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 1970 1971 stream = nxt_port_rpc_register_handler(task, router_port, 1972 nxt_router_listen_socket_ready, 1973 nxt_router_listen_socket_error, 1974 main_port->pid, rpc); 1975 if (nxt_slow_path(stream == 0)) { 1976 goto fail; 1977 } 1978 1979 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 1980 stream, router_port->id, b); 1981 1982 if (nxt_slow_path(ret != NXT_OK)) { 1983 nxt_port_rpc_cancel(task, router_port, stream); 1984 goto fail; 1985 } 1986 1987 return; 1988 1989 fail: 1990 1991 nxt_router_conf_error(task, tmcf); 1992 } 1993 1994 1995 static void 1996 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1997 void *data) 1998 { 1999 nxt_int_t ret; 2000 nxt_socket_t s; 2001 nxt_socket_rpc_t *rpc; 2002 2003 rpc = data; 2004 2005 s = msg->fd; 2006 2007 ret = nxt_socket_nonblocking(task, s); 2008 if (nxt_slow_path(ret != NXT_OK)) { 2009 goto fail; 2010 } 2011 2012 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2013 2014 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2015 if (nxt_slow_path(ret != NXT_OK)) { 2016 goto fail; 2017 } 2018 2019 rpc->socket_conf->listen->socket = s; 2020 2021 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2022 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2023 2024 return; 2025 2026 fail: 2027 2028 nxt_socket_close(task, s); 2029 2030 nxt_router_conf_error(task, rpc->temp_conf); 2031 } 2032 2033 2034 static void 2035 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2036 void *data) 2037 { 2038 nxt_socket_rpc_t *rpc; 2039 nxt_router_temp_conf_t *tmcf; 2040 2041 rpc = data; 2042 tmcf = rpc->temp_conf; 2043 2044 #if 0 2045 u_char *p; 2046 size_t size; 2047 uint8_t error; 2048 nxt_buf_t *in, *out; 2049 nxt_sockaddr_t *sa; 2050 2051 static nxt_str_t socket_errors[] = { 2052 nxt_string("ListenerSystem"), 2053 nxt_string("ListenerNoIPv6"), 2054 nxt_string("ListenerPort"), 2055 nxt_string("ListenerInUse"), 2056 nxt_string("ListenerNoAddress"), 2057 nxt_string("ListenerNoAccess"), 2058 nxt_string("ListenerPath"), 2059 }; 2060 2061 sa = rpc->socket_conf->listen->sockaddr; 2062 2063 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2064 2065 if (nxt_slow_path(in == NULL)) { 2066 return; 2067 } 2068 2069 p = in->mem.pos; 2070 2071 error = *p++; 2072 2073 size = nxt_length("listen socket error: ") 2074 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2075 + sa->length + socket_errors[error].length + (in->mem.free - p); 2076 2077 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2078 if (nxt_slow_path(out == NULL)) { 2079 return; 2080 } 2081 2082 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2083 "listen socket error: " 2084 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2085 (size_t) sa->length, nxt_sockaddr_start(sa), 2086 &socket_errors[error], in->mem.free - p, p); 2087 2088 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2089 #endif 2090 2091 nxt_router_conf_error(task, tmcf); 2092 } 2093 2094 2095 #if (NXT_TLS) 2096 2097 static void 2098 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2099 nxt_router_tlssock_t *tls) 2100 { 2101 nxt_socket_rpc_t *rpc; 2102 2103 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2104 if (rpc == NULL) { 2105 nxt_router_conf_error(task, tmcf); 2106 return; 2107 } 2108 2109 rpc->socket_conf = tls->conf; 2110 rpc->temp_conf = tmcf; 2111 2112 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 2113 nxt_router_tls_rpc_handler, rpc); 2114 } 2115 2116 2117 static void 2118 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2119 void *data) 2120 { 2121 nxt_mp_t *mp; 2122 nxt_int_t ret; 2123 nxt_tls_conf_t *tlscf; 2124 nxt_socket_rpc_t *rpc; 2125 nxt_router_temp_conf_t *tmcf; 2126 2127 nxt_debug(task, "tls rpc handler"); 2128 2129 rpc = data; 2130 tmcf = rpc->temp_conf; 2131 2132 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2133 goto fail; 2134 } 2135 2136 mp = tmcf->router_conf->mem_pool; 2137 2138 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2139 if (nxt_slow_path(tlscf == NULL)) { 2140 goto fail; 2141 } 2142 2143 tlscf->chain_file = msg->fd; 2144 2145 ret = task->thread->runtime->tls->server_init(task, tlscf); 2146 if (nxt_slow_path(ret != NXT_OK)) { 2147 goto fail; 2148 } 2149 2150 rpc->socket_conf->tls = tlscf; 2151 2152 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2153 nxt_router_conf_apply, task, tmcf, NULL); 2154 return; 2155 2156 fail: 2157 2158 nxt_router_conf_error(task, tmcf); 2159 } 2160 2161 #endif 2162 2163 2164 static void 2165 nxt_router_app_rpc_create(nxt_task_t *task, 2166 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2167 { 2168 size_t size; 2169 uint32_t stream; 2170 nxt_int_t ret; 2171 nxt_buf_t *b; 2172 nxt_port_t *main_port, *router_port; 2173 nxt_runtime_t *rt; 2174 nxt_app_rpc_t *rpc; 2175 2176 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2177 if (rpc == NULL) { 2178 goto fail; 2179 } 2180 2181 rpc->app = app; 2182 rpc->temp_conf = tmcf; 2183 2184 nxt_debug(task, "app '%V' prefork", &app->name); 2185 2186 size = app->name.length + 1 + app->conf.length; 2187 2188 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2189 if (nxt_slow_path(b == NULL)) { 2190 goto fail; 2191 } 2192 2193 nxt_buf_cpystr(b, &app->name); 2194 *b->mem.free++ = '\0'; 2195 nxt_buf_cpystr(b, &app->conf); 2196 2197 rt = task->thread->runtime; 2198 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2199 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2200 2201 stream = nxt_port_rpc_register_handler(task, router_port, 2202 nxt_router_app_prefork_ready, 2203 nxt_router_app_prefork_error, 2204 -1, rpc); 2205 if (nxt_slow_path(stream == 0)) { 2206 goto fail; 2207 } 2208 2209 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, 2210 stream, router_port->id, b); 2211 2212 if (nxt_slow_path(ret != NXT_OK)) { 2213 nxt_port_rpc_cancel(task, router_port, stream); 2214 goto fail; 2215 } 2216 2217 app->pending_processes++; 2218 2219 return; 2220 2221 fail: 2222 2223 nxt_router_conf_error(task, tmcf); 2224 } 2225 2226 2227 static void 2228 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2229 void *data) 2230 { 2231 nxt_app_t *app; 2232 nxt_port_t *port; 2233 nxt_app_rpc_t *rpc; 2234 nxt_event_engine_t *engine; 2235 2236 rpc = data; 2237 app = rpc->app; 2238 2239 port = msg->u.new_port; 2240 port->app = app; 2241 2242 app->pending_processes--; 2243 app->processes++; 2244 app->idle_processes++; 2245 2246 engine = task->thread->engine; 2247 2248 nxt_queue_insert_tail(&app->ports, &port->app_link); 2249 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2250 2251 port->idle_start = 0; 2252 2253 nxt_port_inc_use(port); 2254 2255 nxt_work_queue_add(&engine->fast_work_queue, 2256 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2257 } 2258 2259 2260 static void 2261 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2262 void *data) 2263 { 2264 nxt_app_t *app; 2265 nxt_app_rpc_t *rpc; 2266 nxt_router_temp_conf_t *tmcf; 2267 2268 rpc = data; 2269 app = rpc->app; 2270 tmcf = rpc->temp_conf; 2271 2272 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2273 &app->name); 2274 2275 app->pending_processes--; 2276 2277 nxt_router_conf_error(task, tmcf); 2278 } 2279 2280 2281 static nxt_int_t 2282 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2283 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2284 { 2285 nxt_int_t ret; 2286 nxt_uint_t n, threads; 2287 nxt_queue_link_t *qlk; 2288 nxt_router_engine_conf_t *recf; 2289 2290 threads = tmcf->router_conf->threads; 2291 2292 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2293 sizeof(nxt_router_engine_conf_t)); 2294 if (nxt_slow_path(tmcf->engines == NULL)) { 2295 return NXT_ERROR; 2296 } 2297 2298 n = 0; 2299 2300 for (qlk = nxt_queue_first(&router->engines); 2301 qlk != nxt_queue_tail(&router->engines); 2302 qlk = nxt_queue_next(qlk)) 2303 { 2304 recf = nxt_array_zero_add(tmcf->engines); 2305 if (nxt_slow_path(recf == NULL)) { 2306 return NXT_ERROR; 2307 } 2308 2309 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2310 2311 if (n < threads) { 2312 recf->action = NXT_ROUTER_ENGINE_KEEP; 2313 ret = nxt_router_engine_conf_update(tmcf, recf); 2314 2315 } else { 2316 recf->action = NXT_ROUTER_ENGINE_DELETE; 2317 ret = nxt_router_engine_conf_delete(tmcf, recf); 2318 } 2319 2320 if (nxt_slow_path(ret != NXT_OK)) { 2321 return ret; 2322 } 2323 2324 n++; 2325 } 2326 2327 tmcf->new_threads = n; 2328 2329 while (n < threads) { 2330 recf = nxt_array_zero_add(tmcf->engines); 2331 if (nxt_slow_path(recf == NULL)) { 2332 return NXT_ERROR; 2333 } 2334 2335 recf->action = NXT_ROUTER_ENGINE_ADD; 2336 2337 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2338 if (nxt_slow_path(recf->engine == NULL)) { 2339 return NXT_ERROR; 2340 } 2341 2342 ret = nxt_router_engine_conf_create(tmcf, recf); 2343 if (nxt_slow_path(ret != NXT_OK)) { 2344 return ret; 2345 } 2346 2347 n++; 2348 } 2349 2350 return NXT_OK; 2351 } 2352 2353 2354 static nxt_int_t 2355 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2356 nxt_router_engine_conf_t *recf) 2357 { 2358 nxt_int_t ret; 2359 2360 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2361 nxt_router_listen_socket_create); 2362 if (nxt_slow_path(ret != NXT_OK)) { 2363 return ret; 2364 } 2365 2366 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2367 nxt_router_listen_socket_create); 2368 if (nxt_slow_path(ret != NXT_OK)) { 2369 return ret; 2370 } 2371 2372 return ret; 2373 } 2374 2375 2376 static nxt_int_t 2377 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2378 nxt_router_engine_conf_t *recf) 2379 { 2380 nxt_int_t ret; 2381 2382 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, 2383 nxt_router_listen_socket_create); 2384 if (nxt_slow_path(ret != NXT_OK)) { 2385 return ret; 2386 } 2387 2388 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, 2389 nxt_router_listen_socket_update); 2390 if (nxt_slow_path(ret != NXT_OK)) { 2391 return ret; 2392 } 2393 2394 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2395 if (nxt_slow_path(ret != NXT_OK)) { 2396 return ret; 2397 } 2398 2399 return ret; 2400 } 2401 2402 2403 static nxt_int_t 2404 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2405 nxt_router_engine_conf_t *recf) 2406 { 2407 nxt_int_t ret; 2408 2409 ret = nxt_router_engine_quit(tmcf, recf); 2410 if (nxt_slow_path(ret != NXT_OK)) { 2411 return ret; 2412 } 2413 2414 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); 2415 if (nxt_slow_path(ret != NXT_OK)) { 2416 return ret; 2417 } 2418 2419 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); 2420 } 2421 2422 2423 static nxt_int_t 2424 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 2425 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 2426 nxt_work_handler_t handler) 2427 { 2428 nxt_joint_job_t *job; 2429 nxt_queue_link_t *qlk; 2430 nxt_socket_conf_t *skcf; 2431 nxt_socket_conf_joint_t *joint; 2432 2433 for (qlk = nxt_queue_first(sockets); 2434 qlk != nxt_queue_tail(sockets); 2435 qlk = nxt_queue_next(qlk)) 2436 { 2437 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2438 if (nxt_slow_path(job == NULL)) { 2439 return NXT_ERROR; 2440 } 2441 2442 job->work.next = recf->jobs; 2443 recf->jobs = &job->work; 2444 2445 job->task = tmcf->engine->task; 2446 job->work.handler = handler; 2447 job->work.task = &job->task; 2448 job->work.obj = job; 2449 job->tmcf = tmcf; 2450 2451 tmcf->count++; 2452 2453 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 2454 sizeof(nxt_socket_conf_joint_t)); 2455 if (nxt_slow_path(joint == NULL)) { 2456 return NXT_ERROR; 2457 } 2458 2459 job->work.data = joint; 2460 2461 joint->count = 1; 2462 2463 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2464 skcf->count++; 2465 joint->socket_conf = skcf; 2466 2467 joint->engine = recf->engine; 2468 } 2469 2470 return NXT_OK; 2471 } 2472 2473 2474 static nxt_int_t 2475 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 2476 nxt_router_engine_conf_t *recf) 2477 { 2478 nxt_joint_job_t *job; 2479 2480 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2481 if (nxt_slow_path(job == NULL)) { 2482 return NXT_ERROR; 2483 } 2484 2485 job->work.next = recf->jobs; 2486 recf->jobs = &job->work; 2487 2488 job->task = tmcf->engine->task; 2489 job->work.handler = nxt_router_worker_thread_quit; 2490 job->work.task = &job->task; 2491 job->work.obj = NULL; 2492 job->work.data = NULL; 2493 job->tmcf = NULL; 2494 2495 return NXT_OK; 2496 } 2497 2498 2499 static nxt_int_t 2500 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 2501 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 2502 { 2503 nxt_joint_job_t *job; 2504 nxt_queue_link_t *qlk; 2505 2506 for (qlk = nxt_queue_first(sockets); 2507 qlk != nxt_queue_tail(sockets); 2508 qlk = nxt_queue_next(qlk)) 2509 { 2510 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2511 if (nxt_slow_path(job == NULL)) { 2512 return NXT_ERROR; 2513 } 2514 2515 job->work.next = recf->jobs; 2516 recf->jobs = &job->work; 2517 2518 job->task = tmcf->engine->task; 2519 job->work.handler = nxt_router_listen_socket_delete; 2520 job->work.task = &job->task; 2521 job->work.obj = job; 2522 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2523 job->tmcf = tmcf; 2524 2525 tmcf->count++; 2526 } 2527 2528 return NXT_OK; 2529 } 2530 2531 2532 static nxt_int_t 2533 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 2534 nxt_router_temp_conf_t *tmcf) 2535 { 2536 nxt_int_t ret; 2537 nxt_uint_t i, threads; 2538 nxt_router_engine_conf_t *recf; 2539 2540 recf = tmcf->engines->elts; 2541 threads = tmcf->router_conf->threads; 2542 2543 for (i = tmcf->new_threads; i < threads; i++) { 2544 ret = nxt_router_thread_create(task, rt, recf[i].engine); 2545 if (nxt_slow_path(ret != NXT_OK)) { 2546 return ret; 2547 } 2548 } 2549 2550 return NXT_OK; 2551 } 2552 2553 2554 static nxt_int_t 2555 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 2556 nxt_event_engine_t *engine) 2557 { 2558 nxt_int_t ret; 2559 nxt_thread_link_t *link; 2560 nxt_thread_handle_t handle; 2561 2562 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2563 2564 if (nxt_slow_path(link == NULL)) { 2565 return NXT_ERROR; 2566 } 2567 2568 link->start = nxt_router_thread_start; 2569 link->engine = engine; 2570 link->work.handler = nxt_router_thread_exit_handler; 2571 link->work.task = task; 2572 link->work.data = link; 2573 2574 nxt_queue_insert_tail(&rt->engines, &engine->link); 2575 2576 ret = nxt_thread_create(&handle, link); 2577 2578 if (nxt_slow_path(ret != NXT_OK)) { 2579 nxt_queue_remove(&engine->link); 2580 } 2581 2582 return ret; 2583 } 2584 2585 2586 static void 2587 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 2588 nxt_router_temp_conf_t *tmcf) 2589 { 2590 nxt_app_t *app; 2591 2592 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 2593 2594 nxt_router_app_unlink(task, app); 2595 2596 } nxt_queue_loop; 2597 2598 nxt_queue_add(&router->apps, &tmcf->previous); 2599 nxt_queue_add(&router->apps, &tmcf->apps); 2600 } 2601 2602 2603 static void 2604 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 2605 { 2606 nxt_uint_t n; 2607 nxt_event_engine_t *engine; 2608 nxt_router_engine_conf_t *recf; 2609 2610 recf = tmcf->engines->elts; 2611 2612 for (n = tmcf->engines->nelts; n != 0; n--) { 2613 engine = recf->engine; 2614 2615 switch (recf->action) { 2616 2617 case NXT_ROUTER_ENGINE_KEEP: 2618 break; 2619 2620 case NXT_ROUTER_ENGINE_ADD: 2621 nxt_queue_insert_tail(&router->engines, &engine->link0); 2622 break; 2623 2624 case NXT_ROUTER_ENGINE_DELETE: 2625 nxt_queue_remove(&engine->link0); 2626 break; 2627 } 2628 2629 nxt_router_engine_post(engine, recf->jobs); 2630 2631 recf++; 2632 } 2633 } 2634 2635 2636 static void 2637 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 2638 { 2639 nxt_work_t *work, *next; 2640 2641 for (work = jobs; work != NULL; work = next) { 2642 next = work->next; 2643 work->next = NULL; 2644 2645 nxt_event_engine_post(engine, work); 2646 } 2647 } 2648 2649 2650 static nxt_port_handlers_t nxt_router_app_port_handlers = { 2651 .rpc_error = nxt_port_rpc_handler, 2652 .mmap = nxt_port_mmap_handler, 2653 .data = nxt_port_rpc_handler, 2654 }; 2655 2656 2657 static void 2658 nxt_router_thread_start(void *data) 2659 { 2660 nxt_int_t ret; 2661 nxt_port_t *port; 2662 nxt_task_t *task; 2663 nxt_thread_t *thread; 2664 nxt_thread_link_t *link; 2665 nxt_event_engine_t *engine; 2666 2667 link = data; 2668 engine = link->engine; 2669 task = &engine->task; 2670 2671 thread = nxt_thread(); 2672 2673 nxt_event_engine_thread_adopt(engine); 2674 2675 /* STUB */ 2676 thread->runtime = engine->task.thread->runtime; 2677 2678 engine->task.thread = thread; 2679 engine->task.log = thread->log; 2680 thread->engine = engine; 2681 thread->task = &engine->task; 2682 #if 0 2683 thread->fiber = &engine->fibers->fiber; 2684 #endif 2685 2686 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 2687 if (nxt_slow_path(engine->mem_pool == NULL)) { 2688 return; 2689 } 2690 2691 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 2692 NXT_PROCESS_ROUTER); 2693 if (nxt_slow_path(port == NULL)) { 2694 return; 2695 } 2696 2697 ret = nxt_port_socket_init(task, port, 0); 2698 if (nxt_slow_path(ret != NXT_OK)) { 2699 nxt_port_use(task, port, -1); 2700 return; 2701 } 2702 2703 engine->port = port; 2704 2705 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 2706 2707 nxt_event_engine_start(engine); 2708 } 2709 2710 2711 static void 2712 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 2713 { 2714 nxt_joint_job_t *job; 2715 nxt_socket_conf_t *skcf; 2716 nxt_listen_event_t *lev; 2717 nxt_listen_socket_t *ls; 2718 nxt_thread_spinlock_t *lock; 2719 nxt_socket_conf_joint_t *joint; 2720 2721 job = obj; 2722 joint = data; 2723 2724 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 2725 2726 skcf = joint->socket_conf; 2727 ls = skcf->listen; 2728 2729 lev = nxt_listen_event(task, ls); 2730 if (nxt_slow_path(lev == NULL)) { 2731 nxt_router_listen_socket_release(task, skcf); 2732 return; 2733 } 2734 2735 lev->socket.data = joint; 2736 2737 lock = &skcf->router_conf->router->lock; 2738 2739 nxt_thread_spin_lock(lock); 2740 ls->count++; 2741 nxt_thread_spin_unlock(lock); 2742 2743 job->work.next = NULL; 2744 job->work.handler = nxt_router_conf_wait; 2745 2746 nxt_event_engine_post(job->tmcf->engine, &job->work); 2747 } 2748 2749 2750 nxt_inline nxt_listen_event_t * 2751 nxt_router_listen_event(nxt_queue_t *listen_connections, 2752 nxt_socket_conf_t *skcf) 2753 { 2754 nxt_socket_t fd; 2755 nxt_queue_link_t *qlk; 2756 nxt_listen_event_t *lev; 2757 2758 fd = skcf->listen->socket; 2759 2760 for (qlk = nxt_queue_first(listen_connections); 2761 qlk != nxt_queue_tail(listen_connections); 2762 qlk = nxt_queue_next(qlk)) 2763 { 2764 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 2765 2766 if (fd == lev->socket.fd) { 2767 return lev; 2768 } 2769 } 2770 2771 return NULL; 2772 } 2773 2774 2775 static void 2776 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 2777 { 2778 nxt_joint_job_t *job; 2779 nxt_event_engine_t *engine; 2780 nxt_listen_event_t *lev; 2781 nxt_socket_conf_joint_t *joint, *old; 2782 2783 job = obj; 2784 joint = data; 2785 2786 engine = task->thread->engine; 2787 2788 nxt_queue_insert_tail(&engine->joints, &joint->link); 2789 2790 lev = nxt_router_listen_event(&engine->listen_connections, 2791 joint->socket_conf); 2792 2793 old = lev->socket.data; 2794 lev->socket.data = joint; 2795 lev->listen = joint->socket_conf->listen; 2796 2797 job->work.next = NULL; 2798 job->work.handler = nxt_router_conf_wait; 2799 2800 nxt_event_engine_post(job->tmcf->engine, &job->work); 2801 2802 /* 2803 * The task is allocated from configuration temporary 2804 * memory pool so it can be freed after engine post operation. 2805 */ 2806 2807 nxt_router_conf_release(&engine->task, old); 2808 } 2809 2810 2811 static void 2812 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 2813 { 2814 nxt_joint_job_t *job; 2815 nxt_socket_conf_t *skcf; 2816 nxt_listen_event_t *lev; 2817 nxt_event_engine_t *engine; 2818 2819 job = obj; 2820 skcf = data; 2821 2822 engine = task->thread->engine; 2823 2824 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 2825 2826 nxt_fd_event_delete(engine, &lev->socket); 2827 2828 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 2829 lev->socket.fd); 2830 2831 lev->timer.handler = nxt_router_listen_socket_close; 2832 lev->timer.work_queue = &engine->fast_work_queue; 2833 2834 nxt_timer_add(engine, &lev->timer, 0); 2835 2836 job->work.next = NULL; 2837 job->work.handler = nxt_router_conf_wait; 2838 2839 nxt_event_engine_post(job->tmcf->engine, &job->work); 2840 } 2841 2842 2843 static void 2844 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 2845 { 2846 nxt_event_engine_t *engine; 2847 2848 nxt_debug(task, "router worker thread quit"); 2849 2850 engine = task->thread->engine; 2851 2852 engine->shutdown = 1; 2853 2854 if (nxt_queue_is_empty(&engine->joints)) { 2855 nxt_thread_exit(task->thread); 2856 } 2857 } 2858 2859 2860 static void 2861 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 2862 { 2863 nxt_timer_t *timer; 2864 nxt_listen_event_t *lev; 2865 nxt_socket_conf_joint_t *joint; 2866 2867 timer = obj; 2868 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 2869 2870 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 2871 lev->socket.fd); 2872 2873 nxt_queue_remove(&lev->link); 2874 2875 joint = lev->socket.data; 2876 lev->socket.data = NULL; 2877 2878 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 2879 task = &task->thread->engine->task; 2880 2881 nxt_router_listen_socket_release(task, joint->socket_conf); 2882 2883 nxt_router_listen_event_release(task, lev, joint); 2884 } 2885 2886 2887 static void 2888 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 2889 { 2890 nxt_listen_socket_t *ls; 2891 nxt_thread_spinlock_t *lock; 2892 2893 ls = skcf->listen; 2894 lock = &skcf->router_conf->router->lock; 2895 2896 nxt_thread_spin_lock(lock); 2897 2898 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 2899 task->thread->engine, ls->count); 2900 2901 if (--ls->count != 0) { 2902 ls = NULL; 2903 } 2904 2905 nxt_thread_spin_unlock(lock); 2906 2907 if (ls != NULL) { 2908 nxt_socket_close(task, ls->socket); 2909 nxt_free(ls); 2910 } 2911 } 2912 2913 2914 void 2915 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 2916 nxt_socket_conf_joint_t *joint) 2917 { 2918 nxt_event_engine_t *engine; 2919 2920 nxt_debug(task, "listen event count: %D", lev->count); 2921 2922 if (--lev->count == 0) { 2923 nxt_free(lev); 2924 } 2925 2926 if (joint != NULL) { 2927 nxt_router_conf_release(task, joint); 2928 } 2929 2930 engine = task->thread->engine; 2931 2932 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 2933 nxt_thread_exit(task->thread); 2934 } 2935 } 2936 2937 2938 void 2939 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 2940 { 2941 nxt_socket_conf_t *skcf; 2942 nxt_router_conf_t *rtcf; 2943 nxt_thread_spinlock_t *lock; 2944 2945 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 2946 2947 if (--joint->count != 0) { 2948 return; 2949 } 2950 2951 nxt_queue_remove(&joint->link); 2952 2953 /* 2954 * The joint content can not be safely used after the critical 2955 * section protected by the spinlock because its memory pool may 2956 * be already destroyed by another thread. 2957 */ 2958 skcf = joint->socket_conf; 2959 rtcf = skcf->router_conf; 2960 lock = &rtcf->router->lock; 2961 2962 nxt_thread_spin_lock(lock); 2963 2964 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 2965 rtcf, rtcf->count); 2966 2967 if (--skcf->count != 0) { 2968 skcf = NULL; 2969 rtcf = NULL; 2970 2971 } else { 2972 nxt_queue_remove(&skcf->link); 2973 2974 if (--rtcf->count != 0) { 2975 rtcf = NULL; 2976 } 2977 } 2978 2979 nxt_thread_spin_unlock(lock); 2980 2981 if (skcf != NULL) { 2982 if (skcf->pass != NULL) { 2983 nxt_http_pass_cleanup(task, skcf->pass); 2984 } 2985 2986 #if (NXT_TLS) 2987 if (skcf->tls != NULL) { 2988 task->thread->runtime->tls->server_free(task, skcf->tls); 2989 } 2990 #endif 2991 } 2992 2993 /* TODO remove engine->port */ 2994 /* TODO excude from connected ports */ 2995 2996 if (rtcf != NULL) { 2997 nxt_debug(task, "old router conf is destroyed"); 2998 2999 nxt_http_routes_cleanup(task, rtcf->routes); 3000 3001 nxt_router_access_log_release(task, lock, rtcf->access_log); 3002 3003 nxt_mp_thread_adopt(rtcf->mem_pool); 3004 3005 nxt_mp_destroy(rtcf->mem_pool); 3006 } 3007 } 3008 3009 3010 static void 3011 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3012 nxt_router_access_log_t *access_log) 3013 { 3014 size_t size; 3015 u_char *buf, *p; 3016 nxt_off_t bytes; 3017 3018 static nxt_time_string_t date_cache = { 3019 (nxt_atomic_uint_t) -1, 3020 nxt_router_access_log_date, 3021 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3022 nxt_length("31/Dec/1986:19:40:00 +0300"), 3023 NXT_THREAD_TIME_LOCAL, 3024 NXT_THREAD_TIME_SEC, 3025 }; 3026 3027 size = r->remote->address_length 3028 + 6 /* ' - - [' */ 3029 + date_cache.size 3030 + 3 /* '] "' */ 3031 + r->method->length 3032 + 1 /* space */ 3033 + r->target.length 3034 + 1 /* space */ 3035 + r->version.length 3036 + 2 /* '" ' */ 3037 + 3 /* status */ 3038 + 1 /* space */ 3039 + NXT_OFF_T_LEN 3040 + 2 /* ' "' */ 3041 + (r->referer != NULL ? r->referer->value_length : 1) 3042 + 3 /* '" "' */ 3043 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3044 + 2 /* '"\n' */ 3045 ; 3046 3047 buf = nxt_mp_nget(r->mem_pool, size); 3048 if (nxt_slow_path(buf == NULL)) { 3049 return; 3050 } 3051 3052 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3053 r->remote->address_length); 3054 3055 p = nxt_cpymem(p, " - - [", 6); 3056 3057 p = nxt_thread_time_string(task->thread, &date_cache, p); 3058 3059 p = nxt_cpymem(p, "] \"", 3); 3060 3061 if (r->method->length != 0) { 3062 p = nxt_cpymem(p, r->method->start, r->method->length); 3063 3064 if (r->target.length != 0) { 3065 *p++ = ' '; 3066 p = nxt_cpymem(p, r->target.start, r->target.length); 3067 3068 if (r->version.length != 0) { 3069 *p++ = ' '; 3070 p = nxt_cpymem(p, r->version.start, r->version.length); 3071 } 3072 } 3073 3074 } else { 3075 *p++ = '-'; 3076 } 3077 3078 p = nxt_cpymem(p, "\" ", 2); 3079 3080 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3081 3082 *p++ = ' '; 3083 3084 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3085 3086 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3087 3088 p = nxt_cpymem(p, " \"", 2); 3089 3090 if (r->referer != NULL) { 3091 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3092 3093 } else { 3094 *p++ = '-'; 3095 } 3096 3097 p = nxt_cpymem(p, "\" \"", 3); 3098 3099 if (r->user_agent != NULL) { 3100 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3101 3102 } else { 3103 *p++ = '-'; 3104 } 3105 3106 p = nxt_cpymem(p, "\"\n", 2); 3107 3108 nxt_fd_write(access_log->fd, buf, p - buf); 3109 } 3110 3111 3112 static u_char * 3113 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3114 size_t size, const char *format) 3115 { 3116 u_char sign; 3117 time_t gmtoff; 3118 3119 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3120 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3121 3122 gmtoff = nxt_timezone(tm) / 60; 3123 3124 if (gmtoff < 0) { 3125 gmtoff = -gmtoff; 3126 sign = '-'; 3127 3128 } else { 3129 sign = '+'; 3130 } 3131 3132 return nxt_sprintf(buf, buf + size, format, 3133 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3134 tm->tm_hour, tm->tm_min, tm->tm_sec, 3135 sign, gmtoff / 60, gmtoff % 60); 3136 } 3137 3138 3139 static void 3140 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3141 { 3142 uint32_t stream; 3143 nxt_int_t ret; 3144 nxt_buf_t *b; 3145 nxt_port_t *main_port, *router_port; 3146 nxt_runtime_t *rt; 3147 nxt_router_access_log_t *access_log; 3148 3149 access_log = tmcf->router_conf->access_log; 3150 3151 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3152 if (nxt_slow_path(b == NULL)) { 3153 goto fail; 3154 } 3155 3156 nxt_buf_cpystr(b, &access_log->path); 3157 *b->mem.free++ = '\0'; 3158 3159 rt = task->thread->runtime; 3160 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3161 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3162 3163 stream = nxt_port_rpc_register_handler(task, router_port, 3164 nxt_router_access_log_ready, 3165 nxt_router_access_log_error, 3166