1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 #include <nxt_app_queue.h> 19 #include <nxt_port_queue.h> 20 21 #define NXT_SHARED_PORT_ID 0xFFFFu 22 23 typedef struct { 24 nxt_str_t type; 25 uint32_t processes; 26 uint32_t max_processes; 27 uint32_t spare_processes; 28 nxt_msec_t timeout; 29 nxt_msec_t idle_timeout; 30 nxt_conf_value_t *limits_value; 31 nxt_conf_value_t *processes_value; 32 nxt_conf_value_t *targets_value; 33 } nxt_router_app_conf_t; 34 35 36 typedef struct { 37 nxt_str_t pass; 38 nxt_str_t application; 39 } nxt_router_listener_conf_t; 40 41 42 #if (NXT_TLS) 43 44 typedef struct { 45 nxt_str_t name; 46 nxt_socket_conf_t *socket_conf; 47 nxt_router_temp_conf_t *temp_conf; 48 nxt_tls_init_t *tls_init; 49 nxt_bool_t last; 50 51 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 52 } nxt_router_tlssock_t; 53 54 #endif 55 56 57 typedef struct { 58 nxt_str_t *name; 59 nxt_socket_conf_t *socket_conf; 60 nxt_router_temp_conf_t *temp_conf; 61 nxt_bool_t last; 62 } nxt_socket_rpc_t; 63 64 65 typedef struct { 66 nxt_app_t *app; 67 nxt_router_temp_conf_t *temp_conf; 68 } nxt_app_rpc_t; 69 70 71 typedef struct { 72 nxt_app_joint_t *app_joint; 73 uint32_t generation; 74 } nxt_app_joint_rpc_t; 75 76 77 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 78 nxt_mp_t *mp); 79 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 80 static void nxt_router_greet_controller(nxt_task_t *task, 81 nxt_port_t *controller_port); 82 83 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 84 85 static void nxt_router_new_port_handler(nxt_task_t *task, 86 nxt_port_recv_msg_t *msg); 87 static void nxt_router_conf_data_handler(nxt_task_t *task, 88 nxt_port_recv_msg_t *msg); 89 static void nxt_router_app_restart_handler(nxt_task_t *task, 90 nxt_port_recv_msg_t *msg); 91 static void nxt_router_remove_pid_handler(nxt_task_t *task, 92 nxt_port_recv_msg_t *msg); 93 static void nxt_router_access_log_reopen_handler(nxt_task_t *task, 94 nxt_port_recv_msg_t *msg); 95 96 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 97 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 98 static void nxt_router_conf_ready(nxt_task_t *task, 99 nxt_router_temp_conf_t *tmcf); 100 static void nxt_router_conf_error(nxt_task_t *task, 101 nxt_router_temp_conf_t *tmcf); 102 static void nxt_router_conf_send(nxt_task_t *task, 103 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 104 105 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 106 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 107 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 108 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 109 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task, 110 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf, 111 nxt_conf_value_t *conf); 112 113 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 114 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 115 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 116 nxt_app_t *app); 117 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 118 nxt_str_t *name); 119 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 120 int i); 121 122 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 123 nxt_port_t *port); 124 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 125 nxt_port_t *port); 126 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 127 nxt_port_t *port, nxt_fd_t fd); 128 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 129 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 130 static void nxt_router_listen_socket_ready(nxt_task_t *task, 131 nxt_port_recv_msg_t *msg, void *data); 132 static void nxt_router_listen_socket_error(nxt_task_t *task, 133 nxt_port_recv_msg_t *msg, void *data); 134 #if (NXT_TLS) 135 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 136 nxt_port_recv_msg_t *msg, void *data); 137 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 138 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init, 139 nxt_bool_t last); 140 #endif 141 static void nxt_router_app_rpc_create(nxt_task_t *task, 142 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 143 static void nxt_router_app_prefork_ready(nxt_task_t *task, 144 nxt_port_recv_msg_t *msg, void *data); 145 static void nxt_router_app_prefork_error(nxt_task_t *task, 146 nxt_port_recv_msg_t *msg, void *data); 147 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 148 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 149 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 150 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 151 152 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 153 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 154 const nxt_event_interface_t *interface); 155 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 156 nxt_router_engine_conf_t *recf); 157 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 158 nxt_router_engine_conf_t *recf); 159 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 160 nxt_router_engine_conf_t *recf); 161 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 162 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 163 nxt_work_handler_t handler); 164 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 165 nxt_router_engine_conf_t *recf); 166 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 167 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 168 169 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 170 nxt_router_temp_conf_t *tmcf); 171 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 172 nxt_event_engine_t *engine); 173 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 174 nxt_router_temp_conf_t *tmcf); 175 176 static void nxt_router_engines_post(nxt_router_t *router, 177 nxt_router_temp_conf_t *tmcf); 178 static void nxt_router_engine_post(nxt_event_engine_t *engine, 179 nxt_work_t *jobs); 180 181 static void nxt_router_thread_start(void *data); 182 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 183 void *data); 184 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 185 void *data); 186 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 187 void *data); 188 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 189 void *data); 190 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 191 void *data); 192 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 193 void *data); 194 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 195 void *data); 196 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 197 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 198 static void nxt_router_listen_socket_release(nxt_task_t *task, 199 nxt_socket_conf_t *skcf); 200 201 static void nxt_router_access_log_writer(nxt_task_t *task, 202 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 203 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 204 struct tm *tm, size_t size, const char *format); 205 static void nxt_router_access_log_open(nxt_task_t *task, 206 nxt_router_temp_conf_t *tmcf); 207 static void nxt_router_access_log_ready(nxt_task_t *task, 208 nxt_port_recv_msg_t *msg, void *data); 209 static void nxt_router_access_log_error(nxt_task_t *task, 210 nxt_port_recv_msg_t *msg, void *data); 211 static void nxt_router_access_log_release(nxt_task_t *task, 212 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 213 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 214 void *data); 215 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 216 nxt_port_recv_msg_t *msg, void *data); 217 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 218 nxt_port_recv_msg_t *msg, void *data); 219 220 static void nxt_router_app_port_ready(nxt_task_t *task, 221 nxt_port_recv_msg_t *msg, void *data); 222 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, 223 nxt_port_t *app_port); 224 static void nxt_router_app_port_error(nxt_task_t *task, 225 nxt_port_recv_msg_t *msg, void *data); 226 227 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 228 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 229 230 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, 231 nxt_port_t *port, nxt_apr_action_t action); 232 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 233 nxt_request_rpc_data_t *req_rpc_data); 234 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 235 void *data); 236 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 237 void *data); 238 239 static void nxt_router_app_prepare_request(nxt_task_t *task, 240 nxt_request_rpc_data_t *req_rpc_data); 241 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 242 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 243 244 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 245 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 246 void *data); 247 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 248 void *data); 249 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 250 void *data); 251 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 252 253 static const nxt_http_request_state_t nxt_http_request_send_state; 254 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 255 256 static void nxt_router_app_joint_use(nxt_task_t *task, 257 nxt_app_joint_t *app_joint, int i); 258 259 static void nxt_router_http_request_release_post(nxt_task_t *task, 260 nxt_http_request_t *r); 261 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 262 void *data); 263 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 264 static void nxt_router_get_port_handler(nxt_task_t *task, 265 nxt_port_recv_msg_t *msg); 266 static void nxt_router_get_mmap_handler(nxt_task_t *task, 267 nxt_port_recv_msg_t *msg); 268 269 extern const nxt_http_request_state_t nxt_http_websocket; 270 271 static nxt_router_t *nxt_router; 272 273 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 274 static const nxt_str_t empty_prefix = nxt_string(""); 275 276 static const nxt_str_t *nxt_app_msg_prefix[] = { 277 &empty_prefix, 278 &empty_prefix, 279 &http_prefix, 280 &http_prefix, 281 &http_prefix, 282 &empty_prefix, 283 }; 284 285 286 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 287 .quit = nxt_signal_quit_handler, 288 .new_port = nxt_router_new_port_handler, 289 .get_port = nxt_router_get_port_handler, 290 .change_file = nxt_port_change_log_file_handler, 291 .mmap = nxt_port_mmap_handler, 292 .get_mmap = nxt_router_get_mmap_handler, 293 .data = nxt_router_conf_data_handler, 294 .app_restart = nxt_router_app_restart_handler, 295 .remove_pid = nxt_router_remove_pid_handler, 296 .access_log = nxt_router_access_log_reopen_handler, 297 .rpc_ready = nxt_port_rpc_handler, 298 .rpc_error = nxt_port_rpc_handler, 299 .oosm = nxt_router_oosm_handler, 300 }; 301 302 303 const nxt_process_init_t nxt_router_process = { 304 .name = "router", 305 .type = NXT_PROCESS_ROUTER, 306 .prefork = nxt_router_prefork, 307 .restart = 1, 308 .setup = nxt_process_core_setup, 309 .start = nxt_router_start, 310 .port_handlers = &nxt_router_process_port_handlers, 311 .signals = nxt_process_signals, 312 }; 313 314 315 /* Queues of nxt_socket_conf_t */ 316 nxt_queue_t creating_sockets; 317 nxt_queue_t pending_sockets; 318 nxt_queue_t updating_sockets; 319 nxt_queue_t keeping_sockets; 320 nxt_queue_t deleting_sockets; 321 322 323 static nxt_int_t 324 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 325 { 326 nxt_runtime_stop_app_processes(task, task->thread->runtime); 327 328 return NXT_OK; 329 } 330 331 332 static nxt_int_t 333 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 334 { 335 nxt_int_t ret; 336 nxt_port_t *controller_port; 337 nxt_router_t *router; 338 nxt_runtime_t *rt; 339 340 rt = task->thread->runtime; 341 342 nxt_log(task, NXT_LOG_INFO, "router started"); 343 344 #if (NXT_TLS) 345 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 346 if (nxt_slow_path(rt->tls == NULL)) { 347 return NXT_ERROR; 348 } 349 350 ret = rt->tls->library_init(task); 351 if (nxt_slow_path(ret != NXT_OK)) { 352 return ret; 353 } 354 #endif 355 356 ret = nxt_http_init(task); 357 if (nxt_slow_path(ret != NXT_OK)) { 358 return ret; 359 } 360 361 router = nxt_zalloc(sizeof(nxt_router_t)); 362 if (nxt_slow_path(router == NULL)) { 363 return NXT_ERROR; 364 } 365 366 nxt_queue_init(&router->engines); 367 nxt_queue_init(&router->sockets); 368 nxt_queue_init(&router->apps); 369 370 nxt_router = router; 371 372 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 373 if (controller_port != NULL) { 374 nxt_router_greet_controller(task, controller_port); 375 } 376 377 return NXT_OK; 378 } 379 380 381 static void 382 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 383 { 384 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 385 -1, 0, 0, NULL); 386 } 387 388 389 static void 390 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 391 void *data) 392 { 393 size_t size; 394 uint32_t stream; 395 nxt_mp_t *mp; 396 nxt_int_t ret; 397 nxt_app_t *app; 398 nxt_buf_t *b; 399 nxt_port_t *main_port; 400 nxt_runtime_t *rt; 401 nxt_app_joint_rpc_t *app_joint_rpc; 402 403 app = data; 404 405 rt = task->thread->runtime; 406 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 407 408 nxt_debug(task, "app '%V' %p start process", &app->name, app); 409 410 size = app->name.length + 1 + app->conf.length; 411 412 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 413 414 if (nxt_slow_path(b == NULL)) { 415 goto failed; 416 } 417 418 nxt_buf_cpystr(b, &app->name); 419 *b->mem.free++ = '\0'; 420 nxt_buf_cpystr(b, &app->conf); 421 422 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, 423 nxt_router_app_port_ready, 424 nxt_router_app_port_error, 425 sizeof(nxt_app_joint_rpc_t)); 426 if (nxt_slow_path(app_joint_rpc == NULL)) { 427 goto failed; 428 } 429 430 stream = nxt_port_rpc_ex_stream(app_joint_rpc); 431 432 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 433 -1, stream, port->id, b); 434 if (nxt_slow_path(ret != NXT_OK)) { 435 nxt_port_rpc_cancel(task, port, stream); 436 437 goto failed; 438 } 439 440 app_joint_rpc->app_joint = app->joint; 441 app_joint_rpc->generation = app->generation; 442 443 nxt_router_app_joint_use(task, app->joint, 1); 444 445 nxt_router_app_use(task, app, -1); 446 447 return; 448 449 failed: 450 451 if (b != NULL) { 452 mp = b->data; 453 nxt_mp_free(mp, b); 454 nxt_mp_release(mp); 455 } 456 457 nxt_thread_mutex_lock(&app->mutex); 458 459 app->pending_processes--; 460 461 nxt_thread_mutex_unlock(&app->mutex); 462 463 nxt_router_app_use(task, app, -1); 464 } 465 466 467 static void 468 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 469 { 470 app_joint->use_count += i; 471 472 if (app_joint->use_count == 0) { 473 nxt_assert(app_joint->app == NULL); 474 475 nxt_free(app_joint); 476 } 477 } 478 479 480 static nxt_int_t 481 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 482 { 483 nxt_int_t res; 484 nxt_port_t *router_port; 485 nxt_runtime_t *rt; 486 487 nxt_debug(task, "app '%V' start process", &app->name); 488 489 rt = task->thread->runtime; 490 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 491 492 nxt_router_app_use(task, app, 1); 493 494 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 495 app); 496 497 if (res == NXT_OK) { 498 return res; 499 } 500 501 nxt_thread_mutex_lock(&app->mutex); 502 503 app->pending_processes--; 504 505 nxt_thread_mutex_unlock(&app->mutex); 506 507 nxt_router_app_use(task, app, -1); 508 509 return NXT_ERROR; 510 } 511 512 513 nxt_inline nxt_bool_t 514 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 515 { 516 nxt_buf_t *b, *next; 517 nxt_bool_t cancelled; 518 nxt_port_t *app_port; 519 nxt_msg_info_t *msg_info; 520 521 msg_info = &req_rpc_data->msg_info; 522 523 if (msg_info->buf == NULL) { 524 return 0; 525 } 526 527 app_port = req_rpc_data->app_port; 528 529 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) { 530 cancelled = nxt_app_queue_cancel(app_port->queue, 531 msg_info->tracking_cookie, 532 req_rpc_data->stream); 533 534 if (cancelled) { 535 nxt_debug(task, "stream #%uD: cancelled by router", 536 req_rpc_data->stream); 537 } 538 539 } else { 540 cancelled = 0; 541 } 542 543 for (b = msg_info->buf; b != NULL; b = next) { 544 next = b->next; 545 b->next = NULL; 546 547 if (b->is_port_mmap_sent) { 548 b->is_port_mmap_sent = cancelled == 0; 549 } 550 551 b->completion_handler(task, b, b->parent); 552 } 553 554 msg_info->buf = NULL; 555 556 return cancelled; 557 } 558 559 560 nxt_inline nxt_bool_t 561 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 562 { 563 if (lnk->next != NULL) { 564 nxt_queue_remove(lnk); 565 566 lnk->next = NULL; 567 568 return 1; 569 } 570 571 return 0; 572 } 573 574 575 nxt_inline void 576 nxt_request_rpc_data_unlink(nxt_task_t *task, 577 nxt_request_rpc_data_t *req_rpc_data) 578 { 579 nxt_app_t *app; 580 nxt_bool_t unlinked; 581 nxt_http_request_t *r; 582 583 nxt_router_msg_cancel(task, req_rpc_data); 584 585 app = req_rpc_data->app; 586 587 if (req_rpc_data->app_port != NULL) { 588 nxt_router_app_port_release(task, app, req_rpc_data->app_port, 589 req_rpc_data->apr_action); 590 591 req_rpc_data->app_port = NULL; 592 } 593 594 r = req_rpc_data->request; 595 596 if (r != NULL) { 597 r->timer_data = NULL; 598 599 nxt_router_http_request_release_post(task, r); 600 601 r->req_rpc_data = NULL; 602 req_rpc_data->request = NULL; 603 604 if (app != NULL) { 605 unlinked = 0; 606 607 nxt_thread_mutex_lock(&app->mutex); 608 609 if (r->app_link.next != NULL) { 610 nxt_queue_remove(&r->app_link); 611 r->app_link.next = NULL; 612 613 unlinked = 1; 614 } 615 616 nxt_thread_mutex_unlock(&app->mutex); 617 618 if (unlinked) { 619 nxt_mp_release(r->mem_pool); 620 } 621 } 622 } 623 624 if (app != NULL) { 625 nxt_router_app_use(task, app, -1); 626 627 req_rpc_data->app = NULL; 628 } 629 630 if (req_rpc_data->msg_info.body_fd != -1) { 631 nxt_fd_close(req_rpc_data->msg_info.body_fd); 632 633 req_rpc_data->msg_info.body_fd = -1; 634 } 635 636 if (req_rpc_data->rpc_cancel) { 637 req_rpc_data->rpc_cancel = 0; 638 639 nxt_port_rpc_cancel(task, task->thread->engine->port, 640 req_rpc_data->stream); 641 } 642 } 643 644 645 static void 646 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 647 { 648 nxt_int_t res; 649 nxt_app_t *app; 650 nxt_port_t *port, *main_app_port; 651 nxt_runtime_t *rt; 652 653 nxt_port_new_port_handler(task, msg); 654 655 port = msg->u.new_port; 656 657 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 658 nxt_router_greet_controller(task, msg->u.new_port); 659 } 660 661 if (port == NULL || port->type != NXT_PROCESS_APP) { 662 663 if (msg->port_msg.stream == 0) { 664 return; 665 } 666 667 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 668 669 } else { 670 if (msg->fd[1] != -1) { 671 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 672 if (nxt_slow_path(res != NXT_OK)) { 673 return; 674 } 675 676 nxt_fd_close(msg->fd[1]); 677 msg->fd[1] = -1; 678 } 679 } 680 681 if (msg->port_msg.stream != 0) { 682 nxt_port_rpc_handler(task, msg); 683 return; 684 } 685 686 /* 687 * Port with "id == 0" is application 'main' port and it always 688 * should come with non-zero stream. 689 */ 690 nxt_assert(port->id != 0); 691 692 /* Find 'main' app port and get app reference. */ 693 rt = task->thread->runtime; 694 695 /* 696 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 697 * sent to main port (with id == 0) and processed in main thread. 698 */ 699 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 700 nxt_assert(main_app_port != NULL); 701 702 app = main_app_port->app; 703 704 if (nxt_fast_path(app != NULL)) { 705 nxt_thread_mutex_lock(&app->mutex); 706 707 /* TODO here should be find-and-add code because there can be 708 port waiters in port_hash */ 709 nxt_port_hash_add(&app->port_hash, port); 710 app->port_hash_count++; 711 712 nxt_thread_mutex_unlock(&app->mutex); 713 714 port->app = app; 715 } 716 717 port->main_app_port = main_app_port; 718 719 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 720 } 721 722 723 static void 724 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 725 { 726 void *p; 727 size_t size; 728 nxt_int_t ret; 729 nxt_port_t *port; 730 nxt_router_temp_conf_t *tmcf; 731 732 port = nxt_runtime_port_find(task->thread->runtime, 733 msg->port_msg.pid, 734 msg->port_msg.reply_port); 735 if (nxt_slow_path(port == NULL)) { 736 nxt_alert(task, "conf_data_handler: reply port not found"); 737 return; 738 } 739 740 p = MAP_FAILED; 741 742 /* 743 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 744 * initialized in 'cleanup' section. 745 */ 746 size = 0; 747 748 tmcf = nxt_router_temp_conf(task); 749 if (nxt_slow_path(tmcf == NULL)) { 750 goto fail; 751 } 752 753 if (nxt_slow_path(msg->fd[0] == -1)) { 754 nxt_alert(task, "conf_data_handler: invalid shm fd"); 755 goto fail; 756 } 757 758 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 759 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 760 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 761 goto fail; 762 } 763 764 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 765 766 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 767 768 nxt_fd_close(msg->fd[0]); 769 msg->fd[0] = -1; 770 771 if (nxt_slow_path(p == MAP_FAILED)) { 772 goto fail; 773 } 774 775 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 776 777 tmcf->router_conf->router = nxt_router; 778 tmcf->stream = msg->port_msg.stream; 779 tmcf->port = port; 780 781 nxt_port_use(task, tmcf->port, 1); 782 783 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 784 785 if (nxt_fast_path(ret == NXT_OK)) { 786 nxt_router_conf_apply(task, tmcf, NULL); 787 788 } else { 789 nxt_router_conf_error(task, tmcf); 790 } 791 792 goto cleanup; 793 794 fail: 795 796 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 797 msg->port_msg.stream, 0, NULL); 798 799 if (tmcf != NULL) { 800 nxt_mp_release(tmcf->mem_pool); 801 } 802 803 cleanup: 804 805 if (p != MAP_FAILED) { 806 nxt_mem_munmap(p, size); 807 } 808 809 if (msg->fd[0] != -1) { 810 nxt_fd_close(msg->fd[0]); 811 msg->fd[0] = -1; 812 } 813 } 814 815 816 static void 817 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 818 { 819 nxt_app_t *app; 820 nxt_int_t ret; 821 nxt_str_t app_name; 822 nxt_port_t *port, *reply_port, *shared_port, *old_shared_port; 823 nxt_port_msg_type_t reply; 824 825 reply_port = nxt_runtime_port_find(task->thread->runtime, 826 msg->port_msg.pid, 827 msg->port_msg.reply_port); 828 if (nxt_slow_path(reply_port == NULL)) { 829 nxt_alert(task, "app_restart_handler: reply port not found"); 830 return; 831 } 832 833 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem); 834 app_name.start = msg->buf->mem.pos; 835 836 nxt_debug(task, "app_restart_handler: %V", &app_name); 837 838 app = nxt_router_app_find(&nxt_router->apps, &app_name); 839 840 if (nxt_fast_path(app != NULL)) { 841 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 842 NXT_PROCESS_APP); 843 if (nxt_slow_path(shared_port == NULL)) { 844 goto fail; 845 } 846 847 ret = nxt_port_socket_init(task, shared_port, 0); 848 if (nxt_slow_path(ret != NXT_OK)) { 849 nxt_port_use(task, shared_port, -1); 850 goto fail; 851 } 852 853 ret = nxt_router_app_queue_init(task, shared_port); 854 if (nxt_slow_path(ret != NXT_OK)) { 855 nxt_port_write_close(shared_port); 856 nxt_port_read_close(shared_port); 857 nxt_port_use(task, shared_port, -1); 858 goto fail; 859 } 860 861 nxt_port_write_enable(task, shared_port); 862 863 nxt_thread_mutex_lock(&app->mutex); 864 865 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 866 867 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 868 0, 0, NULL); 869 870 } nxt_queue_loop; 871 872 app->generation++; 873 874 shared_port->app = app; 875 876 old_shared_port = app->shared_port; 877 old_shared_port->app = NULL; 878 879 app->shared_port = shared_port; 880 881 nxt_thread_mutex_unlock(&app->mutex); 882 883 nxt_port_close(task, old_shared_port); 884 nxt_port_use(task, old_shared_port, -1); 885 886 reply = NXT_PORT_MSG_RPC_READY_LAST; 887 888 } else { 889 890 fail: 891 892 reply = NXT_PORT_MSG_RPC_ERROR; 893 } 894 895 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream, 896 0, NULL); 897 } 898 899 900 static void 901 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 902 void *data) 903 { 904 union { 905 nxt_pid_t removed_pid; 906 void *data; 907 } u; 908 909 u.data = data; 910 911 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 912 } 913 914 915 static void 916 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 917 { 918 nxt_event_engine_t *engine; 919 920 nxt_port_remove_pid_handler(task, msg); 921 922 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 923 { 924 if (nxt_fast_path(engine->port != NULL)) { 925 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 926 msg->u.data); 927 } 928 } 929 nxt_queue_loop; 930 931 if (msg->port_msg.stream == 0) { 932 return; 933 } 934 935 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 936 937 nxt_port_rpc_handler(task, msg); 938 } 939 940 941 static nxt_router_temp_conf_t * 942 nxt_router_temp_conf(nxt_task_t *task) 943 { 944 nxt_mp_t *mp, *tmp; 945 nxt_router_conf_t *rtcf; 946 nxt_router_temp_conf_t *tmcf; 947 948 mp = nxt_mp_create(1024, 128, 256, 32); 949 if (nxt_slow_path(mp == NULL)) { 950 return NULL; 951 } 952 953 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 954 if (nxt_slow_path(rtcf == NULL)) { 955 goto fail; 956 } 957 958 rtcf->mem_pool = mp; 959 960 tmp = nxt_mp_create(1024, 128, 256, 32); 961 if (nxt_slow_path(tmp == NULL)) { 962 goto fail; 963 } 964 965 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 966 if (nxt_slow_path(tmcf == NULL)) { 967 goto temp_fail; 968 } 969 970 tmcf->mem_pool = tmp; 971 tmcf->router_conf = rtcf; 972 tmcf->count = 1; 973 tmcf->engine = task->thread->engine; 974 975 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 976 sizeof(nxt_router_engine_conf_t)); 977 if (nxt_slow_path(tmcf->engines == NULL)) { 978 goto temp_fail; 979 } 980 981 nxt_queue_init(&creating_sockets); 982 nxt_queue_init(&pending_sockets); 983 nxt_queue_init(&updating_sockets); 984 nxt_queue_init(&keeping_sockets); 985 nxt_queue_init(&deleting_sockets); 986 987 #if (NXT_TLS) 988 nxt_queue_init(&tmcf->tls); 989 #endif 990 991 nxt_queue_init(&tmcf->apps); 992 nxt_queue_init(&tmcf->previous); 993 994 return tmcf; 995 996 temp_fail: 997 998 nxt_mp_destroy(tmp); 999 1000 fail: 1001 1002 nxt_mp_destroy(mp); 1003 1004 return NULL; 1005 } 1006 1007 1008 nxt_inline nxt_bool_t 1009 nxt_router_app_can_start(nxt_app_t *app) 1010 { 1011 return app->processes + app->pending_processes < app->max_processes 1012 && app->pending_processes < app->max_pending_processes; 1013 } 1014 1015 1016 nxt_inline nxt_bool_t 1017 nxt_router_app_need_start(nxt_app_t *app) 1018 { 1019 return (app->active_requests 1020 > app->port_hash_count + app->pending_processes) 1021 || (app->spare_processes 1022 > app->idle_processes + app->pending_processes); 1023 } 1024 1025 1026 static void 1027 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1028 { 1029 nxt_int_t ret; 1030 nxt_app_t *app; 1031 nxt_router_t *router; 1032 nxt_runtime_t *rt; 1033 nxt_queue_link_t *qlk; 1034 nxt_socket_conf_t *skcf; 1035 nxt_router_conf_t *rtcf; 1036 nxt_router_temp_conf_t *tmcf; 1037 const nxt_event_interface_t *interface; 1038 #if (NXT_TLS) 1039 nxt_router_tlssock_t *tls; 1040 #endif 1041 1042 tmcf = obj; 1043 1044 qlk = nxt_queue_first(&pending_sockets); 1045 1046 if (qlk != nxt_queue_tail(&pending_sockets)) { 1047 nxt_queue_remove(qlk); 1048 nxt_queue_insert_tail(&creating_sockets, qlk); 1049 1050 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1051 1052 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1053 1054 return; 1055 } 1056 1057 #if (NXT_TLS) 1058 qlk = nxt_queue_last(&tmcf->tls); 1059 1060 if (qlk != nxt_queue_head(&tmcf->tls)) { 1061 nxt_queue_remove(qlk); 1062 1063 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1064 1065 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 1066 nxt_router_tls_rpc_handler, tls); 1067 return; 1068 } 1069 #endif 1070 1071 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1072 1073 if (nxt_router_app_need_start(app)) { 1074 nxt_router_app_rpc_create(task, tmcf, app); 1075 return; 1076 } 1077 1078 } nxt_queue_loop; 1079 1080 rtcf = tmcf->router_conf; 1081 1082 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1083 nxt_router_access_log_open(task, tmcf); 1084 return; 1085 } 1086 1087 rt = task->thread->runtime; 1088 1089 interface = nxt_service_get(rt->services, "engine", NULL); 1090 1091 router = rtcf->router; 1092 1093 ret = nxt_router_engines_create(task, router, tmcf, interface); 1094 if (nxt_slow_path(ret != NXT_OK)) { 1095 goto fail; 1096 } 1097 1098 ret = nxt_router_threads_create(task, rt, tmcf); 1099 if (nxt_slow_path(ret != NXT_OK)) { 1100 goto fail; 1101 } 1102 1103 nxt_router_apps_sort(task, router, tmcf); 1104 1105 nxt_router_apps_hash_use(task, rtcf, 1); 1106 1107 nxt_router_engines_post(router, tmcf); 1108 1109 nxt_queue_add(&router->sockets, &updating_sockets); 1110 nxt_queue_add(&router->sockets, &creating_sockets); 1111 1112 router->access_log = rtcf->access_log; 1113 1114 nxt_router_conf_ready(task, tmcf); 1115 1116 return; 1117 1118 fail: 1119 1120 nxt_router_conf_error(task, tmcf); 1121 1122 return; 1123 } 1124 1125 1126 static void 1127 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1128 { 1129 nxt_joint_job_t *job; 1130 1131 job = obj; 1132 1133 nxt_router_conf_ready(task, job->tmcf); 1134 } 1135 1136 1137 static void 1138 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1139 { 1140 uint32_t count; 1141 nxt_router_conf_t *rtcf; 1142 nxt_thread_spinlock_t *lock; 1143 1144 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1145 1146 if (--tmcf->count > 0) { 1147 return; 1148 } 1149 1150 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1151 1152 rtcf = tmcf->router_conf; 1153 1154 lock = &rtcf->router->lock; 1155 1156 nxt_thread_spin_lock(lock); 1157 1158 count = rtcf->count; 1159 1160 nxt_thread_spin_unlock(lock); 1161 1162 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1163 1164 if (count == 0) { 1165 nxt_router_apps_hash_use(task, rtcf, -1); 1166 1167 nxt_router_access_log_release(task, lock, rtcf->access_log); 1168 1169 nxt_mp_destroy(rtcf->mem_pool); 1170 } 1171 1172 nxt_mp_release(tmcf->mem_pool); 1173 } 1174 1175 1176 static void 1177 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1178 { 1179 nxt_app_t *app; 1180 nxt_queue_t new_socket_confs; 1181 nxt_socket_t s; 1182 nxt_router_t *router; 1183 nxt_queue_link_t *qlk; 1184 nxt_socket_conf_t *skcf; 1185 nxt_router_conf_t *rtcf; 1186 1187 nxt_alert(task, "failed to apply new conf"); 1188 1189 for (qlk = nxt_queue_first(&creating_sockets); 1190 qlk != nxt_queue_tail(&creating_sockets); 1191 qlk = nxt_queue_next(qlk)) 1192 { 1193 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1194 s = skcf->listen->socket; 1195 1196 if (s != -1) { 1197 nxt_socket_close(task, s); 1198 } 1199 1200 nxt_free(skcf->listen); 1201 } 1202 1203 nxt_queue_init(&new_socket_confs); 1204 nxt_queue_add(&new_socket_confs, &updating_sockets); 1205 nxt_queue_add(&new_socket_confs, &pending_sockets); 1206 nxt_queue_add(&new_socket_confs, &creating_sockets); 1207 1208 rtcf = tmcf->router_conf; 1209 1210 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1211 1212 nxt_router_app_unlink(task, app); 1213 1214 } nxt_queue_loop; 1215 1216 router = rtcf->router; 1217 1218 nxt_queue_add(&router->sockets, &keeping_sockets); 1219 nxt_queue_add(&router->sockets, &deleting_sockets); 1220 1221 nxt_queue_add(&router->apps, &tmcf->previous); 1222 1223 // TODO: new engines and threads 1224 1225 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1226 1227 nxt_mp_destroy(rtcf->mem_pool); 1228 1229 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1230 1231 nxt_mp_release(tmcf->mem_pool); 1232 } 1233 1234 1235 static void 1236 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1237 nxt_port_msg_type_t type) 1238 { 1239 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1240 1241 nxt_port_use(task, tmcf->port, -1); 1242 1243 tmcf->port = NULL; 1244 } 1245 1246 1247 static nxt_conf_map_t nxt_router_conf[] = { 1248 { 1249 nxt_string("listeners_threads"), 1250 NXT_CONF_MAP_INT32, 1251 offsetof(nxt_router_conf_t, threads), 1252 }, 1253 }; 1254 1255 1256 static nxt_conf_map_t nxt_router_app_conf[] = { 1257 { 1258 nxt_string("type"), 1259 NXT_CONF_MAP_STR, 1260 offsetof(nxt_router_app_conf_t, type), 1261 }, 1262 1263 { 1264 nxt_string("limits"), 1265 NXT_CONF_MAP_PTR, 1266 offsetof(nxt_router_app_conf_t, limits_value), 1267 }, 1268 1269 { 1270 nxt_string("processes"), 1271 NXT_CONF_MAP_INT32, 1272 offsetof(nxt_router_app_conf_t, processes), 1273 }, 1274 1275 { 1276 nxt_string("processes"), 1277 NXT_CONF_MAP_PTR, 1278 offsetof(nxt_router_app_conf_t, processes_value), 1279 }, 1280 1281 { 1282 nxt_string("targets"), 1283 NXT_CONF_MAP_PTR, 1284 offsetof(nxt_router_app_conf_t, targets_value), 1285 }, 1286 }; 1287 1288 1289 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1290 { 1291 nxt_string("timeout"), 1292 NXT_CONF_MAP_MSEC, 1293 offsetof(nxt_router_app_conf_t, timeout), 1294 }, 1295 }; 1296 1297 1298 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1299 { 1300 nxt_string("spare"), 1301 NXT_CONF_MAP_INT32, 1302 offsetof(nxt_router_app_conf_t, spare_processes), 1303 }, 1304 1305 { 1306 nxt_string("max"), 1307 NXT_CONF_MAP_INT32, 1308 offsetof(nxt_router_app_conf_t, max_processes), 1309 }, 1310 1311 { 1312 nxt_string("idle_timeout"), 1313 NXT_CONF_MAP_MSEC, 1314 offsetof(nxt_router_app_conf_t, idle_timeout), 1315 }, 1316 }; 1317 1318 1319 static nxt_conf_map_t nxt_router_listener_conf[] = { 1320 { 1321 nxt_string("pass"), 1322 NXT_CONF_MAP_STR_COPY, 1323 offsetof(nxt_router_listener_conf_t, pass), 1324 }, 1325 1326 { 1327 nxt_string("application"), 1328 NXT_CONF_MAP_STR_COPY, 1329 offsetof(nxt_router_listener_conf_t, application), 1330 }, 1331 }; 1332 1333 1334 static nxt_conf_map_t nxt_router_http_conf[] = { 1335 { 1336 nxt_string("header_buffer_size"), 1337 NXT_CONF_MAP_SIZE, 1338 offsetof(nxt_socket_conf_t, header_buffer_size), 1339 }, 1340 1341 { 1342 nxt_string("large_header_buffer_size"), 1343 NXT_CONF_MAP_SIZE, 1344 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1345 }, 1346 1347 { 1348 nxt_string("large_header_buffers"), 1349 NXT_CONF_MAP_SIZE, 1350 offsetof(nxt_socket_conf_t, large_header_buffers), 1351 }, 1352 1353 { 1354 nxt_string("body_buffer_size"), 1355 NXT_CONF_MAP_SIZE, 1356 offsetof(nxt_socket_conf_t, body_buffer_size), 1357 }, 1358 1359 { 1360 nxt_string("max_body_size"), 1361 NXT_CONF_MAP_SIZE, 1362 offsetof(nxt_socket_conf_t, max_body_size), 1363 }, 1364 1365 { 1366 nxt_string("idle_timeout"), 1367 NXT_CONF_MAP_MSEC, 1368 offsetof(nxt_socket_conf_t, idle_timeout), 1369 }, 1370 1371 { 1372 nxt_string("header_read_timeout"), 1373 NXT_CONF_MAP_MSEC, 1374 offsetof(nxt_socket_conf_t, header_read_timeout), 1375 }, 1376 1377 { 1378 nxt_string("body_read_timeout"), 1379 NXT_CONF_MAP_MSEC, 1380 offsetof(nxt_socket_conf_t, body_read_timeout), 1381 }, 1382 1383 { 1384 nxt_string("send_timeout"), 1385 NXT_CONF_MAP_MSEC, 1386 offsetof(nxt_socket_conf_t, send_timeout), 1387 }, 1388 1389 { 1390 nxt_string("body_temp_path"), 1391 NXT_CONF_MAP_STR, 1392 offsetof(nxt_socket_conf_t, body_temp_path), 1393 }, 1394 1395 { 1396 nxt_string("discard_unsafe_fields"), 1397 NXT_CONF_MAP_INT8, 1398 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1399 }, 1400 }; 1401 1402 1403 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1404 { 1405 nxt_string("max_frame_size"), 1406 NXT_CONF_MAP_SIZE, 1407 offsetof(nxt_websocket_conf_t, max_frame_size), 1408 }, 1409 1410 { 1411 nxt_string("read_timeout"), 1412 NXT_CONF_MAP_MSEC, 1413 offsetof(nxt_websocket_conf_t, read_timeout), 1414 }, 1415 1416 { 1417 nxt_string("keepalive_interval"), 1418 NXT_CONF_MAP_MSEC, 1419 offsetof(nxt_websocket_conf_t, keepalive_interval), 1420 }, 1421 1422 }; 1423 1424 1425 static nxt_int_t 1426 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1427 u_char *start, u_char *end) 1428 { 1429 u_char *p; 1430 size_t size; 1431 nxt_mp_t *mp, *app_mp; 1432 uint32_t next, next_target; 1433 nxt_int_t ret; 1434 nxt_str_t name, path, target; 1435 nxt_app_t *app, *prev; 1436 nxt_str_t *t, *s, *targets; 1437 nxt_uint_t n, i; 1438 nxt_port_t *port; 1439 nxt_router_t *router; 1440 nxt_app_joint_t *app_joint; 1441 #if (NXT_TLS) 1442 nxt_tls_init_t *tls_init; 1443 nxt_conf_value_t *certificate; 1444 #endif 1445 nxt_conf_value_t *conf, *http, *value, *websocket; 1446 nxt_conf_value_t *applications, *application; 1447 nxt_conf_value_t *listeners, *listener; 1448 nxt_conf_value_t *routes_conf, *static_conf, *client_ip_conf; 1449 nxt_socket_conf_t *skcf; 1450 nxt_http_routes_t *routes; 1451 nxt_event_engine_t *engine; 1452 nxt_app_lang_module_t *lang; 1453 nxt_router_app_conf_t apcf; 1454 nxt_router_access_log_t *access_log; 1455 nxt_router_listener_conf_t lscf; 1456 1457 static nxt_str_t http_path = nxt_string("/settings/http"); 1458 static nxt_str_t applications_path = nxt_string("/applications"); 1459 static nxt_str_t listeners_path = nxt_string("/listeners"); 1460 static nxt_str_t routes_path = nxt_string("/routes"); 1461 static nxt_str_t access_log_path = nxt_string("/access_log"); 1462 #if (NXT_TLS) 1463 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1464 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands"); 1465 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size"); 1466 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout"); 1467 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets"); 1468 #endif 1469 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1470 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1471 static nxt_str_t client_ip_path = nxt_string("/client_ip"); 1472 1473 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1474 if (conf == NULL) { 1475 nxt_alert(task, "configuration parsing error"); 1476 return NXT_ERROR; 1477 } 1478 1479 mp = tmcf->router_conf->mem_pool; 1480 1481 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1482 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1483 if (ret != NXT_OK) { 1484 nxt_alert(task, "root map error"); 1485 return NXT_ERROR; 1486 } 1487 1488 if (tmcf->router_conf->threads == 0) { 1489 tmcf->router_conf->threads = nxt_ncpu; 1490 } 1491 1492 static_conf = nxt_conf_get_path(conf, &static_path); 1493 1494 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf); 1495 if (nxt_slow_path(ret != NXT_OK)) { 1496 return NXT_ERROR; 1497 } 1498 1499 router = tmcf->router_conf->router; 1500 1501 applications = nxt_conf_get_path(conf, &applications_path); 1502 1503 if (applications != NULL) { 1504 next = 0; 1505 1506 for ( ;; ) { 1507 application = nxt_conf_next_object_member(applications, 1508 &name, &next); 1509 if (application == NULL) { 1510 break; 1511 } 1512 1513 nxt_debug(task, "application \"%V\"", &name); 1514 1515 size = nxt_conf_json_length(application, NULL); 1516 1517 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1518 if (nxt_slow_path(app_mp == NULL)) { 1519 goto fail; 1520 } 1521 1522 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1523 if (app == NULL) { 1524 goto app_fail; 1525 } 1526 1527 nxt_memzero(app, sizeof(nxt_app_t)); 1528 1529 app->mem_pool = app_mp; 1530 1531 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1532 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1533 + name.length); 1534 1535 p = nxt_conf_json_print(app->conf.start, application, NULL); 1536 app->conf.length = p - app->conf.start; 1537 1538 nxt_assert(app->conf.length <= size); 1539 1540 nxt_debug(task, "application conf \"%V\"", &app->conf); 1541 1542 prev = nxt_router_app_find(&router->apps, &name); 1543 1544 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1545 nxt_mp_destroy(app_mp); 1546 1547 nxt_queue_remove(&prev->link); 1548 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1549 1550 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev); 1551 if (nxt_slow_path(ret != NXT_OK)) { 1552 goto fail; 1553 } 1554 1555 continue; 1556 } 1557 1558 apcf.processes = 1; 1559 apcf.max_processes = 1; 1560 apcf.spare_processes = 0; 1561 apcf.timeout = 0; 1562 apcf.idle_timeout = 15000; 1563 apcf.limits_value = NULL; 1564 apcf.processes_value = NULL; 1565 apcf.targets_value = NULL; 1566 1567 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1568 if (nxt_slow_path(app_joint == NULL)) { 1569 goto app_fail; 1570 } 1571 1572 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1573 1574 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1575 nxt_nitems(nxt_router_app_conf), &apcf); 1576 if (ret != NXT_OK) { 1577 nxt_alert(task, "application map error"); 1578 goto app_fail; 1579 } 1580 1581 if (apcf.limits_value != NULL) { 1582 1583 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1584 nxt_alert(task, "application limits is not object"); 1585 goto app_fail; 1586 } 1587 1588 ret = nxt_conf_map_object(mp, apcf.limits_value, 1589 nxt_router_app_limits_conf, 1590 nxt_nitems(nxt_router_app_limits_conf), 1591 &apcf); 1592 if (ret != NXT_OK) { 1593 nxt_alert(task, "application limits map error"); 1594 goto app_fail; 1595 } 1596 } 1597 1598 if (apcf.processes_value != NULL 1599 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1600 { 1601 ret = nxt_conf_map_object(mp, apcf.processes_value, 1602 nxt_router_app_processes_conf, 1603 nxt_nitems(nxt_router_app_processes_conf), 1604 &apcf); 1605 if (ret != NXT_OK) { 1606 nxt_alert(task, "application processes map error"); 1607 goto app_fail; 1608 } 1609 1610 } else { 1611 apcf.max_processes = apcf.processes; 1612 apcf.spare_processes = apcf.processes; 1613 } 1614 1615 if (apcf.targets_value != NULL) { 1616 n = nxt_conf_object_members_count(apcf.targets_value); 1617 1618 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1619 if (nxt_slow_path(targets == NULL)) { 1620 goto app_fail; 1621 } 1622 1623 next_target = 0; 1624 1625 for (i = 0; i < n; i++) { 1626 (void) nxt_conf_next_object_member(apcf.targets_value, 1627 &target, &next_target); 1628 1629 s = nxt_str_dup(app_mp, &targets[i], &target); 1630 if (nxt_slow_path(s == NULL)) { 1631 goto app_fail; 1632 } 1633 } 1634 1635 } else { 1636 targets = NULL; 1637 } 1638 1639 nxt_debug(task, "application type: %V", &apcf.type); 1640 nxt_debug(task, "application processes: %D", apcf.processes); 1641 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1642 1643 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1644 1645 if (lang == NULL) { 1646 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1647 goto app_fail; 1648 } 1649 1650 nxt_debug(task, "application language module: \"%s\"", lang->file); 1651 1652 ret = nxt_thread_mutex_create(&app->mutex); 1653 if (ret != NXT_OK) { 1654 goto app_fail; 1655 } 1656 1657 nxt_queue_init(&app->ports); 1658 nxt_queue_init(&app->spare_ports); 1659 nxt_queue_init(&app->idle_ports); 1660 nxt_queue_init(&app->ack_waiting_req); 1661 1662 app->name.length = name.length; 1663 nxt_memcpy(app->name.start, name.start, name.length); 1664 1665 app->type = lang->type; 1666 app->max_processes = apcf.max_processes; 1667 app->spare_processes = apcf.spare_processes; 1668 app->max_pending_processes = apcf.spare_processes 1669 ? apcf.spare_processes : 1; 1670 app->timeout = apcf.timeout; 1671 app->idle_timeout = apcf.idle_timeout; 1672 1673 app->targets = targets; 1674 1675 engine = task->thread->engine; 1676 1677 app->engine = engine; 1678 1679 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1680 app->adjust_idle_work.task = &engine->task; 1681 app->adjust_idle_work.obj = app; 1682 1683 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1684 1685 ret = nxt_router_apps_hash_add(tmcf->router_conf, app); 1686 if (nxt_slow_path(ret != NXT_OK)) { 1687 goto app_fail; 1688 } 1689 1690 nxt_router_app_use(task, app, 1); 1691 1692 app->joint = app_joint; 1693 1694 app_joint->use_count = 1; 1695 app_joint->app = app; 1696 1697 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1698 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1699 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1700 app_joint->idle_timer.task = &engine->task; 1701 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1702 1703 app_joint->free_app_work.handler = nxt_router_free_app; 1704 app_joint->free_app_work.task = &engine->task; 1705 app_joint->free_app_work.obj = app_joint; 1706 1707 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 1708 NXT_PROCESS_APP); 1709 if (nxt_slow_path(port == NULL)) { 1710 return NXT_ERROR; 1711 } 1712 1713 ret = nxt_port_socket_init(task, port, 0); 1714 if (nxt_slow_path(ret != NXT_OK)) { 1715 nxt_port_use(task, port, -1); 1716 return NXT_ERROR; 1717 } 1718 1719 ret = nxt_router_app_queue_init(task, port); 1720 if (nxt_slow_path(ret != NXT_OK)) { 1721 nxt_port_write_close(port); 1722 nxt_port_read_close(port); 1723 nxt_port_use(task, port, -1); 1724 return NXT_ERROR; 1725 } 1726 1727 nxt_port_write_enable(task, port); 1728 port->app = app; 1729 1730 app->shared_port = port; 1731 1732 nxt_thread_mutex_create(&app->outgoing.mutex); 1733 } 1734 } 1735 1736 routes_conf = nxt_conf_get_path(conf, &routes_path); 1737 if (nxt_fast_path(routes_conf != NULL)) { 1738 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1739 if (nxt_slow_path(routes == NULL)) { 1740 return NXT_ERROR; 1741 } 1742 tmcf->router_conf->routes = routes; 1743 } 1744 1745 ret = nxt_upstreams_create(task, tmcf, conf); 1746 if (nxt_slow_path(ret != NXT_OK)) { 1747 return ret; 1748 } 1749 1750 http = nxt_conf_get_path(conf, &http_path); 1751 #if 0 1752 if (http == NULL) { 1753 nxt_alert(task, "no \"http\" block"); 1754 return NXT_ERROR; 1755 } 1756 #endif 1757 1758 websocket = nxt_conf_get_path(conf, &websocket_path); 1759 1760 listeners = nxt_conf_get_path(conf, &listeners_path); 1761 1762 if (listeners != NULL) { 1763 next = 0; 1764 1765 for ( ;; ) { 1766 listener = nxt_conf_next_object_member(listeners, &name, &next); 1767 if (listener == NULL) { 1768 break; 1769 } 1770 1771 skcf = nxt_router_socket_conf(task, tmcf, &name); 1772 if (skcf == NULL) { 1773 goto fail; 1774 } 1775 1776 nxt_memzero(&lscf, sizeof(lscf)); 1777 1778 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1779 nxt_nitems(nxt_router_listener_conf), 1780 &lscf); 1781 if (ret != NXT_OK) { 1782 nxt_alert(task, "listener map error"); 1783 goto fail; 1784 } 1785 1786 nxt_debug(task, "application: %V", &lscf.application); 1787 1788 // STUB, default values if http block is not defined. 1789 skcf->header_buffer_size = 2048; 1790 skcf->large_header_buffer_size = 8192; 1791 skcf->large_header_buffers = 4; 1792 skcf->discard_unsafe_fields = 1; 1793 skcf->body_buffer_size = 16 * 1024; 1794 skcf->max_body_size = 8 * 1024 * 1024; 1795 skcf->proxy_header_buffer_size = 64 * 1024; 1796 skcf->proxy_buffer_size = 4096; 1797 skcf->proxy_buffers = 256; 1798 skcf->idle_timeout = 180 * 1000; 1799 skcf->header_read_timeout = 30 * 1000; 1800 skcf->body_read_timeout = 30 * 1000; 1801 skcf->send_timeout = 30 * 1000; 1802 skcf->proxy_timeout = 60 * 1000; 1803 skcf->proxy_send_timeout = 30 * 1000; 1804 skcf->proxy_read_timeout = 30 * 1000; 1805 1806 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1807 skcf->websocket_conf.read_timeout = 60 * 1000; 1808 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1809 1810 nxt_str_null(&skcf->body_temp_path); 1811 1812 if (http != NULL) { 1813 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1814 nxt_nitems(nxt_router_http_conf), 1815 skcf); 1816 if (ret != NXT_OK) { 1817 nxt_alert(task, "http map error"); 1818 goto fail; 1819 } 1820 } 1821 1822 if (websocket != NULL) { 1823 ret = nxt_conf_map_object(mp, websocket, 1824 nxt_router_websocket_conf, 1825 nxt_nitems(nxt_router_websocket_conf), 1826 &skcf->websocket_conf); 1827 if (ret != NXT_OK) { 1828 nxt_alert(task, "websocket map error"); 1829 goto fail; 1830 } 1831 } 1832 1833 t = &skcf->body_temp_path; 1834 1835 if (t->length == 0) { 1836 t->start = (u_char *) task->thread->runtime->tmp; 1837 t->length = nxt_strlen(t->start); 1838 } 1839 1840 client_ip_conf = nxt_conf_get_path(listener, &client_ip_path); 1841 ret = nxt_router_conf_process_client_ip(task, tmcf, skcf, 1842 client_ip_conf); 1843 if (nxt_slow_path(ret != NXT_OK)) { 1844 return NXT_ERROR; 1845 } 1846 1847 #if (NXT_TLS) 1848 certificate = nxt_conf_get_path(listener, &certificate_path); 1849 1850 if (certificate != NULL) { 1851 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t)); 1852 if (nxt_slow_path(tls_init == NULL)) { 1853 return NXT_ERROR; 1854 } 1855 1856 tls_init->cache_size = 0; 1857 tls_init->timeout = 300; 1858 1859 value = nxt_conf_get_path(listener, &conf_cache_path); 1860 if (value != NULL) { 1861 tls_init->cache_size = nxt_conf_get_number(value); 1862 } 1863 1864 value = nxt_conf_get_path(listener, &conf_timeout_path); 1865 if (value != NULL) { 1866 tls_init->timeout = nxt_conf_get_number(value); 1867 } 1868 1869 tls_init->conf_cmds = nxt_conf_get_path(listener, 1870 &conf_commands_path); 1871 1872 tls_init->tickets_conf = nxt_conf_get_path(listener, 1873 &conf_tickets); 1874 1875 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) { 1876 n = nxt_conf_array_elements_count(certificate); 1877 1878 for (i = 0; i < n; i++) { 1879 value = nxt_conf_get_array_element(certificate, i); 1880 1881 nxt_assert(value != NULL); 1882 1883 ret = nxt_router_conf_tls_insert(tmcf, value, skcf, 1884 tls_init, i == 0); 1885 if (nxt_slow_path(ret != NXT_OK)) { 1886 goto fail; 1887 } 1888 } 1889 1890 } else { 1891 /* NXT_CONF_STRING */ 1892 ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf, 1893 tls_init, 1); 1894 if (nxt_slow_path(ret != NXT_OK)) { 1895 goto fail; 1896 } 1897 } 1898 } 1899 #endif 1900 1901 skcf->listen->handler = nxt_http_conn_init; 1902 skcf->router_conf = tmcf->router_conf; 1903 skcf->router_conf->count++; 1904 1905 if (lscf.pass.length != 0) { 1906 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1907 1908 /* COMPATIBILITY: listener application. */ 1909 } else if (lscf.application.length > 0) { 1910 skcf->action = nxt_http_pass_application(task, 1911 tmcf->router_conf, 1912 &lscf.application); 1913 } 1914 1915 if (nxt_slow_path(skcf->action == NULL)) { 1916 goto fail; 1917 } 1918 } 1919 } 1920 1921 ret = nxt_http_routes_resolve(task, tmcf); 1922 if (nxt_slow_path(ret != NXT_OK)) { 1923 goto fail; 1924 } 1925 1926 value = nxt_conf_get_path(conf, &access_log_path); 1927 1928 if (value != NULL) { 1929 nxt_conf_get_string(value, &path); 1930 1931 access_log = router->access_log; 1932 1933 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1934 nxt_thread_spin_lock(&router->lock); 1935 access_log->count++; 1936 nxt_thread_spin_unlock(&router->lock); 1937 1938 } else { 1939 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1940 + path.length); 1941 if (access_log == NULL) { 1942 nxt_alert(task, "failed to allocate access log structure"); 1943 goto fail; 1944 } 1945 1946 access_log->fd = -1; 1947 access_log->handler = &nxt_router_access_log_writer; 1948 access_log->count = 1; 1949 1950 access_log->path.length = path.length; 1951 access_log->path.start = (u_char *) access_log 1952 + sizeof(nxt_router_access_log_t); 1953 1954 nxt_memcpy(access_log->path.start, path.start, path.length); 1955 } 1956 1957 tmcf->router_conf->access_log = access_log; 1958 } 1959 1960 nxt_queue_add(&deleting_sockets, &router->sockets); 1961 nxt_queue_init(&router->sockets); 1962 1963 return NXT_OK; 1964 1965 app_fail: 1966 1967 nxt_mp_destroy(app_mp); 1968 1969 fail: 1970 1971 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1972 1973 nxt_queue_remove(&app->link); 1974 nxt_thread_mutex_destroy(&app->mutex); 1975 nxt_mp_destroy(app->mem_pool); 1976 1977 } nxt_queue_loop; 1978 1979 return NXT_ERROR; 1980 } 1981 1982 1983 #if (NXT_TLS) 1984 1985 static nxt_int_t 1986 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 1987 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, 1988 nxt_tls_init_t *tls_init, nxt_bool_t last) 1989 { 1990 nxt_router_tlssock_t *tls; 1991 1992 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t)); 1993 if (nxt_slow_path(tls == NULL)) { 1994 return NXT_ERROR; 1995 } 1996 1997 tls->tls_init = tls_init; 1998 tls->socket_conf = skcf; 1999 tls->temp_conf = tmcf; 2000 tls->last = last; 2001 nxt_conf_get_string(value, &tls->name); 2002 2003 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 2004 2005 return NXT_OK; 2006 } 2007 2008 #endif 2009 2010 2011 static nxt_int_t 2012 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 2013 nxt_conf_value_t *conf) 2014 { 2015 uint32_t next, i; 2016 nxt_mp_t *mp; 2017 nxt_str_t *type, exten, str; 2018 nxt_int_t ret; 2019 nxt_uint_t exts; 2020 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 2021 2022 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 2023 2024 mp = rtcf->mem_pool; 2025 2026 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 2027 if (nxt_slow_path(ret != NXT_OK)) { 2028 return NXT_ERROR; 2029 } 2030 2031 if (conf == NULL) { 2032 return NXT_OK; 2033 } 2034 2035 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 2036 2037 if (mtypes_conf != NULL) { 2038 next = 0; 2039 2040 for ( ;; ) { 2041 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 2042 2043 if (ext_conf == NULL) { 2044 break; 2045 } 2046 2047 type = nxt_str_dup(mp, NULL, &str); 2048 if (nxt_slow_path(type == NULL)) { 2049 return NXT_ERROR; 2050 } 2051 2052 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 2053 nxt_conf_get_string(ext_conf, &str); 2054 2055 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2056 return NXT_ERROR; 2057 } 2058 2059 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2060 &exten, type); 2061 if (nxt_slow_path(ret != NXT_OK)) { 2062 return NXT_ERROR; 2063 } 2064 2065 continue; 2066 } 2067 2068 exts = nxt_conf_array_elements_count(ext_conf); 2069 2070 for (i = 0; i < exts; i++) { 2071 value = nxt_conf_get_array_element(ext_conf, i); 2072 2073 nxt_conf_get_string(value, &str); 2074 2075 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2076 return NXT_ERROR; 2077 } 2078 2079 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2080 &exten, type); 2081 if (nxt_slow_path(ret != NXT_OK)) { 2082 return NXT_ERROR; 2083 } 2084 } 2085 } 2086 } 2087 2088 return NXT_OK; 2089 } 2090 2091 2092 static nxt_int_t 2093 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2094 nxt_socket_conf_t *skcf, nxt_conf_value_t *conf) 2095 { 2096 char c; 2097 size_t i; 2098 nxt_mp_t *mp; 2099 uint32_t hash; 2100 nxt_str_t header; 2101 nxt_conf_value_t *source_conf, *header_conf, *recursive_conf; 2102 nxt_http_client_ip_t *client_ip; 2103 nxt_http_route_addr_rule_t *source; 2104 2105 static nxt_str_t header_path = nxt_string("/header"); 2106 static nxt_str_t source_path = nxt_string("/source"); 2107 static nxt_str_t recursive_path = nxt_string("/recursive"); 2108 2109 if (conf == NULL) { 2110 skcf->client_ip = NULL; 2111 2112 return NXT_OK; 2113 } 2114 2115 mp = tmcf->router_conf->mem_pool; 2116 2117 source_conf = nxt_conf_get_path(conf, &source_path); 2118 header_conf = nxt_conf_get_path(conf, &header_path); 2119 recursive_conf = nxt_conf_get_path(conf, &recursive_path); 2120 2121 if (source_conf == NULL || header_conf == NULL) { 2122 return NXT_ERROR; 2123 } 2124 2125 client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t)); 2126 if (nxt_slow_path(client_ip == NULL)) { 2127 return NXT_ERROR; 2128 } 2129 2130 source = nxt_http_route_addr_rule_create(task, mp, source_conf); 2131 if (nxt_slow_path(source == NULL)) { 2132 return NXT_ERROR; 2133 } 2134 2135 client_ip->source = source; 2136 2137 nxt_conf_get_string(header_conf, &header); 2138 2139 if (recursive_conf != NULL) { 2140 client_ip->recursive = nxt_conf_get_boolean(recursive_conf); 2141 } 2142 2143 client_ip->header = nxt_str_dup(mp, NULL, &header); 2144 if (nxt_slow_path(client_ip->header == NULL)) { 2145 return NXT_ERROR; 2146 } 2147 2148 hash = NXT_HTTP_FIELD_HASH_INIT; 2149 2150 for (i = 0; i < client_ip->header->length; i++) { 2151 c = client_ip->header->start[i]; 2152 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c)); 2153 } 2154 2155 hash = nxt_http_field_hash_end(hash) & 0xFFFF; 2156 2157 client_ip->header_hash = hash; 2158 2159 skcf->client_ip = client_ip; 2160 2161 return NXT_OK; 2162 } 2163 2164 2165 static nxt_app_t * 2166 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 2167 { 2168 nxt_app_t *app; 2169 2170 nxt_queue_each(app, queue, nxt_app_t, link) { 2171 2172 if (nxt_strstr_eq(name, &app->name)) { 2173 return app; 2174 } 2175 2176 } nxt_queue_loop; 2177 2178 return NULL; 2179 } 2180 2181 2182 static nxt_int_t 2183 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 2184 { 2185 void *mem; 2186 nxt_int_t fd; 2187 2188 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 2189 if (nxt_slow_path(fd == -1)) { 2190 return NXT_ERROR; 2191 } 2192 2193 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 2194 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2195 if (nxt_slow_path(mem == MAP_FAILED)) { 2196 nxt_fd_close(fd); 2197 2198 return NXT_ERROR; 2199 } 2200 2201 nxt_app_queue_init(mem); 2202 2203 port->queue_fd = fd; 2204 port->queue = mem; 2205 2206 return NXT_OK; 2207 } 2208 2209 2210 static nxt_int_t 2211 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 2212 { 2213 void *mem; 2214 nxt_int_t fd; 2215 2216 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 2217 if (nxt_slow_path(fd == -1)) { 2218 return NXT_ERROR; 2219 } 2220 2221 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2222 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2223 if (nxt_slow_path(mem == MAP_FAILED)) { 2224 nxt_fd_close(fd); 2225 2226 return NXT_ERROR; 2227 } 2228 2229 nxt_port_queue_init(mem); 2230 2231 port->queue_fd = fd; 2232 port->queue = mem; 2233 2234 return NXT_OK; 2235 } 2236 2237 2238 static nxt_int_t 2239 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 2240 { 2241 void *mem; 2242 2243 nxt_assert(fd != -1); 2244 2245 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2246 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2247 if (nxt_slow_path(mem == MAP_FAILED)) { 2248 2249 return NXT_ERROR; 2250 } 2251 2252 port->queue = mem; 2253 2254 return NXT_OK; 2255 } 2256 2257 2258 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2259 NXT_LVLHSH_DEFAULT, 2260 nxt_router_apps_hash_test, 2261 nxt_mp_lvlhsh_alloc, 2262 nxt_mp_lvlhsh_free, 2263 }; 2264 2265 2266 static nxt_int_t 2267 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2268 { 2269 nxt_app_t *app; 2270 2271 app = data; 2272 2273 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2274 } 2275 2276 2277 static nxt_int_t 2278 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2279 { 2280 nxt_lvlhsh_query_t lhq; 2281 2282 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2283 lhq.replace = 0; 2284 lhq.key = app->name; 2285 lhq.value = app; 2286 lhq.proto = &nxt_router_apps_hash_proto; 2287 lhq.pool = rtcf->mem_pool; 2288 2289 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2290 2291 case NXT_OK: 2292 return NXT_OK; 2293 2294 case NXT_DECLINED: 2295 nxt_thread_log_alert("router app hash adding failed: " 2296 "\"%V\" is already in hash", &lhq.key); 2297 /* Fall through. */ 2298 default: 2299 return NXT_ERROR; 2300 } 2301 } 2302 2303 2304 static nxt_app_t * 2305 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2306 { 2307 nxt_lvlhsh_query_t lhq; 2308 2309 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2310 lhq.key = *name; 2311 lhq.proto = &nxt_router_apps_hash_proto; 2312 2313 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2314 return NULL; 2315 } 2316 2317 return lhq.value; 2318 } 2319 2320 2321 static void 2322 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2323 { 2324 nxt_app_t *app; 2325 nxt_lvlhsh_each_t lhe; 2326 2327 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2328 2329 for ( ;; ) { 2330 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2331 2332 if (app == NULL) { 2333 break; 2334 } 2335 2336 nxt_router_app_use(task, app, i); 2337 } 2338 } 2339 2340 2341 typedef struct { 2342 nxt_app_t *app; 2343 nxt_int_t target; 2344 } nxt_http_app_conf_t; 2345 2346 2347 nxt_int_t 2348 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name, 2349 nxt_str_t *target, nxt_http_action_t *action) 2350 { 2351 nxt_app_t *app; 2352 nxt_str_t *targets; 2353 nxt_uint_t i; 2354 nxt_http_app_conf_t *conf; 2355 2356 app = nxt_router_apps_hash_get(rtcf, name); 2357 if (app == NULL) { 2358 return NXT_DECLINED; 2359 } 2360 2361 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t)); 2362 if (nxt_slow_path(conf == NULL)) { 2363 return NXT_ERROR; 2364 } 2365 2366 action->handler = nxt_http_application_handler; 2367 action->u.conf = conf; 2368 2369 conf->app = app; 2370 2371 if (target != NULL && target->length != 0) { 2372 targets = app->targets; 2373 2374 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++); 2375 2376 conf->target = i; 2377 2378 } else { 2379 conf->target = 0; 2380 } 2381 2382 return NXT_OK; 2383 } 2384 2385 2386 static nxt_socket_conf_t * 2387 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2388 nxt_str_t *name) 2389 { 2390 size_t size; 2391 nxt_int_t ret; 2392 nxt_bool_t wildcard; 2393 nxt_sockaddr_t *sa; 2394 nxt_socket_conf_t *skcf; 2395 nxt_listen_socket_t *ls; 2396 2397 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2398 if (nxt_slow_path(sa == NULL)) { 2399 nxt_alert(task, "invalid listener \"%V\"", name); 2400 return NULL; 2401 } 2402 2403 sa->type = SOCK_STREAM; 2404 2405 nxt_debug(task, "router listener: \"%*s\"", 2406 (size_t) sa->length, nxt_sockaddr_start(sa)); 2407 2408 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2409 if (nxt_slow_path(skcf == NULL)) { 2410 return NULL; 2411 } 2412 2413 size = nxt_sockaddr_size(sa); 2414 2415 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2416 2417 if (ret != NXT_OK) { 2418 2419 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2420 if (nxt_slow_path(ls == NULL)) { 2421 return NULL; 2422 } 2423 2424 skcf->listen = ls; 2425 2426 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2427 nxt_memcpy(ls->sockaddr, sa, size); 2428 2429 nxt_listen_socket_remote_size(ls); 2430 2431 ls->socket = -1; 2432 ls->backlog = NXT_LISTEN_BACKLOG; 2433 ls->flags = NXT_NONBLOCK; 2434 ls->read_after_accept = 1; 2435 } 2436 2437 switch (sa->u.sockaddr.sa_family) { 2438 #if (NXT_HAVE_UNIX_DOMAIN) 2439 case AF_UNIX: 2440 wildcard = 0; 2441 break; 2442 #endif 2443 #if (NXT_INET6) 2444 case AF_INET6: 2445 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2446 break; 2447 #endif 2448 case AF_INET: 2449 default: 2450 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2451 break; 2452 } 2453 2454 if (!wildcard) { 2455 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2456 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2457 return NULL; 2458 } 2459 2460 nxt_memcpy(skcf->sockaddr, sa, size); 2461 } 2462 2463 return skcf; 2464 } 2465 2466 2467 static nxt_int_t 2468 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2469 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2470 { 2471 nxt_router_t *router; 2472 nxt_queue_link_t *qlk; 2473 nxt_socket_conf_t *skcf; 2474 2475 router = tmcf->router_conf->router; 2476 2477 for (qlk = nxt_queue_first(&router->sockets); 2478 qlk != nxt_queue_tail(&router->sockets); 2479 qlk = nxt_queue_next(qlk)) 2480 { 2481 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2482 2483 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2484 nskcf->listen = skcf->listen; 2485 2486 nxt_queue_remove(qlk); 2487 nxt_queue_insert_tail(&keeping_sockets, qlk); 2488 2489 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2490 2491 return NXT_OK; 2492 } 2493 } 2494 2495 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2496 2497 return NXT_DECLINED; 2498 } 2499 2500 2501 static void 2502 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2503 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2504 { 2505 size_t size; 2506 uint32_t stream; 2507 nxt_int_t ret; 2508 nxt_buf_t *b; 2509 nxt_port_t *main_port, *router_port; 2510 nxt_runtime_t *rt; 2511 nxt_socket_rpc_t *rpc; 2512 2513 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2514 if (rpc == NULL) { 2515 goto fail; 2516 } 2517 2518 rpc->socket_conf = skcf; 2519 rpc->temp_conf = tmcf; 2520 2521 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2522 2523 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2524 if (b == NULL) { 2525 goto fail; 2526 } 2527 2528 b->completion_handler = nxt_buf_dummy_completion; 2529 2530 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2531 2532 rt = task->thread->runtime; 2533 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2534 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2535 2536 stream = nxt_port_rpc_register_handler(task, router_port, 2537 nxt_router_listen_socket_ready, 2538 nxt_router_listen_socket_error, 2539 main_port->pid, rpc); 2540 if (nxt_slow_path(stream == 0)) { 2541 goto fail; 2542 } 2543 2544 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2545 stream, router_port->id, b); 2546 2547 if (nxt_slow_path(ret != NXT_OK)) { 2548 nxt_port_rpc_cancel(task, router_port, stream); 2549 goto fail; 2550 } 2551 2552 return; 2553 2554 fail: 2555 2556 nxt_router_conf_error(task, tmcf); 2557 } 2558 2559 2560 static void 2561 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2562 void *data) 2563 { 2564 nxt_int_t ret; 2565 nxt_socket_t s; 2566 nxt_socket_rpc_t *rpc; 2567 2568 rpc = data; 2569 2570 s = msg->fd[0]; 2571 2572 ret = nxt_socket_nonblocking(task, s); 2573 if (nxt_slow_path(ret != NXT_OK)) { 2574 goto fail; 2575 } 2576 2577 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2578 2579 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2580 if (nxt_slow_path(ret != NXT_OK)) { 2581 goto fail; 2582 } 2583 2584 rpc->socket_conf->listen->socket = s; 2585 2586 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2587 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2588 2589 return; 2590 2591 fail: 2592 2593 nxt_socket_close(task, s); 2594 2595 nxt_router_conf_error(task, rpc->temp_conf); 2596 } 2597 2598 2599 static void 2600 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2601 void *data) 2602 { 2603 nxt_socket_rpc_t *rpc; 2604 nxt_router_temp_conf_t *tmcf; 2605 2606 rpc = data; 2607 tmcf = rpc->temp_conf; 2608 2609 #if 0 2610 u_char *p; 2611 size_t size; 2612 uint8_t error; 2613 nxt_buf_t *in, *out; 2614 nxt_sockaddr_t *sa; 2615 2616 static nxt_str_t socket_errors[] = { 2617 nxt_string("ListenerSystem"), 2618 nxt_string("ListenerNoIPv6"), 2619 nxt_string("ListenerPort"), 2620 nxt_string("ListenerInUse"), 2621 nxt_string("ListenerNoAddress"), 2622 nxt_string("ListenerNoAccess"), 2623 nxt_string("ListenerPath"), 2624 }; 2625 2626 sa = rpc->socket_conf->listen->sockaddr; 2627 2628 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2629 2630 if (nxt_slow_path(in == NULL)) { 2631 return; 2632 } 2633 2634 p = in->mem.pos; 2635 2636 error = *p++; 2637 2638 size = nxt_length("listen socket error: ") 2639 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2640 + sa->length + socket_errors[error].length + (in->mem.free - p); 2641 2642 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2643 if (nxt_slow_path(out == NULL)) { 2644 return; 2645 } 2646 2647 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2648 "listen socket error: " 2649 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2650 (size_t) sa->length, nxt_sockaddr_start(sa), 2651 &socket_errors[error], in->mem.free - p, p); 2652 2653 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2654 #endif 2655 2656 nxt_router_conf_error(task, tmcf); 2657 } 2658 2659 2660 #if (NXT_TLS) 2661 2662 static void 2663 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2664 void *data) 2665 { 2666 nxt_mp_t *mp; 2667 nxt_int_t ret; 2668 nxt_tls_conf_t *tlscf; 2669 nxt_router_tlssock_t *tls; 2670 nxt_tls_bundle_conf_t *bundle; 2671 nxt_router_temp_conf_t *tmcf; 2672 2673 nxt_debug(task, "tls rpc handler"); 2674 2675 tls = data; 2676 tmcf = tls->temp_conf; 2677 2678 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2679 goto fail; 2680 } 2681 2682 mp = tmcf->router_conf->mem_pool; 2683 2684 if (tls->socket_conf->tls == NULL){ 2685 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2686 if (nxt_slow_path(tlscf == NULL)) { 2687 goto fail; 2688 } 2689 2690 tlscf->no_wait_shutdown = 1; 2691 tls->socket_conf->tls = tlscf; 2692 2693 } else { 2694 tlscf = tls->socket_conf->tls; 2695 } 2696 2697 tls->tls_init->conf = tlscf; 2698 2699 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); 2700 if (nxt_slow_path(bundle == NULL)) { 2701 goto fail; 2702 } 2703 2704 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) { 2705 goto fail; 2706 } 2707 2708 bundle->chain_file = msg->fd[0]; 2709 bundle->next = tlscf->bundle; 2710 tlscf->bundle = bundle; 2711 2712 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init, 2713 tls->last); 2714 if (nxt_slow_path(ret != NXT_OK)) { 2715 goto fail; 2716 } 2717 2718 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2719 nxt_router_conf_apply, task, tmcf, NULL); 2720 return; 2721 2722 fail: 2723 2724 nxt_router_conf_error(task, tmcf); 2725 } 2726 2727 #endif 2728 2729 2730 static void 2731 nxt_router_app_rpc_create(nxt_task_t *task, 2732 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2733 { 2734 size_t size; 2735 uint32_t stream; 2736 nxt_int_t ret; 2737 nxt_buf_t *b; 2738 nxt_port_t *main_port, *router_port; 2739 nxt_runtime_t *rt; 2740 nxt_app_rpc_t *rpc; 2741 2742 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2743 if (rpc == NULL) { 2744 goto fail; 2745 } 2746 2747 rpc->app = app; 2748 rpc->temp_conf = tmcf; 2749 2750 nxt_debug(task, "app '%V' prefork", &app->name); 2751 2752 size = app->name.length + 1 + app->conf.length; 2753 2754 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2755 if (nxt_slow_path(b == NULL)) { 2756 goto fail; 2757 } 2758 2759 b->completion_handler = nxt_buf_dummy_completion; 2760 2761 nxt_buf_cpystr(b, &app->name); 2762 *b->mem.free++ = '\0'; 2763 nxt_buf_cpystr(b, &app->conf); 2764 2765 rt = task->thread->runtime; 2766 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2767 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2768 2769 stream = nxt_port_rpc_register_handler(task, router_port, 2770 nxt_router_app_prefork_ready, 2771 nxt_router_app_prefork_error, 2772 -1, rpc); 2773 if (nxt_slow_path(stream == 0)) { 2774 goto fail; 2775 } 2776 2777 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 2778 -1, stream, router_port->id, b); 2779 2780 if (nxt_slow_path(ret != NXT_OK)) { 2781 nxt_port_rpc_cancel(task, router_port, stream); 2782 goto fail; 2783 } 2784 2785 app->pending_processes++; 2786 2787 return; 2788 2789 fail: 2790 2791 nxt_router_conf_error(task, tmcf); 2792 } 2793 2794 2795 static void 2796 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2797 void *data) 2798 { 2799 nxt_app_t *app; 2800 nxt_port_t *port; 2801 nxt_app_rpc_t *rpc; 2802 nxt_event_engine_t *engine; 2803 2804 rpc = data; 2805 app = rpc->app; 2806 2807 port = msg->u.new_port; 2808 2809 nxt_assert(port != NULL); 2810 nxt_assert(port->type == NXT_PROCESS_APP); 2811 nxt_assert(port->id == 0); 2812 2813 port->app = app; 2814 port->main_app_port = port; 2815 2816 app->pending_processes--; 2817 app->processes++; 2818 app->idle_processes++; 2819 2820 engine = task->thread->engine; 2821 2822 nxt_queue_insert_tail(&app->ports, &port->app_link); 2823 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2824 2825 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2826 &app->name, port->pid, port->id); 2827 2828 nxt_port_hash_add(&app->port_hash, port); 2829 app->port_hash_count++; 2830 2831 port->idle_start = 0; 2832 2833 nxt_port_inc_use(port); 2834 2835 nxt_router_app_shared_port_send(task, port); 2836 2837 nxt_work_queue_add(&engine->fast_work_queue, 2838 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2839 } 2840 2841 2842 static void 2843 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2844 void *data) 2845 { 2846 nxt_app_t *app; 2847 nxt_app_rpc_t *rpc; 2848 nxt_router_temp_conf_t *tmcf; 2849 2850 rpc = data; 2851 app = rpc->app; 2852 tmcf = rpc->temp_conf; 2853 2854 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2855 &app->name); 2856 2857 app->pending_processes--; 2858 2859 nxt_router_conf_error(task, tmcf); 2860 } 2861 2862 2863 static nxt_int_t 2864 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2865 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2866 { 2867 nxt_int_t ret; 2868 nxt_uint_t n, threads; 2869 nxt_queue_link_t *qlk; 2870 nxt_router_engine_conf_t *recf; 2871 2872 threads = tmcf->router_conf->threads; 2873 2874 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2875 sizeof(nxt_router_engine_conf_t)); 2876 if (nxt_slow_path(tmcf->engines == NULL)) { 2877 return NXT_ERROR; 2878 } 2879 2880 n = 0; 2881 2882 for (qlk = nxt_queue_first(&router->engines); 2883 qlk != nxt_queue_tail(&router->engines); 2884 qlk = nxt_queue_next(qlk)) 2885 { 2886 recf = nxt_array_zero_add(tmcf->engines); 2887 if (nxt_slow_path(recf == NULL)) { 2888 return NXT_ERROR; 2889 } 2890 2891 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2892 2893 if (n < threads) { 2894 recf->action = NXT_ROUTER_ENGINE_KEEP; 2895 ret = nxt_router_engine_conf_update(tmcf, recf); 2896 2897 } else { 2898 recf->action = NXT_ROUTER_ENGINE_DELETE; 2899 ret = nxt_router_engine_conf_delete(tmcf, recf); 2900 } 2901 2902 if (nxt_slow_path(ret != NXT_OK)) { 2903 return ret; 2904 } 2905 2906 n++; 2907 } 2908 2909 tmcf->new_threads = n; 2910 2911 while (n < threads) { 2912 recf = nxt_array_zero_add(tmcf->engines); 2913 if (nxt_slow_path(recf == NULL)) { 2914 return NXT_ERROR; 2915 } 2916 2917 recf->action = NXT_ROUTER_ENGINE_ADD; 2918 2919 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2920 if (nxt_slow_path(recf->engine == NULL)) { 2921 return NXT_ERROR; 2922 } 2923 2924 ret = nxt_router_engine_conf_create(tmcf, recf); 2925 if (nxt_slow_path(ret != NXT_OK)) { 2926 return ret; 2927 } 2928 2929 n++; 2930 } 2931 2932 return NXT_OK; 2933 } 2934 2935 2936 static nxt_int_t 2937 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2938 nxt_router_engine_conf_t *recf) 2939 { 2940 nxt_int_t ret; 2941 2942 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2943 nxt_router_listen_socket_create); 2944 if (nxt_slow_path(ret != NXT_OK)) { 2945 return ret; 2946 } 2947 2948 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2949 nxt_router_listen_socket_create); 2950 if (nxt_slow_path(ret != NXT_OK)) { 2951 return ret; 2952 } 2953 2954 return ret; 2955 } 2956 2957 2958 static nxt_int_t 2959 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2960 nxt_router_engine_conf_t *recf) 2961 { 2962 nxt_int_t ret; 2963 2964 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2965 nxt_router_listen_socket_create); 2966 if (nxt_slow_path(ret != NXT_OK)) { 2967 return ret; 2968 } 2969 2970 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2971 nxt_router_listen_socket_update); 2972 if (nxt_slow_path(ret != NXT_OK)) { 2973 return ret; 2974 } 2975 2976 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 2977 if (nxt_slow_path(ret != NXT_OK)) { 2978 return ret; 2979 } 2980 2981 return ret; 2982 } 2983 2984 2985 static nxt_int_t 2986 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2987 nxt_router_engine_conf_t *recf) 2988 { 2989 nxt_int_t ret; 2990 2991 ret = nxt_router_engine_quit(tmcf, recf); 2992 if (nxt_slow_path(ret != NXT_OK)) { 2993 return ret; 2994 } 2995 2996 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 2997 if (nxt_slow_path(ret != NXT_OK)) { 2998 return ret; 2999 } 3000 3001 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3002 } 3003 3004 3005 static nxt_int_t 3006 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 3007 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 3008 nxt_work_handler_t handler) 3009 { 3010 nxt_int_t ret; 3011 nxt_joint_job_t *job; 3012 nxt_queue_link_t *qlk; 3013 nxt_socket_conf_t *skcf; 3014 nxt_socket_conf_joint_t *joint; 3015 3016 for (qlk = nxt_queue_first(sockets); 3017 qlk != nxt_queue_tail(sockets); 3018 qlk = nxt_queue_next(qlk)) 3019 { 3020 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3021 if (nxt_slow_path(job == NULL)) { 3022 return NXT_ERROR; 3023 } 3024 3025 job->work.next = recf->jobs; 3026 recf->jobs = &job->work; 3027 3028 job->task = tmcf->engine->task; 3029 job->work.handler = handler; 3030 job->work.task = &job->task; 3031 job->work.obj = job; 3032 job->tmcf = tmcf; 3033 3034 tmcf->count++; 3035 3036 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 3037 sizeof(nxt_socket_conf_joint_t)); 3038 if (nxt_slow_path(joint == NULL)) { 3039 return NXT_ERROR; 3040 } 3041 3042 job->work.data = joint; 3043 3044 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 3045 if (nxt_slow_path(ret != NXT_OK)) { 3046 return ret; 3047 } 3048 3049 joint->count = 1; 3050 3051 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3052 skcf->count++; 3053 joint->socket_conf = skcf; 3054 3055 joint->engine = recf->engine; 3056 } 3057 3058 return NXT_OK; 3059 } 3060 3061 3062 static nxt_int_t 3063 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 3064 nxt_router_engine_conf_t *recf) 3065 { 3066 nxt_joint_job_t *job; 3067 3068 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3069 if (nxt_slow_path(job == NULL)) { 3070 return NXT_ERROR; 3071 } 3072 3073 job->work.next = recf->jobs; 3074 recf->jobs = &job->work; 3075 3076 job->task = tmcf->engine->task; 3077 job->work.handler = nxt_router_worker_thread_quit; 3078 job->work.task = &job->task; 3079 job->work.obj = NULL; 3080 job->work.data = NULL; 3081 job->tmcf = NULL; 3082 3083 return NXT_OK; 3084 } 3085 3086 3087 static nxt_int_t 3088 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 3089 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 3090 { 3091 nxt_joint_job_t *job; 3092 nxt_queue_link_t *qlk; 3093 3094 for (qlk = nxt_queue_first(sockets); 3095 qlk != nxt_queue_tail(sockets); 3096 qlk = nxt_queue_next(qlk)) 3097 { 3098 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3099 if (nxt_slow_path(job == NULL)) { 3100 return NXT_ERROR; 3101 } 3102 3103 job->work.next = recf->jobs; 3104 recf->jobs = &job->work; 3105 3106 job->task = tmcf->engine->task; 3107 job->work.handler = nxt_router_listen_socket_delete; 3108 job->work.task = &job->task; 3109 job->work.obj = job; 3110 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3111 job->tmcf = tmcf; 3112 3113 tmcf->count++; 3114 } 3115 3116 return NXT_OK; 3117 } 3118 3119 3120 static nxt_int_t 3121 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 3122 nxt_router_temp_conf_t *tmcf) 3123 { 3124 nxt_int_t ret; 3125 nxt_uint_t i, threads; 3126 nxt_router_engine_conf_t *recf; 3127 3128 recf = tmcf->engines->elts; 3129 threads = tmcf->router_conf->threads; 3130 3131 for (i = tmcf->new_threads; i < threads; i++) { 3132 ret = nxt_router_thread_create(task, rt, recf[i].engine); 3133 if (nxt_slow_path(ret != NXT_OK)) { 3134 return ret; 3135 } 3136 } 3137 3138 return NXT_OK; 3139 } 3140 3141 3142 static nxt_int_t 3143 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 3144 nxt_event_engine_t *engine) 3145 { 3146 nxt_int_t ret; 3147 nxt_thread_link_t *link; 3148 nxt_thread_handle_t handle; 3149 3150 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 3151 3152 if (nxt_slow_path(link == NULL)) { 3153 return NXT_ERROR; 3154 } 3155 3156 link->start = nxt_router_thread_start; 3157 link->engine = engine; 3158 link->work.handler = nxt_router_thread_exit_handler; 3159 link->work.task = task; 3160 link->work.data = link; 3161 3162 nxt_queue_insert_tail(&rt->engines, &engine->link); 3163 3164 ret = nxt_thread_create(&handle, link); 3165 3166 if (nxt_slow_path(ret != NXT_OK)) { 3167 nxt_queue_remove(&engine->link); 3168 } 3169 3170 return ret; 3171 } 3172 3173 3174 static void 3175 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 3176 nxt_router_temp_conf_t *tmcf) 3177 { 3178 nxt_app_t *app; 3179 3180 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 3181 3182 nxt_router_app_unlink(task, app); 3183 3184 } nxt_queue_loop; 3185 3186 nxt_queue_add(&router->apps, &tmcf->previous); 3187 nxt_queue_add(&router->apps, &tmcf->apps); 3188 } 3189 3190 3191 static void 3192 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 3193 { 3194 nxt_uint_t n; 3195 nxt_event_engine_t *engine; 3196 nxt_router_engine_conf_t *recf; 3197 3198 recf = tmcf->engines->elts; 3199 3200 for (n = tmcf->engines->nelts; n != 0; n--) { 3201 engine = recf->engine; 3202 3203 switch (recf->action) { 3204 3205 case NXT_ROUTER_ENGINE_KEEP: 3206 break; 3207 3208 case NXT_ROUTER_ENGINE_ADD: 3209 nxt_queue_insert_tail(&router->engines, &engine->link0); 3210 break; 3211 3212 case NXT_ROUTER_ENGINE_DELETE: 3213 nxt_queue_remove(&engine->link0); 3214 break; 3215 } 3216 3217 nxt_router_engine_post(engine, recf->jobs); 3218 3219 recf++; 3220 } 3221 } 3222 3223 3224 static void 3225 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 3226 { 3227 nxt_work_t *work, *next; 3228 3229 for (work = jobs; work != NULL; work = next) { 3230 next = work->next; 3231 work->next = NULL; 3232 3233 nxt_event_engine_post(engine, work); 3234 } 3235 } 3236 3237 3238 static nxt_port_handlers_t nxt_router_app_port_handlers = { 3239 .rpc_error = nxt_port_rpc_handler, 3240 .mmap = nxt_port_mmap_handler, 3241 .data = nxt_port_rpc_handler, 3242 .oosm = nxt_router_oosm_handler, 3243 .req_headers_ack = nxt_port_rpc_handler, 3244 }; 3245 3246 3247 static void 3248 nxt_router_thread_start(void *data) 3249 { 3250 nxt_int_t ret; 3251 nxt_port_t *port; 3252 nxt_task_t *task; 3253 nxt_work_t *work; 3254 nxt_thread_t *thread; 3255 nxt_thread_link_t *link; 3256 nxt_event_engine_t *engine; 3257 3258 link = data; 3259 engine = link->engine; 3260 task = &engine->task; 3261 3262 thread = nxt_thread(); 3263 3264 nxt_event_engine_thread_adopt(engine); 3265 3266 /* STUB */ 3267 thread->runtime = engine->task.thread->runtime; 3268 3269 engine->task.thread = thread; 3270 engine->task.log = thread->log; 3271 thread->engine = engine; 3272 thread->task = &engine->task; 3273 #if 0 3274 thread->fiber = &engine->fibers->fiber; 3275 #endif 3276 3277 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 3278 if (nxt_slow_path(engine->mem_pool == NULL)) { 3279 return; 3280 } 3281 3282 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 3283 NXT_PROCESS_ROUTER); 3284 if (nxt_slow_path(port == NULL)) { 3285 return; 3286 } 3287 3288 ret = nxt_port_socket_init(task, port, 0); 3289 if (nxt_slow_path(ret != NXT_OK)) { 3290 nxt_port_use(task, port, -1); 3291 return; 3292 } 3293 3294 ret = nxt_router_port_queue_init(task, port); 3295 if (nxt_slow_path(ret != NXT_OK)) { 3296 nxt_port_use(task, port, -1); 3297 return; 3298 } 3299 3300 engine->port = port; 3301 3302 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 3303 3304 work = nxt_zalloc(sizeof(nxt_work_t)); 3305 if (nxt_slow_path(work == NULL)) { 3306 return; 3307 } 3308 3309 work->handler = nxt_router_rt_add_port; 3310 work->task = link->work.task; 3311 work->obj = work; 3312 work->data = port; 3313 3314 nxt_event_engine_post(link->work.task->thread->engine, work); 3315 3316 nxt_event_engine_start(engine); 3317 } 3318 3319 3320 static void 3321 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) 3322 { 3323 nxt_int_t res; 3324 nxt_port_t *port; 3325 nxt_runtime_t *rt; 3326 3327 rt = task->thread->runtime; 3328 port = data; 3329 3330 nxt_free(obj); 3331 3332 res = nxt_port_hash_add(&rt->ports, port); 3333 3334 if (nxt_fast_path(res == NXT_OK)) { 3335 nxt_port_use(task, port, 1); 3336 } 3337 } 3338 3339 3340 static void 3341 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 3342 { 3343 nxt_joint_job_t *job; 3344 nxt_socket_conf_t *skcf; 3345 nxt_listen_event_t *lev; 3346 nxt_listen_socket_t *ls; 3347 nxt_thread_spinlock_t *lock; 3348 nxt_socket_conf_joint_t *joint; 3349 3350 job = obj; 3351 joint = data; 3352 3353 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 3354 3355 skcf = joint->socket_conf; 3356 ls = skcf->listen; 3357 3358 lev = nxt_listen_event(task, ls); 3359 if (nxt_slow_path(lev == NULL)) { 3360 nxt_router_listen_socket_release(task, skcf); 3361 return; 3362 } 3363 3364 lev->socket.data = joint; 3365 3366 lock = &skcf->router_conf->router->lock; 3367 3368 nxt_thread_spin_lock(lock); 3369 ls->count++; 3370 nxt_thread_spin_unlock(lock); 3371 3372 job->work.next = NULL; 3373 job->work.handler = nxt_router_conf_wait; 3374 3375 nxt_event_engine_post(job->tmcf->engine, &job->work); 3376 } 3377 3378 3379 nxt_inline nxt_listen_event_t * 3380 nxt_router_listen_event(nxt_queue_t *listen_connections, 3381 nxt_socket_conf_t *skcf) 3382 { 3383 nxt_socket_t fd; 3384 nxt_queue_link_t *qlk; 3385 nxt_listen_event_t *lev; 3386 3387 fd = skcf->listen->socket; 3388 3389 for (qlk = nxt_queue_first(listen_connections); 3390 qlk != nxt_queue_tail(listen_connections); 3391 qlk = nxt_queue_next(qlk)) 3392 { 3393 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 3394 3395 if (fd == lev->socket.fd) { 3396 return lev; 3397 } 3398 } 3399 3400 return NULL; 3401 } 3402 3403 3404 static void 3405 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 3406 { 3407 nxt_joint_job_t *job; 3408 nxt_event_engine_t *engine; 3409 nxt_listen_event_t *lev; 3410 nxt_socket_conf_joint_t *joint, *old; 3411 3412 job = obj; 3413 joint = data; 3414 3415 engine = task->thread->engine; 3416 3417 nxt_queue_insert_tail(&engine->joints, &joint->link); 3418 3419 lev = nxt_router_listen_event(&engine->listen_connections, 3420 joint->socket_conf); 3421 3422 old = lev->socket.data; 3423 lev->socket.data = joint; 3424 lev->listen = joint->socket_conf->listen; 3425 3426 job->work.next = NULL; 3427 job->work.handler = nxt_router_conf_wait; 3428 3429 nxt_event_engine_post(job->tmcf->engine, &job->work); 3430 3431 /* 3432 * The task is allocated from configuration temporary 3433 * memory pool so it can be freed after engine post operation. 3434 */ 3435 3436 nxt_router_conf_release(&engine->task, old); 3437 } 3438 3439 3440 static void 3441 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 3442 { 3443 nxt_socket_conf_t *skcf; 3444 nxt_listen_event_t *lev; 3445 nxt_event_engine_t *engine; 3446 nxt_socket_conf_joint_t *joint; 3447 3448 skcf = data; 3449 3450 engine = task->thread->engine; 3451 3452 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 3453 3454 nxt_fd_event_delete(engine, &lev->socket); 3455 3456 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 3457 lev->socket.fd); 3458 3459 joint = lev->socket.data; 3460 joint->close_job = obj; 3461 3462 lev->timer.handler = nxt_router_listen_socket_close; 3463 lev->timer.work_queue = &engine->fast_work_queue; 3464 3465 nxt_timer_add(engine, &lev->timer, 0); 3466 } 3467 3468 3469 static void 3470 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 3471 { 3472 nxt_event_engine_t *engine; 3473 3474 nxt_debug(task, "router worker thread quit"); 3475 3476 engine = task->thread->engine; 3477 3478 engine->shutdown = 1; 3479 3480 if (nxt_queue_is_empty(&engine->joints)) { 3481 nxt_thread_exit(task->thread); 3482 } 3483 } 3484 3485 3486 static void 3487 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3488 { 3489 nxt_timer_t *timer; 3490 nxt_joint_job_t *job; 3491 nxt_listen_event_t *lev; 3492 nxt_socket_conf_joint_t *joint; 3493 3494 timer = obj; 3495 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3496 3497 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3498 lev->socket.fd); 3499 3500 nxt_queue_remove(&lev->link); 3501 3502 joint = lev->socket.data; 3503 lev->socket.data = NULL; 3504 3505 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3506 task = &task->thread->engine->task; 3507 3508 nxt_router_listen_socket_release(task, joint->socket_conf); 3509 3510 job = joint->close_job; 3511 job->work.next = NULL; 3512 job->work.handler = nxt_router_conf_wait; 3513 3514 nxt_event_engine_post(job->tmcf->engine, &job->work); 3515 3516 nxt_router_listen_event_release(task, lev, joint); 3517 } 3518 3519 3520 static void 3521 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3522 { 3523 nxt_listen_socket_t *ls; 3524 nxt_thread_spinlock_t *lock; 3525 3526 ls = skcf->listen; 3527 lock = &skcf->router_conf->router->lock; 3528 3529 nxt_thread_spin_lock(lock); 3530 3531 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3532 task->thread->engine, ls->count); 3533 3534 if (--ls->count != 0) { 3535 ls = NULL; 3536 } 3537 3538 nxt_thread_spin_unlock(lock); 3539 3540 if (ls != NULL) { 3541 nxt_socket_close(task, ls->socket); 3542 nxt_free(ls); 3543 } 3544 } 3545 3546 3547 void 3548 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3549 nxt_socket_conf_joint_t *joint) 3550 { 3551 nxt_event_engine_t *engine; 3552 3553 nxt_debug(task, "listen event count: %D", lev->count); 3554 3555 engine = task->thread->engine; 3556 3557 if (--lev->count == 0) { 3558 if (lev->next != NULL) { 3559 nxt_sockaddr_cache_free(engine, lev->next); 3560 3561 nxt_conn_free(task, lev->next); 3562 } 3563 3564 nxt_free(lev); 3565 } 3566 3567 if (joint != NULL) { 3568 nxt_router_conf_release(task, joint); 3569 } 3570 3571 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3572 nxt_thread_exit(task->thread); 3573 } 3574 } 3575 3576 3577 void 3578 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3579 { 3580 nxt_socket_conf_t *skcf; 3581 nxt_router_conf_t *rtcf; 3582 nxt_thread_spinlock_t *lock; 3583 3584 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3585 3586 if (--joint->count != 0) { 3587 return; 3588 } 3589 3590 nxt_queue_remove(&joint->link); 3591 3592 /* 3593 * The joint content can not be safely used after the critical 3594 * section protected by the spinlock because its memory pool may 3595 * be already destroyed by another thread. 3596 */ 3597 skcf = joint->socket_conf; 3598 rtcf = skcf->router_conf; 3599 lock = &rtcf->router->lock; 3600 3601 nxt_thread_spin_lock(lock); 3602 3603 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3604 rtcf, rtcf->count); 3605 3606 if (--skcf->count != 0) { 3607 skcf = NULL; 3608 rtcf = NULL; 3609 3610 } else { 3611 nxt_queue_remove(&skcf->link); 3612 3613 if (--rtcf->count != 0) { 3614 rtcf = NULL; 3615 } 3616 } 3617 3618 nxt_thread_spin_unlock(lock); 3619 3620 #if (NXT_TLS) 3621 if (skcf != NULL && skcf->tls != NULL) { 3622 task->thread->runtime->tls->server_free(task, skcf->tls); 3623 } 3624 #endif 3625 3626 /* TODO remove engine->port */ 3627 3628 if (rtcf != NULL) { 3629 nxt_debug(task, "old router conf is destroyed"); 3630 3631 nxt_router_apps_hash_use(task, rtcf, -1); 3632 3633 nxt_router_access_log_release(task, lock, rtcf->access_log); 3634 3635 nxt_mp_thread_adopt(rtcf->mem_pool); 3636 3637 nxt_mp_destroy(rtcf->mem_pool); 3638 } 3639 } 3640 3641 3642 static void 3643 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3644 nxt_router_access_log_t *access_log) 3645 { 3646 size_t size; 3647 u_char *buf, *p; 3648 nxt_off_t bytes; 3649 3650 static nxt_time_string_t date_cache = { 3651 (nxt_atomic_uint_t) -1, 3652 nxt_router_access_log_date, 3653 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3654 nxt_length("31/Dec/1986:19:40:00 +0300"), 3655 NXT_THREAD_TIME_LOCAL, 3656 NXT_THREAD_TIME_SEC, 3657 }; 3658 3659 size = r->remote->address_length 3660 + 6 /* ' - - [' */ 3661 + date_cache.size 3662 + 3 /* '] "' */ 3663 + r->method->length 3664 + 1 /* space */ 3665 + r->target.length 3666 + 1 /* space */ 3667 + r->version.length 3668 + 2 /* '" ' */ 3669 + 3 /* status */ 3670 + 1 /* space */ 3671 + NXT_OFF_T_LEN 3672 + 2 /* ' "' */ 3673 + (r->referer != NULL ? r->referer->value_length : 1) 3674 + 3 /* '" "' */ 3675 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3676 + 2 /* '"\n' */ 3677 ; 3678 3679 buf = nxt_mp_nget(r->mem_pool, size); 3680 if (nxt_slow_path(buf == NULL)) { 3681 return; 3682 } 3683 3684 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3685 r->remote->address_length); 3686 3687 p = nxt_cpymem(p, " - - [", 6); 3688 3689 p = nxt_thread_time_string(task->thread, &date_cache, p); 3690 3691 p = nxt_cpymem(p, "] \"", 3); 3692 3693 if (r->method->length != 0) { 3694 p = nxt_cpymem(p, r->method->start, r->method->length); 3695 3696 if (r->target.length != 0) { 3697 *p++ = ' '; 3698 p = nxt_cpymem(p, r->target.start, r->target.length); 3699 3700 if (r->version.length != 0) { 3701 *p++ = ' '; 3702 p = nxt_cpymem(p, r->version.start, r->version.length); 3703 } 3704 } 3705 3706 } else { 3707 *p++ = '-'; 3708 } 3709 3710 p = nxt_cpymem(p, "\" ", 2); 3711 3712 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3713 3714 *p++ = ' '; 3715 3716 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3717 3718 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3719 3720 p = nxt_cpymem(p, " \"", 2); 3721 3722 if (r->referer != NULL) { 3723 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3724 3725 } else { 3726 *p++ = '-'; 3727 } 3728 3729 p = nxt_cpymem(p, "\" \"", 3); 3730 3731 if (r->user_agent != NULL) { 3732 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3733 3734 } else { 3735 *p++ = '-'; 3736 } 3737 3738 p = nxt_cpymem(p, "\"\n", 2); 3739 3740 nxt_fd_write(access_log->fd, buf, p - buf); 3741 } 3742 3743 3744 static u_char * 3745 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3746 size_t size, const char *format) 3747 { 3748 u_char sign; 3749 time_t gmtoff; 3750 3751 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3752 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3753 3754 gmtoff = nxt_timezone(tm) / 60; 3755 3756 if (gmtoff < 0) { 3757 gmtoff = -gmtoff; 3758 sign = '-'; 3759 3760 } else { 3761 sign = '+'; 3762 } 3763 3764 return nxt_sprintf(buf, buf + size, format, 3765 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3766 tm->tm_hour, tm->tm_min, tm->tm_sec, 3767 sign, gmtoff / 60, gmtoff % 60); 3768 } 3769 3770 3771 static void 3772 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3773 { 3774 uint32_t stream; 3775 nxt_int_t ret; 3776 nxt_buf_t *b; 3777 nxt_port_t *main_port, *router_port; 3778 nxt_runtime_t *rt; 3779 nxt_router_access_log_t *access_log; 3780 3781 access_log = tmcf->router_conf->access_log; 3782 3783 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3784 if (nxt_slow_path(b == NULL)) { 3785 goto fail; 3786 } 3787 3788 b->completion_handler = nxt_buf_dummy_completion; 3789 3790 nxt_buf_cpystr(b, &access_log->path); 3791 *b->mem.free++ = '\0'; 3792 3793 rt = task->thread->runtime; 3794 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3795 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3796 3797 stream = nxt_port_rpc_register_handler(task, router_port, 3798 nxt_router_access_log_ready, 3799 nxt_router_access_log_error, 3800 -1, tmcf); 3801 if (nxt_slow_path(stream == 0)) { 3802 goto fail; 3803 } 3804 3805 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3806 stream, router_port->id, b); 3807 3808 if (nxt_slow_path(ret != NXT_OK)) { 3809 nxt_port_rpc_cancel(task, router_port, stream); 3810 goto fail; 3811 } 3812 3813 return; 3814 3815 fail: 3816 3817 nxt_router_conf_error(task, tmcf); 3818 } 3819 3820 3821 static void 3822 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3823 void *data) 3824 { 3825 nxt_router_temp_conf_t *tmcf; 3826 nxt_router_access_log_t *access_log; 3827 3828 tmcf = data; 3829 3830 access_log = tmcf->router_conf->access_log; 3831 3832 access_log->fd = msg->fd[0]; 3833 3834 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3835 nxt_router_conf_apply, task, tmcf, NULL); 3836 } 3837 3838 3839 static void 3840 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3841 void *data) 3842 { 3843 nxt_router_temp_conf_t *tmcf; 3844 3845 tmcf = data; 3846 3847 nxt_router_conf_error(task, tmcf); 3848 } 3849 3850 3851 static void 3852 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3853 nxt_router_access_log_t *access_log) 3854 { 3855 if (access_log == NULL) { 3856 return; 3857 } 3858 3859 nxt_thread_spin_lock(lock); 3860 3861 if (--access_log->count != 0) { 3862 access_log = NULL; 3863 } 3864 3865 nxt_thread_spin_unlock(lock); 3866 3867 if (access_log != NULL) { 3868 3869 if (access_log->fd != -1) { 3870 nxt_fd_close(access_log->fd); 3871 } 3872 3873 nxt_free(access_log); 3874 } 3875 } 3876 3877 3878 typedef struct { 3879 nxt_mp_t *mem_pool; 3880 nxt_router_access_log_t *access_log; 3881 } nxt_router_access_log_reopen_t; 3882 3883 3884 static void 3885 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3886 { 3887 nxt_mp_t *mp; 3888 uint32_t stream; 3889 nxt_int_t ret; 3890 nxt_buf_t *b; 3891 nxt_port_t *main_port, *router_port; 3892 nxt_runtime_t *rt; 3893 nxt_router_access_log_t *access_log; 3894 nxt_router_access_log_reopen_t *reopen; 3895 3896 access_log = nxt_router->access_log; 3897 3898 if (access_log == NULL) { 3899 return; 3900 } 3901 3902 mp = nxt_mp_create(1024, 128, 256, 32); 3903 if (nxt_slow_path(mp == NULL)) { 3904 return; 3905 } 3906 3907 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3908 if (nxt_slow_path(reopen == NULL)) { 3909 goto fail; 3910 } 3911 3912 reopen->mem_pool = mp; 3913 reopen->access_log = access_log; 3914 3915 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 3916 if (nxt_slow_path(b == NULL)) { 3917 goto fail; 3918 } 3919 3920 b->completion_handler = nxt_router_access_log_reopen_completion; 3921 3922 nxt_buf_cpystr(b, &access_log->path); 3923 *b->mem.free++ = '\0'; 3924 3925 rt = task->thread->runtime; 3926 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3927 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3928 3929 stream = nxt_port_rpc_register_handler(task, router_port, 3930 nxt_router_access_log_reopen_ready, 3931 nxt_router_access_log_reopen_error, 3932 -1, reopen); 3933 if (nxt_slow_path(stream == 0)) { 3934 goto fail; 3935 } 3936 3937 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3938 stream, router_port->id, b); 3939 3940 if (nxt_slow_path(ret != NXT_OK)) { 3941 nxt_port_rpc_cancel(task, router_port, stream); 3942 goto fail; 3943 } 3944 3945 nxt_mp_retain(mp); 3946 3947 return; 3948 3949 fail: 3950 3951 nxt_mp_destroy(mp); 3952 } 3953 3954 3955 static void 3956 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 3957 { 3958 nxt_mp_t *mp; 3959 nxt_buf_t *b; 3960 3961 b = obj; 3962 mp = b->data; 3963 3964 nxt_mp_release(mp); 3965 } 3966 3967 3968 static void 3969 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3970 void *data) 3971 { 3972 nxt_router_access_log_t *access_log; 3973 nxt_router_access_log_reopen_t *reopen; 3974 3975 reopen = data; 3976 3977 access_log = reopen->access_log; 3978 3979 if (access_log == nxt_router->access_log) { 3980 3981 if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { 3982 nxt_alert(task, "dup2(%FD, %FD) failed %E", 3983 msg->fd[0], access_log->fd, nxt_errno); 3984 } 3985 } 3986 3987 nxt_fd_close(msg->fd[0]); 3988 nxt_mp_release(reopen->mem_pool); 3989 } 3990 3991 3992 static void 3993 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3994 void *data) 3995 { 3996 nxt_router_access_log_reopen_t *reopen; 3997 3998 reopen = data; 3999 4000 nxt_mp_release(reopen->mem_pool); 4001 } 4002 4003 4004 static void 4005 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 4006 { 4007 nxt_port_t *port; 4008 nxt_thread_link_t *link; 4009 nxt_event_engine_t *engine; 4010 nxt_thread_handle_t handle; 4011 4012 handle = (nxt_thread_handle_t) (uintptr_t) obj; 4013 link = data; 4014 4015 nxt_thread_wait(handle); 4016 4017 engine = link->engine; 4018 4019 nxt_queue_remove(&engine->link); 4020 4021 port = engine->port; 4022 4023 // TODO notify all apps 4024 4025 port->engine = task->thread->engine; 4026 nxt_mp_thread_adopt(port->mem_pool); 4027 nxt_port_use(task, port, -1); 4028 4029 nxt_mp_thread_adopt(engine->mem_pool); 4030 nxt_mp_destroy(engine->mem_pool); 4031 4032 nxt_event_engine_free(engine); 4033 4034 nxt_free(link); 4035 } 4036 4037 4038 static void 4039 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4040 void *data) 4041 { 4042 size_t b_size, count; 4043 nxt_int_t ret; 4044 nxt_app_t *app; 4045 nxt_buf_t *b, *next; 4046 nxt_port_t *app_port; 4047 nxt_unit_field_t *f; 4048 nxt_http_field_t *field; 4049 nxt_http_request_t *r; 4050 nxt_unit_response_t *resp; 4051 nxt_request_rpc_data_t *req_rpc_data; 4052 4053 req_rpc_data = data; 4054 4055 r = req_rpc_data->request; 4056 if (nxt_slow_path(r == NULL)) { 4057 return; 4058 } 4059 4060 if (r->error) { 4061 nxt_request_rpc_data_unlink(task, req_rpc_data); 4062 return; 4063 } 4064 4065 app = req_rpc_data->app; 4066 nxt_assert(app != NULL); 4067 4068 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { 4069 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); 4070 4071 return; 4072 } 4073 4074 b = (msg->size == 0) ? NULL : msg->buf; 4075 4076 if (msg->port_msg.last != 0) { 4077 nxt_debug(task, "router data create last buf"); 4078 4079 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 4080 4081 req_rpc_data->rpc_cancel = 0; 4082 4083 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { 4084 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; 4085 } 4086 4087 nxt_request_rpc_data_unlink(task, req_rpc_data); 4088 4089 } else { 4090 if (app->timeout != 0) { 4091 r->timer.handler = nxt_router_app_timeout; 4092 r->timer_data = req_rpc_data; 4093 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4094 } 4095 } 4096 4097 if (b == NULL) { 4098 return; 4099 } 4100 4101 if (msg->buf == b) { 4102 /* Disable instant buffer completion/re-using by port. */ 4103 msg->buf = NULL; 4104 } 4105 4106 if (r->header_sent) { 4107 nxt_buf_chain_add(&r->out, b); 4108 nxt_http_request_send_body(task, r, NULL); 4109 4110 } else { 4111 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0; 4112 4113 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) { 4114 nxt_alert(task, "response buffer too small: %z", b_size); 4115 goto fail; 4116 } 4117 4118 resp = (void *) b->mem.pos; 4119 count = (b_size - sizeof(nxt_unit_response_t)) 4120 / sizeof(nxt_unit_field_t); 4121 4122 if (nxt_slow_path(count < resp->fields_count)) { 4123 nxt_alert(task, "response buffer too small for fields count: %D", 4124 resp->fields_count); 4125 goto fail; 4126 } 4127 4128 field = NULL; 4129 4130 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 4131 if (f->skip) { 4132 continue; 4133 } 4134 4135 field = nxt_list_add(r->resp.fields); 4136 4137 if (nxt_slow_path(field == NULL)) { 4138 goto fail; 4139 } 4140 4141 field->hash = f->hash; 4142 field->skip = 0; 4143 field->hopbyhop = 0; 4144 4145 field->name_length = f->name_length; 4146 field->value_length = f->value_length; 4147 field->name = nxt_unit_sptr_get(&f->name); 4148 field->value = nxt_unit_sptr_get(&f->value); 4149 4150 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 4151 if (nxt_slow_path(ret != NXT_OK)) { 4152 goto fail; 4153 } 4154 4155 nxt_debug(task, "header%s: %*s: %*s", 4156 (field->skip ? " skipped" : ""), 4157 (size_t) field->name_length, field->name, 4158 (size_t) field->value_length, field->value); 4159 4160 if (field->skip) { 4161 r->resp.fields->last->nelts--; 4162 } 4163 } 4164 4165 r->status = resp->status; 4166 4167 if (resp->piggyback_content_length != 0) { 4168 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 4169 b->mem.free = b->mem.pos + resp->piggyback_content_length; 4170 4171 } else { 4172 b->mem.pos = b->mem.free; 4173 } 4174 4175 if (nxt_buf_mem_used_size(&b->mem) == 0) { 4176 next = b->next; 4177 b->next = NULL; 4178 4179 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 4180 b->completion_handler, task, b, b->parent); 4181 4182 b = next; 4183 } 4184 4185 if (b != NULL) { 4186 nxt_buf_chain_add(&r->out, b); 4187 } 4188 4189 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 4190 4191 if (r->websocket_handshake 4192 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 4193 { 4194 app_port = req_rpc_data->app_port; 4195 if (nxt_slow_path(app_port == NULL)) { 4196 goto fail; 4197 } 4198 4199 nxt_thread_mutex_lock(&app->mutex); 4200 4201 app_port->main_app_port->active_websockets++; 4202 4203 nxt_thread_mutex_unlock(&app->mutex); 4204 4205 nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE); 4206 req_rpc_data->apr_action = NXT_APR_CLOSE; 4207 4208 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 4209 4210 r->state = &nxt_http_websocket; 4211 4212 } else { 4213 r->state = &nxt_http_request_send_state; 4214 } 4215 } 4216 4217 return; 4218 4219 fail: 4220 4221 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4222 4223 nxt_request_rpc_data_unlink(task, req_rpc_data); 4224 } 4225 4226 4227 static void 4228 nxt_router_req_headers_ack_handler(nxt_task_t *task, 4229 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 4230 { 4231 int res; 4232 nxt_app_t *app; 4233 nxt_buf_t *b; 4234 nxt_bool_t start_process, unlinked; 4235 nxt_port_t *app_port, *main_app_port, *idle_port; 4236 nxt_queue_link_t *idle_lnk; 4237 nxt_http_request_t *r; 4238 4239 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 4240 req_rpc_data->stream, 4241 msg->port_msg.pid, msg->port_msg.reply_port); 4242 4243 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 4244 msg->port_msg.pid); 4245 4246 app = req_rpc_data->app; 4247 r = req_rpc_data->request; 4248 4249 start_process = 0; 4250 unlinked = 0; 4251 4252 nxt_thread_mutex_lock(&app->mutex); 4253 4254 if (r->app_link.next != NULL) { 4255 nxt_queue_remove(&r->app_link); 4256 r->app_link.next = NULL; 4257 4258 unlinked = 1; 4259 } 4260 4261 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 4262 msg->port_msg.reply_port); 4263 if (nxt_slow_path(app_port == NULL)) { 4264 nxt_thread_mutex_unlock(&app->mutex); 4265 4266 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4267 4268 if (unlinked) { 4269 nxt_mp_release(r->mem_pool); 4270 } 4271 4272 return; 4273 } 4274 4275 main_app_port = app_port->main_app_port; 4276 4277 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 4278 app->idle_processes--; 4279 4280 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", 4281 &app->name, main_app_port->pid, main_app_port->id, 4282 (main_app_port->idle_start ? "idle_ports" : "spare_ports")); 4283 4284 /* Check port was in 'spare_ports' using idle_start field. */ 4285 if (main_app_port->idle_start == 0 4286 && app->idle_processes >= app->spare_processes) 4287 { 4288 /* 4289 * If there is a vacant space in spare ports, 4290 * move the last idle to spare_ports. 4291 */ 4292 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4293 4294 idle_lnk = nxt_queue_last(&app->idle_ports); 4295 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4296 nxt_queue_remove(idle_lnk); 4297 4298 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4299 4300 idle_port->idle_start = 0; 4301 4302 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4303 "to spare_ports", 4304 &app->name, idle_port->pid, idle_port->id); 4305 } 4306 4307 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4308 app->pending_processes++; 4309 start_process = 1; 4310 } 4311 } 4312 4313 main_app_port->active_requests++; 4314 4315 nxt_port_inc_use(app_port); 4316 4317 nxt_thread_mutex_unlock(&app->mutex); 4318 4319 if (unlinked) { 4320 nxt_mp_release(r->mem_pool); 4321 } 4322 4323 if (start_process) { 4324 nxt_router_start_app_process(task, app); 4325 } 4326 4327 nxt_port_use(task, req_rpc_data->app_port, -1); 4328 4329 req_rpc_data->app_port = app_port; 4330 4331 b = req_rpc_data->msg_info.buf; 4332 4333 if (b != NULL) { 4334 /* First buffer is already sent. Start from second. */ 4335 b = b->next; 4336 4337 req_rpc_data->msg_info.buf->next = NULL; 4338 } 4339 4340 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) { 4341 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, 4342 req_rpc_data->msg_info.body_fd); 4343 4344 if (req_rpc_data->msg_info.body_fd != -1) { 4345 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 4346 } 4347 4348 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 4349 req_rpc_data->msg_info.body_fd, 4350 req_rpc_data->stream, 4351 task->thread->engine->port->id, b); 4352 4353 if (nxt_slow_path(res != NXT_OK)) { 4354 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4355 } 4356 } 4357 4358 if (app->timeout != 0) { 4359 r->timer.handler = nxt_router_app_timeout; 4360 r->timer_data = req_rpc_data; 4361 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4362 } 4363 } 4364 4365 4366 static const nxt_http_request_state_t nxt_http_request_send_state 4367 nxt_aligned(64) = 4368 { 4369 .error_handler = nxt_http_request_error_handler, 4370 }; 4371 4372 4373 static void 4374 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 4375 { 4376 nxt_buf_t *out; 4377 nxt_http_request_t *r; 4378 4379 r = obj; 4380 4381 out = r->out; 4382 4383 if (out != NULL) { 4384 r->out = NULL; 4385 nxt_http_request_send(task, r, out); 4386 } 4387 } 4388 4389 4390 static void 4391 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4392 void *data) 4393 { 4394 nxt_request_rpc_data_t *req_rpc_data; 4395 4396 req_rpc_data = data; 4397 4398 req_rpc_data->rpc_cancel = 0; 4399 4400 /* TODO cancel message and return if cancelled. */ 4401 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 4402 4403 if (req_rpc_data->request != NULL) { 4404 nxt_http_request_error(task, req_rpc_data->request, 4405 NXT_HTTP_SERVICE_UNAVAILABLE); 4406 } 4407 4408 nxt_request_rpc_data_unlink(task, req_rpc_data); 4409 } 4410 4411 4412 static void 4413 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4414 void *data) 4415 { 4416 nxt_app_t *app; 4417 nxt_bool_t start_process; 4418 nxt_port_t *port; 4419 nxt_app_joint_t *app_joint; 4420 nxt_app_joint_rpc_t *app_joint_rpc; 4421 4422 nxt_assert(data != NULL); 4423 4424 app_joint_rpc = data; 4425 app_joint = app_joint_rpc->app_joint; 4426 port = msg->u.new_port; 4427 4428 nxt_assert(app_joint != NULL); 4429 nxt_assert(port != NULL); 4430 nxt_assert(port->type == NXT_PROCESS_APP); 4431 nxt_assert(port->id == 0); 4432 4433 app = app_joint->app; 4434 4435 nxt_router_app_joint_use(task, app_joint, -1); 4436 4437 if (nxt_slow_path(app == NULL)) { 4438 nxt_debug(task, "new port ready for released app, send QUIT"); 4439 4440 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4441 4442 return; 4443 } 4444 4445 nxt_thread_mutex_lock(&app->mutex); 4446 4447 nxt_assert(app->pending_processes != 0); 4448 4449 app->pending_processes--; 4450 4451 if (nxt_slow_path(app->generation != app_joint_rpc->generation)) { 4452 nxt_debug(task, "new port ready for restarted app, send QUIT"); 4453 4454 start_process = !task->thread->engine->shutdown 4455 && nxt_router_app_can_start(app) 4456 && nxt_router_app_need_start(app); 4457 4458 if (start_process) { 4459 app->pending_processes++; 4460 } 4461 4462 nxt_thread_mutex_unlock(&app->mutex); 4463 4464 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4465 4466 if (start_process) { 4467 nxt_router_start_app_process(task, app); 4468 } 4469 4470 return; 4471 } 4472 4473 port->app = app; 4474 port->main_app_port = port; 4475 4476 app->processes++; 4477 nxt_port_hash_add(&app->port_hash, port); 4478 app->port_hash_count++; 4479 4480 nxt_thread_mutex_unlock(&app->mutex); 4481 4482 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 4483 &app->name, port->pid, app->processes, app->pending_processes); 4484 4485 nxt_router_app_shared_port_send(task, port); 4486 4487 nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); 4488 } 4489 4490 4491 static nxt_int_t 4492 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) 4493 { 4494 nxt_buf_t *b; 4495 nxt_port_t *port; 4496 nxt_port_msg_new_port_t *msg; 4497 4498 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 4499 sizeof(nxt_port_data_t)); 4500 if (nxt_slow_path(b == NULL)) { 4501 return NXT_ERROR; 4502 } 4503 4504 port = app_port->app->shared_port; 4505 4506 nxt_debug(task, "send port %FD to process %PI", 4507 port->pair[0], app_port->pid); 4508 4509 b->mem.free += sizeof(nxt_port_msg_new_port_t); 4510 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 4511 4512 msg->id = port->id; 4513 msg->pid = port->pid; 4514 msg->max_size = port->max_size; 4515 msg->max_share = port->max_share; 4516 msg->type = port->type; 4517 4518 return nxt_port_socket_write2(task, app_port, 4519 NXT_PORT_MSG_NEW_PORT, 4520 port->pair[0], port->queue_fd, 4521 0, 0, b); 4522 } 4523 4524 4525 static void 4526 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4527 void *data) 4528 { 4529 nxt_app_t *app; 4530 nxt_app_joint_t *app_joint; 4531 nxt_queue_link_t *link; 4532 nxt_http_request_t *r; 4533 nxt_app_joint_rpc_t *app_joint_rpc; 4534 4535 nxt_assert(data != NULL); 4536 4537 app_joint_rpc = data; 4538 app_joint = app_joint_rpc->app_joint; 4539 4540 nxt_assert(app_joint != NULL); 4541 4542 app = app_joint->app; 4543 4544 nxt_router_app_joint_use(task, app_joint, -1); 4545 4546 if (nxt_slow_path(app == NULL)) { 4547 nxt_debug(task, "start error for released app"); 4548 4549 return; 4550 } 4551 4552 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4553 4554 link = NULL; 4555 4556 nxt_thread_mutex_lock(&app->mutex); 4557 4558 nxt_assert(app->pending_processes != 0); 4559 4560 app->pending_processes--; 4561 4562 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4563 link = nxt_queue_first(&app->ack_waiting_req); 4564 4565 nxt_queue_remove(link); 4566 link->next = NULL; 4567 } 4568 4569 nxt_thread_mutex_unlock(&app->mutex); 4570 4571 while (link != NULL) { 4572 r = nxt_container_of(link, nxt_http_request_t, app_link); 4573 4574 nxt_event_engine_post(r->engine, &r->err_work); 4575 4576 link = NULL; 4577 4578 nxt_thread_mutex_lock(&app->mutex); 4579 4580 if (app->processes == 0 && app->pending_processes == 0 4581 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4582 { 4583 link = nxt_queue_first(&app->ack_waiting_req); 4584 4585 nxt_queue_remove(link); 4586 link->next = NULL; 4587 } 4588 4589 nxt_thread_mutex_unlock(&app->mutex); 4590 } 4591 } 4592 4593 4594 4595 nxt_inline nxt_port_t * 4596 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) 4597 { 4598 nxt_port_t *port; 4599 4600 port = NULL; 4601 4602 nxt_thread_mutex_lock(&app->mutex); 4603 4604 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 4605 4606 /* Caller is responsible to decrease port use count. */ 4607 nxt_queue_chk_remove(&port->app_link); 4608 4609 if (nxt_queue_chk_remove(&port->idle_link)) { 4610 app->idle_processes--; 4611 4612 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", 4613 &app->name, port->pid, port->id, 4614 (port->idle_start ? "idle_ports" : "spare_ports")); 4615 } 4616 4617 nxt_port_hash_remove(&app->port_hash, port); 4618 app->port_hash_count--; 4619 4620 port->app = NULL; 4621 app->processes--; 4622 4623 break; 4624 4625 } nxt_queue_loop; 4626 4627 nxt_thread_mutex_unlock(&app->mutex); 4628 4629 return port; 4630 } 4631 4632 4633 static void 4634 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4635 { 4636 int c; 4637 4638 c = nxt_atomic_fetch_add(&app->use_count, i); 4639 4640 if (i < 0 && c == -i) { 4641 4642 if (task->thread->engine != app->engine) { 4643 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 4644 4645 } else { 4646 nxt_router_free_app(task, app->joint, NULL); 4647 } 4648 } 4649 } 4650 4651 4652 static void 4653 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 4654 { 4655 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 4656 4657 nxt_queue_remove(&app->link); 4658 4659 nxt_router_app_use(task, app, -1); 4660 } 4661 4662 4663 static void 4664 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port, 4665 nxt_apr_action_t action) 4666 { 4667 int inc_use; 4668 uint32_t got_response, dec_requests; 4669 nxt_bool_t adjust_idle_timer; 4670 nxt_port_t *main_app_port; 4671 4672 nxt_assert(port != NULL); 4673 4674 inc_use = 0; 4675 got_response = 0; 4676 dec_requests = 0; 4677 4678 switch (action) { 4679 case NXT_APR_NEW_PORT: 4680 break; 4681 case NXT_APR_REQUEST_FAILED: 4682 dec_requests = 1; 4683 inc_use = -1; 4684 break; 4685 case NXT_APR_GOT_RESPONSE: 4686 got_response = 1; 4687 inc_use = -1; 4688 break; 4689 case NXT_APR_UPGRADE: 4690 got_response = 1; 4691 break; 4692 case NXT_APR_CLOSE: 4693 inc_use = -1; 4694 break; 4695 } 4696 4697 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, 4698 port->pid, port->id, 4699 (int) inc_use, (int) got_response); 4700 4701 if (port->id == NXT_SHARED_PORT_ID) { 4702 nxt_thread_mutex_lock(&app->mutex); 4703 4704 app->active_requests -= got_response + dec_requests; 4705 4706 nxt_thread_mutex_unlock(&app->mutex); 4707 4708 goto adjust_use; 4709 } 4710 4711 main_app_port = port->main_app_port; 4712 4713 nxt_thread_mutex_lock(&app->mutex); 4714 4715 main_app_port->active_requests -= got_response + dec_requests; 4716 app->active_requests -= got_response + dec_requests; 4717 4718 if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) { 4719 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4720 4721 nxt_port_inc_use(main_app_port); 4722 } 4723 4724 adjust_idle_timer = 0; 4725 4726 if (main_app_port->pair[1] != -1 4727 && main_app_port->active_requests == 0 4728 && main_app_port->active_websockets == 0 4729 && main_app_port->idle_link.next == NULL) 4730 { 4731 if (app->idle_processes == app->spare_processes 4732 && app->adjust_idle_work.data == NULL) 4733 { 4734 adjust_idle_timer = 1; 4735 app->adjust_idle_work.data = app; 4736 app->adjust_idle_work.next = NULL; 4737 } 4738 4739 if (app->idle_processes < app->spare_processes) { 4740 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4741 4742 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", 4743 &app->name, main_app_port->pid, main_app_port->id); 4744 } else { 4745 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4746 4747 main_app_port->idle_start = task->thread->engine->timers.now; 4748 4749 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", 4750 &app->name, main_app_port->pid, main_app_port->id); 4751 } 4752 4753 app->idle_processes++; 4754 } 4755 4756 nxt_thread_mutex_unlock(&app->mutex); 4757 4758 if (adjust_idle_timer) { 4759 nxt_router_app_use(task, app, 1); 4760 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4761 } 4762 4763 /* ? */ 4764 if (main_app_port->pair[1] == -1) { 4765 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4766 &app->name, app, main_app_port, main_app_port->pid); 4767 4768 goto adjust_use; 4769 } 4770 4771 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4772 &app->name, app); 4773 4774 adjust_use: 4775 4776 nxt_port_use(task, port, inc_use); 4777 } 4778 4779 4780 void 4781 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4782 { 4783 nxt_app_t *app; 4784 nxt_bool_t unchain, start_process; 4785 nxt_port_t *idle_port; 4786 nxt_queue_link_t *idle_lnk; 4787 4788 app = port->app; 4789 4790 nxt_assert(app != NULL); 4791 4792 nxt_thread_mutex_lock(&app->mutex); 4793 4794 nxt_port_hash_remove(&app->port_hash, port); 4795 app->port_hash_count--; 4796 4797 if (port->id != 0) { 4798 nxt_thread_mutex_unlock(&app->mutex); 4799 4800 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, 4801 port->pid, port->id); 4802 4803 return; 4804 } 4805 4806 unchain = nxt_queue_chk_remove(&port->app_link); 4807 4808 if (nxt_queue_chk_remove(&port->idle_link)) { 4809 app->idle_processes--; 4810 4811 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", 4812 &app->name, port->pid, port->id, 4813 (port->idle_start ? "idle_ports" : "spare_ports")); 4814 4815 if (port->idle_start == 0 4816 && app->idle_processes >= app->spare_processes) 4817 { 4818 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4819 4820 idle_lnk = nxt_queue_last(&app->idle_ports); 4821 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4822 nxt_queue_remove(idle_lnk); 4823 4824 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4825 4826 idle_port->idle_start = 0; 4827 4828 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4829 "to spare_ports", 4830 &app->name, idle_port->pid, idle_port->id); 4831 } 4832 } 4833 4834 app->processes--; 4835 4836 start_process = !task->thread->engine->shutdown 4837 && nxt_router_app_can_start(app) 4838 && nxt_router_app_need_start(app); 4839 4840 if (start_process) { 4841 app->pending_processes++; 4842 } 4843 4844 nxt_thread_mutex_unlock(&app->mutex); 4845 4846 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 4847 4848 if (unchain) { 4849 nxt_port_use(task, port, -1); 4850 } 4851 4852 if (start_process) { 4853 nxt_router_start_app_process(task, app); 4854 } 4855 } 4856 4857 4858 static void 4859 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 4860 { 4861 nxt_app_t *app; 4862 nxt_bool_t queued; 4863 nxt_port_t *port; 4864 nxt_msec_t timeout, threshold; 4865 nxt_queue_link_t *lnk; 4866 nxt_event_engine_t *engine; 4867 4868 app = obj; 4869 queued = (data == app); 4870 4871 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 4872 &app->name, queued); 4873 4874 engine = task->thread->engine; 4875 4876 nxt_assert(app->engine == engine); 4877 4878 threshold = engine->timers.now + app->joint->idle_timer.bias; 4879 timeout = 0; 4880 4881 nxt_thread_mutex_lock(&app->mutex); 4882 4883 if (queued) { 4884 app->adjust_idle_work.data = NULL; 4885 } 4886 4887 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", 4888 &app->name, 4889 (int) app->idle_processes, (int) app->spare_processes); 4890 4891 while (app->idle_processes > app->spare_processes) { 4892 4893 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4894 4895 lnk = nxt_queue_first(&app->idle_ports); 4896 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 4897 4898 timeout = port->idle_start + app->idle_timeout; 4899 4900 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", 4901 &app->name, port->pid, 4902 port->idle_start, timeout, threshold); 4903 4904 if (timeout > threshold) { 4905 break; 4906 } 4907 4908 nxt_queue_remove(lnk); 4909 lnk->next = NULL; 4910 4911 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", 4912 &app->name, port->pid, port->id); 4913 4914 nxt_queue_chk_remove(&port->app_link); 4915 4916 nxt_port_hash_remove(&app->port_hash, port); 4917 app->port_hash_count--; 4918 4919 app->idle_processes--; 4920 app->processes--; 4921 port->app = NULL; 4922 4923 nxt_thread_mutex_unlock(&app->mutex); 4924 4925 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 4926 &app->name, port->pid); 4927 4928 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4929 4930 nxt_port_use(task, port, -1); 4931 4932 nxt_thread_mutex_lock(&app->mutex); 4933 } 4934 4935 nxt_thread_mutex_unlock(&app->mutex); 4936 4937 if (timeout > threshold) { 4938 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 4939 4940 } else { 4941 nxt_timer_disable(engine, &app->joint->idle_timer); 4942 } 4943 4944 if (queued) { 4945 nxt_router_app_use(task, app, -1); 4946 } 4947 } 4948 4949 4950 static void 4951 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 4952 { 4953 nxt_timer_t *timer; 4954 nxt_app_joint_t *app_joint; 4955 4956 timer = obj; 4957 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4958 4959 if (nxt_fast_path(app_joint->app != NULL)) { 4960 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 4961 } 4962 } 4963 4964 4965 static void 4966 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 4967 { 4968 nxt_timer_t *timer; 4969 nxt_app_joint_t *app_joint; 4970 4971 timer = obj; 4972 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4973 4974 nxt_router_app_joint_use(task, app_joint, -1); 4975 } 4976 4977 4978 static void 4979 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 4980 { 4981 nxt_app_t *app; 4982 nxt_port_t *port; 4983 nxt_app_joint_t *app_joint; 4984 4985 app_joint = obj; 4986 app = app_joint->app; 4987 4988 for ( ;; ) { 4989 port = nxt_router_app_get_port_for_quit(task, app); 4990 if (port == NULL) { 4991 break; 4992 } 4993 4994 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); 4995 4996 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4997 4998 nxt_port_use(task, port, -1); 4999 } 5000 5001 nxt_thread_mutex_lock(&app->mutex); 5002 5003 for ( ;; ) { 5004 port = nxt_port_hash_retrieve(&app->port_hash); 5005 if (port == NULL) { 5006 break; 5007 } 5008 5009 app->port_hash_count--; 5010 5011 port->app = NULL; 5012 5013 nxt_port_close(task, port); 5014 5015 nxt_port_use(task, port, -1); 5016 } 5017 5018 nxt_thread_mutex_unlock(&app->mutex); 5019 5020 nxt_assert(app->processes == 0); 5021 nxt_assert(app->active_requests == 0); 5022 nxt_assert(app->port_hash_count == 0); 5023 nxt_assert(app->idle_processes == 0); 5024 nxt_assert(nxt_queue_is_empty(&app->ports)); 5025 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 5026 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 5027 5028 nxt_port_mmaps_destroy(&app->outgoing, 1); 5029 5030 nxt_thread_mutex_destroy(&app->outgoing.mutex); 5031 5032 if (app->shared_port != NULL) { 5033 app->shared_port->app = NULL; 5034 nxt_port_close(task, app->shared_port); 5035 nxt_port_use(task, app->shared_port, -1); 5036 5037 app->shared_port = NULL; 5038 } 5039 5040 nxt_thread_mutex_destroy(&app->mutex); 5041 nxt_mp_destroy(app->mem_pool); 5042 5043 app_joint->app = NULL; 5044 5045 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 5046 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 5047 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 5048 5049 } else { 5050 nxt_router_app_joint_use(task, app_joint, -1); 5051 } 5052 } 5053 5054 5055 static void 5056 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 5057 nxt_request_rpc_data_t *req_rpc_data) 5058 { 5059 nxt_bool_t start_process; 5060 nxt_port_t *port; 5061 nxt_http_request_t *r; 5062 5063 start_process = 0; 5064 5065 nxt_thread_mutex_lock(&app->mutex); 5066 5067 port = app->shared_port; 5068 nxt_port_inc_use(port); 5069 5070 app->active_requests++; 5071 5072 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 5073 app->pending_processes++; 5074 start_process = 1; 5075 } 5076 5077 r = req_rpc_data->request; 5078 5079 /* 5080 * Put request into application-wide list to be able to cancel request 5081 * if something goes wrong with application processes. 5082 */ 5083 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 5084 5085 nxt_thread_mutex_unlock(&app->mutex); 5086 5087 /* 5088 * Retain request memory pool while request is linked in ack_waiting_req 5089 * to guarantee request structure memory is accessble. 5090 */ 5091 nxt_mp_retain(r->mem_pool); 5092 5093 req_rpc_data->app_port = port; 5094 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 5095 5096 if (start_process) { 5097 nxt_router_start_app_process(task, app); 5098 } 5099 } 5100 5101 5102 void 5103 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, 5104 nxt_http_action_t *action) 5105 { 5106 nxt_event_engine_t *engine; 5107 nxt_http_app_conf_t *conf; 5108 nxt_request_rpc_data_t *req_rpc_data; 5109 5110 conf = action->u.conf; 5111 engine = task->thread->engine; 5112 5113 r->app_target = conf->target; 5114 5115 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, 5116 nxt_router_response_ready_handler, 5117 nxt_router_response_error_handler, 5118 sizeof(nxt_request_rpc_data_t)); 5119 if (nxt_slow_path(req_rpc_data == NULL)) { 5120 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 5121 return; 5122 } 5123 5124 /* 5125 * At this point we have request req_rpc_data allocated and registered 5126 * in port handlers. Need to fixup request memory pool. Counterpart 5127 * release will be called via following call chain: 5128 * nxt_request_rpc_data_unlink() -> 5129 * nxt_router_http_request_release_post() -> 5130 * nxt_router_http_request_release() 5131 */ 5132 nxt_mp_retain(r->mem_pool); 5133 5134 r->timer.task = &engine->task; 5135 r->timer.work_queue = &engine->fast_work_queue; 5136 r->timer.log = engine->task.log; 5137 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 5138 5139 r->engine = engine; 5140 r->err_work.handler = nxt_router_http_request_error; 5141 r->err_work.task = task; 5142 r->err_work.obj = r; 5143 5144 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 5145 req_rpc_data->app = conf->app; 5146 req_rpc_data->msg_info.body_fd = -1; 5147 req_rpc_data->rpc_cancel = 1; 5148 5149 nxt_router_app_use(task, conf->app, 1); 5150 5151 req_rpc_data->request = r; 5152 r->req_rpc_data = req_rpc_data; 5153 5154 if (r->last != NULL) { 5155 r->last->completion_handler = nxt_router_http_request_done; 5156 } 5157 5158 nxt_router_app_port_get(task, conf->app, req_rpc_data); 5159 nxt_router_app_prepare_request(task, req_rpc_data); 5160 } 5161 5162 5163 static void 5164 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 5165 { 5166 nxt_http_request_t *r; 5167 5168 r = obj; 5169 5170 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 5171 5172 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5173 5174 if (r->req_rpc_data != NULL) { 5175 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5176 } 5177 5178 nxt_mp_release(r->mem_pool); 5179 } 5180 5181 5182 static void 5183 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 5184 { 5185 nxt_http_request_t *r; 5186 5187 r = data; 5188 5189 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 5190 5191 if (r->req_rpc_data != NULL) { 5192 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5193 } 5194 5195 nxt_http_request_close_handler(task, r, r->proto.any); 5196 } 5197 5198 5199 static void 5200 nxt_router_app_prepare_request(nxt_task_t *task, 5201 nxt_request_rpc_data_t *req_rpc_data) 5202 { 5203 nxt_app_t *app; 5204 nxt_buf_t *buf, *body; 5205 nxt_int_t res; 5206 nxt_port_t *port, *reply_port; 5207 5208 int notify; 5209 struct { 5210 nxt_port_msg_t pm; 5211 nxt_port_mmap_msg_t mm; 5212 } msg; 5213 5214 5215 app = req_rpc_data->app; 5216 5217 nxt_assert(app != NULL); 5218 5219 port = req_rpc_data->app_port; 5220 5221 nxt_assert(port != NULL); 5222 nxt_assert(port->queue != NULL); 5223 5224 reply_port = task->thread->engine->port; 5225 5226 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, 5227 nxt_app_msg_prefix[app->type]); 5228 if (nxt_slow_path(buf == NULL)) { 5229 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", 5230 req_rpc_data->stream, &app->name); 5231 5232 nxt_http_request_error(task, req_rpc_data->request, 5233 NXT_HTTP_INTERNAL_SERVER_ERROR); 5234 5235 return; 5236 } 5237 5238 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 5239 nxt_buf_used_size(buf), 5240 port->socket.fd); 5241 5242 req_rpc_data->msg_info.buf = buf; 5243 5244 body = req_rpc_data->request->body; 5245 5246 if (body != NULL && nxt_buf_is_file(body)) { 5247 req_rpc_data->msg_info.body_fd = body->file->fd; 5248 5249 body->file->fd = -1; 5250 5251 } else { 5252 req_rpc_data->msg_info.body_fd = -1; 5253 } 5254 5255 msg.pm.stream = req_rpc_data->stream; 5256 msg.pm.pid = reply_port->pid; 5257 msg.pm.reply_port = reply_port->id; 5258 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; 5259 msg.pm.last = 0; 5260 msg.pm.mmap = 1; 5261 msg.pm.nf = 0; 5262 msg.pm.mf = 0; 5263 msg.pm.tracking = 0; 5264 5265 nxt_port_mmap_handler_t *mmap_handler = buf->parent; 5266 nxt_port_mmap_header_t *hdr = mmap_handler->hdr; 5267 5268 msg.mm.mmap_id = hdr->id; 5269 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); 5270 msg.mm.size = nxt_buf_used_size(buf); 5271 5272 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), 5273 req_rpc_data->stream, ¬ify, 5274 &req_rpc_data->msg_info.tracking_cookie); 5275 if (nxt_fast_path(res == NXT_OK)) { 5276 if (notify != 0) { 5277 (void) nxt_port_socket_write(task, port, 5278 NXT_PORT_MSG_READ_QUEUE, 5279 -1, req_rpc_data->stream, 5280 reply_port->id, NULL); 5281 5282 } else { 5283 nxt_debug(task, "queue is not empty"); 5284 } 5285 5286 buf->is_port_mmap_sent = 1; 5287 buf->mem.pos = buf->mem.free; 5288 5289 } else { 5290 nxt_alert(task, "stream #%uD, app '%V': failed to send app message", 5291 req_rpc_data->stream, &app->name); 5292 5293 nxt_http_request_error(task, req_rpc_data->request, 5294 NXT_HTTP_INTERNAL_SERVER_ERROR); 5295 } 5296 } 5297 5298 5299 struct nxt_fields_iter_s { 5300 nxt_list_part_t *part; 5301 nxt_http_field_t *field; 5302 }; 5303 5304 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 5305 5306 5307 static nxt_http_field_t * 5308 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 5309 { 5310 if (part == NULL) { 5311 return NULL; 5312 } 5313 5314 while (part->nelts == 0) { 5315 part = part->next; 5316 if (part == NULL) { 5317 return NULL; 5318 } 5319 } 5320 5321 i->part = part; 5322 i->field = nxt_list_data(i->part); 5323 5324 return i->field; 5325 } 5326 5327 5328 static nxt_http_field_t * 5329 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 5330 { 5331 return nxt_fields_part_first(nxt_list_part(fields), i); 5332 } 5333 5334 5335 static nxt_http_field_t * 5336 nxt_fields_next(nxt_fields_iter_t *i) 5337 { 5338 nxt_http_field_t *end = nxt_list_data(i->part); 5339 5340 end += i->part->nelts; 5341 i->field++; 5342 5343 if (i->field < end) { 5344 return i->field; 5345 } 5346 5347 return nxt_fields_part_first(i->part->next, i); 5348 } 5349 5350 5351 static nxt_buf_t * 5352 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, 5353 nxt_app_t *app, const nxt_str_t *prefix) 5354 { 5355 void *target_pos, *query_pos; 5356 u_char *pos, *end, *p, c; 5357 size_t fields_count, req_size, size, free_size; 5358 size_t copy_size; 5359 nxt_off_t content_length; 5360 nxt_buf_t *b, *buf, *out, **tail; 5361 nxt_http_field_t *field, *dup; 5362 nxt_unit_field_t *dst_field; 5363 nxt_fields_iter_t iter, dup_iter; 5364 nxt_unit_request_t *req; 5365 5366 req_size = sizeof(nxt_unit_request_t) 5367 + r->method->length + 1 5368 + r->version.length + 1 5369 + r->remote->length + 1 5370 + r->local->length + 1 5371 + r->server_name.length + 1 5372 + r->target.length + 1 5373 + (r->path->start != r->target.start ? r->path->length + 1 : 0); 5374 5375 content_length = r->content_length_n < 0 ? 0 : r->content_length_n; 5376 fields_count = 0; 5377 5378 nxt_list_each(field, r->fields) { 5379 fields_count++; 5380 5381 req_size += field->name_length + prefix->length + 1 5382 + field->value_length + 1; 5383 } nxt_list_loop; 5384 5385 req_size += fields_count * sizeof(nxt_unit_field_t); 5386 5387 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 5388 nxt_alert(task, "headers to big to fit in shared memory (%d)", 5389 (int) req_size); 5390 5391 return NULL; 5392 } 5393 5394 out = nxt_port_mmap_get_buf(task, &app->outgoing, 5395 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); 5396 if (nxt_slow_path(out == NULL)) { 5397 return NULL; 5398 } 5399 5400 req = (nxt_unit_request_t *) out->mem.free; 5401 out->mem.free += req_size; 5402 5403 req->app_target = r->app_target; 5404 5405 req->content_length = content_length; 5406 5407 p = (u_char *) (req->fields + fields_count); 5408 5409 nxt_debug(task, "fields_count=%d", (int) fields_count); 5410 5411 req->method_length = r->method->length; 5412 nxt_unit_sptr_set(&req->method, p); 5413 p = nxt_cpymem(p, r->method->start, r->method->length); 5414 *p++ = '\0'; 5415 5416 req->version_length = r->version.length; 5417 nxt_unit_sptr_set(&req->version, p); 5418 p = nxt_cpymem(p, r->version.start, r->version.length); 5419 *p++ = '\0'; 5420 5421 req->remote_length = r->remote->address_length; 5422 nxt_unit_sptr_set(&req->remote, p); 5423 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote), 5424 r->remote->address_length); 5425 *p++ = '\0'; 5426 5427 req->local_length = r->local->address_length; 5428 nxt_unit_sptr_set(&req->local, p); 5429 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length); 5430 *p++ = '\0'; 5431 5432 req->tls = (r->tls != NULL); 5433 req->websocket_handshake = r->websocket_handshake; 5434 5435 req->server_name_length = r->server_name.length; 5436 nxt_unit_sptr_set(&req->server_name, p); 5437 p = nxt_cpymem(p, r->server_name.start, r->server_name.length); 5438 *p++ = '\0'; 5439 5440 target_pos = p; 5441 req->target_length = (uint32_t) r->target.length; 5442 nxt_unit_sptr_set(&req->target, p); 5443 p = nxt_cpymem(p, r->target.start, r->target.length); 5444 *p++ = '\0'; 5445 5446 req->path_length = (uint32_t) r->path->length; 5447 if (r->path->start == r->target.start) { 5448 nxt_unit_sptr_set(&req->path, target_pos); 5449 5450 } else { 5451 nxt_unit_sptr_set(&req->path, p); 5452 p = nxt_cpymem(p, r->path->start, r->path->length); 5453 *p++ = '\0'; 5454 } 5455 5456 req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0; 5457 if (r->args != NULL && r->args->start != NULL) { 5458 query_pos = nxt_pointer_to(target_pos, 5459 r->args->start - r->target.start); 5460 5461 nxt_unit_sptr_set(&req->query, query_pos); 5462 5463 } else { 5464 req->query.offset = 0; 5465 } 5466 5467 req->content_length_field = NXT_UNIT_NONE_FIELD; 5468 req->content_type_field = NXT_UNIT_NONE_FIELD; 5469 req->cookie_field = NXT_UNIT_NONE_FIELD; 5470 req->authorization_field = NXT_UNIT_NONE_FIELD; 5471 5472 dst_field = req->fields; 5473 5474 for (field = nxt_fields_first(r->fields, &iter); 5475 field != NULL; 5476 field = nxt_fields_next(&iter)) 5477 { 5478 if (field->skip) { 5479 continue; 5480 } 5481 5482 dst_field->hash = field->hash; 5483 dst_field->skip = 0; 5484 dst_field->name_length = field->name_length + prefix->length; 5485 dst_field->value_length = field->value_length; 5486 5487 if (field == r->content_length) { 5488 req->content_length_field = dst_field - req->fields; 5489 5490 } else if (field == r->content_type) { 5491 req->content_type_field = dst_field - req->fields; 5492 5493 } else if (field == r->cookie) { 5494 req->cookie_field = dst_field - req->fields; 5495 5496 } else if (field == r->authorization) { 5497 req->authorization_field = dst_field - req->fields; 5498 } 5499 5500 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 5501 (int) field->hash, (int) field->skip, 5502 (int) field->name_length, field->name, 5503 (int) field->value_length, field->value); 5504 5505 if (prefix->length != 0) { 5506 nxt_unit_sptr_set(&dst_field->name, p); 5507 p = nxt_cpymem(p, prefix->start, prefix->length); 5508 5509 end = field->name + field->name_length; 5510 for (pos = field->name; pos < end; pos++) { 5511 c = *pos; 5512 5513 if (c >= 'a' && c <= 'z') { 5514 *p++ = (c & ~0x20); 5515 continue; 5516 } 5517 5518 if (c == '-') { 5519 *p++ = '_'; 5520 continue; 5521 } 5522 5523 *p++ = c; 5524 } 5525 5526 } else { 5527 nxt_unit_sptr_set(&dst_field->name, p); 5528 p = nxt_cpymem(p, field->name, field->name_length); 5529 } 5530 5531 *p++ = '\0'; 5532 5533 nxt_unit_sptr_set(&dst_field->value, p); 5534 p = nxt_cpymem(p, field->value, field->value_length); 5535 5536 if (prefix->length != 0) { 5537 dup_iter = iter; 5538 5539 for (dup = nxt_fields_next(&dup_iter); 5540 dup != NULL; 5541 dup = nxt_fields_next(&dup_iter)) 5542 { 5543 if (dup->name_length != field->name_length 5544 || dup->skip 5545 || dup->hash != field->hash 5546 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 5547 { 5548 continue; 5549 } 5550 5551 p = nxt_cpymem(p, ", ", 2); 5552 p = nxt_cpymem(p, dup->value, dup->value_length); 5553 5554 dst_field->value_length += 2 + dup->value_length; 5555 5556 dup->skip = 1; 5557 } 5558 } 5559 5560 *p++ = '\0'; 5561 5562 dst_field++; 5563 } 5564 5565 req->fields_count = (uint32_t) (dst_field - req->fields); 5566 5567 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 5568 5569 buf = out; 5570 tail = &buf->next; 5571 5572 for (b = r->body; b != NULL; b = b->next) { 5573 size = nxt_buf_mem_used_size(&b->mem); 5574 pos = b->mem.pos; 5575 5576 while (size > 0) { 5577 if (buf == NULL) { 5578 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 5579 5580 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5581 if (nxt_slow_path(buf == NULL)) { 5582 while (out != NULL) { 5583 buf = out->next; 5584 out->next = NULL; 5585 out->completion_handler(task, out, out->parent); 5586 out = buf; 5587 } 5588 return NULL; 5589 } 5590 5591 *tail = buf; 5592 tail = &buf->next; 5593 5594 } else { 5595 free_size = nxt_buf_mem_free_size(&buf->mem); 5596 if (free_size < size 5597 && nxt_port_mmap_increase_buf(task, buf, size, 1) 5598 == NXT_OK) 5599 { 5600 free_size = nxt_buf_mem_free_size(&buf->mem); 5601 } 5602 } 5603 5604 if (free_size > 0) { 5605 copy_size = nxt_min(free_size, size); 5606 5607 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 5608 5609 size -= copy_size; 5610 pos += copy_size; 5611 5612 if (size == 0) { 5613 break; 5614 } 5615 } 5616 5617 buf = NULL; 5618 } 5619 } 5620 5621 return out; 5622 } 5623 5624 5625 static void 5626 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 5627 { 5628 nxt_timer_t *timer; 5629 nxt_http_request_t *r; 5630 nxt_request_rpc_data_t *req_rpc_data; 5631 5632 timer = obj; 5633 5634 nxt_debug(task, "router app timeout"); 5635 5636 r = nxt_timer_data(timer, nxt_http_request_t, timer); 5637 req_rpc_data = r->timer_data; 5638 5639 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5640 5641 nxt_request_rpc_data_unlink(task, req_rpc_data); 5642 } 5643 5644 5645 static void 5646 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5647 { 5648 r->timer.handler = nxt_router_http_request_release; 5649 nxt_timer_add(task->thread->engine, &r->timer, 0); 5650 } 5651 5652 5653 static void 5654 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) 5655 { 5656 nxt_http_request_t *r; 5657 5658 nxt_debug(task, "http request pool release"); 5659 5660 r = nxt_timer_data(obj, nxt_http_request_t, timer); 5661 5662 nxt_mp_release(r->mem_pool); 5663 } 5664 5665 5666 static void 5667 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5668 { 5669 size_t mi; 5670 uint32_t i; 5671 nxt_bool_t ack; 5672 nxt_process_t *process; 5673 nxt_free_map_t *m; 5674 nxt_port_mmap_handler_t *mmap_handler; 5675 5676 nxt_debug(task, "oosm in %PI", msg->port_msg.pid); 5677 5678 process = nxt_runtime_process_find(task->thread->runtime, 5679 msg->port_msg.pid); 5680 if (nxt_slow_path(process == NULL)) { 5681 return; 5682 } 5683 5684 ack = 0; 5685 5686 /* 5687 * To mitigate possible racing condition (when OOSM message received 5688 * after some of the memory was already freed), need to try to find 5689 * first free segment in shared memory and send ACK if found. 5690 */ 5691 5692 nxt_thread_mutex_lock(&process->incoming.mutex); 5693 5694 for (i = 0; i < process->incoming.size; i++) { 5695 mmap_handler = process->incoming.elts[i].mmap_handler; 5696 5697 if (nxt_slow_path(mmap_handler == NULL)) { 5698 continue; 5699 } 5700 5701 m = mmap_handler->hdr->free_map; 5702 5703 for (mi = 0; mi < MAX_FREE_IDX; mi++) { 5704 if (m[mi] != 0) { 5705 ack = 1; 5706 5707 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", 5708 i, mi, m[mi]); 5709 5710 break; 5711 } 5712 } 5713 } 5714 5715 nxt_thread_mutex_unlock(&process->incoming.mutex); 5716 5717 if (ack) { 5718 nxt_process_broadcast_shm_ack(task, process); 5719 } 5720 } 5721 5722 5723 static void 5724 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5725 { 5726 nxt_fd_t fd; 5727 nxt_port_t *port; 5728 nxt_runtime_t *rt; 5729 nxt_port_mmaps_t *mmaps; 5730 nxt_port_msg_get_mmap_t *get_mmap_msg; 5731 nxt_port_mmap_handler_t *mmap_handler; 5732 5733 rt = task->thread->runtime; 5734 5735 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5736 msg->port_msg.reply_port); 5737 if (nxt_slow_path(port == NULL)) { 5738 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", 5739 msg->port_msg.pid, msg->port_msg.reply_port); 5740 5741 return; 5742 } 5743 5744 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5745 < (int) sizeof(nxt_port_msg_get_mmap_t))) 5746 { 5747 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", 5748 (int) nxt_buf_used_size(msg->buf)); 5749 5750 return; 5751 } 5752 5753 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; 5754 5755 nxt_assert(port->type == NXT_PROCESS_APP); 5756 5757 if (nxt_slow_path(port->app == NULL)) { 5758 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", 5759 port->pid, port->id); 5760 5761 // FIXME 5762 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5763 -1, msg->port_msg.stream, 0, NULL); 5764 5765 return; 5766 } 5767 5768 mmaps = &port->app->outgoing; 5769 nxt_thread_mutex_lock(&mmaps->mutex); 5770 5771 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { 5772 nxt_thread_mutex_unlock(&mmaps->mutex); 5773 5774 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", 5775 (int) get_mmap_msg->id); 5776 5777 // FIXME 5778 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5779 -1, msg->port_msg.stream, 0, NULL); 5780 return; 5781 } 5782 5783 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; 5784 5785 fd = mmap_handler->fd; 5786 5787 nxt_thread_mutex_unlock(&mmaps->mutex); 5788 5789 nxt_debug(task, "get mmap %PI:%d found", 5790 msg->port_msg.pid, (int) get_mmap_msg->id); 5791 5792 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 5793 } 5794 5795 5796 static void 5797 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5798 { 5799 nxt_port_t *port, *reply_port; 5800 nxt_runtime_t *rt; 5801 nxt_port_msg_get_port_t *get_port_msg; 5802 5803 rt = task->thread->runtime; 5804 5805 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5806 msg->port_msg.reply_port); 5807 if (nxt_slow_path(reply_port == NULL)) { 5808 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", 5809 msg->port_msg.pid, msg->port_msg.reply_port); 5810 5811 return; 5812 } 5813 5814 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5815 < (int) sizeof(nxt_port_msg_get_port_t))) 5816 { 5817 nxt_alert(task, "get_port_handler: message buffer too small (%d)", 5818 (int) nxt_buf_used_size(msg->buf)); 5819 5820 return; 5821 } 5822 5823 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; 5824 5825 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); 5826 if (nxt_slow_path(port == NULL)) { 5827 nxt_alert(task, "get_port_handler: port %PI:%d not found", 5828 get_port_msg->pid, get_port_msg->id); 5829 5830 return; 5831 } 5832 5833 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, 5834 get_port_msg->id); 5835 5836 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); 5837 } 5838