1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 #include <nxt_app_queue.h> 19 #include <nxt_port_queue.h> 20 21 typedef struct { 22 nxt_str_t type; 23 uint32_t processes; 24 uint32_t max_processes; 25 uint32_t spare_processes; 26 nxt_msec_t timeout; 27 nxt_msec_t idle_timeout; 28 uint32_t requests; 29 nxt_conf_value_t *limits_value; 30 nxt_conf_value_t *processes_value; 31 nxt_conf_value_t *targets_value; 32 } nxt_router_app_conf_t; 33 34 35 typedef struct { 36 nxt_str_t pass; 37 nxt_str_t application; 38 } nxt_router_listener_conf_t; 39 40 41 #if (NXT_TLS) 42 43 typedef struct { 44 nxt_str_t name; 45 nxt_socket_conf_t *socket_conf; 46 nxt_router_temp_conf_t *temp_conf; 47 nxt_tls_init_t *tls_init; 48 nxt_bool_t last; 49 50 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 51 } nxt_router_tlssock_t; 52 53 #endif 54 55 56 typedef struct { 57 nxt_str_t *name; 58 nxt_socket_conf_t *socket_conf; 59 nxt_router_temp_conf_t *temp_conf; 60 nxt_bool_t last; 61 } nxt_socket_rpc_t; 62 63 64 typedef struct { 65 nxt_app_t *app; 66 nxt_router_temp_conf_t *temp_conf; 67 } nxt_app_rpc_t; 68 69 70 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 71 nxt_mp_t *mp); 72 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 73 static void nxt_router_greet_controller(nxt_task_t *task, 74 nxt_port_t *controller_port); 75 76 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 77 78 static void nxt_router_new_port_handler(nxt_task_t *task, 79 nxt_port_recv_msg_t *msg); 80 static void nxt_router_conf_data_handler(nxt_task_t *task, 81 nxt_port_recv_msg_t *msg); 82 static void nxt_router_remove_pid_handler(nxt_task_t *task, 83 nxt_port_recv_msg_t *msg); 84 static void nxt_router_access_log_reopen_handler(nxt_task_t *task, 85 nxt_port_recv_msg_t *msg); 86 87 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 88 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 89 static void nxt_router_conf_ready(nxt_task_t *task, 90 nxt_router_temp_conf_t *tmcf); 91 static void nxt_router_conf_error(nxt_task_t *task, 92 nxt_router_temp_conf_t *tmcf); 93 static void nxt_router_conf_send(nxt_task_t *task, 94 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 95 96 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 97 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 98 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 99 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 100 101 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 102 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 103 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 104 nxt_app_t *app); 105 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 106 nxt_str_t *name); 107 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 108 int i); 109 110 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 111 nxt_port_t *port); 112 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 113 nxt_port_t *port); 114 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 115 nxt_port_t *port, nxt_fd_t fd); 116 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 117 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 118 static void nxt_router_listen_socket_ready(nxt_task_t *task, 119 nxt_port_recv_msg_t *msg, void *data); 120 static void nxt_router_listen_socket_error(nxt_task_t *task, 121 nxt_port_recv_msg_t *msg, void *data); 122 #if (NXT_TLS) 123 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 124 nxt_port_recv_msg_t *msg, void *data); 125 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 126 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init, 127 nxt_bool_t last); 128 #endif 129 static void nxt_router_app_rpc_create(nxt_task_t *task, 130 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 131 static void nxt_router_app_prefork_ready(nxt_task_t *task, 132 nxt_port_recv_msg_t *msg, void *data); 133 static void nxt_router_app_prefork_error(nxt_task_t *task, 134 nxt_port_recv_msg_t *msg, void *data); 135 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 136 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 137 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 138 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 139 140 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 141 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 142 const nxt_event_interface_t *interface); 143 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 144 nxt_router_engine_conf_t *recf); 145 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 146 nxt_router_engine_conf_t *recf); 147 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 148 nxt_router_engine_conf_t *recf); 149 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 150 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 151 nxt_work_handler_t handler); 152 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 153 nxt_router_engine_conf_t *recf); 154 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 155 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 156 157 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 158 nxt_router_temp_conf_t *tmcf); 159 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 160 nxt_event_engine_t *engine); 161 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 162 nxt_router_temp_conf_t *tmcf); 163 164 static void nxt_router_engines_post(nxt_router_t *router, 165 nxt_router_temp_conf_t *tmcf); 166 static void nxt_router_engine_post(nxt_event_engine_t *engine, 167 nxt_work_t *jobs); 168 169 static void nxt_router_thread_start(void *data); 170 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 171 void *data); 172 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 173 void *data); 174 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 175 void *data); 176 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 177 void *data); 178 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 179 void *data); 180 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 181 void *data); 182 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 183 void *data); 184 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 185 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 186 static void nxt_router_listen_socket_release(nxt_task_t *task, 187 nxt_socket_conf_t *skcf); 188 189 static void nxt_router_access_log_writer(nxt_task_t *task, 190 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 191 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 192 struct tm *tm, size_t size, const char *format); 193 static void nxt_router_access_log_open(nxt_task_t *task, 194 nxt_router_temp_conf_t *tmcf); 195 static void nxt_router_access_log_ready(nxt_task_t *task, 196 nxt_port_recv_msg_t *msg, void *data); 197 static void nxt_router_access_log_error(nxt_task_t *task, 198 nxt_port_recv_msg_t *msg, void *data); 199 static void nxt_router_access_log_release(nxt_task_t *task, 200 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 201 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 202 void *data); 203 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 204 nxt_port_recv_msg_t *msg, void *data); 205 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 206 nxt_port_recv_msg_t *msg, void *data); 207 208 static void nxt_router_app_port_ready(nxt_task_t *task, 209 nxt_port_recv_msg_t *msg, void *data); 210 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, 211 nxt_port_t *app_port); 212 static void nxt_router_app_port_error(nxt_task_t *task, 213 nxt_port_recv_msg_t *msg, void *data); 214 215 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 216 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 217 218 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 219 nxt_apr_action_t action); 220 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 221 nxt_request_rpc_data_t *req_rpc_data); 222 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 223 void *data); 224 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 225 void *data); 226 227 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, 228 void *data); 229 static void nxt_router_app_prepare_request(nxt_task_t *task, 230 nxt_request_rpc_data_t *req_rpc_data); 231 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 232 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 233 234 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 235 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 236 void *data); 237 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 238 void *data); 239 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 240 void *data); 241 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 242 243 static const nxt_http_request_state_t nxt_http_request_send_state; 244 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 245 246 static void nxt_router_app_joint_use(nxt_task_t *task, 247 nxt_app_joint_t *app_joint, int i); 248 249 static void nxt_router_http_request_release_post(nxt_task_t *task, 250 nxt_http_request_t *r); 251 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 252 void *data); 253 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 254 static void nxt_router_get_port_handler(nxt_task_t *task, 255 nxt_port_recv_msg_t *msg); 256 static void nxt_router_get_mmap_handler(nxt_task_t *task, 257 nxt_port_recv_msg_t *msg); 258 259 extern const nxt_http_request_state_t nxt_http_websocket; 260 261 static nxt_router_t *nxt_router; 262 263 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 264 static const nxt_str_t empty_prefix = nxt_string(""); 265 266 static const nxt_str_t *nxt_app_msg_prefix[] = { 267 &empty_prefix, 268 &empty_prefix, 269 &http_prefix, 270 &http_prefix, 271 &http_prefix, 272 &empty_prefix, 273 }; 274 275 276 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 277 .quit = nxt_signal_quit_handler, 278 .new_port = nxt_router_new_port_handler, 279 .get_port = nxt_router_get_port_handler, 280 .change_file = nxt_port_change_log_file_handler, 281 .mmap = nxt_port_mmap_handler, 282 .get_mmap = nxt_router_get_mmap_handler, 283 .data = nxt_router_conf_data_handler, 284 .remove_pid = nxt_router_remove_pid_handler, 285 .access_log = nxt_router_access_log_reopen_handler, 286 .rpc_ready = nxt_port_rpc_handler, 287 .rpc_error = nxt_port_rpc_handler, 288 .oosm = nxt_router_oosm_handler, 289 }; 290 291 292 const nxt_process_init_t nxt_router_process = { 293 .name = "router", 294 .type = NXT_PROCESS_ROUTER, 295 .prefork = nxt_router_prefork, 296 .restart = 1, 297 .setup = nxt_process_core_setup, 298 .start = nxt_router_start, 299 .port_handlers = &nxt_router_process_port_handlers, 300 .signals = nxt_process_signals, 301 }; 302 303 304 /* Queues of nxt_socket_conf_t */ 305 nxt_queue_t creating_sockets; 306 nxt_queue_t pending_sockets; 307 nxt_queue_t updating_sockets; 308 nxt_queue_t keeping_sockets; 309 nxt_queue_t deleting_sockets; 310 311 312 static nxt_int_t 313 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 314 { 315 nxt_runtime_stop_app_processes(task, task->thread->runtime); 316 317 return NXT_OK; 318 } 319 320 321 static nxt_int_t 322 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 323 { 324 nxt_int_t ret; 325 nxt_port_t *controller_port; 326 nxt_router_t *router; 327 nxt_runtime_t *rt; 328 329 rt = task->thread->runtime; 330 331 nxt_log(task, NXT_LOG_INFO, "router started"); 332 333 #if (NXT_TLS) 334 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 335 if (nxt_slow_path(rt->tls == NULL)) { 336 return NXT_ERROR; 337 } 338 339 ret = rt->tls->library_init(task); 340 if (nxt_slow_path(ret != NXT_OK)) { 341 return ret; 342 } 343 #endif 344 345 ret = nxt_http_init(task); 346 if (nxt_slow_path(ret != NXT_OK)) { 347 return ret; 348 } 349 350 router = nxt_zalloc(sizeof(nxt_router_t)); 351 if (nxt_slow_path(router == NULL)) { 352 return NXT_ERROR; 353 } 354 355 nxt_queue_init(&router->engines); 356 nxt_queue_init(&router->sockets); 357 nxt_queue_init(&router->apps); 358 359 nxt_router = router; 360 361 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 362 if (controller_port != NULL) { 363 nxt_router_greet_controller(task, controller_port); 364 } 365 366 return NXT_OK; 367 } 368 369 370 static void 371 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 372 { 373 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 374 -1, 0, 0, NULL); 375 } 376 377 378 static void 379 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 380 void *data) 381 { 382 size_t size; 383 uint32_t stream; 384 nxt_mp_t *mp; 385 nxt_int_t ret; 386 nxt_app_t *app; 387 nxt_buf_t *b; 388 nxt_port_t *main_port; 389 nxt_runtime_t *rt; 390 391 app = data; 392 393 rt = task->thread->runtime; 394 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 395 396 nxt_debug(task, "app '%V' %p start process", &app->name, app); 397 398 size = app->name.length + 1 + app->conf.length; 399 400 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 401 402 if (nxt_slow_path(b == NULL)) { 403 goto failed; 404 } 405 406 nxt_buf_cpystr(b, &app->name); 407 *b->mem.free++ = '\0'; 408 nxt_buf_cpystr(b, &app->conf); 409 410 nxt_router_app_joint_use(task, app->joint, 1); 411 412 stream = nxt_port_rpc_register_handler(task, port, 413 nxt_router_app_port_ready, 414 nxt_router_app_port_error, 415 -1, app->joint); 416 417 if (nxt_slow_path(stream == 0)) { 418 nxt_router_app_joint_use(task, app->joint, -1); 419 420 goto failed; 421 } 422 423 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 424 -1, stream, port->id, b); 425 426 if (nxt_slow_path(ret != NXT_OK)) { 427 nxt_port_rpc_cancel(task, port, stream); 428 429 nxt_router_app_joint_use(task, app->joint, -1); 430 431 goto failed; 432 } 433 434 nxt_router_app_use(task, app, -1); 435 436 return; 437 438 failed: 439 440 if (b != NULL) { 441 mp = b->data; 442 nxt_mp_free(mp, b); 443 nxt_mp_release(mp); 444 } 445 446 nxt_thread_mutex_lock(&app->mutex); 447 448 app->pending_processes--; 449 450 nxt_thread_mutex_unlock(&app->mutex); 451 452 nxt_router_app_use(task, app, -1); 453 } 454 455 456 static void 457 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 458 { 459 app_joint->use_count += i; 460 461 if (app_joint->use_count == 0) { 462 nxt_assert(app_joint->app == NULL); 463 464 nxt_free(app_joint); 465 } 466 } 467 468 469 static nxt_int_t 470 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 471 { 472 nxt_int_t res; 473 nxt_port_t *router_port; 474 nxt_runtime_t *rt; 475 476 nxt_debug(task, "app '%V' start process", &app->name); 477 478 rt = task->thread->runtime; 479 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 480 481 nxt_router_app_use(task, app, 1); 482 483 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 484 app); 485 486 if (res == NXT_OK) { 487 return res; 488 } 489 490 nxt_thread_mutex_lock(&app->mutex); 491 492 app->pending_processes--; 493 494 nxt_thread_mutex_unlock(&app->mutex); 495 496 nxt_router_app_use(task, app, -1); 497 498 return NXT_ERROR; 499 } 500 501 502 nxt_inline nxt_bool_t 503 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 504 { 505 nxt_buf_t *b, *next; 506 nxt_bool_t cancelled; 507 nxt_msg_info_t *msg_info; 508 509 msg_info = &req_rpc_data->msg_info; 510 511 if (msg_info->buf == NULL) { 512 return 0; 513 } 514 515 cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue, 516 msg_info->tracking_cookie, 517 req_rpc_data->stream); 518 519 if (cancelled) { 520 nxt_debug(task, "stream #%uD: cancelled by router", 521 req_rpc_data->stream); 522 } 523 524 for (b = msg_info->buf; b != NULL; b = next) { 525 next = b->next; 526 b->next = NULL; 527 528 if (b->is_port_mmap_sent) { 529 b->is_port_mmap_sent = cancelled == 0; 530 } 531 532 b->completion_handler(task, b, b->parent); 533 } 534 535 msg_info->buf = NULL; 536 537 return cancelled; 538 } 539 540 541 nxt_inline nxt_bool_t 542 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 543 { 544 if (lnk->next != NULL) { 545 nxt_queue_remove(lnk); 546 547 lnk->next = NULL; 548 549 return 1; 550 } 551 552 return 0; 553 } 554 555 556 nxt_inline void 557 nxt_request_rpc_data_unlink(nxt_task_t *task, 558 nxt_request_rpc_data_t *req_rpc_data) 559 { 560 nxt_app_t *app; 561 nxt_bool_t unlinked; 562 nxt_http_request_t *r; 563 564 nxt_router_msg_cancel(task, req_rpc_data); 565 566 if (req_rpc_data->app_port != NULL) { 567 nxt_router_app_port_release(task, req_rpc_data->app_port, 568 req_rpc_data->apr_action); 569 570 req_rpc_data->app_port = NULL; 571 } 572 573 app = req_rpc_data->app; 574 r = req_rpc_data->request; 575 576 if (r != NULL) { 577 r->timer_data = NULL; 578 579 nxt_router_http_request_release_post(task, r); 580 581 r->req_rpc_data = NULL; 582 req_rpc_data->request = NULL; 583 584 if (app != NULL) { 585 unlinked = 0; 586 587 nxt_thread_mutex_lock(&app->mutex); 588 589 if (r->app_link.next != NULL) { 590 nxt_queue_remove(&r->app_link); 591 r->app_link.next = NULL; 592 593 unlinked = 1; 594 } 595 596 nxt_thread_mutex_unlock(&app->mutex); 597 598 if (unlinked) { 599 nxt_mp_release(r->mem_pool); 600 } 601 } 602 } 603 604 if (app != NULL) { 605 nxt_router_app_use(task, app, -1); 606 607 req_rpc_data->app = NULL; 608 } 609 610 if (req_rpc_data->msg_info.body_fd != -1) { 611 nxt_fd_close(req_rpc_data->msg_info.body_fd); 612 613 req_rpc_data->msg_info.body_fd = -1; 614 } 615 616 if (req_rpc_data->rpc_cancel) { 617 req_rpc_data->rpc_cancel = 0; 618 619 nxt_port_rpc_cancel(task, task->thread->engine->port, 620 req_rpc_data->stream); 621 } 622 } 623 624 625 static void 626 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 627 { 628 nxt_int_t res; 629 nxt_app_t *app; 630 nxt_port_t *port, *main_app_port; 631 nxt_runtime_t *rt; 632 633 nxt_port_new_port_handler(task, msg); 634 635 port = msg->u.new_port; 636 637 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 638 nxt_router_greet_controller(task, msg->u.new_port); 639 } 640 641 if (port == NULL || port->type != NXT_PROCESS_APP) { 642 643 if (msg->port_msg.stream == 0) { 644 return; 645 } 646 647 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 648 649 } else { 650 if (msg->fd[1] != -1) { 651 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 652 if (nxt_slow_path(res != NXT_OK)) { 653 return; 654 } 655 656 nxt_fd_close(msg->fd[1]); 657 msg->fd[1] = -1; 658 } 659 } 660 661 if (msg->port_msg.stream != 0) { 662 nxt_port_rpc_handler(task, msg); 663 return; 664 } 665 666 /* 667 * Port with "id == 0" is application 'main' port and it always 668 * should come with non-zero stream. 669 */ 670 nxt_assert(port->id != 0); 671 672 /* Find 'main' app port and get app reference. */ 673 rt = task->thread->runtime; 674 675 /* 676 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 677 * sent to main port (with id == 0) and processed in main thread. 678 */ 679 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 680 nxt_assert(main_app_port != NULL); 681 682 app = main_app_port->app; 683 684 if (nxt_fast_path(app != NULL)) { 685 nxt_thread_mutex_lock(&app->mutex); 686 687 /* TODO here should be find-and-add code because there can be 688 port waiters in port_hash */ 689 nxt_port_hash_add(&app->port_hash, port); 690 app->port_hash_count++; 691 692 nxt_thread_mutex_unlock(&app->mutex); 693 694 port->app = app; 695 } 696 697 port->main_app_port = main_app_port; 698 699 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 700 } 701 702 703 static void 704 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 705 { 706 void *p; 707 size_t size; 708 nxt_int_t ret; 709 nxt_port_t *port; 710 nxt_router_temp_conf_t *tmcf; 711 712 port = nxt_runtime_port_find(task->thread->runtime, 713 msg->port_msg.pid, 714 msg->port_msg.reply_port); 715 if (nxt_slow_path(port == NULL)) { 716 nxt_alert(task, "conf_data_handler: reply port not found"); 717 return; 718 } 719 720 p = MAP_FAILED; 721 722 /* 723 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 724 * initialized in 'cleanup' section. 725 */ 726 size = 0; 727 728 tmcf = nxt_router_temp_conf(task); 729 if (nxt_slow_path(tmcf == NULL)) { 730 goto fail; 731 } 732 733 if (nxt_slow_path(msg->fd[0] == -1)) { 734 nxt_alert(task, "conf_data_handler: invalid shm fd"); 735 goto fail; 736 } 737 738 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 739 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 740 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 741 goto fail; 742 } 743 744 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 745 746 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 747 748 nxt_fd_close(msg->fd[0]); 749 msg->fd[0] = -1; 750 751 if (nxt_slow_path(p == MAP_FAILED)) { 752 goto fail; 753 } 754 755 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 756 757 tmcf->router_conf->router = nxt_router; 758 tmcf->stream = msg->port_msg.stream; 759 tmcf->port = port; 760 761 nxt_port_use(task, tmcf->port, 1); 762 763 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 764 765 if (nxt_fast_path(ret == NXT_OK)) { 766 nxt_router_conf_apply(task, tmcf, NULL); 767 768 } else { 769 nxt_router_conf_error(task, tmcf); 770 } 771 772 goto cleanup; 773 774 fail: 775 776 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 777 msg->port_msg.stream, 0, NULL); 778 779 if (tmcf != NULL) { 780 nxt_mp_release(tmcf->mem_pool); 781 } 782 783 cleanup: 784 785 if (p != MAP_FAILED) { 786 nxt_mem_munmap(p, size); 787 } 788 789 if (msg->fd[0] != -1) { 790 nxt_fd_close(msg->fd[0]); 791 msg->fd[0] = -1; 792 } 793 } 794 795 796 static void 797 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 798 void *data) 799 { 800 union { 801 nxt_pid_t removed_pid; 802 void *data; 803 } u; 804 805 u.data = data; 806 807 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 808 } 809 810 811 static void 812 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 813 { 814 nxt_event_engine_t *engine; 815 816 nxt_port_remove_pid_handler(task, msg); 817 818 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 819 { 820 if (nxt_fast_path(engine->port != NULL)) { 821 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 822 msg->u.data); 823 } 824 } 825 nxt_queue_loop; 826 827 if (msg->port_msg.stream == 0) { 828 return; 829 } 830 831 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 832 833 nxt_port_rpc_handler(task, msg); 834 } 835 836 837 static nxt_router_temp_conf_t * 838 nxt_router_temp_conf(nxt_task_t *task) 839 { 840 nxt_mp_t *mp, *tmp; 841 nxt_router_conf_t *rtcf; 842 nxt_router_temp_conf_t *tmcf; 843 844 mp = nxt_mp_create(1024, 128, 256, 32); 845 if (nxt_slow_path(mp == NULL)) { 846 return NULL; 847 } 848 849 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 850 if (nxt_slow_path(rtcf == NULL)) { 851 goto fail; 852 } 853 854 rtcf->mem_pool = mp; 855 856 tmp = nxt_mp_create(1024, 128, 256, 32); 857 if (nxt_slow_path(tmp == NULL)) { 858 goto fail; 859 } 860 861 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 862 if (nxt_slow_path(tmcf == NULL)) { 863 goto temp_fail; 864 } 865 866 tmcf->mem_pool = tmp; 867 tmcf->router_conf = rtcf; 868 tmcf->count = 1; 869 tmcf->engine = task->thread->engine; 870 871 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 872 sizeof(nxt_router_engine_conf_t)); 873 if (nxt_slow_path(tmcf->engines == NULL)) { 874 goto temp_fail; 875 } 876 877 nxt_queue_init(&creating_sockets); 878 nxt_queue_init(&pending_sockets); 879 nxt_queue_init(&updating_sockets); 880 nxt_queue_init(&keeping_sockets); 881 nxt_queue_init(&deleting_sockets); 882 883 #if (NXT_TLS) 884 nxt_queue_init(&tmcf->tls); 885 #endif 886 887 nxt_queue_init(&tmcf->apps); 888 nxt_queue_init(&tmcf->previous); 889 890 return tmcf; 891 892 temp_fail: 893 894 nxt_mp_destroy(tmp); 895 896 fail: 897 898 nxt_mp_destroy(mp); 899 900 return NULL; 901 } 902 903 904 nxt_inline nxt_bool_t 905 nxt_router_app_can_start(nxt_app_t *app) 906 { 907 return app->processes + app->pending_processes < app->max_processes 908 && app->pending_processes < app->max_pending_processes; 909 } 910 911 912 nxt_inline nxt_bool_t 913 nxt_router_app_need_start(nxt_app_t *app) 914 { 915 return (app->active_requests 916 > app->port_hash_count + app->pending_processes) 917 || (app->spare_processes 918 > app->idle_processes + app->pending_processes); 919 } 920 921 922 static void 923 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 924 { 925 nxt_int_t ret; 926 nxt_app_t *app; 927 nxt_router_t *router; 928 nxt_runtime_t *rt; 929 nxt_queue_link_t *qlk; 930 nxt_socket_conf_t *skcf; 931 nxt_router_conf_t *rtcf; 932 nxt_router_temp_conf_t *tmcf; 933 const nxt_event_interface_t *interface; 934 #if (NXT_TLS) 935 nxt_router_tlssock_t *tls; 936 #endif 937 938 tmcf = obj; 939 940 qlk = nxt_queue_first(&pending_sockets); 941 942 if (qlk != nxt_queue_tail(&pending_sockets)) { 943 nxt_queue_remove(qlk); 944 nxt_queue_insert_tail(&creating_sockets, qlk); 945 946 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 947 948 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 949 950 return; 951 } 952 953 #if (NXT_TLS) 954 qlk = nxt_queue_last(&tmcf->tls); 955 956 if (qlk != nxt_queue_head(&tmcf->tls)) { 957 nxt_queue_remove(qlk); 958 959 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 960 961 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 962 nxt_router_tls_rpc_handler, tls); 963 return; 964 } 965 #endif 966 967 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 968 969 if (nxt_router_app_need_start(app)) { 970 nxt_router_app_rpc_create(task, tmcf, app); 971 return; 972 } 973 974 } nxt_queue_loop; 975 976 rtcf = tmcf->router_conf; 977 978 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 979 nxt_router_access_log_open(task, tmcf); 980 return; 981 } 982 983 rt = task->thread->runtime; 984 985 interface = nxt_service_get(rt->services, "engine", NULL); 986 987 router = rtcf->router; 988 989 ret = nxt_router_engines_create(task, router, tmcf, interface); 990 if (nxt_slow_path(ret != NXT_OK)) { 991 goto fail; 992 } 993 994 ret = nxt_router_threads_create(task, rt, tmcf); 995 if (nxt_slow_path(ret != NXT_OK)) { 996 goto fail; 997 } 998 999 nxt_router_apps_sort(task, router, tmcf); 1000 1001 nxt_router_apps_hash_use(task, rtcf, 1); 1002 1003 nxt_router_engines_post(router, tmcf); 1004 1005 nxt_queue_add(&router->sockets, &updating_sockets); 1006 nxt_queue_add(&router->sockets, &creating_sockets); 1007 1008 router->access_log = rtcf->access_log; 1009 1010 nxt_router_conf_ready(task, tmcf); 1011 1012 return; 1013 1014 fail: 1015 1016 nxt_router_conf_error(task, tmcf); 1017 1018 return; 1019 } 1020 1021 1022 static void 1023 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1024 { 1025 nxt_joint_job_t *job; 1026 1027 job = obj; 1028 1029 nxt_router_conf_ready(task, job->tmcf); 1030 } 1031 1032 1033 static void 1034 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1035 { 1036 uint32_t count; 1037 nxt_router_conf_t *rtcf; 1038 nxt_thread_spinlock_t *lock; 1039 1040 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1041 1042 if (--tmcf->count > 0) { 1043 return; 1044 } 1045 1046 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1047 1048 rtcf = tmcf->router_conf; 1049 1050 lock = &rtcf->router->lock; 1051 1052 nxt_thread_spin_lock(lock); 1053 1054 count = rtcf->count; 1055 1056 nxt_thread_spin_unlock(lock); 1057 1058 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1059 1060 if (count == 0) { 1061 nxt_router_apps_hash_use(task, rtcf, -1); 1062 1063 nxt_router_access_log_release(task, lock, rtcf->access_log); 1064 1065 nxt_mp_destroy(rtcf->mem_pool); 1066 } 1067 1068 nxt_mp_release(tmcf->mem_pool); 1069 } 1070 1071 1072 static void 1073 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1074 { 1075 nxt_app_t *app; 1076 nxt_queue_t new_socket_confs; 1077 nxt_socket_t s; 1078 nxt_router_t *router; 1079 nxt_queue_link_t *qlk; 1080 nxt_socket_conf_t *skcf; 1081 nxt_router_conf_t *rtcf; 1082 1083 nxt_alert(task, "failed to apply new conf"); 1084 1085 for (qlk = nxt_queue_first(&creating_sockets); 1086 qlk != nxt_queue_tail(&creating_sockets); 1087 qlk = nxt_queue_next(qlk)) 1088 { 1089 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1090 s = skcf->listen->socket; 1091 1092 if (s != -1) { 1093 nxt_socket_close(task, s); 1094 } 1095 1096 nxt_free(skcf->listen); 1097 } 1098 1099 nxt_queue_init(&new_socket_confs); 1100 nxt_queue_add(&new_socket_confs, &updating_sockets); 1101 nxt_queue_add(&new_socket_confs, &pending_sockets); 1102 nxt_queue_add(&new_socket_confs, &creating_sockets); 1103 1104 rtcf = tmcf->router_conf; 1105 1106 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1107 1108 nxt_router_app_unlink(task, app); 1109 1110 } nxt_queue_loop; 1111 1112 router = rtcf->router; 1113 1114 nxt_queue_add(&router->sockets, &keeping_sockets); 1115 nxt_queue_add(&router->sockets, &deleting_sockets); 1116 1117 nxt_queue_add(&router->apps, &tmcf->previous); 1118 1119 // TODO: new engines and threads 1120 1121 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1122 1123 nxt_mp_destroy(rtcf->mem_pool); 1124 1125 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1126 1127 nxt_mp_release(tmcf->mem_pool); 1128 } 1129 1130 1131 static void 1132 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1133 nxt_port_msg_type_t type) 1134 { 1135 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1136 1137 nxt_port_use(task, tmcf->port, -1); 1138 1139 tmcf->port = NULL; 1140 } 1141 1142 1143 static nxt_conf_map_t nxt_router_conf[] = { 1144 { 1145 nxt_string("listeners_threads"), 1146 NXT_CONF_MAP_INT32, 1147 offsetof(nxt_router_conf_t, threads), 1148 }, 1149 }; 1150 1151 1152 static nxt_conf_map_t nxt_router_app_conf[] = { 1153 { 1154 nxt_string("type"), 1155 NXT_CONF_MAP_STR, 1156 offsetof(nxt_router_app_conf_t, type), 1157 }, 1158 1159 { 1160 nxt_string("limits"), 1161 NXT_CONF_MAP_PTR, 1162 offsetof(nxt_router_app_conf_t, limits_value), 1163 }, 1164 1165 { 1166 nxt_string("processes"), 1167 NXT_CONF_MAP_INT32, 1168 offsetof(nxt_router_app_conf_t, processes), 1169 }, 1170 1171 { 1172 nxt_string("processes"), 1173 NXT_CONF_MAP_PTR, 1174 offsetof(nxt_router_app_conf_t, processes_value), 1175 }, 1176 1177 { 1178 nxt_string("targets"), 1179 NXT_CONF_MAP_PTR, 1180 offsetof(nxt_router_app_conf_t, targets_value), 1181 }, 1182 }; 1183 1184 1185 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1186 { 1187 nxt_string("timeout"), 1188 NXT_CONF_MAP_MSEC, 1189 offsetof(nxt_router_app_conf_t, timeout), 1190 }, 1191 1192 { 1193 nxt_string("requests"), 1194 NXT_CONF_MAP_INT32, 1195 offsetof(nxt_router_app_conf_t, requests), 1196 }, 1197 }; 1198 1199 1200 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1201 { 1202 nxt_string("spare"), 1203 NXT_CONF_MAP_INT32, 1204 offsetof(nxt_router_app_conf_t, spare_processes), 1205 }, 1206 1207 { 1208 nxt_string("max"), 1209 NXT_CONF_MAP_INT32, 1210 offsetof(nxt_router_app_conf_t, max_processes), 1211 }, 1212 1213 { 1214 nxt_string("idle_timeout"), 1215 NXT_CONF_MAP_MSEC, 1216 offsetof(nxt_router_app_conf_t, idle_timeout), 1217 }, 1218 }; 1219 1220 1221 static nxt_conf_map_t nxt_router_listener_conf[] = { 1222 { 1223 nxt_string("pass"), 1224 NXT_CONF_MAP_STR_COPY, 1225 offsetof(nxt_router_listener_conf_t, pass), 1226 }, 1227 1228 { 1229 nxt_string("application"), 1230 NXT_CONF_MAP_STR_COPY, 1231 offsetof(nxt_router_listener_conf_t, application), 1232 }, 1233 }; 1234 1235 1236 static nxt_conf_map_t nxt_router_http_conf[] = { 1237 { 1238 nxt_string("header_buffer_size"), 1239 NXT_CONF_MAP_SIZE, 1240 offsetof(nxt_socket_conf_t, header_buffer_size), 1241 }, 1242 1243 { 1244 nxt_string("large_header_buffer_size"), 1245 NXT_CONF_MAP_SIZE, 1246 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1247 }, 1248 1249 { 1250 nxt_string("large_header_buffers"), 1251 NXT_CONF_MAP_SIZE, 1252 offsetof(nxt_socket_conf_t, large_header_buffers), 1253 }, 1254 1255 { 1256 nxt_string("body_buffer_size"), 1257 NXT_CONF_MAP_SIZE, 1258 offsetof(nxt_socket_conf_t, body_buffer_size), 1259 }, 1260 1261 { 1262 nxt_string("max_body_size"), 1263 NXT_CONF_MAP_SIZE, 1264 offsetof(nxt_socket_conf_t, max_body_size), 1265 }, 1266 1267 { 1268 nxt_string("idle_timeout"), 1269 NXT_CONF_MAP_MSEC, 1270 offsetof(nxt_socket_conf_t, idle_timeout), 1271 }, 1272 1273 { 1274 nxt_string("header_read_timeout"), 1275 NXT_CONF_MAP_MSEC, 1276 offsetof(nxt_socket_conf_t, header_read_timeout), 1277 }, 1278 1279 { 1280 nxt_string("body_read_timeout"), 1281 NXT_CONF_MAP_MSEC, 1282 offsetof(nxt_socket_conf_t, body_read_timeout), 1283 }, 1284 1285 { 1286 nxt_string("send_timeout"), 1287 NXT_CONF_MAP_MSEC, 1288 offsetof(nxt_socket_conf_t, send_timeout), 1289 }, 1290 1291 { 1292 nxt_string("body_temp_path"), 1293 NXT_CONF_MAP_STR, 1294 offsetof(nxt_socket_conf_t, body_temp_path), 1295 }, 1296 1297 { 1298 nxt_string("discard_unsafe_fields"), 1299 NXT_CONF_MAP_INT8, 1300 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1301 }, 1302 }; 1303 1304 1305 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1306 { 1307 nxt_string("max_frame_size"), 1308 NXT_CONF_MAP_SIZE, 1309 offsetof(nxt_websocket_conf_t, max_frame_size), 1310 }, 1311 1312 { 1313 nxt_string("read_timeout"), 1314 NXT_CONF_MAP_MSEC, 1315 offsetof(nxt_websocket_conf_t, read_timeout), 1316 }, 1317 1318 { 1319 nxt_string("keepalive_interval"), 1320 NXT_CONF_MAP_MSEC, 1321 offsetof(nxt_websocket_conf_t, keepalive_interval), 1322 }, 1323 1324 }; 1325 1326 1327 static nxt_int_t 1328 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1329 u_char *start, u_char *end) 1330 { 1331 u_char *p; 1332 size_t size; 1333 nxt_mp_t *mp, *app_mp; 1334 uint32_t next, next_target; 1335 nxt_int_t ret; 1336 nxt_str_t name, path, target; 1337 nxt_app_t *app, *prev; 1338 nxt_str_t *t, *s, *targets; 1339 nxt_uint_t n, i; 1340 nxt_port_t *port; 1341 nxt_router_t *router; 1342 nxt_app_joint_t *app_joint; 1343 #if (NXT_TLS) 1344 nxt_tls_init_t *tls_init; 1345 nxt_conf_value_t *certificate; 1346 #endif 1347 nxt_conf_value_t *conf, *http, *value, *websocket; 1348 nxt_conf_value_t *applications, *application; 1349 nxt_conf_value_t *listeners, *listener; 1350 nxt_conf_value_t *routes_conf, *static_conf; 1351 nxt_socket_conf_t *skcf; 1352 nxt_http_routes_t *routes; 1353 nxt_event_engine_t *engine; 1354 nxt_app_lang_module_t *lang; 1355 nxt_router_app_conf_t apcf; 1356 nxt_router_access_log_t *access_log; 1357 nxt_router_listener_conf_t lscf; 1358 1359 static nxt_str_t http_path = nxt_string("/settings/http"); 1360 static nxt_str_t applications_path = nxt_string("/applications"); 1361 static nxt_str_t listeners_path = nxt_string("/listeners"); 1362 static nxt_str_t routes_path = nxt_string("/routes"); 1363 static nxt_str_t access_log_path = nxt_string("/access_log"); 1364 #if (NXT_TLS) 1365 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1366 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands"); 1367 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size"); 1368 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout"); 1369 #endif 1370 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1371 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1372 1373 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1374 if (conf == NULL) { 1375 nxt_alert(task, "configuration parsing error"); 1376 return NXT_ERROR; 1377 } 1378 1379 mp = tmcf->router_conf->mem_pool; 1380 1381 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1382 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1383 if (ret != NXT_OK) { 1384 nxt_alert(task, "root map error"); 1385 return NXT_ERROR; 1386 } 1387 1388 if (tmcf->router_conf->threads == 0) { 1389 tmcf->router_conf->threads = nxt_ncpu; 1390 } 1391 1392 static_conf = nxt_conf_get_path(conf, &static_path); 1393 1394 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf); 1395 if (nxt_slow_path(ret != NXT_OK)) { 1396 return NXT_ERROR; 1397 } 1398 1399 router = tmcf->router_conf->router; 1400 1401 applications = nxt_conf_get_path(conf, &applications_path); 1402 1403 if (applications != NULL) { 1404 next = 0; 1405 1406 for ( ;; ) { 1407 application = nxt_conf_next_object_member(applications, 1408 &name, &next); 1409 if (application == NULL) { 1410 break; 1411 } 1412 1413 nxt_debug(task, "application \"%V\"", &name); 1414 1415 size = nxt_conf_json_length(application, NULL); 1416 1417 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1418 if (nxt_slow_path(app_mp == NULL)) { 1419 goto fail; 1420 } 1421 1422 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1423 if (app == NULL) { 1424 goto app_fail; 1425 } 1426 1427 nxt_memzero(app, sizeof(nxt_app_t)); 1428 1429 app->mem_pool = app_mp; 1430 1431 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1432 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1433 + name.length); 1434 1435 p = nxt_conf_json_print(app->conf.start, application, NULL); 1436 app->conf.length = p - app->conf.start; 1437 1438 nxt_assert(app->conf.length <= size); 1439 1440 nxt_debug(task, "application conf \"%V\"", &app->conf); 1441 1442 prev = nxt_router_app_find(&router->apps, &name); 1443 1444 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1445 nxt_mp_destroy(app_mp); 1446 1447 nxt_queue_remove(&prev->link); 1448 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1449 1450 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev); 1451 if (nxt_slow_path(ret != NXT_OK)) { 1452 goto fail; 1453 } 1454 1455 continue; 1456 } 1457 1458 apcf.processes = 1; 1459 apcf.max_processes = 1; 1460 apcf.spare_processes = 0; 1461 apcf.timeout = 0; 1462 apcf.idle_timeout = 15000; 1463 apcf.requests = 0; 1464 apcf.limits_value = NULL; 1465 apcf.processes_value = NULL; 1466 apcf.targets_value = NULL; 1467 1468 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1469 if (nxt_slow_path(app_joint == NULL)) { 1470 goto app_fail; 1471 } 1472 1473 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1474 1475 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1476 nxt_nitems(nxt_router_app_conf), &apcf); 1477 if (ret != NXT_OK) { 1478 nxt_alert(task, "application map error"); 1479 goto app_fail; 1480 } 1481 1482 if (apcf.limits_value != NULL) { 1483 1484 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1485 nxt_alert(task, "application limits is not object"); 1486 goto app_fail; 1487 } 1488 1489 ret = nxt_conf_map_object(mp, apcf.limits_value, 1490 nxt_router_app_limits_conf, 1491 nxt_nitems(nxt_router_app_limits_conf), 1492 &apcf); 1493 if (ret != NXT_OK) { 1494 nxt_alert(task, "application limits map error"); 1495 goto app_fail; 1496 } 1497 } 1498 1499 if (apcf.processes_value != NULL 1500 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1501 { 1502 ret = nxt_conf_map_object(mp, apcf.processes_value, 1503 nxt_router_app_processes_conf, 1504 nxt_nitems(nxt_router_app_processes_conf), 1505 &apcf); 1506 if (ret != NXT_OK) { 1507 nxt_alert(task, "application processes map error"); 1508 goto app_fail; 1509 } 1510 1511 } else { 1512 apcf.max_processes = apcf.processes; 1513 apcf.spare_processes = apcf.processes; 1514 } 1515 1516 if (apcf.targets_value != NULL) { 1517 n = nxt_conf_object_members_count(apcf.targets_value); 1518 1519 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1520 if (nxt_slow_path(targets == NULL)) { 1521 goto app_fail; 1522 } 1523 1524 next_target = 0; 1525 1526 for (i = 0; i < n; i++) { 1527 (void) nxt_conf_next_object_member(apcf.targets_value, 1528 &target, &next_target); 1529 1530 s = nxt_str_dup(app_mp, &targets[i], &target); 1531 if (nxt_slow_path(s == NULL)) { 1532 goto app_fail; 1533 } 1534 } 1535 1536 } else { 1537 targets = NULL; 1538 } 1539 1540 nxt_debug(task, "application type: %V", &apcf.type); 1541 nxt_debug(task, "application processes: %D", apcf.processes); 1542 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1543 nxt_debug(task, "application requests: %D", apcf.requests); 1544 1545 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1546 1547 if (lang == NULL) { 1548 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1549 goto app_fail; 1550 } 1551 1552 nxt_debug(task, "application language module: \"%s\"", lang->file); 1553 1554 ret = nxt_thread_mutex_create(&app->mutex); 1555 if (ret != NXT_OK) { 1556 goto app_fail; 1557 } 1558 1559 nxt_queue_init(&app->ports); 1560 nxt_queue_init(&app->spare_ports); 1561 nxt_queue_init(&app->idle_ports); 1562 nxt_queue_init(&app->ack_waiting_req); 1563 1564 app->name.length = name.length; 1565 nxt_memcpy(app->name.start, name.start, name.length); 1566 1567 app->type = lang->type; 1568 app->max_processes = apcf.max_processes; 1569 app->spare_processes = apcf.spare_processes; 1570 app->max_pending_processes = apcf.spare_processes 1571 ? apcf.spare_processes : 1; 1572 app->timeout = apcf.timeout; 1573 app->idle_timeout = apcf.idle_timeout; 1574 app->max_requests = apcf.requests; 1575 1576 app->targets = targets; 1577 1578 engine = task->thread->engine; 1579 1580 app->engine = engine; 1581 1582 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1583 app->adjust_idle_work.task = &engine->task; 1584 app->adjust_idle_work.obj = app; 1585 1586 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1587 1588 ret = nxt_router_apps_hash_add(tmcf->router_conf, app); 1589 if (nxt_slow_path(ret != NXT_OK)) { 1590 goto app_fail; 1591 } 1592 1593 nxt_router_app_use(task, app, 1); 1594 1595 app->joint = app_joint; 1596 1597 app_joint->use_count = 1; 1598 app_joint->app = app; 1599 1600 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1601 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1602 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1603 app_joint->idle_timer.task = &engine->task; 1604 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1605 1606 app_joint->free_app_work.handler = nxt_router_free_app; 1607 app_joint->free_app_work.task = &engine->task; 1608 app_joint->free_app_work.obj = app_joint; 1609 1610 port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid, 1611 NXT_PROCESS_APP); 1612 if (nxt_slow_path(port == NULL)) { 1613 return NXT_ERROR; 1614 } 1615 1616 ret = nxt_port_socket_init(task, port, 0); 1617 if (nxt_slow_path(ret != NXT_OK)) { 1618 nxt_port_use(task, port, -1); 1619 return NXT_ERROR; 1620 } 1621 1622 ret = nxt_router_app_queue_init(task, port); 1623 if (nxt_slow_path(ret != NXT_OK)) { 1624 nxt_port_write_close(port); 1625 nxt_port_read_close(port); 1626 nxt_port_use(task, port, -1); 1627 return NXT_ERROR; 1628 } 1629 1630 nxt_port_write_enable(task, port); 1631 port->app = app; 1632 1633 app->shared_port = port; 1634 1635 nxt_thread_mutex_create(&app->outgoing.mutex); 1636 } 1637 } 1638 1639 routes_conf = nxt_conf_get_path(conf, &routes_path); 1640 if (nxt_fast_path(routes_conf != NULL)) { 1641 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1642 if (nxt_slow_path(routes == NULL)) { 1643 return NXT_ERROR; 1644 } 1645 tmcf->router_conf->routes = routes; 1646 } 1647 1648 ret = nxt_upstreams_create(task, tmcf, conf); 1649 if (nxt_slow_path(ret != NXT_OK)) { 1650 return ret; 1651 } 1652 1653 http = nxt_conf_get_path(conf, &http_path); 1654 #if 0 1655 if (http == NULL) { 1656 nxt_alert(task, "no \"http\" block"); 1657 return NXT_ERROR; 1658 } 1659 #endif 1660 1661 websocket = nxt_conf_get_path(conf, &websocket_path); 1662 1663 listeners = nxt_conf_get_path(conf, &listeners_path); 1664 1665 if (listeners != NULL) { 1666 next = 0; 1667 1668 for ( ;; ) { 1669 listener = nxt_conf_next_object_member(listeners, &name, &next); 1670 if (listener == NULL) { 1671 break; 1672 } 1673 1674 skcf = nxt_router_socket_conf(task, tmcf, &name); 1675 if (skcf == NULL) { 1676 goto fail; 1677 } 1678 1679 nxt_memzero(&lscf, sizeof(lscf)); 1680 1681 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1682 nxt_nitems(nxt_router_listener_conf), 1683 &lscf); 1684 if (ret != NXT_OK) { 1685 nxt_alert(task, "listener map error"); 1686 goto fail; 1687 } 1688 1689 nxt_debug(task, "application: %V", &lscf.application); 1690 1691 // STUB, default values if http block is not defined. 1692 skcf->header_buffer_size = 2048; 1693 skcf->large_header_buffer_size = 8192; 1694 skcf->large_header_buffers = 4; 1695 skcf->discard_unsafe_fields = 1; 1696 skcf->body_buffer_size = 16 * 1024; 1697 skcf->max_body_size = 8 * 1024 * 1024; 1698 skcf->proxy_header_buffer_size = 64 * 1024; 1699 skcf->proxy_buffer_size = 4096; 1700 skcf->proxy_buffers = 256; 1701 skcf->idle_timeout = 180 * 1000; 1702 skcf->header_read_timeout = 30 * 1000; 1703 skcf->body_read_timeout = 30 * 1000; 1704 skcf->send_timeout = 30 * 1000; 1705 skcf->proxy_timeout = 60 * 1000; 1706 skcf->proxy_send_timeout = 30 * 1000; 1707 skcf->proxy_read_timeout = 30 * 1000; 1708 1709 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1710 skcf->websocket_conf.read_timeout = 60 * 1000; 1711 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1712 1713 nxt_str_null(&skcf->body_temp_path); 1714 1715 if (http != NULL) { 1716 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1717 nxt_nitems(nxt_router_http_conf), 1718 skcf); 1719 if (ret != NXT_OK) { 1720 nxt_alert(task, "http map error"); 1721 goto fail; 1722 } 1723 } 1724 1725 if (websocket != NULL) { 1726 ret = nxt_conf_map_object(mp, websocket, 1727 nxt_router_websocket_conf, 1728 nxt_nitems(nxt_router_websocket_conf), 1729 &skcf->websocket_conf); 1730 if (ret != NXT_OK) { 1731 nxt_alert(task, "websocket map error"); 1732 goto fail; 1733 } 1734 } 1735 1736 t = &skcf->body_temp_path; 1737 1738 if (t->length == 0) { 1739 t->start = (u_char *) task->thread->runtime->tmp; 1740 t->length = nxt_strlen(t->start); 1741 } 1742 1743 #if (NXT_TLS) 1744 certificate = nxt_conf_get_path(listener, &certificate_path); 1745 1746 if (certificate != NULL) { 1747 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t)); 1748 if (nxt_slow_path(tls_init == NULL)) { 1749 return NXT_ERROR; 1750 } 1751 1752 tls_init->cache_size = 0; 1753 tls_init->timeout = 300; 1754 1755 value = nxt_conf_get_path(listener, &conf_cache_path); 1756 if (value != NULL) { 1757 tls_init->cache_size = nxt_conf_get_number(value); 1758 } 1759 1760 value = nxt_conf_get_path(listener, &conf_timeout_path); 1761 if (value != NULL) { 1762 tls_init->timeout = nxt_conf_get_number(value); 1763 } 1764 1765 tls_init->conf_cmds = nxt_conf_get_path(listener, 1766 &conf_commands_path); 1767 1768 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) { 1769 n = nxt_conf_array_elements_count(certificate); 1770 1771 for (i = 0; i < n; i++) { 1772 value = nxt_conf_get_array_element(certificate, i); 1773 1774 nxt_assert(value != NULL); 1775 1776 ret = nxt_router_conf_tls_insert(tmcf, value, skcf, 1777 tls_init, i == 0); 1778 if (nxt_slow_path(ret != NXT_OK)) { 1779 goto fail; 1780 } 1781 } 1782 1783 } else { 1784 /* NXT_CONF_STRING */ 1785 ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf, 1786 tls_init, 1); 1787 if (nxt_slow_path(ret != NXT_OK)) { 1788 goto fail; 1789 } 1790 } 1791 } 1792 #endif 1793 1794 skcf->listen->handler = nxt_http_conn_init; 1795 skcf->router_conf = tmcf->router_conf; 1796 skcf->router_conf->count++; 1797 1798 if (lscf.pass.length != 0) { 1799 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1800 1801 /* COMPATIBILITY: listener application. */ 1802 } else if (lscf.application.length > 0) { 1803 skcf->action = nxt_http_pass_application(task, 1804 tmcf->router_conf, 1805 &lscf.application); 1806 } 1807 1808 if (nxt_slow_path(skcf->action == NULL)) { 1809 goto fail; 1810 } 1811 } 1812 } 1813 1814 ret = nxt_http_routes_resolve(task, tmcf); 1815 if (nxt_slow_path(ret != NXT_OK)) { 1816 goto fail; 1817 } 1818 1819 value = nxt_conf_get_path(conf, &access_log_path); 1820 1821 if (value != NULL) { 1822 nxt_conf_get_string(value, &path); 1823 1824 access_log = router->access_log; 1825 1826 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1827 nxt_thread_spin_lock(&router->lock); 1828 access_log->count++; 1829 nxt_thread_spin_unlock(&router->lock); 1830 1831 } else { 1832 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1833 + path.length); 1834 if (access_log == NULL) { 1835 nxt_alert(task, "failed to allocate access log structure"); 1836 goto fail; 1837 } 1838 1839 access_log->fd = -1; 1840 access_log->handler = &nxt_router_access_log_writer; 1841 access_log->count = 1; 1842 1843 access_log->path.length = path.length; 1844 access_log->path.start = (u_char *) access_log 1845 + sizeof(nxt_router_access_log_t); 1846 1847 nxt_memcpy(access_log->path.start, path.start, path.length); 1848 } 1849 1850 tmcf->router_conf->access_log = access_log; 1851 } 1852 1853 nxt_queue_add(&deleting_sockets, &router->sockets); 1854 nxt_queue_init(&router->sockets); 1855 1856 return NXT_OK; 1857 1858 app_fail: 1859 1860 nxt_mp_destroy(app_mp); 1861 1862 fail: 1863 1864 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1865 1866 nxt_queue_remove(&app->link); 1867 nxt_thread_mutex_destroy(&app->mutex); 1868 nxt_mp_destroy(app->mem_pool); 1869 1870 } nxt_queue_loop; 1871 1872 return NXT_ERROR; 1873 } 1874 1875 1876 #if (NXT_TLS) 1877 1878 static nxt_int_t 1879 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 1880 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, 1881 nxt_tls_init_t *tls_init, nxt_bool_t last) 1882 { 1883 nxt_router_tlssock_t *tls; 1884 1885 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t)); 1886 if (nxt_slow_path(tls == NULL)) { 1887 return NXT_ERROR; 1888 } 1889 1890 tls->tls_init = tls_init; 1891 tls->socket_conf = skcf; 1892 tls->temp_conf = tmcf; 1893 tls->last = last; 1894 nxt_conf_get_string(value, &tls->name); 1895 1896 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 1897 1898 return NXT_OK; 1899 } 1900 1901 #endif 1902 1903 1904 static nxt_int_t 1905 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 1906 nxt_conf_value_t *conf) 1907 { 1908 uint32_t next, i; 1909 nxt_mp_t *mp; 1910 nxt_str_t *type, exten, str; 1911 nxt_int_t ret; 1912 nxt_uint_t exts; 1913 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 1914 1915 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 1916 1917 mp = rtcf->mem_pool; 1918 1919 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 1920 if (nxt_slow_path(ret != NXT_OK)) { 1921 return NXT_ERROR; 1922 } 1923 1924 if (conf == NULL) { 1925 return NXT_OK; 1926 } 1927 1928 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 1929 1930 if (mtypes_conf != NULL) { 1931 next = 0; 1932 1933 for ( ;; ) { 1934 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 1935 1936 if (ext_conf == NULL) { 1937 break; 1938 } 1939 1940 type = nxt_str_dup(mp, NULL, &str); 1941 if (nxt_slow_path(type == NULL)) { 1942 return NXT_ERROR; 1943 } 1944 1945 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 1946 nxt_conf_get_string(ext_conf, &str); 1947 1948 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 1949 return NXT_ERROR; 1950 } 1951 1952 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1953 &exten, type); 1954 if (nxt_slow_path(ret != NXT_OK)) { 1955 return NXT_ERROR; 1956 } 1957 1958 continue; 1959 } 1960 1961 exts = nxt_conf_array_elements_count(ext_conf); 1962 1963 for (i = 0; i < exts; i++) { 1964 value = nxt_conf_get_array_element(ext_conf, i); 1965 1966 nxt_conf_get_string(value, &str); 1967 1968 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 1969 return NXT_ERROR; 1970 } 1971 1972 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1973 &exten, type); 1974 if (nxt_slow_path(ret != NXT_OK)) { 1975 return NXT_ERROR; 1976 } 1977 } 1978 } 1979 } 1980 1981 return NXT_OK; 1982 } 1983 1984 1985 static nxt_app_t * 1986 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1987 { 1988 nxt_app_t *app; 1989 1990 nxt_queue_each(app, queue, nxt_app_t, link) { 1991 1992 if (nxt_strstr_eq(name, &app->name)) { 1993 return app; 1994 } 1995 1996 } nxt_queue_loop; 1997 1998 return NULL; 1999 } 2000 2001 2002 static nxt_int_t 2003 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 2004 { 2005 void *mem; 2006 nxt_int_t fd; 2007 2008 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 2009 if (nxt_slow_path(fd == -1)) { 2010 return NXT_ERROR; 2011 } 2012 2013 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 2014 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2015 if (nxt_slow_path(mem == MAP_FAILED)) { 2016 nxt_fd_close(fd); 2017 2018 return NXT_ERROR; 2019 } 2020 2021 nxt_app_queue_init(mem); 2022 2023 port->queue_fd = fd; 2024 port->queue = mem; 2025 2026 return NXT_OK; 2027 } 2028 2029 2030 static nxt_int_t 2031 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 2032 { 2033 void *mem; 2034 nxt_int_t fd; 2035 2036 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 2037 if (nxt_slow_path(fd == -1)) { 2038 return NXT_ERROR; 2039 } 2040 2041 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2042 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2043 if (nxt_slow_path(mem == MAP_FAILED)) { 2044 nxt_fd_close(fd); 2045 2046 return NXT_ERROR; 2047 } 2048 2049 nxt_port_queue_init(mem); 2050 2051 port->queue_fd = fd; 2052 port->queue = mem; 2053 2054 return NXT_OK; 2055 } 2056 2057 2058 static nxt_int_t 2059 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 2060 { 2061 void *mem; 2062 2063 nxt_assert(fd != -1); 2064 2065 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2066 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2067 if (nxt_slow_path(mem == MAP_FAILED)) { 2068 2069 return NXT_ERROR; 2070 } 2071 2072 port->queue = mem; 2073 2074 return NXT_OK; 2075 } 2076 2077 2078 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2079 NXT_LVLHSH_DEFAULT, 2080 nxt_router_apps_hash_test, 2081 nxt_mp_lvlhsh_alloc, 2082 nxt_mp_lvlhsh_free, 2083 }; 2084 2085 2086 static nxt_int_t 2087 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2088 { 2089 nxt_app_t *app; 2090 2091 app = data; 2092 2093 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2094 } 2095 2096 2097 static nxt_int_t 2098 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2099 { 2100 nxt_lvlhsh_query_t lhq; 2101 2102 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2103 lhq.replace = 0; 2104 lhq.key = app->name; 2105 lhq.value = app; 2106 lhq.proto = &nxt_router_apps_hash_proto; 2107 lhq.pool = rtcf->mem_pool; 2108 2109 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2110 2111 case NXT_OK: 2112 return NXT_OK; 2113 2114 case NXT_DECLINED: 2115 nxt_thread_log_alert("router app hash adding failed: " 2116 "\"%V\" is already in hash", &lhq.key); 2117 /* Fall through. */ 2118 default: 2119 return NXT_ERROR; 2120 } 2121 } 2122 2123 2124 static nxt_app_t * 2125 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2126 { 2127 nxt_lvlhsh_query_t lhq; 2128 2129 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2130 lhq.key = *name; 2131 lhq.proto = &nxt_router_apps_hash_proto; 2132 2133 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2134 return NULL; 2135 } 2136 2137 return lhq.value; 2138 } 2139 2140 2141 static void 2142 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2143 { 2144 nxt_app_t *app; 2145 nxt_lvlhsh_each_t lhe; 2146 2147 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2148 2149 for ( ;; ) { 2150 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2151 2152 if (app == NULL) { 2153 break; 2154 } 2155 2156 nxt_router_app_use(task, app, i); 2157 } 2158 } 2159 2160 2161 2162 nxt_int_t 2163 nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name, 2164 nxt_http_action_t *action) 2165 { 2166 nxt_app_t *app; 2167 2168 app = nxt_router_apps_hash_get(rtcf, name); 2169 2170 if (app == NULL) { 2171 return NXT_DECLINED; 2172 } 2173 2174 action->u.app.application = app; 2175 action->handler = nxt_http_application_handler; 2176 2177 return NXT_OK; 2178 } 2179 2180 2181 static nxt_socket_conf_t * 2182 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2183 nxt_str_t *name) 2184 { 2185 size_t size; 2186 nxt_int_t ret; 2187 nxt_bool_t wildcard; 2188 nxt_sockaddr_t *sa; 2189 nxt_socket_conf_t *skcf; 2190 nxt_listen_socket_t *ls; 2191 2192 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2193 if (nxt_slow_path(sa == NULL)) { 2194 nxt_alert(task, "invalid listener \"%V\"", name); 2195 return NULL; 2196 } 2197 2198 sa->type = SOCK_STREAM; 2199 2200 nxt_debug(task, "router listener: \"%*s\"", 2201 (size_t) sa->length, nxt_sockaddr_start(sa)); 2202 2203 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2204 if (nxt_slow_path(skcf == NULL)) { 2205 return NULL; 2206 } 2207 2208 size = nxt_sockaddr_size(sa); 2209 2210 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2211 2212 if (ret != NXT_OK) { 2213 2214 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2215 if (nxt_slow_path(ls == NULL)) { 2216 return NULL; 2217 } 2218 2219 skcf->listen = ls; 2220 2221 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2222 nxt_memcpy(ls->sockaddr, sa, size); 2223 2224 nxt_listen_socket_remote_size(ls); 2225 2226 ls->socket = -1; 2227 ls->backlog = NXT_LISTEN_BACKLOG; 2228 ls->flags = NXT_NONBLOCK; 2229 ls->read_after_accept = 1; 2230 } 2231 2232 switch (sa->u.sockaddr.sa_family) { 2233 #if (NXT_HAVE_UNIX_DOMAIN) 2234 case AF_UNIX: 2235 wildcard = 0; 2236 break; 2237 #endif 2238 #if (NXT_INET6) 2239 case AF_INET6: 2240 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2241 break; 2242 #endif 2243 case AF_INET: 2244 default: 2245 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2246 break; 2247 } 2248 2249 if (!wildcard) { 2250 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2251 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2252 return NULL; 2253 } 2254 2255 nxt_memcpy(skcf->sockaddr, sa, size); 2256 } 2257 2258 return skcf; 2259 } 2260 2261 2262 static nxt_int_t 2263 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2264 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2265 { 2266 nxt_router_t *router; 2267 nxt_queue_link_t *qlk; 2268 nxt_socket_conf_t *skcf; 2269 2270 router = tmcf->router_conf->router; 2271 2272 for (qlk = nxt_queue_first(&router->sockets); 2273 qlk != nxt_queue_tail(&router->sockets); 2274 qlk = nxt_queue_next(qlk)) 2275 { 2276 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2277 2278 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2279 nskcf->listen = skcf->listen; 2280 2281 nxt_queue_remove(qlk); 2282 nxt_queue_insert_tail(&keeping_sockets, qlk); 2283 2284 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2285 2286 return NXT_OK; 2287 } 2288 } 2289 2290 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2291 2292 return NXT_DECLINED; 2293 } 2294 2295 2296 static void 2297 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2298 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2299 { 2300 size_t size; 2301 uint32_t stream; 2302 nxt_int_t ret; 2303 nxt_buf_t *b; 2304 nxt_port_t *main_port, *router_port; 2305 nxt_runtime_t *rt; 2306 nxt_socket_rpc_t *rpc; 2307 2308 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2309 if (rpc == NULL) { 2310 goto fail; 2311 } 2312 2313 rpc->socket_conf = skcf; 2314 rpc->temp_conf = tmcf; 2315 2316 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2317 2318 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2319 if (b == NULL) { 2320 goto fail; 2321 } 2322 2323 b->completion_handler = nxt_router_dummy_buf_completion; 2324 2325 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2326 2327 rt = task->thread->runtime; 2328 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2329 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2330 2331 stream = nxt_port_rpc_register_handler(task, router_port, 2332 nxt_router_listen_socket_ready, 2333 nxt_router_listen_socket_error, 2334 main_port->pid, rpc); 2335 if (nxt_slow_path(stream == 0)) { 2336 goto fail; 2337 } 2338 2339 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2340 stream, router_port->id, b); 2341 2342 if (nxt_slow_path(ret != NXT_OK)) { 2343 nxt_port_rpc_cancel(task, router_port, stream); 2344 goto fail; 2345 } 2346 2347 return; 2348 2349 fail: 2350 2351 nxt_router_conf_error(task, tmcf); 2352 } 2353 2354 2355 static void 2356 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2357 void *data) 2358 { 2359 nxt_int_t ret; 2360 nxt_socket_t s; 2361 nxt_socket_rpc_t *rpc; 2362 2363 rpc = data; 2364 2365 s = msg->fd[0]; 2366 2367 ret = nxt_socket_nonblocking(task, s); 2368 if (nxt_slow_path(ret != NXT_OK)) { 2369 goto fail; 2370 } 2371 2372 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2373 2374 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2375 if (nxt_slow_path(ret != NXT_OK)) { 2376 goto fail; 2377 } 2378 2379 rpc->socket_conf->listen->socket = s; 2380 2381 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2382 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2383 2384 return; 2385 2386 fail: 2387 2388 nxt_socket_close(task, s); 2389 2390 nxt_router_conf_error(task, rpc->temp_conf); 2391 } 2392 2393 2394 static void 2395 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2396 void *data) 2397 { 2398 nxt_socket_rpc_t *rpc; 2399 nxt_router_temp_conf_t *tmcf; 2400 2401 rpc = data; 2402 tmcf = rpc->temp_conf; 2403 2404 #if 0 2405 u_char *p; 2406 size_t size; 2407 uint8_t error; 2408 nxt_buf_t *in, *out; 2409 nxt_sockaddr_t *sa; 2410 2411 static nxt_str_t socket_errors[] = { 2412 nxt_string("ListenerSystem"), 2413 nxt_string("ListenerNoIPv6"), 2414 nxt_string("ListenerPort"), 2415 nxt_string("ListenerInUse"), 2416 nxt_string("ListenerNoAddress"), 2417 nxt_string("ListenerNoAccess"), 2418 nxt_string("ListenerPath"), 2419 }; 2420 2421 sa = rpc->socket_conf->listen->sockaddr; 2422 2423 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2424 2425 if (nxt_slow_path(in == NULL)) { 2426 return; 2427 } 2428 2429 p = in->mem.pos; 2430 2431 error = *p++; 2432 2433 size = nxt_length("listen socket error: ") 2434 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2435 + sa->length + socket_errors[error].length + (in->mem.free - p); 2436 2437 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2438 if (nxt_slow_path(out == NULL)) { 2439 return; 2440 } 2441 2442 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2443 "listen socket error: " 2444 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2445 (size_t) sa->length, nxt_sockaddr_start(sa), 2446 &socket_errors[error], in->mem.free - p, p); 2447 2448 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2449 #endif 2450 2451 nxt_router_conf_error(task, tmcf); 2452 } 2453 2454 2455 #if (NXT_TLS) 2456 2457 static void 2458 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2459 void *data) 2460 { 2461 nxt_mp_t *mp; 2462 nxt_int_t ret; 2463 nxt_tls_conf_t *tlscf; 2464 nxt_router_tlssock_t *tls; 2465 nxt_tls_bundle_conf_t *bundle; 2466 nxt_router_temp_conf_t *tmcf; 2467 2468 nxt_debug(task, "tls rpc handler"); 2469 2470 tls = data; 2471 tmcf = tls->temp_conf; 2472 2473 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2474 goto fail; 2475 } 2476 2477 mp = tmcf->router_conf->mem_pool; 2478 2479 if (tls->socket_conf->tls == NULL){ 2480 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2481 if (nxt_slow_path(tlscf == NULL)) { 2482 goto fail; 2483 } 2484 2485 tlscf->no_wait_shutdown = 1; 2486 tls->socket_conf->tls = tlscf; 2487 2488 } else { 2489 tlscf = tls->socket_conf->tls; 2490 } 2491 2492 tls->tls_init->conf = tlscf; 2493 2494 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); 2495 if (nxt_slow_path(bundle == NULL)) { 2496 goto fail; 2497 } 2498 2499 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) { 2500 goto fail; 2501 } 2502 2503 bundle->chain_file = msg->fd[0]; 2504 bundle->next = tlscf->bundle; 2505 tlscf->bundle = bundle; 2506 2507 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init, 2508 tls->last); 2509 if (nxt_slow_path(ret != NXT_OK)) { 2510 goto fail; 2511 } 2512 2513 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2514 nxt_router_conf_apply, task, tmcf, NULL); 2515 return; 2516 2517 fail: 2518 2519 nxt_router_conf_error(task, tmcf); 2520 } 2521 2522 #endif 2523 2524 2525 static void 2526 nxt_router_app_rpc_create(nxt_task_t *task, 2527 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2528 { 2529 size_t size; 2530 uint32_t stream; 2531 nxt_int_t ret; 2532 nxt_buf_t *b; 2533 nxt_port_t *main_port, *router_port; 2534 nxt_runtime_t *rt; 2535 nxt_app_rpc_t *rpc; 2536 2537 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2538 if (rpc == NULL) { 2539 goto fail; 2540 } 2541 2542 rpc->app = app; 2543 rpc->temp_conf = tmcf; 2544 2545 nxt_debug(task, "app '%V' prefork", &app->name); 2546 2547 size = app->name.length + 1 + app->conf.length; 2548 2549 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2550 if (nxt_slow_path(b == NULL)) { 2551 goto fail; 2552 } 2553 2554 b->completion_handler = nxt_router_dummy_buf_completion; 2555 2556 nxt_buf_cpystr(b, &app->name); 2557 *b->mem.free++ = '\0'; 2558 nxt_buf_cpystr(b, &app->conf); 2559 2560 rt = task->thread->runtime; 2561 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2562 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2563 2564 stream = nxt_port_rpc_register_handler(task, router_port, 2565 nxt_router_app_prefork_ready, 2566 nxt_router_app_prefork_error, 2567 -1, rpc); 2568 if (nxt_slow_path(stream == 0)) { 2569 goto fail; 2570 } 2571 2572 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 2573 -1, stream, router_port->id, b); 2574 2575 if (nxt_slow_path(ret != NXT_OK)) { 2576 nxt_port_rpc_cancel(task, router_port, stream); 2577 goto fail; 2578 } 2579 2580 app->pending_processes++; 2581 2582 return; 2583 2584 fail: 2585 2586 nxt_router_conf_error(task, tmcf); 2587 } 2588 2589 2590 static void 2591 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2592 void *data) 2593 { 2594 nxt_app_t *app; 2595 nxt_port_t *port; 2596 nxt_app_rpc_t *rpc; 2597 nxt_event_engine_t *engine; 2598 2599 rpc = data; 2600 app = rpc->app; 2601 2602 port = msg->u.new_port; 2603 2604 nxt_assert(port != NULL); 2605 nxt_assert(port->type == NXT_PROCESS_APP); 2606 nxt_assert(port->id == 0); 2607 2608 port->app = app; 2609 port->main_app_port = port; 2610 2611 app->pending_processes--; 2612 app->processes++; 2613 app->idle_processes++; 2614 2615 engine = task->thread->engine; 2616 2617 nxt_queue_insert_tail(&app->ports, &port->app_link); 2618 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2619 2620 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2621 &app->name, port->pid, port->id); 2622 2623 nxt_port_hash_add(&app->port_hash, port); 2624 app->port_hash_count++; 2625 2626 port->idle_start = 0; 2627 2628 nxt_port_inc_use(port); 2629 2630 nxt_router_app_shared_port_send(task, port); 2631 2632 nxt_work_queue_add(&engine->fast_work_queue, 2633 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2634 } 2635 2636 2637 static void 2638 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2639 void *data) 2640 { 2641 nxt_app_t *app; 2642 nxt_app_rpc_t *rpc; 2643 nxt_router_temp_conf_t *tmcf; 2644 2645 rpc = data; 2646 app = rpc->app; 2647 tmcf = rpc->temp_conf; 2648 2649 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2650 &app->name); 2651 2652 app->pending_processes--; 2653 2654 nxt_router_conf_error(task, tmcf); 2655 } 2656 2657 2658 static nxt_int_t 2659 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2660 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2661 { 2662 nxt_int_t ret; 2663 nxt_uint_t n, threads; 2664 nxt_queue_link_t *qlk; 2665 nxt_router_engine_conf_t *recf; 2666 2667 threads = tmcf->router_conf->threads; 2668 2669 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2670 sizeof(nxt_router_engine_conf_t)); 2671 if (nxt_slow_path(tmcf->engines == NULL)) { 2672 return NXT_ERROR; 2673 } 2674 2675 n = 0; 2676 2677 for (qlk = nxt_queue_first(&router->engines); 2678 qlk != nxt_queue_tail(&router->engines); 2679 qlk = nxt_queue_next(qlk)) 2680 { 2681 recf = nxt_array_zero_add(tmcf->engines); 2682 if (nxt_slow_path(recf == NULL)) { 2683 return NXT_ERROR; 2684 } 2685 2686 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2687 2688 if (n < threads) { 2689 recf->action = NXT_ROUTER_ENGINE_KEEP; 2690 ret = nxt_router_engine_conf_update(tmcf, recf); 2691 2692 } else { 2693 recf->action = NXT_ROUTER_ENGINE_DELETE; 2694 ret = nxt_router_engine_conf_delete(tmcf, recf); 2695 } 2696 2697 if (nxt_slow_path(ret != NXT_OK)) { 2698 return ret; 2699 } 2700 2701 n++; 2702 } 2703 2704 tmcf->new_threads = n; 2705 2706 while (n < threads) { 2707 recf = nxt_array_zero_add(tmcf->engines); 2708 if (nxt_slow_path(recf == NULL)) { 2709 return NXT_ERROR; 2710 } 2711 2712 recf->action = NXT_ROUTER_ENGINE_ADD; 2713 2714 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2715 if (nxt_slow_path(recf->engine == NULL)) { 2716 return NXT_ERROR; 2717 } 2718 2719 ret = nxt_router_engine_conf_create(tmcf, recf); 2720 if (nxt_slow_path(ret != NXT_OK)) { 2721 return ret; 2722 } 2723 2724 n++; 2725 } 2726 2727 return NXT_OK; 2728 } 2729 2730 2731 static nxt_int_t 2732 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2733 nxt_router_engine_conf_t *recf) 2734 { 2735 nxt_int_t ret; 2736 2737 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2738 nxt_router_listen_socket_create); 2739 if (nxt_slow_path(ret != NXT_OK)) { 2740 return ret; 2741 } 2742 2743 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2744 nxt_router_listen_socket_create); 2745 if (nxt_slow_path(ret != NXT_OK)) { 2746 return ret; 2747 } 2748 2749 return ret; 2750 } 2751 2752 2753 static nxt_int_t 2754 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2755 nxt_router_engine_conf_t *recf) 2756 { 2757 nxt_int_t ret; 2758 2759 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2760 nxt_router_listen_socket_create); 2761 if (nxt_slow_path(ret != NXT_OK)) { 2762 return ret; 2763 } 2764 2765 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2766 nxt_router_listen_socket_update); 2767 if (nxt_slow_path(ret != NXT_OK)) { 2768 return ret; 2769 } 2770 2771 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 2772 if (nxt_slow_path(ret != NXT_OK)) { 2773 return ret; 2774 } 2775 2776 return ret; 2777 } 2778 2779 2780 static nxt_int_t 2781 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2782 nxt_router_engine_conf_t *recf) 2783 { 2784 nxt_int_t ret; 2785 2786 ret = nxt_router_engine_quit(tmcf, recf); 2787 if (nxt_slow_path(ret != NXT_OK)) { 2788 return ret; 2789 } 2790 2791 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 2792 if (nxt_slow_path(ret != NXT_OK)) { 2793 return ret; 2794 } 2795 2796 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 2797 } 2798 2799 2800 static nxt_int_t 2801 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 2802 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 2803 nxt_work_handler_t handler) 2804 { 2805 nxt_int_t ret; 2806 nxt_joint_job_t *job; 2807 nxt_queue_link_t *qlk; 2808 nxt_socket_conf_t *skcf; 2809 nxt_socket_conf_joint_t *joint; 2810 2811 for (qlk = nxt_queue_first(sockets); 2812 qlk != nxt_queue_tail(sockets); 2813 qlk = nxt_queue_next(qlk)) 2814 { 2815 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2816 if (nxt_slow_path(job == NULL)) { 2817 return NXT_ERROR; 2818 } 2819 2820 job->work.next = recf->jobs; 2821 recf->jobs = &job->work; 2822 2823 job->task = tmcf->engine->task; 2824 job->work.handler = handler; 2825 job->work.task = &job->task; 2826 job->work.obj = job; 2827 job->tmcf = tmcf; 2828 2829 tmcf->count++; 2830 2831 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 2832 sizeof(nxt_socket_conf_joint_t)); 2833 if (nxt_slow_path(joint == NULL)) { 2834 return NXT_ERROR; 2835 } 2836 2837 job->work.data = joint; 2838 2839 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 2840 if (nxt_slow_path(ret != NXT_OK)) { 2841 return ret; 2842 } 2843 2844 joint->count = 1; 2845 2846 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2847 skcf->count++; 2848 joint->socket_conf = skcf; 2849 2850 joint->engine = recf->engine; 2851 } 2852 2853 return NXT_OK; 2854 } 2855 2856 2857 static nxt_int_t 2858 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 2859 nxt_router_engine_conf_t *recf) 2860 { 2861 nxt_joint_job_t *job; 2862 2863 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2864 if (nxt_slow_path(job == NULL)) { 2865 return NXT_ERROR; 2866 } 2867 2868 job->work.next = recf->jobs; 2869 recf->jobs = &job->work; 2870 2871 job->task = tmcf->engine->task; 2872 job->work.handler = nxt_router_worker_thread_quit; 2873 job->work.task = &job->task; 2874 job->work.obj = NULL; 2875 job->work.data = NULL; 2876 job->tmcf = NULL; 2877 2878 return NXT_OK; 2879 } 2880 2881 2882 static nxt_int_t 2883 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 2884 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 2885 { 2886 nxt_joint_job_t *job; 2887 nxt_queue_link_t *qlk; 2888 2889 for (qlk = nxt_queue_first(sockets); 2890 qlk != nxt_queue_tail(sockets); 2891 qlk = nxt_queue_next(qlk)) 2892 { 2893 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2894 if (nxt_slow_path(job == NULL)) { 2895 return NXT_ERROR; 2896 } 2897 2898 job->work.next = recf->jobs; 2899 recf->jobs = &job->work; 2900 2901 job->task = tmcf->engine->task; 2902 job->work.handler = nxt_router_listen_socket_delete; 2903 job->work.task = &job->task; 2904 job->work.obj = job; 2905 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2906 job->tmcf = tmcf; 2907 2908 tmcf->count++; 2909 } 2910 2911 return NXT_OK; 2912 } 2913 2914 2915 static nxt_int_t 2916 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 2917 nxt_router_temp_conf_t *tmcf) 2918 { 2919 nxt_int_t ret; 2920 nxt_uint_t i, threads; 2921 nxt_router_engine_conf_t *recf; 2922 2923 recf = tmcf->engines->elts; 2924 threads = tmcf->router_conf->threads; 2925 2926 for (i = tmcf->new_threads; i < threads; i++) { 2927 ret = nxt_router_thread_create(task, rt, recf[i].engine); 2928 if (nxt_slow_path(ret != NXT_OK)) { 2929 return ret; 2930 } 2931 } 2932 2933 return NXT_OK; 2934 } 2935 2936 2937 static nxt_int_t 2938 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 2939 nxt_event_engine_t *engine) 2940 { 2941 nxt_int_t ret; 2942 nxt_thread_link_t *link; 2943 nxt_thread_handle_t handle; 2944 2945 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2946 2947 if (nxt_slow_path(link == NULL)) { 2948 return NXT_ERROR; 2949 } 2950 2951 link->start = nxt_router_thread_start; 2952 link->engine = engine; 2953 link->work.handler = nxt_router_thread_exit_handler; 2954 link->work.task = task; 2955 link->work.data = link; 2956 2957 nxt_queue_insert_tail(&rt->engines, &engine->link); 2958 2959 ret = nxt_thread_create(&handle, link); 2960 2961 if (nxt_slow_path(ret != NXT_OK)) { 2962 nxt_queue_remove(&engine->link); 2963 } 2964 2965 return ret; 2966 } 2967 2968 2969 static void 2970 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 2971 nxt_router_temp_conf_t *tmcf) 2972 { 2973 nxt_app_t *app; 2974 2975 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 2976 2977 nxt_router_app_unlink(task, app); 2978 2979 } nxt_queue_loop; 2980 2981 nxt_queue_add(&router->apps, &tmcf->previous); 2982 nxt_queue_add(&router->apps, &tmcf->apps); 2983 } 2984 2985 2986 static void 2987 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 2988 { 2989 nxt_uint_t n; 2990 nxt_event_engine_t *engine; 2991 nxt_router_engine_conf_t *recf; 2992 2993 recf = tmcf->engines->elts; 2994 2995 for (n = tmcf->engines->nelts; n != 0; n--) { 2996 engine = recf->engine; 2997 2998 switch (recf->action) { 2999 3000 case NXT_ROUTER_ENGINE_KEEP: 3001 break; 3002 3003 case NXT_ROUTER_ENGINE_ADD: 3004 nxt_queue_insert_tail(&router->engines, &engine->link0); 3005 break; 3006 3007 case NXT_ROUTER_ENGINE_DELETE: 3008 nxt_queue_remove(&engine->link0); 3009 break; 3010 } 3011 3012 nxt_router_engine_post(engine, recf->jobs); 3013 3014 recf++; 3015 } 3016 } 3017 3018 3019 static void 3020 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 3021 { 3022 nxt_work_t *work, *next; 3023 3024 for (work = jobs; work != NULL; work = next) { 3025 next = work->next; 3026 work->next = NULL; 3027 3028 nxt_event_engine_post(engine, work); 3029 } 3030 } 3031 3032 3033 static nxt_port_handlers_t nxt_router_app_port_handlers = { 3034 .rpc_error = nxt_port_rpc_handler, 3035 .mmap = nxt_port_mmap_handler, 3036 .data = nxt_port_rpc_handler, 3037 .oosm = nxt_router_oosm_handler, 3038 .req_headers_ack = nxt_port_rpc_handler, 3039 }; 3040 3041 3042 static void 3043 nxt_router_thread_start(void *data) 3044 { 3045 nxt_int_t ret; 3046 nxt_port_t *port; 3047 nxt_task_t *task; 3048 nxt_work_t *work; 3049 nxt_thread_t *thread; 3050 nxt_thread_link_t *link; 3051 nxt_event_engine_t *engine; 3052 3053 link = data; 3054 engine = link->engine; 3055 task = &engine->task; 3056 3057 thread = nxt_thread(); 3058 3059 nxt_event_engine_thread_adopt(engine); 3060 3061 /* STUB */ 3062 thread->runtime = engine->task.thread->runtime; 3063 3064 engine->task.thread = thread; 3065 engine->task.log = thread->log; 3066 thread->engine = engine; 3067 thread->task = &engine->task; 3068 #if 0 3069 thread->fiber = &engine->fibers->fiber; 3070 #endif 3071 3072 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 3073 if (nxt_slow_path(engine->mem_pool == NULL)) { 3074 return; 3075 } 3076 3077 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 3078 NXT_PROCESS_ROUTER); 3079 if (nxt_slow_path(port == NULL)) { 3080 return; 3081 } 3082 3083 ret = nxt_port_socket_init(task, port, 0); 3084 if (nxt_slow_path(ret != NXT_OK)) { 3085 nxt_port_use(task, port, -1); 3086 return; 3087 } 3088 3089 ret = nxt_router_port_queue_init(task, port); 3090 if (nxt_slow_path(ret != NXT_OK)) { 3091 nxt_port_use(task, port, -1); 3092 return; 3093 } 3094 3095 engine->port = port; 3096 3097 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 3098 3099 work = nxt_zalloc(sizeof(nxt_work_t)); 3100 if (nxt_slow_path(work == NULL)) { 3101 return; 3102 } 3103 3104 work->handler = nxt_router_rt_add_port; 3105 work->task = link->work.task; 3106 work->obj = work; 3107 work->data = port; 3108 3109 nxt_event_engine_post(link->work.task->thread->engine, work); 3110 3111 nxt_event_engine_start(engine); 3112 } 3113 3114 3115 static void 3116 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) 3117 { 3118 nxt_int_t res; 3119 nxt_port_t *port; 3120 nxt_runtime_t *rt; 3121 3122 rt = task->thread->runtime; 3123 port = data; 3124 3125 nxt_free(obj); 3126 3127 res = nxt_port_hash_add(&rt->ports, port); 3128 3129 if (nxt_fast_path(res == NXT_OK)) { 3130 nxt_port_use(task, port, 1); 3131 } 3132 } 3133 3134 3135 static void 3136 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 3137 { 3138 nxt_joint_job_t *job; 3139 nxt_socket_conf_t *skcf; 3140 nxt_listen_event_t *lev; 3141 nxt_listen_socket_t *ls; 3142 nxt_thread_spinlock_t *lock; 3143 nxt_socket_conf_joint_t *joint; 3144 3145 job = obj; 3146 joint = data; 3147 3148 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 3149 3150 skcf = joint->socket_conf; 3151 ls = skcf->listen; 3152 3153 lev = nxt_listen_event(task, ls); 3154 if (nxt_slow_path(lev == NULL)) { 3155 nxt_router_listen_socket_release(task, skcf); 3156 return; 3157 } 3158 3159 lev->socket.data = joint; 3160 3161 lock = &skcf->router_conf->router->lock; 3162 3163 nxt_thread_spin_lock(lock); 3164 ls->count++; 3165 nxt_thread_spin_unlock(lock); 3166 3167 job->work.next = NULL; 3168 job->work.handler = nxt_router_conf_wait; 3169 3170 nxt_event_engine_post(job->tmcf->engine, &job->work); 3171 } 3172 3173 3174 nxt_inline nxt_listen_event_t * 3175 nxt_router_listen_event(nxt_queue_t *listen_connections, 3176 nxt_socket_conf_t *skcf) 3177 { 3178 nxt_socket_t fd; 3179 nxt_queue_link_t *qlk; 3180 nxt_listen_event_t *lev; 3181 3182 fd = skcf->listen->socket; 3183 3184 for (qlk = nxt_queue_first(listen_connections); 3185 qlk != nxt_queue_tail(listen_connections); 3186 qlk = nxt_queue_next(qlk)) 3187 { 3188 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 3189 3190 if (fd == lev->socket.fd) { 3191 return lev; 3192 } 3193 } 3194 3195 return NULL; 3196 } 3197 3198 3199 static void 3200 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 3201 { 3202 nxt_joint_job_t *job; 3203 nxt_event_engine_t *engine; 3204 nxt_listen_event_t *lev; 3205 nxt_socket_conf_joint_t *joint, *old; 3206 3207 job = obj; 3208 joint = data; 3209 3210 engine = task->thread->engine; 3211 3212 nxt_queue_insert_tail(&engine->joints, &joint->link); 3213 3214 lev = nxt_router_listen_event(&engine->listen_connections, 3215 joint->socket_conf); 3216 3217 old = lev->socket.data; 3218 lev->socket.data = joint; 3219 lev->listen = joint->socket_conf->listen; 3220 3221 job->work.next = NULL; 3222 job->work.handler = nxt_router_conf_wait; 3223 3224 nxt_event_engine_post(job->tmcf->engine, &job->work); 3225 3226 /* 3227 * The task is allocated from configuration temporary 3228 * memory pool so it can be freed after engine post operation. 3229 */ 3230 3231 nxt_router_conf_release(&engine->task, old); 3232 } 3233 3234 3235 static void 3236 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 3237 { 3238 nxt_socket_conf_t *skcf; 3239 nxt_listen_event_t *lev; 3240 nxt_event_engine_t *engine; 3241 nxt_socket_conf_joint_t *joint; 3242 3243 skcf = data; 3244 3245 engine = task->thread->engine; 3246 3247 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 3248 3249 nxt_fd_event_delete(engine, &lev->socket); 3250 3251 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 3252 lev->socket.fd); 3253 3254 joint = lev->socket.data; 3255 joint->close_job = obj; 3256 3257 lev->timer.handler = nxt_router_listen_socket_close; 3258 lev->timer.work_queue = &engine->fast_work_queue; 3259 3260 nxt_timer_add(engine, &lev->timer, 0); 3261 } 3262 3263 3264 static void 3265 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 3266 { 3267 nxt_event_engine_t *engine; 3268 3269 nxt_debug(task, "router worker thread quit"); 3270 3271 engine = task->thread->engine; 3272 3273 engine->shutdown = 1; 3274 3275 if (nxt_queue_is_empty(&engine->joints)) { 3276 nxt_thread_exit(task->thread); 3277 } 3278 } 3279 3280 3281 static void 3282 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3283 { 3284 nxt_timer_t *timer; 3285 nxt_joint_job_t *job; 3286 nxt_listen_event_t *lev; 3287 nxt_socket_conf_joint_t *joint; 3288 3289 timer = obj; 3290 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3291 3292 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3293 lev->socket.fd); 3294 3295 nxt_queue_remove(&lev->link); 3296 3297 joint = lev->socket.data; 3298 lev->socket.data = NULL; 3299 3300 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3301 task = &task->thread->engine->task; 3302 3303 nxt_router_listen_socket_release(task, joint->socket_conf); 3304 3305 job = joint->close_job; 3306 job->work.next = NULL; 3307 job->work.handler = nxt_router_conf_wait; 3308 3309 nxt_event_engine_post(job->tmcf->engine, &job->work); 3310 3311 nxt_router_listen_event_release(task, lev, joint); 3312 } 3313 3314 3315 static void 3316 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3317 { 3318 nxt_listen_socket_t *ls; 3319 nxt_thread_spinlock_t *lock; 3320 3321 ls = skcf->listen; 3322 lock = &skcf->router_conf->router->lock; 3323 3324 nxt_thread_spin_lock(lock); 3325 3326 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3327 task->thread->engine, ls->count); 3328 3329 if (--ls->count != 0) { 3330 ls = NULL; 3331 } 3332 3333 nxt_thread_spin_unlock(lock); 3334 3335 if (ls != NULL) { 3336 nxt_socket_close(task, ls->socket); 3337 nxt_free(ls); 3338 } 3339 } 3340 3341 3342 void 3343 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3344 nxt_socket_conf_joint_t *joint) 3345 { 3346 nxt_event_engine_t *engine; 3347 3348 nxt_debug(task, "listen event count: %D", lev->count); 3349 3350 engine = task->thread->engine; 3351 3352 if (--lev->count == 0) { 3353 if (lev->next != NULL) { 3354 nxt_sockaddr_cache_free(engine, lev->next); 3355 3356 nxt_conn_free(task, lev->next); 3357 } 3358 3359 nxt_free(lev); 3360 } 3361 3362 if (joint != NULL) { 3363 nxt_router_conf_release(task, joint); 3364 } 3365 3366 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3367 nxt_thread_exit(task->thread); 3368 } 3369 } 3370 3371 3372 void 3373 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3374 { 3375 nxt_socket_conf_t *skcf; 3376 nxt_router_conf_t *rtcf; 3377 nxt_thread_spinlock_t *lock; 3378 3379 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3380 3381 if (--joint->count != 0) { 3382 return; 3383 } 3384 3385 nxt_queue_remove(&joint->link); 3386 3387 /* 3388 * The joint content can not be safely used after the critical 3389 * section protected by the spinlock because its memory pool may 3390 * be already destroyed by another thread. 3391 */ 3392 skcf = joint->socket_conf; 3393 rtcf = skcf->router_conf; 3394 lock = &rtcf->router->lock; 3395 3396 nxt_thread_spin_lock(lock); 3397 3398 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3399 rtcf, rtcf->count); 3400 3401 if (--skcf->count != 0) { 3402 skcf = NULL; 3403 rtcf = NULL; 3404 3405 } else { 3406 nxt_queue_remove(&skcf->link); 3407 3408 if (--rtcf->count != 0) { 3409 rtcf = NULL; 3410 } 3411 } 3412 3413 nxt_thread_spin_unlock(lock); 3414 3415 #if (NXT_TLS) 3416 if (skcf != NULL && skcf->tls != NULL) { 3417 task->thread->runtime->tls->server_free(task, skcf->tls); 3418 } 3419 #endif 3420 3421 /* TODO remove engine->port */ 3422 3423 if (rtcf != NULL) { 3424 nxt_debug(task, "old router conf is destroyed"); 3425 3426 nxt_router_apps_hash_use(task, rtcf, -1); 3427 3428 nxt_router_access_log_release(task, lock, rtcf->access_log); 3429 3430 nxt_mp_thread_adopt(rtcf->mem_pool); 3431 3432 nxt_mp_destroy(rtcf->mem_pool); 3433 } 3434 } 3435 3436 3437 static void 3438 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3439 nxt_router_access_log_t *access_log) 3440 { 3441 size_t size; 3442 u_char *buf, *p; 3443 nxt_off_t bytes; 3444 3445 static nxt_time_string_t date_cache = { 3446 (nxt_atomic_uint_t) -1, 3447 nxt_router_access_log_date, 3448 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3449 nxt_length("31/Dec/1986:19:40:00 +0300"), 3450 NXT_THREAD_TIME_LOCAL, 3451 NXT_THREAD_TIME_SEC, 3452 }; 3453 3454 size = r->remote->address_length 3455 + 6 /* ' - - [' */ 3456 + date_cache.size 3457 + 3 /* '] "' */ 3458 + r->method->length 3459 + 1 /* space */ 3460 + r->target.length 3461 + 1 /* space */ 3462 + r->version.length 3463 + 2 /* '" ' */ 3464 + 3 /* status */ 3465 + 1 /* space */ 3466 + NXT_OFF_T_LEN 3467 + 2 /* ' "' */ 3468 + (r->referer != NULL ? r->referer->value_length : 1) 3469 + 3 /* '" "' */ 3470 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3471 + 2 /* '"\n' */ 3472 ; 3473 3474 buf = nxt_mp_nget(r->mem_pool, size); 3475 if (nxt_slow_path(buf == NULL)) { 3476 return; 3477 } 3478 3479 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3480 r->remote->address_length); 3481 3482 p = nxt_cpymem(p, " - - [", 6); 3483 3484 p = nxt_thread_time_string(task->thread, &date_cache, p); 3485 3486 p = nxt_cpymem(p, "] \"", 3); 3487 3488 if (r->method->length != 0) { 3489 p = nxt_cpymem(p, r->method->start, r->method->length); 3490 3491 if (r->target.length != 0) { 3492 *p++ = ' '; 3493 p = nxt_cpymem(p, r->target.start, r->target.length); 3494 3495 if (r->version.length != 0) { 3496 *p++ = ' '; 3497 p = nxt_cpymem(p, r->version.start, r->version.length); 3498 } 3499 } 3500 3501 } else { 3502 *p++ = '-'; 3503 } 3504 3505 p = nxt_cpymem(p, "\" ", 2); 3506 3507 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3508 3509 *p++ = ' '; 3510 3511 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3512 3513 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3514 3515 p = nxt_cpymem(p, " \"", 2); 3516 3517 if (r->referer != NULL) { 3518 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3519 3520 } else { 3521 *p++ = '-'; 3522 } 3523 3524 p = nxt_cpymem(p, "\" \"", 3); 3525 3526 if (r->user_agent != NULL) { 3527 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3528 3529 } else { 3530 *p++ = '-'; 3531 } 3532 3533 p = nxt_cpymem(p, "\"\n", 2); 3534 3535 nxt_fd_write(access_log->fd, buf, p - buf); 3536 } 3537 3538 3539 static u_char * 3540 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3541 size_t size, const char *format) 3542 { 3543 u_char sign; 3544 time_t gmtoff; 3545 3546 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3547 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3548 3549 gmtoff = nxt_timezone(tm) / 60; 3550 3551 if (gmtoff < 0) { 3552 gmtoff = -gmtoff; 3553 sign = '-'; 3554 3555 } else { 3556 sign = '+'; 3557 } 3558 3559 return nxt_sprintf(buf, buf + size, format, 3560 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3561 tm->tm_hour, tm->tm_min, tm->tm_sec, 3562 sign, gmtoff / 60, gmtoff % 60); 3563 } 3564 3565 3566 static void 3567 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3568 { 3569 uint32_t stream; 3570 nxt_int_t ret; 3571 nxt_buf_t *b; 3572 nxt_port_t *main_port, *router_port; 3573 nxt_runtime_t *rt; 3574 nxt_router_access_log_t *access_log; 3575 3576 access_log = tmcf->router_conf->access_log; 3577 3578 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3579 if (nxt_slow_path(b == NULL)) { 3580 goto fail; 3581 } 3582 3583 b->completion_handler = nxt_router_dummy_buf_completion; 3584 3585 nxt_buf_cpystr(b, &access_log->path); 3586 *b->mem.free++ = '\0'; 3587 3588 rt = task->thread->runtime; 3589 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3590 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3591 3592 stream = nxt_port_rpc_register_handler(task, router_port, 3593 nxt_router_access_log_ready, 3594 nxt_router_access_log_error, 3595 -1, tmcf); 3596 if (nxt_slow_path(stream == 0)) { 3597 goto fail; 3598 } 3599 3600 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3601 stream, router_port->id, b); 3602 3603 if (nxt_slow_path(ret != NXT_OK)) { 3604 nxt_port_rpc_cancel(task, router_port, stream); 3605 goto fail; 3606 } 3607 3608 return; 3609 3610 fail: 3611 3612 nxt_router_conf_error(task, tmcf); 3613 } 3614 3615 3616 static void 3617 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3618 void *data) 3619 { 3620 nxt_router_temp_conf_t *tmcf; 3621 nxt_router_access_log_t *access_log; 3622 3623 tmcf = data; 3624 3625 access_log = tmcf->router_conf->access_log; 3626 3627 access_log->fd = msg->fd[0]; 3628 3629 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3630 nxt_router_conf_apply, task, tmcf, NULL); 3631 } 3632 3633 3634 static void 3635 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3636 void *data) 3637 { 3638 nxt_router_temp_conf_t *tmcf; 3639 3640 tmcf = data; 3641 3642 nxt_router_conf_error(task, tmcf); 3643 } 3644 3645 3646 static void 3647 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3648 nxt_router_access_log_t *access_log) 3649 { 3650 if (access_log == NULL) { 3651 return; 3652 } 3653 3654 nxt_thread_spin_lock(lock); 3655 3656 if (--access_log->count != 0) { 3657 access_log = NULL; 3658 } 3659 3660 nxt_thread_spin_unlock(lock); 3661 3662 if (access_log != NULL) { 3663 3664 if (access_log->fd != -1) { 3665 nxt_fd_close(access_log->fd); 3666 } 3667 3668 nxt_free(access_log); 3669 } 3670 } 3671 3672 3673 typedef struct { 3674 nxt_mp_t *mem_pool; 3675 nxt_router_access_log_t *access_log; 3676 } nxt_router_access_log_reopen_t; 3677 3678 3679 static void 3680 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3681 { 3682 nxt_mp_t *mp; 3683 uint32_t stream; 3684 nxt_int_t ret; 3685 nxt_buf_t *b; 3686 nxt_port_t *main_port, *router_port; 3687 nxt_runtime_t *rt; 3688 nxt_router_access_log_t *access_log; 3689 nxt_router_access_log_reopen_t *reopen; 3690 3691 access_log = nxt_router->access_log; 3692 3693 if (access_log == NULL) { 3694 return; 3695 } 3696 3697 mp = nxt_mp_create(1024, 128, 256, 32); 3698 if (nxt_slow_path(mp == NULL)) { 3699 return; 3700 } 3701 3702 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3703 if (nxt_slow_path(reopen == NULL)) { 3704 goto fail; 3705 } 3706 3707 reopen->mem_pool = mp; 3708 reopen->access_log = access_log; 3709 3710 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 3711 if (nxt_slow_path(b == NULL)) { 3712 goto fail; 3713 } 3714 3715 b->completion_handler = nxt_router_access_log_reopen_completion; 3716 3717 nxt_buf_cpystr(b, &access_log->path); 3718 *b->mem.free++ = '\0'; 3719 3720 rt = task->thread->runtime; 3721 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3722 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3723 3724 stream = nxt_port_rpc_register_handler(task, router_port, 3725 nxt_router_access_log_reopen_ready, 3726 nxt_router_access_log_reopen_error, 3727 -1, reopen); 3728 if (nxt_slow_path(stream == 0)) { 3729 goto fail; 3730 } 3731 3732 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3733 stream, router_port->id, b); 3734 3735 if (nxt_slow_path(ret != NXT_OK)) { 3736 nxt_port_rpc_cancel(task, router_port, stream); 3737 goto fail; 3738 } 3739 3740 nxt_mp_retain(mp); 3741 3742 return; 3743 3744 fail: 3745 3746 nxt_mp_destroy(mp); 3747 } 3748 3749 3750 static void 3751 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 3752 { 3753 nxt_mp_t *mp; 3754 nxt_buf_t *b; 3755 3756 b = obj; 3757 mp = b->data; 3758 3759 nxt_mp_release(mp); 3760 } 3761 3762 3763 static void 3764 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3765 void *data) 3766 { 3767 nxt_router_access_log_t *access_log; 3768 nxt_router_access_log_reopen_t *reopen; 3769 3770 reopen = data; 3771 3772 access_log = reopen->access_log; 3773 3774 if (access_log == nxt_router->access_log) { 3775 3776 if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { 3777 nxt_alert(task, "dup2(%FD, %FD) failed %E", 3778 msg->fd[0], access_log->fd, nxt_errno); 3779 } 3780 } 3781 3782 nxt_fd_close(msg->fd[0]); 3783 nxt_mp_release(reopen->mem_pool); 3784 } 3785 3786 3787 static void 3788 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3789 void *data) 3790 { 3791 nxt_router_access_log_reopen_t *reopen; 3792 3793 reopen = data; 3794 3795 nxt_mp_release(reopen->mem_pool); 3796 } 3797 3798 3799 static void 3800 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 3801 { 3802 nxt_port_t *port; 3803 nxt_thread_link_t *link; 3804 nxt_event_engine_t *engine; 3805 nxt_thread_handle_t handle; 3806 3807 handle = (nxt_thread_handle_t) (uintptr_t) obj; 3808 link = data; 3809 3810 nxt_thread_wait(handle); 3811 3812 engine = link->engine; 3813 3814 nxt_queue_remove(&engine->link); 3815 3816 port = engine->port; 3817 3818 // TODO notify all apps 3819 3820 port->engine = task->thread->engine; 3821 nxt_mp_thread_adopt(port->mem_pool); 3822 nxt_port_use(task, port, -1); 3823 3824 nxt_mp_thread_adopt(engine->mem_pool); 3825 nxt_mp_destroy(engine->mem_pool); 3826 3827 nxt_event_engine_free(engine); 3828 3829 nxt_free(link); 3830 } 3831 3832 3833 static void 3834 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3835 void *data) 3836 { 3837 size_t b_size, count; 3838 nxt_int_t ret; 3839 nxt_app_t *app; 3840 nxt_buf_t *b, *next; 3841 nxt_port_t *app_port; 3842 nxt_unit_field_t *f; 3843 nxt_http_field_t *field; 3844 nxt_http_request_t *r; 3845 nxt_unit_response_t *resp; 3846 nxt_request_rpc_data_t *req_rpc_data; 3847 3848 req_rpc_data = data; 3849 3850 r = req_rpc_data->request; 3851 if (nxt_slow_path(r == NULL)) { 3852 return; 3853 } 3854 3855 if (r->error) { 3856 nxt_request_rpc_data_unlink(task, req_rpc_data); 3857 return; 3858 } 3859 3860 app = req_rpc_data->app; 3861 nxt_assert(app != NULL); 3862 3863 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { 3864 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); 3865 3866 return; 3867 } 3868 3869 b = (msg->size == 0) ? NULL : msg->buf; 3870 3871 if (msg->port_msg.last != 0) { 3872 nxt_debug(task, "router data create last buf"); 3873 3874 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 3875 3876 req_rpc_data->rpc_cancel = 0; 3877 3878 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { 3879 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; 3880 } 3881 3882 nxt_request_rpc_data_unlink(task, req_rpc_data); 3883 3884 } else { 3885 if (app->timeout != 0) { 3886 r->timer.handler = nxt_router_app_timeout; 3887 r->timer_data = req_rpc_data; 3888 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 3889 } 3890 } 3891 3892 if (b == NULL) { 3893 return; 3894 } 3895 3896 if (msg->buf == b) { 3897 /* Disable instant buffer completion/re-using by port. */ 3898 msg->buf = NULL; 3899 } 3900 3901 if (r->header_sent) { 3902 nxt_buf_chain_add(&r->out, b); 3903 nxt_http_request_send_body(task, r, NULL); 3904 3905 } else { 3906 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0; 3907 3908 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) { 3909 nxt_alert(task, "response buffer too small: %z", b_size); 3910 goto fail; 3911 } 3912 3913 resp = (void *) b->mem.pos; 3914 count = (b_size - sizeof(nxt_unit_response_t)) 3915 / sizeof(nxt_unit_field_t); 3916 3917 if (nxt_slow_path(count < resp->fields_count)) { 3918 nxt_alert(task, "response buffer too small for fields count: %D", 3919 resp->fields_count); 3920 goto fail; 3921 } 3922 3923 field = NULL; 3924 3925 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 3926 if (f->skip) { 3927 continue; 3928 } 3929 3930 field = nxt_list_add(r->resp.fields); 3931 3932 if (nxt_slow_path(field == NULL)) { 3933 goto fail; 3934 } 3935 3936 field->hash = f->hash; 3937 field->skip = 0; 3938 field->hopbyhop = 0; 3939 3940 field->name_length = f->name_length; 3941 field->value_length = f->value_length; 3942 field->name = nxt_unit_sptr_get(&f->name); 3943 field->value = nxt_unit_sptr_get(&f->value); 3944 3945 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 3946 if (nxt_slow_path(ret != NXT_OK)) { 3947 goto fail; 3948 } 3949 3950 nxt_debug(task, "header%s: %*s: %*s", 3951 (field->skip ? " skipped" : ""), 3952 (size_t) field->name_length, field->name, 3953 (size_t) field->value_length, field->value); 3954 3955 if (field->skip) { 3956 r->resp.fields->last->nelts--; 3957 } 3958 } 3959 3960 r->status = resp->status; 3961 3962 if (resp->piggyback_content_length != 0) { 3963 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 3964 b->mem.free = b->mem.pos + resp->piggyback_content_length; 3965 3966 } else { 3967 b->mem.pos = b->mem.free; 3968 } 3969 3970 if (nxt_buf_mem_used_size(&b->mem) == 0) { 3971 next = b->next; 3972 b->next = NULL; 3973 3974 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3975 b->completion_handler, task, b, b->parent); 3976 3977 b = next; 3978 } 3979 3980 if (b != NULL) { 3981 nxt_buf_chain_add(&r->out, b); 3982 } 3983 3984 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 3985 3986 if (r->websocket_handshake 3987 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 3988 { 3989 app_port = req_rpc_data->app_port; 3990 if (nxt_slow_path(app_port == NULL)) { 3991 goto fail; 3992 } 3993 3994 nxt_thread_mutex_lock(&app->mutex); 3995 3996 app_port->main_app_port->active_websockets++; 3997 3998 nxt_thread_mutex_unlock(&app->mutex); 3999 4000 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); 4001 req_rpc_data->apr_action = NXT_APR_CLOSE; 4002 4003 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 4004 4005 r->state = &nxt_http_websocket; 4006 4007 } else { 4008 r->state = &nxt_http_request_send_state; 4009 } 4010 } 4011 4012 return; 4013 4014 fail: 4015 4016 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4017 4018 nxt_request_rpc_data_unlink(task, req_rpc_data); 4019 } 4020 4021 4022 static void 4023 nxt_router_req_headers_ack_handler(nxt_task_t *task, 4024 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 4025 { 4026 int res; 4027 nxt_app_t *app; 4028 nxt_buf_t *b; 4029 nxt_bool_t start_process, unlinked; 4030 nxt_port_t *app_port, *main_app_port, *idle_port; 4031 nxt_queue_link_t *idle_lnk; 4032 nxt_http_request_t *r; 4033 4034 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 4035 req_rpc_data->stream, 4036 msg->port_msg.pid, msg->port_msg.reply_port); 4037 4038 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 4039 msg->port_msg.pid); 4040 4041 app = req_rpc_data->app; 4042 r = req_rpc_data->request; 4043 4044 start_process = 0; 4045 unlinked = 0; 4046 4047 nxt_thread_mutex_lock(&app->mutex); 4048 4049 if (r->app_link.next != NULL) { 4050 nxt_queue_remove(&r->app_link); 4051 r->app_link.next = NULL; 4052 4053 unlinked = 1; 4054 } 4055 4056 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 4057 msg->port_msg.reply_port); 4058 if (nxt_slow_path(app_port == NULL)) { 4059 nxt_thread_mutex_unlock(&app->mutex); 4060 4061 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4062 4063 if (unlinked) { 4064 nxt_mp_release(r->mem_pool); 4065 } 4066 4067 return; 4068 } 4069 4070 main_app_port = app_port->main_app_port; 4071 4072 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 4073 app->idle_processes--; 4074 4075 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", 4076 &app->name, main_app_port->pid, main_app_port->id, 4077 (main_app_port->idle_start ? "idle_ports" : "spare_ports")); 4078 4079 /* Check port was in 'spare_ports' using idle_start field. */ 4080 if (main_app_port->idle_start == 0 4081 && app->idle_processes >= app->spare_processes) 4082 { 4083 /* 4084 * If there is a vacant space in spare ports, 4085 * move the last idle to spare_ports. 4086 */ 4087 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4088 4089 idle_lnk = nxt_queue_last(&app->idle_ports); 4090 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4091 nxt_queue_remove(idle_lnk); 4092 4093 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4094 4095 idle_port->idle_start = 0; 4096 4097 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4098 "to spare_ports", 4099 &app->name, idle_port->pid, idle_port->id); 4100 } 4101 4102 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4103 app->pending_processes++; 4104 start_process = 1; 4105 } 4106 } 4107 4108 main_app_port->active_requests++; 4109 4110 nxt_port_inc_use(app_port); 4111 4112 nxt_thread_mutex_unlock(&app->mutex); 4113 4114 if (unlinked) { 4115 nxt_mp_release(r->mem_pool); 4116 } 4117 4118 if (start_process) { 4119 nxt_router_start_app_process(task, app); 4120 } 4121 4122 nxt_port_use(task, req_rpc_data->app_port, -1); 4123 4124 req_rpc_data->app_port = app_port; 4125 4126 b = req_rpc_data->msg_info.buf; 4127 4128 if (b != NULL) { 4129 /* First buffer is already sent. Start from second. */ 4130 b = b->next; 4131 4132 req_rpc_data->msg_info.buf->next = NULL; 4133 } 4134 4135 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) { 4136 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, 4137 req_rpc_data->msg_info.body_fd); 4138 4139 if (req_rpc_data->msg_info.body_fd != -1) { 4140 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 4141 } 4142 4143 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 4144 req_rpc_data->msg_info.body_fd, 4145 req_rpc_data->stream, 4146 task->thread->engine->port->id, b); 4147 4148 if (nxt_slow_path(res != NXT_OK)) { 4149 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4150 } 4151 } 4152 4153 if (app->timeout != 0) { 4154 r->timer.handler = nxt_router_app_timeout; 4155 r->timer_data = req_rpc_data; 4156 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4157 } 4158 } 4159 4160 4161 static const nxt_http_request_state_t nxt_http_request_send_state 4162 nxt_aligned(64) = 4163 { 4164 .error_handler = nxt_http_request_error_handler, 4165 }; 4166 4167 4168 static void 4169 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 4170 { 4171 nxt_buf_t *out; 4172 nxt_http_request_t *r; 4173 4174 r = obj; 4175 4176 out = r->out; 4177 4178 if (out != NULL) { 4179 r->out = NULL; 4180 nxt_http_request_send(task, r, out); 4181 } 4182 } 4183 4184 4185 static void 4186 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4187 void *data) 4188 { 4189 nxt_request_rpc_data_t *req_rpc_data; 4190 4191 req_rpc_data = data; 4192 4193 req_rpc_data->rpc_cancel = 0; 4194 4195 /* TODO cancel message and return if cancelled. */ 4196 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 4197 4198 if (req_rpc_data->request != NULL) { 4199 nxt_http_request_error(task, req_rpc_data->request, 4200 NXT_HTTP_SERVICE_UNAVAILABLE); 4201 } 4202 4203 nxt_request_rpc_data_unlink(task, req_rpc_data); 4204 } 4205 4206 4207 static void 4208 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4209 void *data) 4210 { 4211 nxt_app_t *app; 4212 nxt_port_t *port; 4213 nxt_app_joint_t *app_joint; 4214 4215 app_joint = data; 4216 port = msg->u.new_port; 4217 4218 nxt_assert(app_joint != NULL); 4219 nxt_assert(port != NULL); 4220 nxt_assert(port->type == NXT_PROCESS_APP); 4221 nxt_assert(port->id == 0); 4222 4223 app = app_joint->app; 4224 4225 nxt_router_app_joint_use(task, app_joint, -1); 4226 4227 if (nxt_slow_path(app == NULL)) { 4228 nxt_debug(task, "new port ready for released app, send QUIT"); 4229 4230 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4231 4232 return; 4233 } 4234 4235 port->app = app; 4236 port->main_app_port = port; 4237 4238 nxt_thread_mutex_lock(&app->mutex); 4239 4240 nxt_assert(app->pending_processes != 0); 4241 4242 app->pending_processes--; 4243 app->processes++; 4244 nxt_port_hash_add(&app->port_hash, port); 4245 app->port_hash_count++; 4246 4247 nxt_thread_mutex_unlock(&app->mutex); 4248 4249 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 4250 &app->name, port->pid, app->processes, app->pending_processes); 4251 4252 nxt_router_app_shared_port_send(task, port); 4253 4254 nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); 4255 } 4256 4257 4258 static nxt_int_t 4259 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) 4260 { 4261 nxt_buf_t *b; 4262 nxt_port_t *port; 4263 nxt_port_msg_new_port_t *msg; 4264 4265 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 4266 sizeof(nxt_port_data_t)); 4267 if (nxt_slow_path(b == NULL)) { 4268 return NXT_ERROR; 4269 } 4270 4271 port = app_port->app->shared_port; 4272 4273 nxt_debug(task, "send port %FD to process %PI", 4274 port->pair[0], app_port->pid); 4275 4276 b->mem.free += sizeof(nxt_port_msg_new_port_t); 4277 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 4278 4279 msg->id = port->id; 4280 msg->pid = port->pid; 4281 msg->max_size = port->max_size; 4282 msg->max_share = port->max_share; 4283 msg->type = port->type; 4284 4285 return nxt_port_socket_write2(task, app_port, 4286 NXT_PORT_MSG_NEW_PORT, 4287 port->pair[0], port->queue_fd, 4288 0, 0, b); 4289 } 4290 4291 4292 static void 4293 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4294 void *data) 4295 { 4296 nxt_app_t *app; 4297 nxt_app_joint_t *app_joint; 4298 nxt_queue_link_t *link; 4299 nxt_http_request_t *r; 4300 4301 app_joint = data; 4302 4303 nxt_assert(app_joint != NULL); 4304 4305 app = app_joint->app; 4306 4307 nxt_router_app_joint_use(task, app_joint, -1); 4308 4309 if (nxt_slow_path(app == NULL)) { 4310 nxt_debug(task, "start error for released app"); 4311 4312 return; 4313 } 4314 4315 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4316 4317 link = NULL; 4318 4319 nxt_thread_mutex_lock(&app->mutex); 4320 4321 nxt_assert(app->pending_processes != 0); 4322 4323 app->pending_processes--; 4324 4325 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4326 link = nxt_queue_first(&app->ack_waiting_req); 4327 4328 nxt_queue_remove(link); 4329 link->next = NULL; 4330 } 4331 4332 nxt_thread_mutex_unlock(&app->mutex); 4333 4334 while (link != NULL) { 4335 r = nxt_container_of(link, nxt_http_request_t, app_link); 4336 4337 nxt_event_engine_post(r->engine, &r->err_work); 4338 4339 link = NULL; 4340 4341 nxt_thread_mutex_lock(&app->mutex); 4342 4343 if (app->processes == 0 && app->pending_processes == 0 4344 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4345 { 4346 link = nxt_queue_first(&app->ack_waiting_req); 4347 4348 nxt_queue_remove(link); 4349 link->next = NULL; 4350 } 4351 4352 nxt_thread_mutex_unlock(&app->mutex); 4353 } 4354 } 4355 4356 4357 4358 nxt_inline nxt_port_t * 4359 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) 4360 { 4361 nxt_port_t *port; 4362 4363 port = NULL; 4364 4365 nxt_thread_mutex_lock(&app->mutex); 4366 4367 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 4368 4369 /* Caller is responsible to decrease port use count. */ 4370 nxt_queue_chk_remove(&port->app_link); 4371 4372 if (nxt_queue_chk_remove(&port->idle_link)) { 4373 app->idle_processes--; 4374 4375 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", 4376 &app->name, port->pid, port->id, 4377 (port->idle_start ? "idle_ports" : "spare_ports")); 4378 } 4379 4380 nxt_port_hash_remove(&app->port_hash, port); 4381 app->port_hash_count--; 4382 4383 port->app = NULL; 4384 app->processes--; 4385 4386 break; 4387 4388 } nxt_queue_loop; 4389 4390 nxt_thread_mutex_unlock(&app->mutex); 4391 4392 return port; 4393 } 4394 4395 4396 static void 4397 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4398 { 4399 int c; 4400 4401 c = nxt_atomic_fetch_add(&app->use_count, i); 4402 4403 if (i < 0 && c == -i) { 4404 4405 if (task->thread->engine != app->engine) { 4406 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 4407 4408 } else { 4409 nxt_router_free_app(task, app->joint, NULL); 4410 } 4411 } 4412 } 4413 4414 4415 static void 4416 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 4417 { 4418 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 4419 4420 nxt_queue_remove(&app->link); 4421 4422 nxt_router_app_use(task, app, -1); 4423 } 4424 4425 4426 static void 4427 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 4428 nxt_apr_action_t action) 4429 { 4430 int inc_use; 4431 uint32_t got_response, dec_requests; 4432 nxt_app_t *app; 4433 nxt_bool_t port_unchained, send_quit, adjust_idle_timer; 4434 nxt_port_t *main_app_port; 4435 4436 nxt_assert(port != NULL); 4437 nxt_assert(port->app != NULL); 4438 4439 app = port->app; 4440 4441 inc_use = 0; 4442 got_response = 0; 4443 dec_requests = 0; 4444 4445 switch (action) { 4446 case NXT_APR_NEW_PORT: 4447 break; 4448 case NXT_APR_REQUEST_FAILED: 4449 dec_requests = 1; 4450 inc_use = -1; 4451 break; 4452 case NXT_APR_GOT_RESPONSE: 4453 got_response = 1; 4454 inc_use = -1; 4455 break; 4456 case NXT_APR_UPGRADE: 4457 got_response = 1; 4458 break; 4459 case NXT_APR_CLOSE: 4460 inc_use = -1; 4461 break; 4462 } 4463 4464 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, 4465 port->pid, port->id, 4466 (int) inc_use, (int) got_response); 4467 4468 if (port == app->shared_port) { 4469 nxt_thread_mutex_lock(&app->mutex); 4470 4471 app->active_requests -= got_response + dec_requests; 4472 4473 nxt_thread_mutex_unlock(&app->mutex); 4474 4475 goto adjust_use; 4476 } 4477 4478 main_app_port = port->main_app_port; 4479 4480 nxt_thread_mutex_lock(&app->mutex); 4481 4482 main_app_port->app_responses += got_response; 4483 main_app_port->active_requests -= got_response + dec_requests; 4484 app->active_requests -= got_response + dec_requests; 4485 4486 if (main_app_port->pair[1] != -1 4487 && (app->max_requests == 0 4488 || main_app_port->app_responses < app->max_requests)) 4489 { 4490 if (main_app_port->app_link.next == NULL) { 4491 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4492 4493 nxt_port_inc_use(main_app_port); 4494 } 4495 } 4496 4497 send_quit = (app->max_requests > 0 4498 && main_app_port->app_responses >= app->max_requests); 4499 4500 if (send_quit) { 4501 port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); 4502 4503 nxt_port_hash_remove(&app->port_hash, main_app_port); 4504 app->port_hash_count--; 4505 4506 main_app_port->app = NULL; 4507 app->processes--; 4508 4509 } else { 4510 port_unchained = 0; 4511 } 4512 4513 adjust_idle_timer = 0; 4514 4515 if (main_app_port->pair[1] != -1 && !send_quit 4516 && main_app_port->active_requests == 0 4517 && main_app_port->active_websockets == 0 4518 && main_app_port->idle_link.next == NULL) 4519 { 4520 if (app->idle_processes == app->spare_processes 4521 && app->adjust_idle_work.data == NULL) 4522 { 4523 adjust_idle_timer = 1; 4524 app->adjust_idle_work.data = app; 4525 app->adjust_idle_work.next = NULL; 4526 } 4527 4528 if (app->idle_processes < app->spare_processes) { 4529 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4530 4531 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", 4532 &app->name, main_app_port->pid, main_app_port->id); 4533 } else { 4534 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4535 4536 main_app_port->idle_start = task->thread->engine->timers.now; 4537 4538 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", 4539 &app->name, main_app_port->pid, main_app_port->id); 4540 } 4541 4542 app->idle_processes++; 4543 } 4544 4545 nxt_thread_mutex_unlock(&app->mutex); 4546 4547 if (adjust_idle_timer) { 4548 nxt_router_app_use(task, app, 1); 4549 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4550 } 4551 4552 /* ? */ 4553 if (main_app_port->pair[1] == -1) { 4554 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4555 &app->name, app, main_app_port, main_app_port->pid); 4556 4557 goto adjust_use; 4558 } 4559 4560 if (send_quit) { 4561 nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); 4562 4563 nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, 4564 NULL); 4565 4566 if (port_unchained) { 4567 nxt_port_use(task, main_app_port, -1); 4568 } 4569 4570 goto adjust_use; 4571 } 4572 4573 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4574 &app->name, app); 4575 4576 adjust_use: 4577 4578 nxt_port_use(task, port, inc_use); 4579 } 4580 4581 4582 void 4583 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4584 { 4585 nxt_app_t *app; 4586 nxt_bool_t unchain, start_process; 4587 nxt_port_t *idle_port; 4588 nxt_queue_link_t *idle_lnk; 4589 4590 app = port->app; 4591 4592 nxt_assert(app != NULL); 4593 4594 nxt_thread_mutex_lock(&app->mutex); 4595 4596 nxt_port_hash_remove(&app->port_hash, port); 4597 app->port_hash_count--; 4598 4599 if (port->id != 0) { 4600 nxt_thread_mutex_unlock(&app->mutex); 4601 4602 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, 4603 port->pid, port->id); 4604 4605 return; 4606 } 4607 4608 unchain = nxt_queue_chk_remove(&port->app_link); 4609 4610 if (nxt_queue_chk_remove(&port->idle_link)) { 4611 app->idle_processes--; 4612 4613 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", 4614 &app->name, port->pid, port->id, 4615 (port->idle_start ? "idle_ports" : "spare_ports")); 4616 4617 if (port->idle_start == 0 4618 && app->idle_processes >= app->spare_processes) 4619 { 4620 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4621 4622 idle_lnk = nxt_queue_last(&app->idle_ports); 4623 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4624 nxt_queue_remove(idle_lnk); 4625 4626 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4627 4628 idle_port->idle_start = 0; 4629 4630 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4631 "to spare_ports", 4632 &app->name, idle_port->pid, idle_port->id); 4633 } 4634 } 4635 4636 app->processes--; 4637 4638 start_process = !task->thread->engine->shutdown 4639 && nxt_router_app_can_start(app) 4640 && nxt_router_app_need_start(app); 4641 4642 if (start_process) { 4643 app->pending_processes++; 4644 } 4645 4646 nxt_thread_mutex_unlock(&app->mutex); 4647 4648 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 4649 4650 if (unchain) { 4651 nxt_port_use(task, port, -1); 4652 } 4653 4654 if (start_process) { 4655 nxt_router_start_app_process(task, app); 4656 } 4657 } 4658 4659 4660 static void 4661 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 4662 { 4663 nxt_app_t *app; 4664 nxt_bool_t queued; 4665 nxt_port_t *port; 4666 nxt_msec_t timeout, threshold; 4667 nxt_queue_link_t *lnk; 4668 nxt_event_engine_t *engine; 4669 4670 app = obj; 4671 queued = (data == app); 4672 4673 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 4674 &app->name, queued); 4675 4676 engine = task->thread->engine; 4677 4678 nxt_assert(app->engine == engine); 4679 4680 threshold = engine->timers.now + app->joint->idle_timer.bias; 4681 timeout = 0; 4682 4683 nxt_thread_mutex_lock(&app->mutex); 4684 4685 if (queued) { 4686 app->adjust_idle_work.data = NULL; 4687 } 4688 4689 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", 4690 &app->name, 4691 (int) app->idle_processes, (int) app->spare_processes); 4692 4693 while (app->idle_processes > app->spare_processes) { 4694 4695 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4696 4697 lnk = nxt_queue_first(&app->idle_ports); 4698 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 4699 4700 timeout = port->idle_start + app->idle_timeout; 4701 4702 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", 4703 &app->name, port->pid, 4704 port->idle_start, timeout, threshold); 4705 4706 if (timeout > threshold) { 4707 break; 4708 } 4709 4710 nxt_queue_remove(lnk); 4711 lnk->next = NULL; 4712 4713 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", 4714 &app->name, port->pid, port->id); 4715 4716 nxt_queue_chk_remove(&port->app_link); 4717 4718 nxt_port_hash_remove(&app->port_hash, port); 4719 app->port_hash_count--; 4720 4721 app->idle_processes--; 4722 app->processes--; 4723 port->app = NULL; 4724 4725 nxt_thread_mutex_unlock(&app->mutex); 4726 4727 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 4728 &app->name, port->pid); 4729 4730 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4731 4732 nxt_port_use(task, port, -1); 4733 4734 nxt_thread_mutex_lock(&app->mutex); 4735 } 4736 4737 nxt_thread_mutex_unlock(&app->mutex); 4738 4739 if (timeout > threshold) { 4740 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 4741 4742 } else { 4743 nxt_timer_disable(engine, &app->joint->idle_timer); 4744 } 4745 4746 if (queued) { 4747 nxt_router_app_use(task, app, -1); 4748 } 4749 } 4750 4751 4752 static void 4753 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 4754 { 4755 nxt_timer_t *timer; 4756 nxt_app_joint_t *app_joint; 4757 4758 timer = obj; 4759 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4760 4761 if (nxt_fast_path(app_joint->app != NULL)) { 4762 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 4763 } 4764 } 4765 4766 4767 static void 4768 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 4769 { 4770 nxt_timer_t *timer; 4771 nxt_app_joint_t *app_joint; 4772 4773 timer = obj; 4774 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4775 4776 nxt_router_app_joint_use(task, app_joint, -1); 4777 } 4778 4779 4780 static void 4781 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 4782 { 4783 nxt_app_t *app; 4784 nxt_port_t *port; 4785 nxt_app_joint_t *app_joint; 4786 4787 app_joint = obj; 4788 app = app_joint->app; 4789 4790 for ( ;; ) { 4791 port = nxt_router_app_get_port_for_quit(task, app); 4792 if (port == NULL) { 4793 break; 4794 } 4795 4796 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); 4797 4798 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4799 4800 nxt_port_use(task, port, -1); 4801 } 4802 4803 nxt_thread_mutex_lock(&app->mutex); 4804 4805 for ( ;; ) { 4806 port = nxt_port_hash_retrieve(&app->port_hash); 4807 if (port == NULL) { 4808 break; 4809 } 4810 4811 app->port_hash_count--; 4812 4813 port->app = NULL; 4814 4815 nxt_port_close(task, port); 4816 4817 nxt_port_use(task, port, -1); 4818 } 4819 4820 nxt_thread_mutex_unlock(&app->mutex); 4821 4822 nxt_assert(app->processes == 0); 4823 nxt_assert(app->active_requests == 0); 4824 nxt_assert(app->port_hash_count == 0); 4825 nxt_assert(app->idle_processes == 0); 4826 nxt_assert(nxt_queue_is_empty(&app->ports)); 4827 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 4828 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 4829 4830 nxt_port_mmaps_destroy(&app->outgoing, 1); 4831 4832 nxt_thread_mutex_destroy(&app->outgoing.mutex); 4833 4834 if (app->shared_port != NULL) { 4835 app->shared_port->app = NULL; 4836 nxt_port_close(task, app->shared_port); 4837 nxt_port_use(task, app->shared_port, -1); 4838 } 4839 4840 nxt_thread_mutex_destroy(&app->mutex); 4841 nxt_mp_destroy(app->mem_pool); 4842 4843 app_joint->app = NULL; 4844 4845 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 4846 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 4847 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 4848 4849 } else { 4850 nxt_router_app_joint_use(task, app_joint, -1); 4851 } 4852 } 4853 4854 4855 static void 4856 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 4857 nxt_request_rpc_data_t *req_rpc_data) 4858 { 4859 nxt_bool_t start_process; 4860 nxt_port_t *port; 4861 nxt_http_request_t *r; 4862 4863 start_process = 0; 4864 4865 nxt_thread_mutex_lock(&app->mutex); 4866 4867 port = app->shared_port; 4868 nxt_port_inc_use(port); 4869 4870 app->active_requests++; 4871 4872 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4873 app->pending_processes++; 4874 start_process = 1; 4875 } 4876 4877 r = req_rpc_data->request; 4878 4879 /* 4880 * Put request into application-wide list to be able to cancel request 4881 * if something goes wrong with application processes. 4882 */ 4883 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 4884 4885 nxt_thread_mutex_unlock(&app->mutex); 4886 4887 /* 4888 * Retain request memory pool while request is linked in ack_waiting_req 4889 * to guarantee request structure memory is accessble. 4890 */ 4891 nxt_mp_retain(r->mem_pool); 4892 4893 req_rpc_data->app_port = port; 4894 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 4895 4896 if (start_process) { 4897 nxt_router_start_app_process(task, app); 4898 } 4899 } 4900 4901 4902 void 4903 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, 4904 nxt_app_t *app) 4905 { 4906 nxt_event_engine_t *engine; 4907 nxt_request_rpc_data_t *req_rpc_data; 4908 4909 engine = task->thread->engine; 4910 4911 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, 4912 nxt_router_response_ready_handler, 4913 nxt_router_response_error_handler, 4914 sizeof(nxt_request_rpc_data_t)); 4915 if (nxt_slow_path(req_rpc_data == NULL)) { 4916 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4917 return; 4918 } 4919 4920 /* 4921 * At this point we have request req_rpc_data allocated and registered 4922 * in port handlers. Need to fixup request memory pool. Counterpart 4923 * release will be called via following call chain: 4924 * nxt_request_rpc_data_unlink() -> 4925 * nxt_router_http_request_release_post() -> 4926 * nxt_router_http_request_release() 4927 */ 4928 nxt_mp_retain(r->mem_pool); 4929 4930 r->timer.task = &engine->task; 4931 r->timer.work_queue = &engine->fast_work_queue; 4932 r->timer.log = engine->task.log; 4933 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 4934 4935 r->engine = engine; 4936 r->err_work.handler = nxt_router_http_request_error; 4937 r->err_work.task = task; 4938 r->err_work.obj = r; 4939 4940 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 4941 req_rpc_data->app = app; 4942 req_rpc_data->msg_info.body_fd = -1; 4943 req_rpc_data->rpc_cancel = 1; 4944 4945 nxt_router_app_use(task, app, 1); 4946 4947 req_rpc_data->request = r; 4948 r->req_rpc_data = req_rpc_data; 4949 4950 if (r->last != NULL) { 4951 r->last->completion_handler = nxt_router_http_request_done; 4952 } 4953 4954 nxt_router_app_port_get(task, app, req_rpc_data); 4955 nxt_router_app_prepare_request(task, req_rpc_data); 4956 } 4957 4958 4959 static void 4960 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 4961 { 4962 nxt_http_request_t *r; 4963 4964 r = obj; 4965 4966 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 4967 4968 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4969 4970 if (r->req_rpc_data != NULL) { 4971 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4972 } 4973 4974 nxt_mp_release(r->mem_pool); 4975 } 4976 4977 4978 static void 4979 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 4980 { 4981 nxt_http_request_t *r; 4982 4983 r = data; 4984 4985 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 4986 4987 if (r->req_rpc_data != NULL) { 4988 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4989 } 4990 4991 nxt_http_request_close_handler(task, r, r->proto.any); 4992 } 4993 4994 4995 static void 4996 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) 4997 { 4998 } 4999 5000 5001 static void 5002 nxt_router_app_prepare_request(nxt_task_t *task, 5003 nxt_request_rpc_data_t *req_rpc_data) 5004 { 5005 nxt_app_t *app; 5006 nxt_buf_t *buf, *body; 5007 nxt_int_t res; 5008 nxt_port_t *port, *reply_port; 5009 5010 int notify; 5011 struct { 5012 nxt_port_msg_t pm; 5013 nxt_port_mmap_msg_t mm; 5014 } msg; 5015 5016 5017 app = req_rpc_data->app; 5018 5019 nxt_assert(app != NULL); 5020 5021 port = req_rpc_data->app_port; 5022 5023 nxt_assert(port != NULL); 5024 nxt_assert(port->queue != NULL); 5025 5026 reply_port = task->thread->engine->port; 5027 5028 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, 5029 nxt_app_msg_prefix[app->type]); 5030 if (nxt_slow_path(buf == NULL)) { 5031 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", 5032 req_rpc_data->stream, &app->name); 5033 5034 nxt_http_request_error(task, req_rpc_data->request, 5035 NXT_HTTP_INTERNAL_SERVER_ERROR); 5036 5037 return; 5038 } 5039 5040 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 5041 nxt_buf_used_size(buf), 5042 port->socket.fd); 5043 5044 req_rpc_data->msg_info.buf = buf; 5045 5046 body = req_rpc_data->request->body; 5047 5048 if (body != NULL && nxt_buf_is_file(body)) { 5049 req_rpc_data->msg_info.body_fd = body->file->fd; 5050 5051 body->file->fd = -1; 5052 5053 } else { 5054 req_rpc_data->msg_info.body_fd = -1; 5055 } 5056 5057 msg.pm.stream = req_rpc_data->stream; 5058 msg.pm.pid = reply_port->pid; 5059 msg.pm.reply_port = reply_port->id; 5060 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; 5061 msg.pm.last = 0; 5062 msg.pm.mmap = 1; 5063 msg.pm.nf = 0; 5064 msg.pm.mf = 0; 5065 msg.pm.tracking = 0; 5066 5067 nxt_port_mmap_handler_t *mmap_handler = buf->parent; 5068 nxt_port_mmap_header_t *hdr = mmap_handler->hdr; 5069 5070 msg.mm.mmap_id = hdr->id; 5071 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); 5072 msg.mm.size = nxt_buf_used_size(buf); 5073 5074 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), 5075 req_rpc_data->stream, ¬ify, 5076 &req_rpc_data->msg_info.tracking_cookie); 5077 if (nxt_fast_path(res == NXT_OK)) { 5078 if (notify != 0) { 5079 (void) nxt_port_socket_write(task, port, 5080 NXT_PORT_MSG_READ_QUEUE, 5081 -1, req_rpc_data->stream, 5082 reply_port->id, NULL); 5083 5084 } else { 5085 nxt_debug(task, "queue is not empty"); 5086 } 5087 5088 buf->is_port_mmap_sent = 1; 5089 buf->mem.pos = buf->mem.free; 5090 5091 } else { 5092 nxt_alert(task, "stream #%uD, app '%V': failed to send app message", 5093 req_rpc_data->stream, &app->name); 5094 5095 nxt_http_request_error(task, req_rpc_data->request, 5096 NXT_HTTP_INTERNAL_SERVER_ERROR); 5097 } 5098 } 5099 5100 5101 struct nxt_fields_iter_s { 5102 nxt_list_part_t *part; 5103 nxt_http_field_t *field; 5104 }; 5105 5106 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 5107 5108 5109 static nxt_http_field_t * 5110 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 5111 { 5112 if (part == NULL) { 5113 return NULL; 5114 } 5115 5116 while (part->nelts == 0) { 5117 part = part->next; 5118 if (part == NULL) { 5119 return NULL; 5120 } 5121 } 5122 5123 i->part = part; 5124 i->field = nxt_list_data(i->part); 5125 5126 return i->field; 5127 } 5128 5129 5130 static nxt_http_field_t * 5131 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 5132 { 5133 return nxt_fields_part_first(nxt_list_part(fields), i); 5134 } 5135 5136 5137 static nxt_http_field_t * 5138 nxt_fields_next(nxt_fields_iter_t *i) 5139 { 5140 nxt_http_field_t *end = nxt_list_data(i->part); 5141 5142 end += i->part->nelts; 5143 i->field++; 5144 5145 if (i->field < end) { 5146 return i->field; 5147 } 5148 5149 return nxt_fields_part_first(i->part->next, i); 5150 } 5151 5152 5153 static nxt_buf_t * 5154 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, 5155 nxt_app_t *app, const nxt_str_t *prefix) 5156 { 5157 void *target_pos, *query_pos; 5158 u_char *pos, *end, *p, c; 5159 size_t fields_count, req_size, size, free_size; 5160 size_t copy_size; 5161 nxt_off_t content_length; 5162 nxt_buf_t *b, *buf, *out, **tail; 5163 nxt_http_field_t *field, *dup; 5164 nxt_unit_field_t *dst_field; 5165 nxt_fields_iter_t iter, dup_iter; 5166 nxt_unit_request_t *req; 5167 5168 req_size = sizeof(nxt_unit_request_t) 5169 + r->method->length + 1 5170 + r->version.length + 1 5171 + r->remote->length + 1 5172 + r->local->length + 1 5173 + r->server_name.length + 1 5174 + r->target.length + 1 5175 + (r->path->start != r->target.start ? r->path->length + 1 : 0); 5176 5177 content_length = r->content_length_n < 0 ? 0 : r->content_length_n; 5178 fields_count = 0; 5179 5180 nxt_list_each(field, r->fields) { 5181 fields_count++; 5182 5183 req_size += field->name_length + prefix->length + 1 5184 + field->value_length + 1; 5185 } nxt_list_loop; 5186 5187 req_size += fields_count * sizeof(nxt_unit_field_t); 5188 5189 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 5190 nxt_alert(task, "headers to big to fit in shared memory (%d)", 5191 (int) req_size); 5192 5193 return NULL; 5194 } 5195 5196 out = nxt_port_mmap_get_buf(task, &app->outgoing, 5197 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); 5198 if (nxt_slow_path(out == NULL)) { 5199 return NULL; 5200 } 5201 5202 req = (nxt_unit_request_t *) out->mem.free; 5203 out->mem.free += req_size; 5204 5205 req->app_target = r->app_target; 5206 5207 req->content_length = content_length; 5208 5209 p = (u_char *) (req->fields + fields_count); 5210 5211 nxt_debug(task, "fields_count=%d", (int) fields_count); 5212 5213 req->method_length = r->method->length; 5214 nxt_unit_sptr_set(&req->method, p); 5215 p = nxt_cpymem(p, r->method->start, r->method->length); 5216 *p++ = '\0'; 5217 5218 req->version_length = r->version.length; 5219 nxt_unit_sptr_set(&req->version, p); 5220 p = nxt_cpymem(p, r->version.start, r->version.length); 5221 *p++ = '\0'; 5222 5223 req->remote_length = r->remote->address_length; 5224 nxt_unit_sptr_set(&req->remote, p); 5225 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote), 5226 r->remote->address_length); 5227 *p++ = '\0'; 5228 5229 req->local_length = r->local->address_length; 5230 nxt_unit_sptr_set(&req->local, p); 5231 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length); 5232 *p++ = '\0'; 5233 5234 req->tls = (r->tls != NULL); 5235 req->websocket_handshake = r->websocket_handshake; 5236 5237 req->server_name_length = r->server_name.length; 5238 nxt_unit_sptr_set(&req->server_name, p); 5239 p = nxt_cpymem(p, r->server_name.start, r->server_name.length); 5240 *p++ = '\0'; 5241 5242 target_pos = p; 5243 req->target_length = (uint32_t) r->target.length; 5244 nxt_unit_sptr_set(&req->target, p); 5245 p = nxt_cpymem(p, r->target.start, r->target.length); 5246 *p++ = '\0'; 5247 5248 req->path_length = (uint32_t) r->path->length; 5249 if (r->path->start == r->target.start) { 5250 nxt_unit_sptr_set(&req->path, target_pos); 5251 5252 } else { 5253 nxt_unit_sptr_set(&req->path, p); 5254 p = nxt_cpymem(p, r->path->start, r->path->length); 5255 *p++ = '\0'; 5256 } 5257 5258 req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0; 5259 if (r->args != NULL && r->args->start != NULL) { 5260 query_pos = nxt_pointer_to(target_pos, 5261 r->args->start - r->target.start); 5262 5263 nxt_unit_sptr_set(&req->query, query_pos); 5264 5265 } else { 5266 req->query.offset = 0; 5267 } 5268 5269 req->content_length_field = NXT_UNIT_NONE_FIELD; 5270 req->content_type_field = NXT_UNIT_NONE_FIELD; 5271 req->cookie_field = NXT_UNIT_NONE_FIELD; 5272 req->authorization_field = NXT_UNIT_NONE_FIELD; 5273 5274 dst_field = req->fields; 5275 5276 for (field = nxt_fields_first(r->fields, &iter); 5277 field != NULL; 5278 field = nxt_fields_next(&iter)) 5279 { 5280 if (field->skip) { 5281 continue; 5282 } 5283 5284 dst_field->hash = field->hash; 5285 dst_field->skip = 0; 5286 dst_field->name_length = field->name_length + prefix->length; 5287 dst_field->value_length = field->value_length; 5288 5289 if (field == r->content_length) { 5290 req->content_length_field = dst_field - req->fields; 5291 5292 } else if (field == r->content_type) { 5293 req->content_type_field = dst_field - req->fields; 5294 5295 } else if (field == r->cookie) { 5296 req->cookie_field = dst_field - req->fields; 5297 5298 } else if (field == r->authorization) { 5299 req->authorization_field = dst_field - req->fields; 5300 } 5301 5302 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 5303 (int) field->hash, (int) field->skip, 5304 (int) field->name_length, field->name, 5305 (int) field->value_length, field->value); 5306 5307 if (prefix->length != 0) { 5308 nxt_unit_sptr_set(&dst_field->name, p); 5309 p = nxt_cpymem(p, prefix->start, prefix->length); 5310 5311 end = field->name + field->name_length; 5312 for (pos = field->name; pos < end; pos++) { 5313 c = *pos; 5314 5315 if (c >= 'a' && c <= 'z') { 5316 *p++ = (c & ~0x20); 5317 continue; 5318 } 5319 5320 if (c == '-') { 5321 *p++ = '_'; 5322 continue; 5323 } 5324 5325 *p++ = c; 5326 } 5327 5328 } else { 5329 nxt_unit_sptr_set(&dst_field->name, p); 5330 p = nxt_cpymem(p, field->name, field->name_length); 5331 } 5332 5333 *p++ = '\0'; 5334 5335 nxt_unit_sptr_set(&dst_field->value, p); 5336 p = nxt_cpymem(p, field->value, field->value_length); 5337 5338 if (prefix->length != 0) { 5339 dup_iter = iter; 5340 5341 for (dup = nxt_fields_next(&dup_iter); 5342 dup != NULL; 5343 dup = nxt_fields_next(&dup_iter)) 5344 { 5345 if (dup->name_length != field->name_length 5346 || dup->skip 5347 || dup->hash != field->hash 5348 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 5349 { 5350 continue; 5351 } 5352 5353 p = nxt_cpymem(p, ", ", 2); 5354 p = nxt_cpymem(p, dup->value, dup->value_length); 5355 5356 dst_field->value_length += 2 + dup->value_length; 5357 5358 dup->skip = 1; 5359 } 5360 } 5361 5362 *p++ = '\0'; 5363 5364 dst_field++; 5365 } 5366 5367 req->fields_count = (uint32_t) (dst_field - req->fields); 5368 5369 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 5370 5371 buf = out; 5372 tail = &buf->next; 5373 5374 for (b = r->body; b != NULL; b = b->next) { 5375 size = nxt_buf_mem_used_size(&b->mem); 5376 pos = b->mem.pos; 5377 5378 while (size > 0) { 5379 if (buf == NULL) { 5380 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 5381 5382 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5383 if (nxt_slow_path(buf == NULL)) { 5384 while (out != NULL) { 5385 buf = out->next; 5386 out->next = NULL; 5387 out->completion_handler(task, out, out->parent); 5388 out = buf; 5389 } 5390 return NULL; 5391 } 5392 5393 *tail = buf; 5394 tail = &buf->next; 5395 5396 } else { 5397 free_size = nxt_buf_mem_free_size(&buf->mem); 5398 if (free_size < size 5399 && nxt_port_mmap_increase_buf(task, buf, size, 1) 5400 == NXT_OK) 5401 { 5402 free_size = nxt_buf_mem_free_size(&buf->mem); 5403 } 5404 } 5405 5406 if (free_size > 0) { 5407 copy_size = nxt_min(free_size, size); 5408 5409 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 5410 5411 size -= copy_size; 5412 pos += copy_size; 5413 5414 if (size == 0) { 5415 break; 5416 } 5417 } 5418 5419 buf = NULL; 5420 } 5421 } 5422 5423 return out; 5424 } 5425 5426 5427 static void 5428 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 5429 { 5430 nxt_timer_t *timer; 5431 nxt_http_request_t *r; 5432 nxt_request_rpc_data_t *req_rpc_data; 5433 5434 timer = obj; 5435 5436 nxt_debug(task, "router app timeout"); 5437 5438 r = nxt_timer_data(timer, nxt_http_request_t, timer); 5439 req_rpc_data = r->timer_data; 5440 5441 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5442 5443 nxt_request_rpc_data_unlink(task, req_rpc_data); 5444 } 5445 5446 5447 static void 5448 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5449 { 5450 r->timer.handler = nxt_router_http_request_release; 5451 nxt_timer_add(task->thread->engine, &r->timer, 0); 5452 } 5453 5454 5455 static void 5456 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) 5457 { 5458 nxt_http_request_t *r; 5459 5460 nxt_debug(task, "http request pool release"); 5461 5462 r = nxt_timer_data(obj, nxt_http_request_t, timer); 5463 5464 nxt_mp_release(r->mem_pool); 5465 } 5466 5467 5468 static void 5469 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5470 { 5471 size_t mi; 5472 uint32_t i; 5473 nxt_bool_t ack; 5474 nxt_process_t *process; 5475 nxt_free_map_t *m; 5476 nxt_port_mmap_handler_t *mmap_handler; 5477 5478 nxt_debug(task, "oosm in %PI", msg->port_msg.pid); 5479 5480 process = nxt_runtime_process_find(task->thread->runtime, 5481 msg->port_msg.pid); 5482 if (nxt_slow_path(process == NULL)) { 5483 return; 5484 } 5485 5486 ack = 0; 5487 5488 /* 5489 * To mitigate possible racing condition (when OOSM message received 5490 * after some of the memory was already freed), need to try to find 5491 * first free segment in shared memory and send ACK if found. 5492 */ 5493 5494 nxt_thread_mutex_lock(&process->incoming.mutex); 5495 5496 for (i = 0; i < process->incoming.size; i++) { 5497 mmap_handler = process->incoming.elts[i].mmap_handler; 5498 5499 if (nxt_slow_path(mmap_handler == NULL)) { 5500 continue; 5501 } 5502 5503 m = mmap_handler->hdr->free_map; 5504 5505 for (mi = 0; mi < MAX_FREE_IDX; mi++) { 5506 if (m[mi] != 0) { 5507 ack = 1; 5508 5509 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", 5510 i, mi, m[mi]); 5511 5512 break; 5513 } 5514 } 5515 } 5516 5517 nxt_thread_mutex_unlock(&process->incoming.mutex); 5518 5519 if (ack) { 5520 nxt_process_broadcast_shm_ack(task, process); 5521 } 5522 } 5523 5524 5525 static void 5526 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5527 { 5528 nxt_fd_t fd; 5529 nxt_port_t *port; 5530 nxt_runtime_t *rt; 5531 nxt_port_mmaps_t *mmaps; 5532 nxt_port_msg_get_mmap_t *get_mmap_msg; 5533 nxt_port_mmap_handler_t *mmap_handler; 5534 5535 rt = task->thread->runtime; 5536 5537 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5538 msg->port_msg.reply_port); 5539 if (nxt_slow_path(port == NULL)) { 5540 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", 5541 msg->port_msg.pid, msg->port_msg.reply_port); 5542 5543 return; 5544 } 5545 5546 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5547 < (int) sizeof(nxt_port_msg_get_mmap_t))) 5548 { 5549 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", 5550 (int) nxt_buf_used_size(msg->buf)); 5551 5552 return; 5553 } 5554 5555 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; 5556 5557 nxt_assert(port->type == NXT_PROCESS_APP); 5558 5559 if (nxt_slow_path(port->app == NULL)) { 5560 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", 5561 port->pid, port->id); 5562 5563 // FIXME 5564 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5565 -1, msg->port_msg.stream, 0, NULL); 5566 5567 return; 5568 } 5569 5570 mmaps = &port->app->outgoing; 5571 nxt_thread_mutex_lock(&mmaps->mutex); 5572 5573 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { 5574 nxt_thread_mutex_unlock(&mmaps->mutex); 5575 5576 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", 5577 (int) get_mmap_msg->id); 5578 5579 // FIXME 5580 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5581 -1, msg->port_msg.stream, 0, NULL); 5582 return; 5583 } 5584 5585 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; 5586 5587 fd = mmap_handler->fd; 5588 5589 nxt_thread_mutex_unlock(&mmaps->mutex); 5590 5591 nxt_debug(task, "get mmap %PI:%d found", 5592 msg->port_msg.pid, (int) get_mmap_msg->id); 5593 5594 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 5595 } 5596 5597 5598 static void 5599 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5600 { 5601 nxt_port_t *port, *reply_port; 5602 nxt_runtime_t *rt; 5603 nxt_port_msg_get_port_t *get_port_msg; 5604 5605 rt = task->thread->runtime; 5606 5607 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5608 msg->port_msg.reply_port); 5609 if (nxt_slow_path(reply_port == NULL)) { 5610 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", 5611 msg->port_msg.pid, msg->port_msg.reply_port); 5612 5613 return; 5614 } 5615 5616 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5617 < (int) sizeof(nxt_port_msg_get_port_t))) 5618 { 5619 nxt_alert(task, "get_port_handler: message buffer too small (%d)", 5620 (int) nxt_buf_used_size(msg->buf)); 5621 5622 return; 5623 } 5624 5625 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; 5626 5627 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); 5628 if (nxt_slow_path(port == NULL)) { 5629 nxt_alert(task, "get_port_handler: port %PI:%d not found", 5630 get_port_msg->pid, get_port_msg->id); 5631 5632 return; 5633 } 5634 5635 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, 5636 get_port_msg->id); 5637 5638 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); 5639 } 5640