1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 #include <nxt_app_queue.h> 19 #include <nxt_port_queue.h> 20 21 #define NXT_SHARED_PORT_ID 0xFFFFu 22 23 typedef struct { 24 nxt_str_t type; 25 uint32_t processes; 26 uint32_t max_processes; 27 uint32_t spare_processes; 28 nxt_msec_t timeout; 29 nxt_msec_t idle_timeout; 30 uint32_t requests; 31 nxt_conf_value_t *limits_value; 32 nxt_conf_value_t *processes_value; 33 nxt_conf_value_t *targets_value; 34 } nxt_router_app_conf_t; 35 36 37 typedef struct { 38 nxt_str_t pass; 39 nxt_str_t application; 40 } nxt_router_listener_conf_t; 41 42 43 #if (NXT_TLS) 44 45 typedef struct { 46 nxt_str_t name; 47 nxt_socket_conf_t *socket_conf; 48 nxt_router_temp_conf_t *temp_conf; 49 nxt_tls_init_t *tls_init; 50 nxt_bool_t last; 51 52 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 53 } nxt_router_tlssock_t; 54 55 #endif 56 57 58 typedef struct { 59 nxt_str_t *name; 60 nxt_socket_conf_t *socket_conf; 61 nxt_router_temp_conf_t *temp_conf; 62 nxt_bool_t last; 63 } nxt_socket_rpc_t; 64 65 66 typedef struct { 67 nxt_app_t *app; 68 nxt_router_temp_conf_t *temp_conf; 69 } nxt_app_rpc_t; 70 71 72 typedef struct { 73 nxt_app_joint_t *app_joint; 74 uint32_t generation; 75 } nxt_app_joint_rpc_t; 76 77 78 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 79 nxt_mp_t *mp); 80 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 81 static void nxt_router_greet_controller(nxt_task_t *task, 82 nxt_port_t *controller_port); 83 84 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 85 86 static void nxt_router_new_port_handler(nxt_task_t *task, 87 nxt_port_recv_msg_t *msg); 88 static void nxt_router_conf_data_handler(nxt_task_t *task, 89 nxt_port_recv_msg_t *msg); 90 static void nxt_router_app_restart_handler(nxt_task_t *task, 91 nxt_port_recv_msg_t *msg); 92 static void nxt_router_remove_pid_handler(nxt_task_t *task, 93 nxt_port_recv_msg_t *msg); 94 static void nxt_router_access_log_reopen_handler(nxt_task_t *task, 95 nxt_port_recv_msg_t *msg); 96 97 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 98 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 99 static void nxt_router_conf_ready(nxt_task_t *task, 100 nxt_router_temp_conf_t *tmcf); 101 static void nxt_router_conf_error(nxt_task_t *task, 102 nxt_router_temp_conf_t *tmcf); 103 static void nxt_router_conf_send(nxt_task_t *task, 104 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 105 106 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 107 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 108 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 109 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 110 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task, 111 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf, 112 nxt_conf_value_t *conf); 113 114 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 115 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 116 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 117 nxt_app_t *app); 118 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 119 nxt_str_t *name); 120 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 121 int i); 122 123 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 124 nxt_port_t *port); 125 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 126 nxt_port_t *port); 127 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 128 nxt_port_t *port, nxt_fd_t fd); 129 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 130 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 131 static void nxt_router_listen_socket_ready(nxt_task_t *task, 132 nxt_port_recv_msg_t *msg, void *data); 133 static void nxt_router_listen_socket_error(nxt_task_t *task, 134 nxt_port_recv_msg_t *msg, void *data); 135 #if (NXT_TLS) 136 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 137 nxt_port_recv_msg_t *msg, void *data); 138 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 139 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init, 140 nxt_bool_t last); 141 #endif 142 static void nxt_router_app_rpc_create(nxt_task_t *task, 143 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 144 static void nxt_router_app_prefork_ready(nxt_task_t *task, 145 nxt_port_recv_msg_t *msg, void *data); 146 static void nxt_router_app_prefork_error(nxt_task_t *task, 147 nxt_port_recv_msg_t *msg, void *data); 148 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 149 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 150 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 151 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 152 153 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 154 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 155 const nxt_event_interface_t *interface); 156 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 157 nxt_router_engine_conf_t *recf); 158 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 159 nxt_router_engine_conf_t *recf); 160 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 161 nxt_router_engine_conf_t *recf); 162 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 163 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 164 nxt_work_handler_t handler); 165 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 166 nxt_router_engine_conf_t *recf); 167 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 168 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 169 170 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 171 nxt_router_temp_conf_t *tmcf); 172 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 173 nxt_event_engine_t *engine); 174 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 175 nxt_router_temp_conf_t *tmcf); 176 177 static void nxt_router_engines_post(nxt_router_t *router, 178 nxt_router_temp_conf_t *tmcf); 179 static void nxt_router_engine_post(nxt_event_engine_t *engine, 180 nxt_work_t *jobs); 181 182 static void nxt_router_thread_start(void *data); 183 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 184 void *data); 185 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 186 void *data); 187 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 188 void *data); 189 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 190 void *data); 191 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 192 void *data); 193 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 194 void *data); 195 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 196 void *data); 197 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 198 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 199 static void nxt_router_listen_socket_release(nxt_task_t *task, 200 nxt_socket_conf_t *skcf); 201 202 static void nxt_router_access_log_writer(nxt_task_t *task, 203 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 204 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 205 struct tm *tm, size_t size, const char *format); 206 static void nxt_router_access_log_open(nxt_task_t *task, 207 nxt_router_temp_conf_t *tmcf); 208 static void nxt_router_access_log_ready(nxt_task_t *task, 209 nxt_port_recv_msg_t *msg, void *data); 210 static void nxt_router_access_log_error(nxt_task_t *task, 211 nxt_port_recv_msg_t *msg, void *data); 212 static void nxt_router_access_log_release(nxt_task_t *task, 213 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 214 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 215 void *data); 216 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 217 nxt_port_recv_msg_t *msg, void *data); 218 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 219 nxt_port_recv_msg_t *msg, void *data); 220 221 static void nxt_router_app_port_ready(nxt_task_t *task, 222 nxt_port_recv_msg_t *msg, void *data); 223 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, 224 nxt_port_t *app_port); 225 static void nxt_router_app_port_error(nxt_task_t *task, 226 nxt_port_recv_msg_t *msg, void *data); 227 228 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 229 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 230 231 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 232 nxt_apr_action_t action); 233 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 234 nxt_request_rpc_data_t *req_rpc_data); 235 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 236 void *data); 237 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 238 void *data); 239 240 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, 241 void *data); 242 static void nxt_router_app_prepare_request(nxt_task_t *task, 243 nxt_request_rpc_data_t *req_rpc_data); 244 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 245 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 246 247 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 248 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 249 void *data); 250 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 251 void *data); 252 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 253 void *data); 254 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 255 256 static const nxt_http_request_state_t nxt_http_request_send_state; 257 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 258 259 static void nxt_router_app_joint_use(nxt_task_t *task, 260 nxt_app_joint_t *app_joint, int i); 261 262 static void nxt_router_http_request_release_post(nxt_task_t *task, 263 nxt_http_request_t *r); 264 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 265 void *data); 266 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 267 static void nxt_router_get_port_handler(nxt_task_t *task, 268 nxt_port_recv_msg_t *msg); 269 static void nxt_router_get_mmap_handler(nxt_task_t *task, 270 nxt_port_recv_msg_t *msg); 271 272 extern const nxt_http_request_state_t nxt_http_websocket; 273 274 static nxt_router_t *nxt_router; 275 276 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 277 static const nxt_str_t empty_prefix = nxt_string(""); 278 279 static const nxt_str_t *nxt_app_msg_prefix[] = { 280 &empty_prefix, 281 &empty_prefix, 282 &http_prefix, 283 &http_prefix, 284 &http_prefix, 285 &empty_prefix, 286 }; 287 288 289 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 290 .quit = nxt_signal_quit_handler, 291 .new_port = nxt_router_new_port_handler, 292 .get_port = nxt_router_get_port_handler, 293 .change_file = nxt_port_change_log_file_handler, 294 .mmap = nxt_port_mmap_handler, 295 .get_mmap = nxt_router_get_mmap_handler, 296 .data = nxt_router_conf_data_handler, 297 .app_restart = nxt_router_app_restart_handler, 298 .remove_pid = nxt_router_remove_pid_handler, 299 .access_log = nxt_router_access_log_reopen_handler, 300 .rpc_ready = nxt_port_rpc_handler, 301 .rpc_error = nxt_port_rpc_handler, 302 .oosm = nxt_router_oosm_handler, 303 }; 304 305 306 const nxt_process_init_t nxt_router_process = { 307 .name = "router", 308 .type = NXT_PROCESS_ROUTER, 309 .prefork = nxt_router_prefork, 310 .restart = 1, 311 .setup = nxt_process_core_setup, 312 .start = nxt_router_start, 313 .port_handlers = &nxt_router_process_port_handlers, 314 .signals = nxt_process_signals, 315 }; 316 317 318 /* Queues of nxt_socket_conf_t */ 319 nxt_queue_t creating_sockets; 320 nxt_queue_t pending_sockets; 321 nxt_queue_t updating_sockets; 322 nxt_queue_t keeping_sockets; 323 nxt_queue_t deleting_sockets; 324 325 326 static nxt_int_t 327 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 328 { 329 nxt_runtime_stop_app_processes(task, task->thread->runtime); 330 331 return NXT_OK; 332 } 333 334 335 static nxt_int_t 336 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 337 { 338 nxt_int_t ret; 339 nxt_port_t *controller_port; 340 nxt_router_t *router; 341 nxt_runtime_t *rt; 342 343 rt = task->thread->runtime; 344 345 nxt_log(task, NXT_LOG_INFO, "router started"); 346 347 #if (NXT_TLS) 348 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 349 if (nxt_slow_path(rt->tls == NULL)) { 350 return NXT_ERROR; 351 } 352 353 ret = rt->tls->library_init(task); 354 if (nxt_slow_path(ret != NXT_OK)) { 355 return ret; 356 } 357 #endif 358 359 ret = nxt_http_init(task); 360 if (nxt_slow_path(ret != NXT_OK)) { 361 return ret; 362 } 363 364 router = nxt_zalloc(sizeof(nxt_router_t)); 365 if (nxt_slow_path(router == NULL)) { 366 return NXT_ERROR; 367 } 368 369 nxt_queue_init(&router->engines); 370 nxt_queue_init(&router->sockets); 371 nxt_queue_init(&router->apps); 372 373 nxt_router = router; 374 375 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 376 if (controller_port != NULL) { 377 nxt_router_greet_controller(task, controller_port); 378 } 379 380 return NXT_OK; 381 } 382 383 384 static void 385 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 386 { 387 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 388 -1, 0, 0, NULL); 389 } 390 391 392 static void 393 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 394 void *data) 395 { 396 size_t size; 397 uint32_t stream; 398 nxt_mp_t *mp; 399 nxt_int_t ret; 400 nxt_app_t *app; 401 nxt_buf_t *b; 402 nxt_port_t *main_port; 403 nxt_runtime_t *rt; 404 nxt_app_joint_rpc_t *app_joint_rpc; 405 406 app = data; 407 408 rt = task->thread->runtime; 409 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 410 411 nxt_debug(task, "app '%V' %p start process", &app->name, app); 412 413 size = app->name.length + 1 + app->conf.length; 414 415 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 416 417 if (nxt_slow_path(b == NULL)) { 418 goto failed; 419 } 420 421 nxt_buf_cpystr(b, &app->name); 422 *b->mem.free++ = '\0'; 423 nxt_buf_cpystr(b, &app->conf); 424 425 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, 426 nxt_router_app_port_ready, 427 nxt_router_app_port_error, 428 sizeof(nxt_app_joint_rpc_t)); 429 if (nxt_slow_path(app_joint_rpc == NULL)) { 430 goto failed; 431 } 432 433 stream = nxt_port_rpc_ex_stream(app_joint_rpc); 434 435 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 436 -1, stream, port->id, b); 437 if (nxt_slow_path(ret != NXT_OK)) { 438 nxt_port_rpc_cancel(task, port, stream); 439 440 goto failed; 441 } 442 443 app_joint_rpc->app_joint = app->joint; 444 app_joint_rpc->generation = app->generation; 445 446 nxt_router_app_joint_use(task, app->joint, 1); 447 448 nxt_router_app_use(task, app, -1); 449 450 return; 451 452 failed: 453 454 if (b != NULL) { 455 mp = b->data; 456 nxt_mp_free(mp, b); 457 nxt_mp_release(mp); 458 } 459 460 nxt_thread_mutex_lock(&app->mutex); 461 462 app->pending_processes--; 463 464 nxt_thread_mutex_unlock(&app->mutex); 465 466 nxt_router_app_use(task, app, -1); 467 } 468 469 470 static void 471 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 472 { 473 app_joint->use_count += i; 474 475 if (app_joint->use_count == 0) { 476 nxt_assert(app_joint->app == NULL); 477 478 nxt_free(app_joint); 479 } 480 } 481 482 483 static nxt_int_t 484 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 485 { 486 nxt_int_t res; 487 nxt_port_t *router_port; 488 nxt_runtime_t *rt; 489 490 nxt_debug(task, "app '%V' start process", &app->name); 491 492 rt = task->thread->runtime; 493 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 494 495 nxt_router_app_use(task, app, 1); 496 497 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 498 app); 499 500 if (res == NXT_OK) { 501 return res; 502 } 503 504 nxt_thread_mutex_lock(&app->mutex); 505 506 app->pending_processes--; 507 508 nxt_thread_mutex_unlock(&app->mutex); 509 510 nxt_router_app_use(task, app, -1); 511 512 return NXT_ERROR; 513 } 514 515 516 nxt_inline nxt_bool_t 517 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 518 { 519 nxt_buf_t *b, *next; 520 nxt_bool_t cancelled; 521 nxt_port_t *app_port; 522 nxt_msg_info_t *msg_info; 523 524 msg_info = &req_rpc_data->msg_info; 525 526 if (msg_info->buf == NULL) { 527 return 0; 528 } 529 530 app_port = req_rpc_data->app_port; 531 532 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) { 533 cancelled = nxt_app_queue_cancel(app_port->queue, 534 msg_info->tracking_cookie, 535 req_rpc_data->stream); 536 537 if (cancelled) { 538 nxt_debug(task, "stream #%uD: cancelled by router", 539 req_rpc_data->stream); 540 } 541 542 } else { 543 cancelled = 0; 544 } 545 546 for (b = msg_info->buf; b != NULL; b = next) { 547 next = b->next; 548 b->next = NULL; 549 550 if (b->is_port_mmap_sent) { 551 b->is_port_mmap_sent = cancelled == 0; 552 } 553 554 b->completion_handler(task, b, b->parent); 555 } 556 557 msg_info->buf = NULL; 558 559 return cancelled; 560 } 561 562 563 nxt_inline nxt_bool_t 564 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 565 { 566 if (lnk->next != NULL) { 567 nxt_queue_remove(lnk); 568 569 lnk->next = NULL; 570 571 return 1; 572 } 573 574 return 0; 575 } 576 577 578 nxt_inline void 579 nxt_request_rpc_data_unlink(nxt_task_t *task, 580 nxt_request_rpc_data_t *req_rpc_data) 581 { 582 nxt_app_t *app; 583 nxt_bool_t unlinked; 584 nxt_http_request_t *r; 585 586 nxt_router_msg_cancel(task, req_rpc_data); 587 588 if (req_rpc_data->app_port != NULL) { 589 nxt_router_app_port_release(task, req_rpc_data->app_port, 590 req_rpc_data->apr_action); 591 592 req_rpc_data->app_port = NULL; 593 } 594 595 app = req_rpc_data->app; 596 r = req_rpc_data->request; 597 598 if (r != NULL) { 599 r->timer_data = NULL; 600 601 nxt_router_http_request_release_post(task, r); 602 603 r->req_rpc_data = NULL; 604 req_rpc_data->request = NULL; 605 606 if (app != NULL) { 607 unlinked = 0; 608 609 nxt_thread_mutex_lock(&app->mutex); 610 611 if (r->app_link.next != NULL) { 612 nxt_queue_remove(&r->app_link); 613 r->app_link.next = NULL; 614 615 unlinked = 1; 616 } 617 618 nxt_thread_mutex_unlock(&app->mutex); 619 620 if (unlinked) { 621 nxt_mp_release(r->mem_pool); 622 } 623 } 624 } 625 626 if (app != NULL) { 627 nxt_router_app_use(task, app, -1); 628 629 req_rpc_data->app = NULL; 630 } 631 632 if (req_rpc_data->msg_info.body_fd != -1) { 633 nxt_fd_close(req_rpc_data->msg_info.body_fd); 634 635 req_rpc_data->msg_info.body_fd = -1; 636 } 637 638 if (req_rpc_data->rpc_cancel) { 639 req_rpc_data->rpc_cancel = 0; 640 641 nxt_port_rpc_cancel(task, task->thread->engine->port, 642 req_rpc_data->stream); 643 } 644 } 645 646 647 static void 648 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 649 { 650 nxt_int_t res; 651 nxt_app_t *app; 652 nxt_port_t *port, *main_app_port; 653 nxt_runtime_t *rt; 654 655 nxt_port_new_port_handler(task, msg); 656 657 port = msg->u.new_port; 658 659 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 660 nxt_router_greet_controller(task, msg->u.new_port); 661 } 662 663 if (port == NULL || port->type != NXT_PROCESS_APP) { 664 665 if (msg->port_msg.stream == 0) { 666 return; 667 } 668 669 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 670 671 } else { 672 if (msg->fd[1] != -1) { 673 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 674 if (nxt_slow_path(res != NXT_OK)) { 675 return; 676 } 677 678 nxt_fd_close(msg->fd[1]); 679 msg->fd[1] = -1; 680 } 681 } 682 683 if (msg->port_msg.stream != 0) { 684 nxt_port_rpc_handler(task, msg); 685 return; 686 } 687 688 /* 689 * Port with "id == 0" is application 'main' port and it always 690 * should come with non-zero stream. 691 */ 692 nxt_assert(port->id != 0); 693 694 /* Find 'main' app port and get app reference. */ 695 rt = task->thread->runtime; 696 697 /* 698 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 699 * sent to main port (with id == 0) and processed in main thread. 700 */ 701 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 702 nxt_assert(main_app_port != NULL); 703 704 app = main_app_port->app; 705 706 if (nxt_fast_path(app != NULL)) { 707 nxt_thread_mutex_lock(&app->mutex); 708 709 /* TODO here should be find-and-add code because there can be 710 port waiters in port_hash */ 711 nxt_port_hash_add(&app->port_hash, port); 712 app->port_hash_count++; 713 714 nxt_thread_mutex_unlock(&app->mutex); 715 716 port->app = app; 717 } 718 719 port->main_app_port = main_app_port; 720 721 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 722 } 723 724 725 static void 726 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 727 { 728 void *p; 729 size_t size; 730 nxt_int_t ret; 731 nxt_port_t *port; 732 nxt_router_temp_conf_t *tmcf; 733 734 port = nxt_runtime_port_find(task->thread->runtime, 735 msg->port_msg.pid, 736 msg->port_msg.reply_port); 737 if (nxt_slow_path(port == NULL)) { 738 nxt_alert(task, "conf_data_handler: reply port not found"); 739 return; 740 } 741 742 p = MAP_FAILED; 743 744 /* 745 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 746 * initialized in 'cleanup' section. 747 */ 748 size = 0; 749 750 tmcf = nxt_router_temp_conf(task); 751 if (nxt_slow_path(tmcf == NULL)) { 752 goto fail; 753 } 754 755 if (nxt_slow_path(msg->fd[0] == -1)) { 756 nxt_alert(task, "conf_data_handler: invalid shm fd"); 757 goto fail; 758 } 759 760 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 761 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 762 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 763 goto fail; 764 } 765 766 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 767 768 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 769 770 nxt_fd_close(msg->fd[0]); 771 msg->fd[0] = -1; 772 773 if (nxt_slow_path(p == MAP_FAILED)) { 774 goto fail; 775 } 776 777 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 778 779 tmcf->router_conf->router = nxt_router; 780 tmcf->stream = msg->port_msg.stream; 781 tmcf->port = port; 782 783 nxt_port_use(task, tmcf->port, 1); 784 785 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 786 787 if (nxt_fast_path(ret == NXT_OK)) { 788 nxt_router_conf_apply(task, tmcf, NULL); 789 790 } else { 791 nxt_router_conf_error(task, tmcf); 792 } 793 794 goto cleanup; 795 796 fail: 797 798 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 799 msg->port_msg.stream, 0, NULL); 800 801 if (tmcf != NULL) { 802 nxt_mp_release(tmcf->mem_pool); 803 } 804 805 cleanup: 806 807 if (p != MAP_FAILED) { 808 nxt_mem_munmap(p, size); 809 } 810 811 if (msg->fd[0] != -1) { 812 nxt_fd_close(msg->fd[0]); 813 msg->fd[0] = -1; 814 } 815 } 816 817 818 static void 819 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 820 { 821 nxt_app_t *app; 822 nxt_int_t ret; 823 nxt_str_t app_name; 824 nxt_port_t *port, *reply_port, *shared_port, *old_shared_port; 825 nxt_port_msg_type_t reply; 826 827 reply_port = nxt_runtime_port_find(task->thread->runtime, 828 msg->port_msg.pid, 829 msg->port_msg.reply_port); 830 if (nxt_slow_path(reply_port == NULL)) { 831 nxt_alert(task, "app_restart_handler: reply port not found"); 832 return; 833 } 834 835 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem); 836 app_name.start = msg->buf->mem.pos; 837 838 nxt_debug(task, "app_restart_handler: %V", &app_name); 839 840 app = nxt_router_app_find(&nxt_router->apps, &app_name); 841 842 if (nxt_fast_path(app != NULL)) { 843 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 844 NXT_PROCESS_APP); 845 if (nxt_slow_path(shared_port == NULL)) { 846 goto fail; 847 } 848 849 ret = nxt_port_socket_init(task, shared_port, 0); 850 if (nxt_slow_path(ret != NXT_OK)) { 851 nxt_port_use(task, shared_port, -1); 852 goto fail; 853 } 854 855 ret = nxt_router_app_queue_init(task, shared_port); 856 if (nxt_slow_path(ret != NXT_OK)) { 857 nxt_port_write_close(shared_port); 858 nxt_port_read_close(shared_port); 859 nxt_port_use(task, shared_port, -1); 860 goto fail; 861 } 862 863 nxt_port_write_enable(task, shared_port); 864 865 nxt_thread_mutex_lock(&app->mutex); 866 867 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 868 869 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 870 0, 0, NULL); 871 872 } nxt_queue_loop; 873 874 app->generation++; 875 876 shared_port->app = app; 877 878 old_shared_port = app->shared_port; 879 old_shared_port->app = NULL; 880 881 app->shared_port = shared_port; 882 883 nxt_thread_mutex_unlock(&app->mutex); 884 885 nxt_port_close(task, old_shared_port); 886 nxt_port_use(task, old_shared_port, -1); 887 888 reply = NXT_PORT_MSG_RPC_READY_LAST; 889 890 } else { 891 892 fail: 893 894 reply = NXT_PORT_MSG_RPC_ERROR; 895 } 896 897 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream, 898 0, NULL); 899 } 900 901 902 static void 903 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 904 void *data) 905 { 906 union { 907 nxt_pid_t removed_pid; 908 void *data; 909 } u; 910 911 u.data = data; 912 913 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 914 } 915 916 917 static void 918 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 919 { 920 nxt_event_engine_t *engine; 921 922 nxt_port_remove_pid_handler(task, msg); 923 924 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 925 { 926 if (nxt_fast_path(engine->port != NULL)) { 927 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 928 msg->u.data); 929 } 930 } 931 nxt_queue_loop; 932 933 if (msg->port_msg.stream == 0) { 934 return; 935 } 936 937 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 938 939 nxt_port_rpc_handler(task, msg); 940 } 941 942 943 static nxt_router_temp_conf_t * 944 nxt_router_temp_conf(nxt_task_t *task) 945 { 946 nxt_mp_t *mp, *tmp; 947 nxt_router_conf_t *rtcf; 948 nxt_router_temp_conf_t *tmcf; 949 950 mp = nxt_mp_create(1024, 128, 256, 32); 951 if (nxt_slow_path(mp == NULL)) { 952 return NULL; 953 } 954 955 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 956 if (nxt_slow_path(rtcf == NULL)) { 957 goto fail; 958 } 959 960 rtcf->mem_pool = mp; 961 962 tmp = nxt_mp_create(1024, 128, 256, 32); 963 if (nxt_slow_path(tmp == NULL)) { 964 goto fail; 965 } 966 967 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 968 if (nxt_slow_path(tmcf == NULL)) { 969 goto temp_fail; 970 } 971 972 tmcf->mem_pool = tmp; 973 tmcf->router_conf = rtcf; 974 tmcf->count = 1; 975 tmcf->engine = task->thread->engine; 976 977 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 978 sizeof(nxt_router_engine_conf_t)); 979 if (nxt_slow_path(tmcf->engines == NULL)) { 980 goto temp_fail; 981 } 982 983 nxt_queue_init(&creating_sockets); 984 nxt_queue_init(&pending_sockets); 985 nxt_queue_init(&updating_sockets); 986 nxt_queue_init(&keeping_sockets); 987 nxt_queue_init(&deleting_sockets); 988 989 #if (NXT_TLS) 990 nxt_queue_init(&tmcf->tls); 991 #endif 992 993 nxt_queue_init(&tmcf->apps); 994 nxt_queue_init(&tmcf->previous); 995 996 return tmcf; 997 998 temp_fail: 999 1000 nxt_mp_destroy(tmp); 1001 1002 fail: 1003 1004 nxt_mp_destroy(mp); 1005 1006 return NULL; 1007 } 1008 1009 1010 nxt_inline nxt_bool_t 1011 nxt_router_app_can_start(nxt_app_t *app) 1012 { 1013 return app->processes + app->pending_processes < app->max_processes 1014 && app->pending_processes < app->max_pending_processes; 1015 } 1016 1017 1018 nxt_inline nxt_bool_t 1019 nxt_router_app_need_start(nxt_app_t *app) 1020 { 1021 return (app->active_requests 1022 > app->port_hash_count + app->pending_processes) 1023 || (app->spare_processes 1024 > app->idle_processes + app->pending_processes); 1025 } 1026 1027 1028 static void 1029 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1030 { 1031 nxt_int_t ret; 1032 nxt_app_t *app; 1033 nxt_router_t *router; 1034 nxt_runtime_t *rt; 1035 nxt_queue_link_t *qlk; 1036 nxt_socket_conf_t *skcf; 1037 nxt_router_conf_t *rtcf; 1038 nxt_router_temp_conf_t *tmcf; 1039 const nxt_event_interface_t *interface; 1040 #if (NXT_TLS) 1041 nxt_router_tlssock_t *tls; 1042 #endif 1043 1044 tmcf = obj; 1045 1046 qlk = nxt_queue_first(&pending_sockets); 1047 1048 if (qlk != nxt_queue_tail(&pending_sockets)) { 1049 nxt_queue_remove(qlk); 1050 nxt_queue_insert_tail(&creating_sockets, qlk); 1051 1052 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1053 1054 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1055 1056 return; 1057 } 1058 1059 #if (NXT_TLS) 1060 qlk = nxt_queue_last(&tmcf->tls); 1061 1062 if (qlk != nxt_queue_head(&tmcf->tls)) { 1063 nxt_queue_remove(qlk); 1064 1065 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1066 1067 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 1068 nxt_router_tls_rpc_handler, tls); 1069 return; 1070 } 1071 #endif 1072 1073 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1074 1075 if (nxt_router_app_need_start(app)) { 1076 nxt_router_app_rpc_create(task, tmcf, app); 1077 return; 1078 } 1079 1080 } nxt_queue_loop; 1081 1082 rtcf = tmcf->router_conf; 1083 1084 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1085 nxt_router_access_log_open(task, tmcf); 1086 return; 1087 } 1088 1089 rt = task->thread->runtime; 1090 1091 interface = nxt_service_get(rt->services, "engine", NULL); 1092 1093 router = rtcf->router; 1094 1095 ret = nxt_router_engines_create(task, router, tmcf, interface); 1096 if (nxt_slow_path(ret != NXT_OK)) { 1097 goto fail; 1098 } 1099 1100 ret = nxt_router_threads_create(task, rt, tmcf); 1101 if (nxt_slow_path(ret != NXT_OK)) { 1102 goto fail; 1103 } 1104 1105 nxt_router_apps_sort(task, router, tmcf); 1106 1107 nxt_router_apps_hash_use(task, rtcf, 1); 1108 1109 nxt_router_engines_post(router, tmcf); 1110 1111 nxt_queue_add(&router->sockets, &updating_sockets); 1112 nxt_queue_add(&router->sockets, &creating_sockets); 1113 1114 router->access_log = rtcf->access_log; 1115 1116 nxt_router_conf_ready(task, tmcf); 1117 1118 return; 1119 1120 fail: 1121 1122 nxt_router_conf_error(task, tmcf); 1123 1124 return; 1125 } 1126 1127 1128 static void 1129 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1130 { 1131 nxt_joint_job_t *job; 1132 1133 job = obj; 1134 1135 nxt_router_conf_ready(task, job->tmcf); 1136 } 1137 1138 1139 static void 1140 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1141 { 1142 uint32_t count; 1143 nxt_router_conf_t *rtcf; 1144 nxt_thread_spinlock_t *lock; 1145 1146 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1147 1148 if (--tmcf->count > 0) { 1149 return; 1150 } 1151 1152 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1153 1154 rtcf = tmcf->router_conf; 1155 1156 lock = &rtcf->router->lock; 1157 1158 nxt_thread_spin_lock(lock); 1159 1160 count = rtcf->count; 1161 1162 nxt_thread_spin_unlock(lock); 1163 1164 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1165 1166 if (count == 0) { 1167 nxt_router_apps_hash_use(task, rtcf, -1); 1168 1169 nxt_router_access_log_release(task, lock, rtcf->access_log); 1170 1171 nxt_mp_destroy(rtcf->mem_pool); 1172 } 1173 1174 nxt_mp_release(tmcf->mem_pool); 1175 } 1176 1177 1178 static void 1179 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1180 { 1181 nxt_app_t *app; 1182 nxt_queue_t new_socket_confs; 1183 nxt_socket_t s; 1184 nxt_router_t *router; 1185 nxt_queue_link_t *qlk; 1186 nxt_socket_conf_t *skcf; 1187 nxt_router_conf_t *rtcf; 1188 1189 nxt_alert(task, "failed to apply new conf"); 1190 1191 for (qlk = nxt_queue_first(&creating_sockets); 1192 qlk != nxt_queue_tail(&creating_sockets); 1193 qlk = nxt_queue_next(qlk)) 1194 { 1195 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1196 s = skcf->listen->socket; 1197 1198 if (s != -1) { 1199 nxt_socket_close(task, s); 1200 } 1201 1202 nxt_free(skcf->listen); 1203 } 1204 1205 nxt_queue_init(&new_socket_confs); 1206 nxt_queue_add(&new_socket_confs, &updating_sockets); 1207 nxt_queue_add(&new_socket_confs, &pending_sockets); 1208 nxt_queue_add(&new_socket_confs, &creating_sockets); 1209 1210 rtcf = tmcf->router_conf; 1211 1212 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1213 1214 nxt_router_app_unlink(task, app); 1215 1216 } nxt_queue_loop; 1217 1218 router = rtcf->router; 1219 1220 nxt_queue_add(&router->sockets, &keeping_sockets); 1221 nxt_queue_add(&router->sockets, &deleting_sockets); 1222 1223 nxt_queue_add(&router->apps, &tmcf->previous); 1224 1225 // TODO: new engines and threads 1226 1227 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1228 1229 nxt_mp_destroy(rtcf->mem_pool); 1230 1231 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1232 1233 nxt_mp_release(tmcf->mem_pool); 1234 } 1235 1236 1237 static void 1238 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1239 nxt_port_msg_type_t type) 1240 { 1241 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1242 1243 nxt_port_use(task, tmcf->port, -1); 1244 1245 tmcf->port = NULL; 1246 } 1247 1248 1249 static nxt_conf_map_t nxt_router_conf[] = { 1250 { 1251 nxt_string("listeners_threads"), 1252 NXT_CONF_MAP_INT32, 1253 offsetof(nxt_router_conf_t, threads), 1254 }, 1255 }; 1256 1257 1258 static nxt_conf_map_t nxt_router_app_conf[] = { 1259 { 1260 nxt_string("type"), 1261 NXT_CONF_MAP_STR, 1262 offsetof(nxt_router_app_conf_t, type), 1263 }, 1264 1265 { 1266 nxt_string("limits"), 1267 NXT_CONF_MAP_PTR, 1268 offsetof(nxt_router_app_conf_t, limits_value), 1269 }, 1270 1271 { 1272 nxt_string("processes"), 1273 NXT_CONF_MAP_INT32, 1274 offsetof(nxt_router_app_conf_t, processes), 1275 }, 1276 1277 { 1278 nxt_string("processes"), 1279 NXT_CONF_MAP_PTR, 1280 offsetof(nxt_router_app_conf_t, processes_value), 1281 }, 1282 1283 { 1284 nxt_string("targets"), 1285 NXT_CONF_MAP_PTR, 1286 offsetof(nxt_router_app_conf_t, targets_value), 1287 }, 1288 }; 1289 1290 1291 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1292 { 1293 nxt_string("timeout"), 1294 NXT_CONF_MAP_MSEC, 1295 offsetof(nxt_router_app_conf_t, timeout), 1296 }, 1297 1298 { 1299 nxt_string("requests"), 1300 NXT_CONF_MAP_INT32, 1301 offsetof(nxt_router_app_conf_t, requests), 1302 }, 1303 }; 1304 1305 1306 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1307 { 1308 nxt_string("spare"), 1309 NXT_CONF_MAP_INT32, 1310 offsetof(nxt_router_app_conf_t, spare_processes), 1311 }, 1312 1313 { 1314 nxt_string("max"), 1315 NXT_CONF_MAP_INT32, 1316 offsetof(nxt_router_app_conf_t, max_processes), 1317 }, 1318 1319 { 1320 nxt_string("idle_timeout"), 1321 NXT_CONF_MAP_MSEC, 1322 offsetof(nxt_router_app_conf_t, idle_timeout), 1323 }, 1324 }; 1325 1326 1327 static nxt_conf_map_t nxt_router_listener_conf[] = { 1328 { 1329 nxt_string("pass"), 1330 NXT_CONF_MAP_STR_COPY, 1331 offsetof(nxt_router_listener_conf_t, pass), 1332 }, 1333 1334 { 1335 nxt_string("application"), 1336 NXT_CONF_MAP_STR_COPY, 1337 offsetof(nxt_router_listener_conf_t, application), 1338 }, 1339 }; 1340 1341 1342 static nxt_conf_map_t nxt_router_http_conf[] = { 1343 { 1344 nxt_string("header_buffer_size"), 1345 NXT_CONF_MAP_SIZE, 1346 offsetof(nxt_socket_conf_t, header_buffer_size), 1347 }, 1348 1349 { 1350 nxt_string("large_header_buffer_size"), 1351 NXT_CONF_MAP_SIZE, 1352 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1353 }, 1354 1355 { 1356 nxt_string("large_header_buffers"), 1357 NXT_CONF_MAP_SIZE, 1358 offsetof(nxt_socket_conf_t, large_header_buffers), 1359 }, 1360 1361 { 1362 nxt_string("body_buffer_size"), 1363 NXT_CONF_MAP_SIZE, 1364 offsetof(nxt_socket_conf_t, body_buffer_size), 1365 }, 1366 1367 { 1368 nxt_string("max_body_size"), 1369 NXT_CONF_MAP_SIZE, 1370 offsetof(nxt_socket_conf_t, max_body_size), 1371 }, 1372 1373 { 1374 nxt_string("idle_timeout"), 1375 NXT_CONF_MAP_MSEC, 1376 offsetof(nxt_socket_conf_t, idle_timeout), 1377 }, 1378 1379 { 1380 nxt_string("header_read_timeout"), 1381 NXT_CONF_MAP_MSEC, 1382 offsetof(nxt_socket_conf_t, header_read_timeout), 1383 }, 1384 1385 { 1386 nxt_string("body_read_timeout"), 1387 NXT_CONF_MAP_MSEC, 1388 offsetof(nxt_socket_conf_t, body_read_timeout), 1389 }, 1390 1391 { 1392 nxt_string("send_timeout"), 1393 NXT_CONF_MAP_MSEC, 1394 offsetof(nxt_socket_conf_t, send_timeout), 1395 }, 1396 1397 { 1398 nxt_string("body_temp_path"), 1399 NXT_CONF_MAP_STR, 1400 offsetof(nxt_socket_conf_t, body_temp_path), 1401 }, 1402 1403 { 1404 nxt_string("discard_unsafe_fields"), 1405 NXT_CONF_MAP_INT8, 1406 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1407 }, 1408 }; 1409 1410 1411 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1412 { 1413 nxt_string("max_frame_size"), 1414 NXT_CONF_MAP_SIZE, 1415 offsetof(nxt_websocket_conf_t, max_frame_size), 1416 }, 1417 1418 { 1419 nxt_string("read_timeout"), 1420 NXT_CONF_MAP_MSEC, 1421 offsetof(nxt_websocket_conf_t, read_timeout), 1422 }, 1423 1424 { 1425 nxt_string("keepalive_interval"), 1426 NXT_CONF_MAP_MSEC, 1427 offsetof(nxt_websocket_conf_t, keepalive_interval), 1428 }, 1429 1430 }; 1431 1432 1433 static nxt_int_t 1434 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1435 u_char *start, u_char *end) 1436 { 1437 u_char *p; 1438 size_t size; 1439 nxt_mp_t *mp, *app_mp; 1440 uint32_t next, next_target; 1441 nxt_int_t ret; 1442 nxt_str_t name, path, target; 1443 nxt_app_t *app, *prev; 1444 nxt_str_t *t, *s, *targets; 1445 nxt_uint_t n, i; 1446 nxt_port_t *port; 1447 nxt_router_t *router; 1448 nxt_app_joint_t *app_joint; 1449 #if (NXT_TLS) 1450 nxt_tls_init_t *tls_init; 1451 nxt_conf_value_t *certificate; 1452 #endif 1453 nxt_conf_value_t *conf, *http, *value, *websocket; 1454 nxt_conf_value_t *applications, *application; 1455 nxt_conf_value_t *listeners, *listener; 1456 nxt_conf_value_t *routes_conf, *static_conf, *client_ip_conf; 1457 nxt_socket_conf_t *skcf; 1458 nxt_http_routes_t *routes; 1459 nxt_event_engine_t *engine; 1460 nxt_app_lang_module_t *lang; 1461 nxt_router_app_conf_t apcf; 1462 nxt_router_access_log_t *access_log; 1463 nxt_router_listener_conf_t lscf; 1464 1465 static nxt_str_t http_path = nxt_string("/settings/http"); 1466 static nxt_str_t applications_path = nxt_string("/applications"); 1467 static nxt_str_t listeners_path = nxt_string("/listeners"); 1468 static nxt_str_t routes_path = nxt_string("/routes"); 1469 static nxt_str_t access_log_path = nxt_string("/access_log"); 1470 #if (NXT_TLS) 1471 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1472 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands"); 1473 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size"); 1474 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout"); 1475 #endif 1476 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1477 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1478 static nxt_str_t client_ip_path = nxt_string("/client_ip"); 1479 1480 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1481 if (conf == NULL) { 1482 nxt_alert(task, "configuration parsing error"); 1483 return NXT_ERROR; 1484 } 1485 1486 mp = tmcf->router_conf->mem_pool; 1487 1488 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1489 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1490 if (ret != NXT_OK) { 1491 nxt_alert(task, "root map error"); 1492 return NXT_ERROR; 1493 } 1494 1495 if (tmcf->router_conf->threads == 0) { 1496 tmcf->router_conf->threads = nxt_ncpu; 1497 } 1498 1499 static_conf = nxt_conf_get_path(conf, &static_path); 1500 1501 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf); 1502 if (nxt_slow_path(ret != NXT_OK)) { 1503 return NXT_ERROR; 1504 } 1505 1506 router = tmcf->router_conf->router; 1507 1508 applications = nxt_conf_get_path(conf, &applications_path); 1509 1510 if (applications != NULL) { 1511 next = 0; 1512 1513 for ( ;; ) { 1514 application = nxt_conf_next_object_member(applications, 1515 &name, &next); 1516 if (application == NULL) { 1517 break; 1518 } 1519 1520 nxt_debug(task, "application \"%V\"", &name); 1521 1522 size = nxt_conf_json_length(application, NULL); 1523 1524 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1525 if (nxt_slow_path(app_mp == NULL)) { 1526 goto fail; 1527 } 1528 1529 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1530 if (app == NULL) { 1531 goto app_fail; 1532 } 1533 1534 nxt_memzero(app, sizeof(nxt_app_t)); 1535 1536 app->mem_pool = app_mp; 1537 1538 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1539 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1540 + name.length); 1541 1542 p = nxt_conf_json_print(app->conf.start, application, NULL); 1543 app->conf.length = p - app->conf.start; 1544 1545 nxt_assert(app->conf.length <= size); 1546 1547 nxt_debug(task, "application conf \"%V\"", &app->conf); 1548 1549 prev = nxt_router_app_find(&router->apps, &name); 1550 1551 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1552 nxt_mp_destroy(app_mp); 1553 1554 nxt_queue_remove(&prev->link); 1555 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1556 1557 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev); 1558 if (nxt_slow_path(ret != NXT_OK)) { 1559 goto fail; 1560 } 1561 1562 continue; 1563 } 1564 1565 apcf.processes = 1; 1566 apcf.max_processes = 1; 1567 apcf.spare_processes = 0; 1568 apcf.timeout = 0; 1569 apcf.idle_timeout = 15000; 1570 apcf.requests = 0; 1571 apcf.limits_value = NULL; 1572 apcf.processes_value = NULL; 1573 apcf.targets_value = NULL; 1574 1575 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1576 if (nxt_slow_path(app_joint == NULL)) { 1577 goto app_fail; 1578 } 1579 1580 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1581 1582 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1583 nxt_nitems(nxt_router_app_conf), &apcf); 1584 if (ret != NXT_OK) { 1585 nxt_alert(task, "application map error"); 1586 goto app_fail; 1587 } 1588 1589 if (apcf.limits_value != NULL) { 1590 1591 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1592 nxt_alert(task, "application limits is not object"); 1593 goto app_fail; 1594 } 1595 1596 ret = nxt_conf_map_object(mp, apcf.limits_value, 1597 nxt_router_app_limits_conf, 1598 nxt_nitems(nxt_router_app_limits_conf), 1599 &apcf); 1600 if (ret != NXT_OK) { 1601 nxt_alert(task, "application limits map error"); 1602 goto app_fail; 1603 } 1604 } 1605 1606 if (apcf.processes_value != NULL 1607 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1608 { 1609 ret = nxt_conf_map_object(mp, apcf.processes_value, 1610 nxt_router_app_processes_conf, 1611 nxt_nitems(nxt_router_app_processes_conf), 1612 &apcf); 1613 if (ret != NXT_OK) { 1614 nxt_alert(task, "application processes map error"); 1615 goto app_fail; 1616 } 1617 1618 } else { 1619 apcf.max_processes = apcf.processes; 1620 apcf.spare_processes = apcf.processes; 1621 } 1622 1623 if (apcf.targets_value != NULL) { 1624 n = nxt_conf_object_members_count(apcf.targets_value); 1625 1626 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1627 if (nxt_slow_path(targets == NULL)) { 1628 goto app_fail; 1629 } 1630 1631 next_target = 0; 1632 1633 for (i = 0; i < n; i++) { 1634 (void) nxt_conf_next_object_member(apcf.targets_value, 1635 &target, &next_target); 1636 1637 s = nxt_str_dup(app_mp, &targets[i], &target); 1638 if (nxt_slow_path(s == NULL)) { 1639 goto app_fail; 1640 } 1641 } 1642 1643 } else { 1644 targets = NULL; 1645 } 1646 1647 nxt_debug(task, "application type: %V", &apcf.type); 1648 nxt_debug(task, "application processes: %D", apcf.processes); 1649 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1650 nxt_debug(task, "application requests: %D", apcf.requests); 1651 1652 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1653 1654 if (lang == NULL) { 1655 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1656 goto app_fail; 1657 } 1658 1659 nxt_debug(task, "application language module: \"%s\"", lang->file); 1660 1661 ret = nxt_thread_mutex_create(&app->mutex); 1662 if (ret != NXT_OK) { 1663 goto app_fail; 1664 } 1665 1666 nxt_queue_init(&app->ports); 1667 nxt_queue_init(&app->spare_ports); 1668 nxt_queue_init(&app->idle_ports); 1669 nxt_queue_init(&app->ack_waiting_req); 1670 1671 app->name.length = name.length; 1672 nxt_memcpy(app->name.start, name.start, name.length); 1673 1674 app->type = lang->type; 1675 app->max_processes = apcf.max_processes; 1676 app->spare_processes = apcf.spare_processes; 1677 app->max_pending_processes = apcf.spare_processes 1678 ? apcf.spare_processes : 1; 1679 app->timeout = apcf.timeout; 1680 app->idle_timeout = apcf.idle_timeout; 1681 app->max_requests = apcf.requests; 1682 1683 app->targets = targets; 1684 1685 engine = task->thread->engine; 1686 1687 app->engine = engine; 1688 1689 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1690 app->adjust_idle_work.task = &engine->task; 1691 app->adjust_idle_work.obj = app; 1692 1693 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1694 1695 ret = nxt_router_apps_hash_add(tmcf->router_conf, app); 1696 if (nxt_slow_path(ret != NXT_OK)) { 1697 goto app_fail; 1698 } 1699 1700 nxt_router_app_use(task, app, 1); 1701 1702 app->joint = app_joint; 1703 1704 app_joint->use_count = 1; 1705 app_joint->app = app; 1706 1707 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1708 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1709 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1710 app_joint->idle_timer.task = &engine->task; 1711 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1712 1713 app_joint->free_app_work.handler = nxt_router_free_app; 1714 app_joint->free_app_work.task = &engine->task; 1715 app_joint->free_app_work.obj = app_joint; 1716 1717 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 1718 NXT_PROCESS_APP); 1719 if (nxt_slow_path(port == NULL)) { 1720 return NXT_ERROR; 1721 } 1722 1723 ret = nxt_port_socket_init(task, port, 0); 1724 if (nxt_slow_path(ret != NXT_OK)) { 1725 nxt_port_use(task, port, -1); 1726 return NXT_ERROR; 1727 } 1728 1729 ret = nxt_router_app_queue_init(task, port); 1730 if (nxt_slow_path(ret != NXT_OK)) { 1731 nxt_port_write_close(port); 1732 nxt_port_read_close(port); 1733 nxt_port_use(task, port, -1); 1734 return NXT_ERROR; 1735 } 1736 1737 nxt_port_write_enable(task, port); 1738 port->app = app; 1739 1740 app->shared_port = port; 1741 1742 nxt_thread_mutex_create(&app->outgoing.mutex); 1743 } 1744 } 1745 1746 routes_conf = nxt_conf_get_path(conf, &routes_path); 1747 if (nxt_fast_path(routes_conf != NULL)) { 1748 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1749 if (nxt_slow_path(routes == NULL)) { 1750 return NXT_ERROR; 1751 } 1752 tmcf->router_conf->routes = routes; 1753 } 1754 1755 ret = nxt_upstreams_create(task, tmcf, conf); 1756 if (nxt_slow_path(ret != NXT_OK)) { 1757 return ret; 1758 } 1759 1760 http = nxt_conf_get_path(conf, &http_path); 1761 #if 0 1762 if (http == NULL) { 1763 nxt_alert(task, "no \"http\" block"); 1764 return NXT_ERROR; 1765 } 1766 #endif 1767 1768 websocket = nxt_conf_get_path(conf, &websocket_path); 1769 1770 listeners = nxt_conf_get_path(conf, &listeners_path); 1771 1772 if (listeners != NULL) { 1773 next = 0; 1774 1775 for ( ;; ) { 1776 listener = nxt_conf_next_object_member(listeners, &name, &next); 1777 if (listener == NULL) { 1778 break; 1779 } 1780 1781 skcf = nxt_router_socket_conf(task, tmcf, &name); 1782 if (skcf == NULL) { 1783 goto fail; 1784 } 1785 1786 nxt_memzero(&lscf, sizeof(lscf)); 1787 1788 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1789 nxt_nitems(nxt_router_listener_conf), 1790 &lscf); 1791 if (ret != NXT_OK) { 1792 nxt_alert(task, "listener map error"); 1793 goto fail; 1794 } 1795 1796 nxt_debug(task, "application: %V", &lscf.application); 1797 1798 // STUB, default values if http block is not defined. 1799 skcf->header_buffer_size = 2048; 1800 skcf->large_header_buffer_size = 8192; 1801 skcf->large_header_buffers = 4; 1802 skcf->discard_unsafe_fields = 1; 1803 skcf->body_buffer_size = 16 * 1024; 1804 skcf->max_body_size = 8 * 1024 * 1024; 1805 skcf->proxy_header_buffer_size = 64 * 1024; 1806 skcf->proxy_buffer_size = 4096; 1807 skcf->proxy_buffers = 256; 1808 skcf->idle_timeout = 180 * 1000; 1809 skcf->header_read_timeout = 30 * 1000; 1810 skcf->body_read_timeout = 30 * 1000; 1811 skcf->send_timeout = 30 * 1000; 1812 skcf->proxy_timeout = 60 * 1000; 1813 skcf->proxy_send_timeout = 30 * 1000; 1814 skcf->proxy_read_timeout = 30 * 1000; 1815 1816 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1817 skcf->websocket_conf.read_timeout = 60 * 1000; 1818 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1819 1820 nxt_str_null(&skcf->body_temp_path); 1821 1822 if (http != NULL) { 1823 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1824 nxt_nitems(nxt_router_http_conf), 1825 skcf); 1826 if (ret != NXT_OK) { 1827 nxt_alert(task, "http map error"); 1828 goto fail; 1829 } 1830 } 1831 1832 if (websocket != NULL) { 1833 ret = nxt_conf_map_object(mp, websocket, 1834 nxt_router_websocket_conf, 1835 nxt_nitems(nxt_router_websocket_conf), 1836 &skcf->websocket_conf); 1837 if (ret != NXT_OK) { 1838 nxt_alert(task, "websocket map error"); 1839 goto fail; 1840 } 1841 } 1842 1843 t = &skcf->body_temp_path; 1844 1845 if (t->length == 0) { 1846 t->start = (u_char *) task->thread->runtime->tmp; 1847 t->length = nxt_strlen(t->start); 1848 } 1849 1850 client_ip_conf = nxt_conf_get_path(listener, &client_ip_path); 1851 ret = nxt_router_conf_process_client_ip(task, tmcf, skcf, 1852 client_ip_conf); 1853 if (nxt_slow_path(ret != NXT_OK)) { 1854 return NXT_ERROR; 1855 } 1856 1857 #if (NXT_TLS) 1858 certificate = nxt_conf_get_path(listener, &certificate_path); 1859 1860 if (certificate != NULL) { 1861 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t)); 1862 if (nxt_slow_path(tls_init == NULL)) { 1863 return NXT_ERROR; 1864 } 1865 1866 tls_init->cache_size = 0; 1867 tls_init->timeout = 300; 1868 1869 value = nxt_conf_get_path(listener, &conf_cache_path); 1870 if (value != NULL) { 1871 tls_init->cache_size = nxt_conf_get_number(value); 1872 } 1873 1874 value = nxt_conf_get_path(listener, &conf_timeout_path); 1875 if (value != NULL) { 1876 tls_init->timeout = nxt_conf_get_number(value); 1877 } 1878 1879 tls_init->conf_cmds = nxt_conf_get_path(listener, 1880 &conf_commands_path); 1881 1882 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) { 1883 n = nxt_conf_array_elements_count(certificate); 1884 1885 for (i = 0; i < n; i++) { 1886 value = nxt_conf_get_array_element(certificate, i); 1887 1888 nxt_assert(value != NULL); 1889 1890 ret = nxt_router_conf_tls_insert(tmcf, value, skcf, 1891 tls_init, i == 0); 1892 if (nxt_slow_path(ret != NXT_OK)) { 1893 goto fail; 1894 } 1895 } 1896 1897 } else { 1898 /* NXT_CONF_STRING */ 1899 ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf, 1900 tls_init, 1); 1901 if (nxt_slow_path(ret != NXT_OK)) { 1902 goto fail; 1903 } 1904 } 1905 } 1906 #endif 1907 1908 skcf->listen->handler = nxt_http_conn_init; 1909 skcf->router_conf = tmcf->router_conf; 1910 skcf->router_conf->count++; 1911 1912 if (lscf.pass.length != 0) { 1913 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1914 1915 /* COMPATIBILITY: listener application. */ 1916 } else if (lscf.application.length > 0) { 1917 skcf->action = nxt_http_pass_application(task, 1918 tmcf->router_conf, 1919 &lscf.application); 1920 } 1921 1922 if (nxt_slow_path(skcf->action == NULL)) { 1923 goto fail; 1924 } 1925 } 1926 } 1927 1928 ret = nxt_http_routes_resolve(task, tmcf); 1929 if (nxt_slow_path(ret != NXT_OK)) { 1930 goto fail; 1931 } 1932 1933 value = nxt_conf_get_path(conf, &access_log_path); 1934 1935 if (value != NULL) { 1936 nxt_conf_get_string(value, &path); 1937 1938 access_log = router->access_log; 1939 1940 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1941 nxt_thread_spin_lock(&router->lock); 1942 access_log->count++; 1943 nxt_thread_spin_unlock(&router->lock); 1944 1945 } else { 1946 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1947 + path.length); 1948 if (access_log == NULL) { 1949 nxt_alert(task, "failed to allocate access log structure"); 1950 goto fail; 1951 } 1952 1953 access_log->fd = -1; 1954 access_log->handler = &nxt_router_access_log_writer; 1955 access_log->count = 1; 1956 1957 access_log->path.length = path.length; 1958 access_log->path.start = (u_char *) access_log 1959 + sizeof(nxt_router_access_log_t); 1960 1961 nxt_memcpy(access_log->path.start, path.start, path.length); 1962 } 1963 1964 tmcf->router_conf->access_log = access_log; 1965 } 1966 1967 nxt_queue_add(&deleting_sockets, &router->sockets); 1968 nxt_queue_init(&router->sockets); 1969 1970 return NXT_OK; 1971 1972 app_fail: 1973 1974 nxt_mp_destroy(app_mp); 1975 1976 fail: 1977 1978 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1979 1980 nxt_queue_remove(&app->link); 1981 nxt_thread_mutex_destroy(&app->mutex); 1982 nxt_mp_destroy(app->mem_pool); 1983 1984 } nxt_queue_loop; 1985 1986 return NXT_ERROR; 1987 } 1988 1989 1990 #if (NXT_TLS) 1991 1992 static nxt_int_t 1993 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 1994 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, 1995 nxt_tls_init_t *tls_init, nxt_bool_t last) 1996 { 1997 nxt_router_tlssock_t *tls; 1998 1999 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t)); 2000 if (nxt_slow_path(tls == NULL)) { 2001 return NXT_ERROR; 2002 } 2003 2004 tls->tls_init = tls_init; 2005 tls->socket_conf = skcf; 2006 tls->temp_conf = tmcf; 2007 tls->last = last; 2008 nxt_conf_get_string(value, &tls->name); 2009 2010 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 2011 2012 return NXT_OK; 2013 } 2014 2015 #endif 2016 2017 2018 static nxt_int_t 2019 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 2020 nxt_conf_value_t *conf) 2021 { 2022 uint32_t next, i; 2023 nxt_mp_t *mp; 2024 nxt_str_t *type, exten, str; 2025 nxt_int_t ret; 2026 nxt_uint_t exts; 2027 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 2028 2029 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 2030 2031 mp = rtcf->mem_pool; 2032 2033 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 2034 if (nxt_slow_path(ret != NXT_OK)) { 2035 return NXT_ERROR; 2036 } 2037 2038 if (conf == NULL) { 2039 return NXT_OK; 2040 } 2041 2042 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 2043 2044 if (mtypes_conf != NULL) { 2045 next = 0; 2046 2047 for ( ;; ) { 2048 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 2049 2050 if (ext_conf == NULL) { 2051 break; 2052 } 2053 2054 type = nxt_str_dup(mp, NULL, &str); 2055 if (nxt_slow_path(type == NULL)) { 2056 return NXT_ERROR; 2057 } 2058 2059 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 2060 nxt_conf_get_string(ext_conf, &str); 2061 2062 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2063 return NXT_ERROR; 2064 } 2065 2066 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2067 &exten, type); 2068 if (nxt_slow_path(ret != NXT_OK)) { 2069 return NXT_ERROR; 2070 } 2071 2072 continue; 2073 } 2074 2075 exts = nxt_conf_array_elements_count(ext_conf); 2076 2077 for (i = 0; i < exts; i++) { 2078 value = nxt_conf_get_array_element(ext_conf, i); 2079 2080 nxt_conf_get_string(value, &str); 2081 2082 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2083 return NXT_ERROR; 2084 } 2085 2086 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2087 &exten, type); 2088 if (nxt_slow_path(ret != NXT_OK)) { 2089 return NXT_ERROR; 2090 } 2091 } 2092 } 2093 } 2094 2095 return NXT_OK; 2096 } 2097 2098 2099 static nxt_int_t 2100 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2101 nxt_socket_conf_t *skcf, nxt_conf_value_t *conf) 2102 { 2103 char c; 2104 size_t i; 2105 nxt_mp_t *mp; 2106 uint32_t hash; 2107 nxt_str_t header; 2108 nxt_conf_value_t *source_conf, *header_conf, *recursive_conf; 2109 nxt_http_client_ip_t *client_ip; 2110 nxt_http_route_addr_rule_t *source; 2111 2112 static nxt_str_t header_path = nxt_string("/header"); 2113 static nxt_str_t source_path = nxt_string("/source"); 2114 static nxt_str_t recursive_path = nxt_string("/recursive"); 2115 2116 if (conf == NULL) { 2117 skcf->client_ip = NULL; 2118 2119 return NXT_OK; 2120 } 2121 2122 mp = tmcf->router_conf->mem_pool; 2123 2124 source_conf = nxt_conf_get_path(conf, &source_path); 2125 header_conf = nxt_conf_get_path(conf, &header_path); 2126 recursive_conf = nxt_conf_get_path(conf, &recursive_path); 2127 2128 if (source_conf == NULL || header_conf == NULL) { 2129 return NXT_ERROR; 2130 } 2131 2132 client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t)); 2133 if (nxt_slow_path(client_ip == NULL)) { 2134 return NXT_ERROR; 2135 } 2136 2137 source = nxt_http_route_addr_rule_create(task, mp, source_conf); 2138 if (nxt_slow_path(source == NULL)) { 2139 return NXT_ERROR; 2140 } 2141 2142 client_ip->source = source; 2143 2144 nxt_conf_get_string(header_conf, &header); 2145 2146 if (recursive_conf != NULL) { 2147 client_ip->recursive = nxt_conf_get_boolean(recursive_conf); 2148 } 2149 2150 client_ip->header = nxt_str_dup(mp, NULL, &header); 2151 if (nxt_slow_path(client_ip->header == NULL)) { 2152 return NXT_ERROR; 2153 } 2154 2155 hash = NXT_HTTP_FIELD_HASH_INIT; 2156 2157 for (i = 0; i < client_ip->header->length; i++) { 2158 c = client_ip->header->start[i]; 2159 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c)); 2160 } 2161 2162 hash = nxt_http_field_hash_end(hash) & 0xFFFF; 2163 2164 client_ip->header_hash = hash; 2165 2166 skcf->client_ip = client_ip; 2167 2168 return NXT_OK; 2169 } 2170 2171 2172 static nxt_app_t * 2173 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 2174 { 2175 nxt_app_t *app; 2176 2177 nxt_queue_each(app, queue, nxt_app_t, link) { 2178 2179 if (nxt_strstr_eq(name, &app->name)) { 2180 return app; 2181 } 2182 2183 } nxt_queue_loop; 2184 2185 return NULL; 2186 } 2187 2188 2189 static nxt_int_t 2190 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 2191 { 2192 void *mem; 2193 nxt_int_t fd; 2194 2195 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 2196 if (nxt_slow_path(fd == -1)) { 2197 return NXT_ERROR; 2198 } 2199 2200 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 2201 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2202 if (nxt_slow_path(mem == MAP_FAILED)) { 2203 nxt_fd_close(fd); 2204 2205 return NXT_ERROR; 2206 } 2207 2208 nxt_app_queue_init(mem); 2209 2210 port->queue_fd = fd; 2211 port->queue = mem; 2212 2213 return NXT_OK; 2214 } 2215 2216 2217 static nxt_int_t 2218 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 2219 { 2220 void *mem; 2221 nxt_int_t fd; 2222 2223 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 2224 if (nxt_slow_path(fd == -1)) { 2225 return NXT_ERROR; 2226 } 2227 2228 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2229 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2230 if (nxt_slow_path(mem == MAP_FAILED)) { 2231 nxt_fd_close(fd); 2232 2233 return NXT_ERROR; 2234 } 2235 2236 nxt_port_queue_init(mem); 2237 2238 port->queue_fd = fd; 2239 port->queue = mem; 2240 2241 return NXT_OK; 2242 } 2243 2244 2245 static nxt_int_t 2246 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 2247 { 2248 void *mem; 2249 2250 nxt_assert(fd != -1); 2251 2252 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2253 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2254 if (nxt_slow_path(mem == MAP_FAILED)) { 2255 2256 return NXT_ERROR; 2257 } 2258 2259 port->queue = mem; 2260 2261 return NXT_OK; 2262 } 2263 2264 2265 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2266 NXT_LVLHSH_DEFAULT, 2267 nxt_router_apps_hash_test, 2268 nxt_mp_lvlhsh_alloc, 2269 nxt_mp_lvlhsh_free, 2270 }; 2271 2272 2273 static nxt_int_t 2274 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2275 { 2276 nxt_app_t *app; 2277 2278 app = data; 2279 2280 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2281 } 2282 2283 2284 static nxt_int_t 2285 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2286 { 2287 nxt_lvlhsh_query_t lhq; 2288 2289 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2290 lhq.replace = 0; 2291 lhq.key = app->name; 2292 lhq.value = app; 2293 lhq.proto = &nxt_router_apps_hash_proto; 2294 lhq.pool = rtcf->mem_pool; 2295 2296 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2297 2298 case NXT_OK: 2299 return NXT_OK; 2300 2301 case NXT_DECLINED: 2302 nxt_thread_log_alert("router app hash adding failed: " 2303 "\"%V\" is already in hash", &lhq.key); 2304 /* Fall through. */ 2305 default: 2306 return NXT_ERROR; 2307 } 2308 } 2309 2310 2311 static nxt_app_t * 2312 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2313 { 2314 nxt_lvlhsh_query_t lhq; 2315 2316 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2317 lhq.key = *name; 2318 lhq.proto = &nxt_router_apps_hash_proto; 2319 2320 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2321 return NULL; 2322 } 2323 2324 return lhq.value; 2325 } 2326 2327 2328 static void 2329 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2330 { 2331 nxt_app_t *app; 2332 nxt_lvlhsh_each_t lhe; 2333 2334 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2335 2336 for ( ;; ) { 2337 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2338 2339 if (app == NULL) { 2340 break; 2341 } 2342 2343 nxt_router_app_use(task, app, i); 2344 } 2345 } 2346 2347 2348 typedef struct { 2349 nxt_app_t *app; 2350 nxt_int_t target; 2351 } nxt_http_app_conf_t; 2352 2353 2354 nxt_int_t 2355 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name, 2356 nxt_str_t *target, nxt_http_action_t *action) 2357 { 2358 nxt_app_t *app; 2359 nxt_str_t *targets; 2360 nxt_uint_t i; 2361 nxt_http_app_conf_t *conf; 2362 2363 app = nxt_router_apps_hash_get(rtcf, name); 2364 if (app == NULL) { 2365 return NXT_DECLINED; 2366 } 2367 2368 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t)); 2369 if (nxt_slow_path(conf == NULL)) { 2370 return NXT_ERROR; 2371 } 2372 2373 action->handler = nxt_http_application_handler; 2374 action->u.conf = conf; 2375 2376 conf->app = app; 2377 2378 if (target != NULL && target->length != 0) { 2379 targets = app->targets; 2380 2381 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++); 2382 2383 conf->target = i; 2384 2385 } else { 2386 conf->target = 0; 2387 } 2388 2389 return NXT_OK; 2390 } 2391 2392 2393 static nxt_socket_conf_t * 2394 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2395 nxt_str_t *name) 2396 { 2397 size_t size; 2398 nxt_int_t ret; 2399 nxt_bool_t wildcard; 2400 nxt_sockaddr_t *sa; 2401 nxt_socket_conf_t *skcf; 2402 nxt_listen_socket_t *ls; 2403 2404 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2405 if (nxt_slow_path(sa == NULL)) { 2406 nxt_alert(task, "invalid listener \"%V\"", name); 2407 return NULL; 2408 } 2409 2410 sa->type = SOCK_STREAM; 2411 2412 nxt_debug(task, "router listener: \"%*s\"", 2413 (size_t) sa->length, nxt_sockaddr_start(sa)); 2414 2415 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2416 if (nxt_slow_path(skcf == NULL)) { 2417 return NULL; 2418 } 2419 2420 size = nxt_sockaddr_size(sa); 2421 2422 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2423 2424 if (ret != NXT_OK) { 2425 2426 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2427 if (nxt_slow_path(ls == NULL)) { 2428 return NULL; 2429 } 2430 2431 skcf->listen = ls; 2432 2433 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2434 nxt_memcpy(ls->sockaddr, sa, size); 2435 2436 nxt_listen_socket_remote_size(ls); 2437 2438 ls->socket = -1; 2439 ls->backlog = NXT_LISTEN_BACKLOG; 2440 ls->flags = NXT_NONBLOCK; 2441 ls->read_after_accept = 1; 2442 } 2443 2444 switch (sa->u.sockaddr.sa_family) { 2445 #if (NXT_HAVE_UNIX_DOMAIN) 2446 case AF_UNIX: 2447 wildcard = 0; 2448 break; 2449 #endif 2450 #if (NXT_INET6) 2451 case AF_INET6: 2452 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2453 break; 2454 #endif 2455 case AF_INET: 2456 default: 2457 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2458 break; 2459 } 2460 2461 if (!wildcard) { 2462 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2463 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2464 return NULL; 2465 } 2466 2467 nxt_memcpy(skcf->sockaddr, sa, size); 2468 } 2469 2470 return skcf; 2471 } 2472 2473 2474 static nxt_int_t 2475 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2476 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2477 { 2478 nxt_router_t *router; 2479 nxt_queue_link_t *qlk; 2480 nxt_socket_conf_t *skcf; 2481 2482 router = tmcf->router_conf->router; 2483 2484 for (qlk = nxt_queue_first(&router->sockets); 2485 qlk != nxt_queue_tail(&router->sockets); 2486 qlk = nxt_queue_next(qlk)) 2487 { 2488 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2489 2490 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2491 nskcf->listen = skcf->listen; 2492 2493 nxt_queue_remove(qlk); 2494 nxt_queue_insert_tail(&keeping_sockets, qlk); 2495 2496 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2497 2498 return NXT_OK; 2499 } 2500 } 2501 2502 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2503 2504 return NXT_DECLINED; 2505 } 2506 2507 2508 static void 2509 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2510 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2511 { 2512 size_t size; 2513 uint32_t stream; 2514 nxt_int_t ret; 2515 nxt_buf_t *b; 2516 nxt_port_t *main_port, *router_port; 2517 nxt_runtime_t *rt; 2518 nxt_socket_rpc_t *rpc; 2519 2520 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2521 if (rpc == NULL) { 2522 goto fail; 2523 } 2524 2525 rpc->socket_conf = skcf; 2526 rpc->temp_conf = tmcf; 2527 2528 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2529 2530 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2531 if (b == NULL) { 2532 goto fail; 2533 } 2534 2535 b->completion_handler = nxt_router_dummy_buf_completion; 2536 2537 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2538 2539 rt = task->thread->runtime; 2540 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2541 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2542 2543 stream = nxt_port_rpc_register_handler(task, router_port, 2544 nxt_router_listen_socket_ready, 2545 nxt_router_listen_socket_error, 2546 main_port->pid, rpc); 2547 if (nxt_slow_path(stream == 0)) { 2548 goto fail; 2549 } 2550 2551 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2552 stream, router_port->id, b); 2553 2554 if (nxt_slow_path(ret != NXT_OK)) { 2555 nxt_port_rpc_cancel(task, router_port, stream); 2556 goto fail; 2557 } 2558 2559 return; 2560 2561 fail: 2562 2563 nxt_router_conf_error(task, tmcf); 2564 } 2565 2566 2567 static void 2568 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2569 void *data) 2570 { 2571 nxt_int_t ret; 2572 nxt_socket_t s; 2573 nxt_socket_rpc_t *rpc; 2574 2575 rpc = data; 2576 2577 s = msg->fd[0]; 2578 2579 ret = nxt_socket_nonblocking(task, s); 2580 if (nxt_slow_path(ret != NXT_OK)) { 2581 goto fail; 2582 } 2583 2584 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2585 2586 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2587 if (nxt_slow_path(ret != NXT_OK)) { 2588 goto fail; 2589 } 2590 2591 rpc->socket_conf->listen->socket = s; 2592 2593 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2594 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2595 2596 return; 2597 2598 fail: 2599 2600 nxt_socket_close(task, s); 2601 2602 nxt_router_conf_error(task, rpc->temp_conf); 2603 } 2604 2605 2606 static void 2607 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2608 void *data) 2609 { 2610 nxt_socket_rpc_t *rpc; 2611 nxt_router_temp_conf_t *tmcf; 2612 2613 rpc = data; 2614 tmcf = rpc->temp_conf; 2615 2616 #if 0 2617 u_char *p; 2618 size_t size; 2619 uint8_t error; 2620 nxt_buf_t *in, *out; 2621 nxt_sockaddr_t *sa; 2622 2623 static nxt_str_t socket_errors[] = { 2624 nxt_string("ListenerSystem"), 2625 nxt_string("ListenerNoIPv6"), 2626 nxt_string("ListenerPort"), 2627 nxt_string("ListenerInUse"), 2628 nxt_string("ListenerNoAddress"), 2629 nxt_string("ListenerNoAccess"), 2630 nxt_string("ListenerPath"), 2631 }; 2632 2633 sa = rpc->socket_conf->listen->sockaddr; 2634 2635 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2636 2637 if (nxt_slow_path(in == NULL)) { 2638 return; 2639 } 2640 2641 p = in->mem.pos; 2642 2643 error = *p++; 2644 2645 size = nxt_length("listen socket error: ") 2646 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2647 + sa->length + socket_errors[error].length + (in->mem.free - p); 2648 2649 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2650 if (nxt_slow_path(out == NULL)) { 2651 return; 2652 } 2653 2654 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2655 "listen socket error: " 2656 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2657 (size_t) sa->length, nxt_sockaddr_start(sa), 2658 &socket_errors[error], in->mem.free - p, p); 2659 2660 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2661 #endif 2662 2663 nxt_router_conf_error(task, tmcf); 2664 } 2665 2666 2667 #if (NXT_TLS) 2668 2669 static void 2670 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2671 void *data) 2672 { 2673 nxt_mp_t *mp; 2674 nxt_int_t ret; 2675 nxt_tls_conf_t *tlscf; 2676 nxt_router_tlssock_t *tls; 2677 nxt_tls_bundle_conf_t *bundle; 2678 nxt_router_temp_conf_t *tmcf; 2679 2680 nxt_debug(task, "tls rpc handler"); 2681 2682 tls = data; 2683 tmcf = tls->temp_conf; 2684 2685 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2686 goto fail; 2687 } 2688 2689 mp = tmcf->router_conf->mem_pool; 2690 2691 if (tls->socket_conf->tls == NULL){ 2692 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2693 if (nxt_slow_path(tlscf == NULL)) { 2694 goto fail; 2695 } 2696 2697 tlscf->no_wait_shutdown = 1; 2698 tls->socket_conf->tls = tlscf; 2699 2700 } else { 2701 tlscf = tls->socket_conf->tls; 2702 } 2703 2704 tls->tls_init->conf = tlscf; 2705 2706 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); 2707 if (nxt_slow_path(bundle == NULL)) { 2708 goto fail; 2709 } 2710 2711 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) { 2712 goto fail; 2713 } 2714 2715 bundle->chain_file = msg->fd[0]; 2716 bundle->next = tlscf->bundle; 2717 tlscf->bundle = bundle; 2718 2719 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init, 2720 tls->last); 2721 if (nxt_slow_path(ret != NXT_OK)) { 2722 goto fail; 2723 } 2724 2725 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2726 nxt_router_conf_apply, task, tmcf, NULL); 2727 return; 2728 2729 fail: 2730 2731 nxt_router_conf_error(task, tmcf); 2732 } 2733 2734 #endif 2735 2736 2737 static void 2738 nxt_router_app_rpc_create(nxt_task_t *task, 2739 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2740 { 2741 size_t size; 2742 uint32_t stream; 2743 nxt_int_t ret; 2744 nxt_buf_t *b; 2745 nxt_port_t *main_port, *router_port; 2746 nxt_runtime_t *rt; 2747 nxt_app_rpc_t *rpc; 2748 2749 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2750 if (rpc == NULL) { 2751 goto fail; 2752 } 2753 2754 rpc->app = app; 2755 rpc->temp_conf = tmcf; 2756 2757 nxt_debug(task, "app '%V' prefork", &app->name); 2758 2759 size = app->name.length + 1 + app->conf.length; 2760 2761 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2762 if (nxt_slow_path(b == NULL)) { 2763 goto fail; 2764 } 2765 2766 b->completion_handler = nxt_router_dummy_buf_completion; 2767 2768 nxt_buf_cpystr(b, &app->name); 2769 *b->mem.free++ = '\0'; 2770 nxt_buf_cpystr(b, &app->conf); 2771 2772 rt = task->thread->runtime; 2773 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2774 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2775 2776 stream = nxt_port_rpc_register_handler(task, router_port, 2777 nxt_router_app_prefork_ready, 2778 nxt_router_app_prefork_error, 2779 -1, rpc); 2780 if (nxt_slow_path(stream == 0)) { 2781 goto fail; 2782 } 2783 2784 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 2785 -1, stream, router_port->id, b); 2786 2787 if (nxt_slow_path(ret != NXT_OK)) { 2788 nxt_port_rpc_cancel(task, router_port, stream); 2789 goto fail; 2790 } 2791 2792 app->pending_processes++; 2793 2794 return; 2795 2796 fail: 2797 2798 nxt_router_conf_error(task, tmcf); 2799 } 2800 2801 2802 static void 2803 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2804 void *data) 2805 { 2806 nxt_app_t *app; 2807 nxt_port_t *port; 2808 nxt_app_rpc_t *rpc; 2809 nxt_event_engine_t *engine; 2810 2811 rpc = data; 2812 app = rpc->app; 2813 2814 port = msg->u.new_port; 2815 2816 nxt_assert(port != NULL); 2817 nxt_assert(port->type == NXT_PROCESS_APP); 2818 nxt_assert(port->id == 0); 2819 2820 port->app = app; 2821 port->main_app_port = port; 2822 2823 app->pending_processes--; 2824 app->processes++; 2825 app->idle_processes++; 2826 2827 engine = task->thread->engine; 2828 2829 nxt_queue_insert_tail(&app->ports, &port->app_link); 2830 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2831 2832 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2833 &app->name, port->pid, port->id); 2834 2835 nxt_port_hash_add(&app->port_hash, port); 2836 app->port_hash_count++; 2837 2838 port->idle_start = 0; 2839 2840 nxt_port_inc_use(port); 2841 2842 nxt_router_app_shared_port_send(task, port); 2843 2844 nxt_work_queue_add(&engine->fast_work_queue, 2845 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2846 } 2847 2848 2849 static void 2850 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2851 void *data) 2852 { 2853 nxt_app_t *app; 2854 nxt_app_rpc_t *rpc; 2855 nxt_router_temp_conf_t *tmcf; 2856 2857 rpc = data; 2858 app = rpc->app; 2859 tmcf = rpc->temp_conf; 2860 2861 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2862 &app->name); 2863 2864 app->pending_processes--; 2865 2866 nxt_router_conf_error(task, tmcf); 2867 } 2868 2869 2870 static nxt_int_t 2871 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2872 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2873 { 2874 nxt_int_t ret; 2875 nxt_uint_t n, threads; 2876 nxt_queue_link_t *qlk; 2877 nxt_router_engine_conf_t *recf; 2878 2879 threads = tmcf->router_conf->threads; 2880 2881 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2882 sizeof(nxt_router_engine_conf_t)); 2883 if (nxt_slow_path(tmcf->engines == NULL)) { 2884 return NXT_ERROR; 2885 } 2886 2887 n = 0; 2888 2889 for (qlk = nxt_queue_first(&router->engines); 2890 qlk != nxt_queue_tail(&router->engines); 2891 qlk = nxt_queue_next(qlk)) 2892 { 2893 recf = nxt_array_zero_add(tmcf->engines); 2894 if (nxt_slow_path(recf == NULL)) { 2895 return NXT_ERROR; 2896 } 2897 2898 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2899 2900 if (n < threads) { 2901 recf->action = NXT_ROUTER_ENGINE_KEEP; 2902 ret = nxt_router_engine_conf_update(tmcf, recf); 2903 2904 } else { 2905 recf->action = NXT_ROUTER_ENGINE_DELETE; 2906 ret = nxt_router_engine_conf_delete(tmcf, recf); 2907 } 2908 2909 if (nxt_slow_path(ret != NXT_OK)) { 2910 return ret; 2911 } 2912 2913 n++; 2914 } 2915 2916 tmcf->new_threads = n; 2917 2918 while (n < threads) { 2919 recf = nxt_array_zero_add(tmcf->engines); 2920 if (nxt_slow_path(recf == NULL)) { 2921 return NXT_ERROR; 2922 } 2923 2924 recf->action = NXT_ROUTER_ENGINE_ADD; 2925 2926 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2927 if (nxt_slow_path(recf->engine == NULL)) { 2928 return NXT_ERROR; 2929 } 2930 2931 ret = nxt_router_engine_conf_create(tmcf, recf); 2932 if (nxt_slow_path(ret != NXT_OK)) { 2933 return ret; 2934 } 2935 2936 n++; 2937 } 2938 2939 return NXT_OK; 2940 } 2941 2942 2943 static nxt_int_t 2944 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2945 nxt_router_engine_conf_t *recf) 2946 { 2947 nxt_int_t ret; 2948 2949 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2950 nxt_router_listen_socket_create); 2951 if (nxt_slow_path(ret != NXT_OK)) { 2952 return ret; 2953 } 2954 2955 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2956 nxt_router_listen_socket_create); 2957 if (nxt_slow_path(ret != NXT_OK)) { 2958 return ret; 2959 } 2960 2961 return ret; 2962 } 2963 2964 2965 static nxt_int_t 2966 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2967 nxt_router_engine_conf_t *recf) 2968 { 2969 nxt_int_t ret; 2970 2971 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2972 nxt_router_listen_socket_create); 2973 if (nxt_slow_path(ret != NXT_OK)) { 2974 return ret; 2975 } 2976 2977 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2978 nxt_router_listen_socket_update); 2979 if (nxt_slow_path(ret != NXT_OK)) { 2980 return ret; 2981 } 2982 2983 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 2984 if (nxt_slow_path(ret != NXT_OK)) { 2985 return ret; 2986 } 2987 2988 return ret; 2989 } 2990 2991 2992 static nxt_int_t 2993 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2994 nxt_router_engine_conf_t *recf) 2995 { 2996 nxt_int_t ret; 2997 2998 ret = nxt_router_engine_quit(tmcf, recf); 2999 if (nxt_slow_path(ret != NXT_OK)) { 3000 return ret; 3001 } 3002 3003 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 3004 if (nxt_slow_path(ret != NXT_OK)) { 3005 return ret; 3006 } 3007 3008 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3009 } 3010 3011 3012 static nxt_int_t 3013 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 3014 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 3015 nxt_work_handler_t handler) 3016 { 3017 nxt_int_t ret; 3018 nxt_joint_job_t *job; 3019 nxt_queue_link_t *qlk; 3020 nxt_socket_conf_t *skcf; 3021 nxt_socket_conf_joint_t *joint; 3022 3023 for (qlk = nxt_queue_first(sockets); 3024 qlk != nxt_queue_tail(sockets); 3025 qlk = nxt_queue_next(qlk)) 3026 { 3027 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3028 if (nxt_slow_path(job == NULL)) { 3029 return NXT_ERROR; 3030 } 3031 3032 job->work.next = recf->jobs; 3033 recf->jobs = &job->work; 3034 3035 job->task = tmcf->engine->task; 3036 job->work.handler = handler; 3037 job->work.task = &job->task; 3038 job->work.obj = job; 3039 job->tmcf = tmcf; 3040 3041 tmcf->count++; 3042 3043 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 3044 sizeof(nxt_socket_conf_joint_t)); 3045 if (nxt_slow_path(joint == NULL)) { 3046 return NXT_ERROR; 3047 } 3048 3049 job->work.data = joint; 3050 3051 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 3052 if (nxt_slow_path(ret != NXT_OK)) { 3053 return ret; 3054 } 3055 3056 joint->count = 1; 3057 3058 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3059 skcf->count++; 3060 joint->socket_conf = skcf; 3061 3062 joint->engine = recf->engine; 3063 } 3064 3065 return NXT_OK; 3066 } 3067 3068 3069 static nxt_int_t 3070 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 3071 nxt_router_engine_conf_t *recf) 3072 { 3073 nxt_joint_job_t *job; 3074 3075 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3076 if (nxt_slow_path(job == NULL)) { 3077 return NXT_ERROR; 3078 } 3079 3080 job->work.next = recf->jobs; 3081 recf->jobs = &job->work; 3082 3083 job->task = tmcf->engine->task; 3084 job->work.handler = nxt_router_worker_thread_quit; 3085 job->work.task = &job->task; 3086 job->work.obj = NULL; 3087 job->work.data = NULL; 3088 job->tmcf = NULL; 3089 3090 return NXT_OK; 3091 } 3092 3093 3094 static nxt_int_t 3095 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 3096 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 3097 { 3098 nxt_joint_job_t *job; 3099 nxt_queue_link_t *qlk; 3100 3101 for (qlk = nxt_queue_first(sockets); 3102 qlk != nxt_queue_tail(sockets); 3103 qlk = nxt_queue_next(qlk)) 3104 { 3105 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3106 if (nxt_slow_path(job == NULL)) { 3107 return NXT_ERROR; 3108 } 3109 3110 job->work.next = recf->jobs; 3111 recf->jobs = &job->work; 3112 3113 job->task = tmcf->engine->task; 3114 job->work.handler = nxt_router_listen_socket_delete; 3115 job->work.task = &job->task; 3116 job->work.obj = job; 3117 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3118 job->tmcf = tmcf; 3119 3120 tmcf->count++; 3121 } 3122 3123 return NXT_OK; 3124 } 3125 3126 3127 static nxt_int_t 3128 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 3129 nxt_router_temp_conf_t *tmcf) 3130 { 3131 nxt_int_t ret; 3132 nxt_uint_t i, threads; 3133 nxt_router_engine_conf_t *recf; 3134 3135 recf = tmcf->engines->elts; 3136 threads = tmcf->router_conf->threads; 3137 3138 for (i = tmcf->new_threads; i < threads; i++) { 3139 ret = nxt_router_thread_create(task, rt, recf[i].engine); 3140 if (nxt_slow_path(ret != NXT_OK)) { 3141 return ret; 3142 } 3143 } 3144 3145 return NXT_OK; 3146 } 3147 3148 3149 static nxt_int_t 3150 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 3151 nxt_event_engine_t *engine) 3152 { 3153 nxt_int_t ret; 3154 nxt_thread_link_t *link; 3155 nxt_thread_handle_t handle; 3156 3157 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 3158 3159 if (nxt_slow_path(link == NULL)) { 3160 return NXT_ERROR; 3161 } 3162 3163 link->start = nxt_router_thread_start; 3164 link->engine = engine; 3165 link->work.handler = nxt_router_thread_exit_handler; 3166 link->work.task = task; 3167 link->work.data = link; 3168 3169 nxt_queue_insert_tail(&rt->engines, &engine->link); 3170 3171 ret = nxt_thread_create(&handle, link); 3172 3173 if (nxt_slow_path(ret != NXT_OK)) { 3174 nxt_queue_remove(&engine->link); 3175 } 3176 3177 return ret; 3178 } 3179 3180 3181 static void 3182 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 3183 nxt_router_temp_conf_t *tmcf) 3184 { 3185 nxt_app_t *app; 3186 3187 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 3188 3189 nxt_router_app_unlink(task, app); 3190 3191 } nxt_queue_loop; 3192 3193 nxt_queue_add(&router->apps, &tmcf->previous); 3194 nxt_queue_add(&router->apps, &tmcf->apps); 3195 } 3196 3197 3198 static void 3199 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 3200 { 3201 nxt_uint_t n; 3202 nxt_event_engine_t *engine; 3203 nxt_router_engine_conf_t *recf; 3204 3205 recf = tmcf->engines->elts; 3206 3207 for (n = tmcf->engines->nelts; n != 0; n--) { 3208 engine = recf->engine; 3209 3210 switch (recf->action) { 3211 3212 case NXT_ROUTER_ENGINE_KEEP: 3213 break; 3214 3215 case NXT_ROUTER_ENGINE_ADD: 3216 nxt_queue_insert_tail(&router->engines, &engine->link0); 3217 break; 3218 3219 case NXT_ROUTER_ENGINE_DELETE: 3220 nxt_queue_remove(&engine->link0); 3221 break; 3222 } 3223 3224 nxt_router_engine_post(engine, recf->jobs); 3225 3226 recf++; 3227 } 3228 } 3229 3230 3231 static void 3232 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 3233 { 3234 nxt_work_t *work, *next; 3235 3236 for (work = jobs; work != NULL; work = next) { 3237 next = work->next; 3238 work->next = NULL; 3239 3240 nxt_event_engine_post(engine, work); 3241 } 3242 } 3243 3244 3245 static nxt_port_handlers_t nxt_router_app_port_handlers = { 3246 .rpc_error = nxt_port_rpc_handler, 3247 .mmap = nxt_port_mmap_handler, 3248 .data = nxt_port_rpc_handler, 3249 .oosm = nxt_router_oosm_handler, 3250 .req_headers_ack = nxt_port_rpc_handler, 3251 }; 3252 3253 3254 static void 3255 nxt_router_thread_start(void *data) 3256 { 3257 nxt_int_t ret; 3258 nxt_port_t *port; 3259 nxt_task_t *task; 3260 nxt_work_t *work; 3261 nxt_thread_t *thread; 3262 nxt_thread_link_t *link; 3263 nxt_event_engine_t *engine; 3264 3265 link = data; 3266 engine = link->engine; 3267 task = &engine->task; 3268 3269 thread = nxt_thread(); 3270 3271 nxt_event_engine_thread_adopt(engine); 3272 3273 /* STUB */ 3274 thread->runtime = engine->task.thread->runtime; 3275 3276 engine->task.thread = thread; 3277 engine->task.log = thread->log; 3278 thread->engine = engine; 3279 thread->task = &engine->task; 3280 #if 0 3281 thread->fiber = &engine->fibers->fiber; 3282 #endif 3283 3284 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 3285 if (nxt_slow_path(engine->mem_pool == NULL)) { 3286 return; 3287 } 3288 3289 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 3290 NXT_PROCESS_ROUTER); 3291 if (nxt_slow_path(port == NULL)) { 3292 return; 3293 } 3294 3295 ret = nxt_port_socket_init(task, port, 0); 3296 if (nxt_slow_path(ret != NXT_OK)) { 3297 nxt_port_use(task, port, -1); 3298 return; 3299 } 3300 3301 ret = nxt_router_port_queue_init(task, port); 3302 if (nxt_slow_path(ret != NXT_OK)) { 3303 nxt_port_use(task, port, -1); 3304 return; 3305 } 3306 3307 engine->port = port; 3308 3309 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 3310 3311 work = nxt_zalloc(sizeof(nxt_work_t)); 3312 if (nxt_slow_path(work == NULL)) { 3313 return; 3314 } 3315 3316 work->handler = nxt_router_rt_add_port; 3317 work->task = link->work.task; 3318 work->obj = work; 3319 work->data = port; 3320 3321 nxt_event_engine_post(link->work.task->thread->engine, work); 3322 3323 nxt_event_engine_start(engine); 3324 } 3325 3326 3327 static void 3328 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) 3329 { 3330 nxt_int_t res; 3331 nxt_port_t *port; 3332 nxt_runtime_t *rt; 3333 3334 rt = task->thread->runtime; 3335 port = data; 3336 3337 nxt_free(obj); 3338 3339 res = nxt_port_hash_add(&rt->ports, port); 3340 3341 if (nxt_fast_path(res == NXT_OK)) { 3342 nxt_port_use(task, port, 1); 3343 } 3344 } 3345 3346 3347 static void 3348 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 3349 { 3350 nxt_joint_job_t *job; 3351 nxt_socket_conf_t *skcf; 3352 nxt_listen_event_t *lev; 3353 nxt_listen_socket_t *ls; 3354 nxt_thread_spinlock_t *lock; 3355 nxt_socket_conf_joint_t *joint; 3356 3357 job = obj; 3358 joint = data; 3359 3360 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 3361 3362 skcf = joint->socket_conf; 3363 ls = skcf->listen; 3364 3365 lev = nxt_listen_event(task, ls); 3366 if (nxt_slow_path(lev == NULL)) { 3367 nxt_router_listen_socket_release(task, skcf); 3368 return; 3369 } 3370 3371 lev->socket.data = joint; 3372 3373 lock = &skcf->router_conf->router->lock; 3374 3375 nxt_thread_spin_lock(lock); 3376 ls->count++; 3377 nxt_thread_spin_unlock(lock); 3378 3379 job->work.next = NULL; 3380 job->work.handler = nxt_router_conf_wait; 3381 3382 nxt_event_engine_post(job->tmcf->engine, &job->work); 3383 } 3384 3385 3386 nxt_inline nxt_listen_event_t * 3387 nxt_router_listen_event(nxt_queue_t *listen_connections, 3388 nxt_socket_conf_t *skcf) 3389 { 3390 nxt_socket_t fd; 3391 nxt_queue_link_t *qlk; 3392 nxt_listen_event_t *lev; 3393 3394 fd = skcf->listen->socket; 3395 3396 for (qlk = nxt_queue_first(listen_connections); 3397 qlk != nxt_queue_tail(listen_connections); 3398 qlk = nxt_queue_next(qlk)) 3399 { 3400 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 3401 3402 if (fd == lev->socket.fd) { 3403 return lev; 3404 } 3405 } 3406 3407 return NULL; 3408 } 3409 3410 3411 static void 3412 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 3413 { 3414 nxt_joint_job_t *job; 3415 nxt_event_engine_t *engine; 3416 nxt_listen_event_t *lev; 3417 nxt_socket_conf_joint_t *joint, *old; 3418 3419 job = obj; 3420 joint = data; 3421 3422 engine = task->thread->engine; 3423 3424 nxt_queue_insert_tail(&engine->joints, &joint->link); 3425 3426 lev = nxt_router_listen_event(&engine->listen_connections, 3427 joint->socket_conf); 3428 3429 old = lev->socket.data; 3430 lev->socket.data = joint; 3431 lev->listen = joint->socket_conf->listen; 3432 3433 job->work.next = NULL; 3434 job->work.handler = nxt_router_conf_wait; 3435 3436 nxt_event_engine_post(job->tmcf->engine, &job->work); 3437 3438 /* 3439 * The task is allocated from configuration temporary 3440 * memory pool so it can be freed after engine post operation. 3441 */ 3442 3443 nxt_router_conf_release(&engine->task, old); 3444 } 3445 3446 3447 static void 3448 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 3449 { 3450 nxt_socket_conf_t *skcf; 3451 nxt_listen_event_t *lev; 3452 nxt_event_engine_t *engine; 3453 nxt_socket_conf_joint_t *joint; 3454 3455 skcf = data; 3456 3457 engine = task->thread->engine; 3458 3459 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 3460 3461 nxt_fd_event_delete(engine, &lev->socket); 3462 3463 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 3464 lev->socket.fd); 3465 3466 joint = lev->socket.data; 3467 joint->close_job = obj; 3468 3469 lev->timer.handler = nxt_router_listen_socket_close; 3470 lev->timer.work_queue = &engine->fast_work_queue; 3471 3472 nxt_timer_add(engine, &lev->timer, 0); 3473 } 3474 3475 3476 static void 3477 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 3478 { 3479 nxt_event_engine_t *engine; 3480 3481 nxt_debug(task, "router worker thread quit"); 3482 3483 engine = task->thread->engine; 3484 3485 engine->shutdown = 1; 3486 3487 if (nxt_queue_is_empty(&engine->joints)) { 3488 nxt_thread_exit(task->thread); 3489 } 3490 } 3491 3492 3493 static void 3494 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3495 { 3496 nxt_timer_t *timer; 3497 nxt_joint_job_t *job; 3498 nxt_listen_event_t *lev; 3499 nxt_socket_conf_joint_t *joint; 3500 3501 timer = obj; 3502 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3503 3504 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3505 lev->socket.fd); 3506 3507 nxt_queue_remove(&lev->link); 3508 3509 joint = lev->socket.data; 3510 lev->socket.data = NULL; 3511 3512 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3513 task = &task->thread->engine->task; 3514 3515 nxt_router_listen_socket_release(task, joint->socket_conf); 3516 3517 job = joint->close_job; 3518 job->work.next = NULL; 3519 job->work.handler = nxt_router_conf_wait; 3520 3521 nxt_event_engine_post(job->tmcf->engine, &job->work); 3522 3523 nxt_router_listen_event_release(task, lev, joint); 3524 } 3525 3526 3527 static void 3528 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3529 { 3530 nxt_listen_socket_t *ls; 3531 nxt_thread_spinlock_t *lock; 3532 3533 ls = skcf->listen; 3534 lock = &skcf->router_conf->router->lock; 3535 3536 nxt_thread_spin_lock(lock); 3537 3538 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3539 task->thread->engine, ls->count); 3540 3541 if (--ls->count != 0) { 3542 ls = NULL; 3543 } 3544 3545 nxt_thread_spin_unlock(lock); 3546 3547 if (ls != NULL) { 3548 nxt_socket_close(task, ls->socket); 3549 nxt_free(ls); 3550 } 3551 } 3552 3553 3554 void 3555 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3556 nxt_socket_conf_joint_t *joint) 3557 { 3558 nxt_event_engine_t *engine; 3559 3560 nxt_debug(task, "listen event count: %D", lev->count); 3561 3562 engine = task->thread->engine; 3563 3564 if (--lev->count == 0) { 3565 if (lev->next != NULL) { 3566 nxt_sockaddr_cache_free(engine, lev->next); 3567 3568 nxt_conn_free(task, lev->next); 3569 } 3570 3571 nxt_free(lev); 3572 } 3573 3574 if (joint != NULL) { 3575 nxt_router_conf_release(task, joint); 3576 } 3577 3578 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3579 nxt_thread_exit(task->thread); 3580 } 3581 } 3582 3583 3584 void 3585 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3586 { 3587 nxt_socket_conf_t *skcf; 3588 nxt_router_conf_t *rtcf; 3589 nxt_thread_spinlock_t *lock; 3590 3591 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3592 3593 if (--joint->count != 0) { 3594 return; 3595 } 3596 3597 nxt_queue_remove(&joint->link); 3598 3599 /* 3600 * The joint content can not be safely used after the critical 3601 * section protected by the spinlock because its memory pool may 3602 * be already destroyed by another thread. 3603 */ 3604 skcf = joint->socket_conf; 3605 rtcf = skcf->router_conf; 3606 lock = &rtcf->router->lock; 3607 3608 nxt_thread_spin_lock(lock); 3609 3610 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3611 rtcf, rtcf->count); 3612 3613 if (--skcf->count != 0) { 3614 skcf = NULL; 3615 rtcf = NULL; 3616 3617 } else { 3618 nxt_queue_remove(&skcf->link); 3619 3620 if (--rtcf->count != 0) { 3621 rtcf = NULL; 3622 } 3623 } 3624 3625 nxt_thread_spin_unlock(lock); 3626 3627 #if (NXT_TLS) 3628 if (skcf != NULL && skcf->tls != NULL) { 3629 task->thread->runtime->tls->server_free(task, skcf->tls); 3630 } 3631 #endif 3632 3633 /* TODO remove engine->port */ 3634 3635 if (rtcf != NULL) { 3636 nxt_debug(task, "old router conf is destroyed"); 3637 3638 nxt_router_apps_hash_use(task, rtcf, -1); 3639 3640 nxt_router_access_log_release(task, lock, rtcf->access_log); 3641 3642 nxt_mp_thread_adopt(rtcf->mem_pool); 3643 3644 nxt_mp_destroy(rtcf->mem_pool); 3645 } 3646 } 3647 3648 3649 static void 3650 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3651 nxt_router_access_log_t *access_log) 3652 { 3653 size_t size; 3654 u_char *buf, *p; 3655 nxt_off_t bytes; 3656 3657 static nxt_time_string_t date_cache = { 3658 (nxt_atomic_uint_t) -1, 3659 nxt_router_access_log_date, 3660 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3661 nxt_length("31/Dec/1986:19:40:00 +0300"), 3662 NXT_THREAD_TIME_LOCAL, 3663 NXT_THREAD_TIME_SEC, 3664 }; 3665 3666 size = r->remote->address_length 3667 + 6 /* ' - - [' */ 3668 + date_cache.size 3669 + 3 /* '] "' */ 3670 + r->method->length 3671 + 1 /* space */ 3672 + r->target.length 3673 + 1 /* space */ 3674 + r->version.length 3675 + 2 /* '" ' */ 3676 + 3 /* status */ 3677 + 1 /* space */ 3678 + NXT_OFF_T_LEN 3679 + 2 /* ' "' */ 3680 + (r->referer != NULL ? r->referer->value_length : 1) 3681 + 3 /* '" "' */ 3682 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3683 + 2 /* '"\n' */ 3684 ; 3685 3686 buf = nxt_mp_nget(r->mem_pool, size); 3687 if (nxt_slow_path(buf == NULL)) { 3688 return; 3689 } 3690 3691 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3692 r->remote->address_length); 3693 3694 p = nxt_cpymem(p, " - - [", 6); 3695 3696 p = nxt_thread_time_string(task->thread, &date_cache, p); 3697 3698 p = nxt_cpymem(p, "] \"", 3); 3699 3700 if (r->method->length != 0) { 3701 p = nxt_cpymem(p, r->method->start, r->method->length); 3702 3703 if (r->target.length != 0) { 3704 *p++ = ' '; 3705 p = nxt_cpymem(p, r->target.start, r->target.length); 3706 3707 if (r->version.length != 0) { 3708 *p++ = ' '; 3709 p = nxt_cpymem(p, r->version.start, r->version.length); 3710 } 3711 } 3712 3713 } else { 3714 *p++ = '-'; 3715 } 3716 3717 p = nxt_cpymem(p, "\" ", 2); 3718 3719 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3720 3721 *p++ = ' '; 3722 3723 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3724 3725 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3726 3727 p = nxt_cpymem(p, " \"", 2); 3728 3729 if (r->referer != NULL) { 3730 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3731 3732 } else { 3733 *p++ = '-'; 3734 } 3735 3736 p = nxt_cpymem(p, "\" \"", 3); 3737 3738 if (r->user_agent != NULL) { 3739 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3740 3741 } else { 3742 *p++ = '-'; 3743 } 3744 3745 p = nxt_cpymem(p, "\"\n", 2); 3746 3747 nxt_fd_write(access_log->fd, buf, p - buf); 3748 } 3749 3750 3751 static u_char * 3752 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3753 size_t size, const char *format) 3754 { 3755 u_char sign; 3756 time_t gmtoff; 3757 3758 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3759 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3760 3761 gmtoff = nxt_timezone(tm) / 60; 3762 3763 if (gmtoff < 0) { 3764 gmtoff = -gmtoff; 3765 sign = '-'; 3766 3767 } else { 3768 sign = '+'; 3769 } 3770 3771 return nxt_sprintf(buf, buf + size, format, 3772 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3773 tm->tm_hour, tm->tm_min, tm->tm_sec, 3774 sign, gmtoff / 60, gmtoff % 60); 3775 } 3776 3777 3778 static void 3779 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3780 { 3781 uint32_t stream; 3782 nxt_int_t ret; 3783 nxt_buf_t *b; 3784 nxt_port_t *main_port, *router_port; 3785 nxt_runtime_t *rt; 3786 nxt_router_access_log_t *access_log; 3787 3788 access_log = tmcf->router_conf->access_log; 3789 3790 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3791 if (nxt_slow_path(b == NULL)) { 3792 goto fail; 3793 } 3794 3795 b->completion_handler = nxt_router_dummy_buf_completion; 3796 3797 nxt_buf_cpystr(b, &access_log->path); 3798 *b->mem.free++ = '\0'; 3799 3800 rt = task->thread->runtime; 3801 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3802 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3803 3804 stream = nxt_port_rpc_register_handler(task, router_port, 3805 nxt_router_access_log_ready, 3806 nxt_router_access_log_error, 3807 -1, tmcf); 3808 if (nxt_slow_path(stream == 0)) { 3809 goto fail; 3810 } 3811 3812 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3813 stream, router_port->id, b); 3814 3815 if (nxt_slow_path(ret != NXT_OK)) { 3816 nxt_port_rpc_cancel(task, router_port, stream); 3817 goto fail; 3818 } 3819 3820 return; 3821 3822 fail: 3823 3824 nxt_router_conf_error(task, tmcf); 3825 } 3826 3827 3828 static void 3829 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3830 void *data) 3831 { 3832 nxt_router_temp_conf_t *tmcf; 3833 nxt_router_access_log_t *access_log; 3834 3835 tmcf = data; 3836 3837 access_log = tmcf->router_conf->access_log; 3838 3839 access_log->fd = msg->fd[0]; 3840 3841 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3842 nxt_router_conf_apply, task, tmcf, NULL); 3843 } 3844 3845 3846 static void 3847 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3848 void *data) 3849 { 3850 nxt_router_temp_conf_t *tmcf; 3851 3852 tmcf = data; 3853 3854 nxt_router_conf_error(task, tmcf); 3855 } 3856 3857 3858 static void 3859 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3860 nxt_router_access_log_t *access_log) 3861 { 3862 if (access_log == NULL) { 3863 return; 3864 } 3865 3866 nxt_thread_spin_lock(lock); 3867 3868 if (--access_log->count != 0) { 3869 access_log = NULL; 3870 } 3871 3872 nxt_thread_spin_unlock(lock); 3873 3874 if (access_log != NULL) { 3875 3876 if (access_log->fd != -1) { 3877 nxt_fd_close(access_log->fd); 3878 } 3879 3880 nxt_free(access_log); 3881 } 3882 } 3883 3884 3885 typedef struct { 3886 nxt_mp_t *mem_pool; 3887 nxt_router_access_log_t *access_log; 3888 } nxt_router_access_log_reopen_t; 3889 3890 3891 static void 3892 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3893 { 3894 nxt_mp_t *mp; 3895 uint32_t stream; 3896 nxt_int_t ret; 3897 nxt_buf_t *b; 3898 nxt_port_t *main_port, *router_port; 3899 nxt_runtime_t *rt; 3900 nxt_router_access_log_t *access_log; 3901 nxt_router_access_log_reopen_t *reopen; 3902 3903 access_log = nxt_router->access_log; 3904 3905 if (access_log == NULL) { 3906 return; 3907 } 3908 3909 mp = nxt_mp_create(1024, 128, 256, 32); 3910 if (nxt_slow_path(mp == NULL)) { 3911 return; 3912 } 3913 3914 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3915 if (nxt_slow_path(reopen == NULL)) { 3916 goto fail; 3917 } 3918 3919 reopen->mem_pool = mp; 3920 reopen->access_log = access_log; 3921 3922 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 3923 if (nxt_slow_path(b == NULL)) { 3924 goto fail; 3925 } 3926 3927 b->completion_handler = nxt_router_access_log_reopen_completion; 3928 3929 nxt_buf_cpystr(b, &access_log->path); 3930 *b->mem.free++ = '\0'; 3931 3932 rt = task->thread->runtime; 3933 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3934 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3935 3936 stream = nxt_port_rpc_register_handler(task, router_port, 3937 nxt_router_access_log_reopen_ready, 3938 nxt_router_access_log_reopen_error, 3939 -1, reopen); 3940 if (nxt_slow_path(stream == 0)) { 3941 goto fail; 3942 } 3943 3944 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3945 stream, router_port->id, b); 3946 3947 if (nxt_slow_path(ret != NXT_OK)) { 3948 nxt_port_rpc_cancel(task, router_port, stream); 3949 goto fail; 3950 } 3951 3952 nxt_mp_retain(mp); 3953 3954 return; 3955 3956 fail: 3957 3958 nxt_mp_destroy(mp); 3959 } 3960 3961 3962 static void 3963 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 3964 { 3965 nxt_mp_t *mp; 3966 nxt_buf_t *b; 3967 3968 b = obj; 3969 mp = b->data; 3970 3971 nxt_mp_release(mp); 3972 } 3973 3974 3975 static void 3976 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3977 void *data) 3978 { 3979 nxt_router_access_log_t *access_log; 3980 nxt_router_access_log_reopen_t *reopen; 3981 3982 reopen = data; 3983 3984 access_log = reopen->access_log; 3985 3986 if (access_log == nxt_router->access_log) { 3987 3988 if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { 3989 nxt_alert(task, "dup2(%FD, %FD) failed %E", 3990 msg->fd[0], access_log->fd, nxt_errno); 3991 } 3992 } 3993 3994 nxt_fd_close(msg->fd[0]); 3995 nxt_mp_release(reopen->mem_pool); 3996 } 3997 3998 3999 static void 4000 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4001 void *data) 4002 { 4003 nxt_router_access_log_reopen_t *reopen; 4004 4005 reopen = data; 4006 4007 nxt_mp_release(reopen->mem_pool); 4008 } 4009 4010 4011 static void 4012 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 4013 { 4014 nxt_port_t *port; 4015 nxt_thread_link_t *link; 4016 nxt_event_engine_t *engine; 4017 nxt_thread_handle_t handle; 4018 4019 handle = (nxt_thread_handle_t) (uintptr_t) obj; 4020 link = data; 4021 4022 nxt_thread_wait(handle); 4023 4024 engine = link->engine; 4025 4026 nxt_queue_remove(&engine->link); 4027 4028 port = engine->port; 4029 4030 // TODO notify all apps 4031 4032 port->engine = task->thread->engine; 4033 nxt_mp_thread_adopt(port->mem_pool); 4034 nxt_port_use(task, port, -1); 4035 4036 nxt_mp_thread_adopt(engine->mem_pool); 4037 nxt_mp_destroy(engine->mem_pool); 4038 4039 nxt_event_engine_free(engine); 4040 4041 nxt_free(link); 4042 } 4043 4044 4045 static void 4046 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4047 void *data) 4048 { 4049 size_t b_size, count; 4050 nxt_int_t ret; 4051 nxt_app_t *app; 4052 nxt_buf_t *b, *next; 4053 nxt_port_t *app_port; 4054 nxt_unit_field_t *f; 4055 nxt_http_field_t *field; 4056 nxt_http_request_t *r; 4057 nxt_unit_response_t *resp; 4058 nxt_request_rpc_data_t *req_rpc_data; 4059 4060 req_rpc_data = data; 4061 4062 r = req_rpc_data->request; 4063 if (nxt_slow_path(r == NULL)) { 4064 return; 4065 } 4066 4067 if (r->error) { 4068 nxt_request_rpc_data_unlink(task, req_rpc_data); 4069 return; 4070 } 4071 4072 app = req_rpc_data->app; 4073 nxt_assert(app != NULL); 4074 4075 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { 4076 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); 4077 4078 return; 4079 } 4080 4081 b = (msg->size == 0) ? NULL : msg->buf; 4082 4083 if (msg->port_msg.last != 0) { 4084 nxt_debug(task, "router data create last buf"); 4085 4086 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 4087 4088 req_rpc_data->rpc_cancel = 0; 4089 4090 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { 4091 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; 4092 } 4093 4094 nxt_request_rpc_data_unlink(task, req_rpc_data); 4095 4096 } else { 4097 if (app->timeout != 0) { 4098 r->timer.handler = nxt_router_app_timeout; 4099 r->timer_data = req_rpc_data; 4100 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4101 } 4102 } 4103 4104 if (b == NULL) { 4105 return; 4106 } 4107 4108 if (msg->buf == b) { 4109 /* Disable instant buffer completion/re-using by port. */ 4110 msg->buf = NULL; 4111 } 4112 4113 if (r->header_sent) { 4114 nxt_buf_chain_add(&r->out, b); 4115 nxt_http_request_send_body(task, r, NULL); 4116 4117 } else { 4118 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0; 4119 4120 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) { 4121 nxt_alert(task, "response buffer too small: %z", b_size); 4122 goto fail; 4123 } 4124 4125 resp = (void *) b->mem.pos; 4126 count = (b_size - sizeof(nxt_unit_response_t)) 4127 / sizeof(nxt_unit_field_t); 4128 4129 if (nxt_slow_path(count < resp->fields_count)) { 4130 nxt_alert(task, "response buffer too small for fields count: %D", 4131 resp->fields_count); 4132 goto fail; 4133 } 4134 4135 field = NULL; 4136 4137 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 4138 if (f->skip) { 4139 continue; 4140 } 4141 4142 field = nxt_list_add(r->resp.fields); 4143 4144 if (nxt_slow_path(field == NULL)) { 4145 goto fail; 4146 } 4147 4148 field->hash = f->hash; 4149 field->skip = 0; 4150 field->hopbyhop = 0; 4151 4152 field->name_length = f->name_length; 4153 field->value_length = f->value_length; 4154 field->name = nxt_unit_sptr_get(&f->name); 4155 field->value = nxt_unit_sptr_get(&f->value); 4156 4157 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 4158 if (nxt_slow_path(ret != NXT_OK)) { 4159 goto fail; 4160 } 4161 4162 nxt_debug(task, "header%s: %*s: %*s", 4163 (field->skip ? " skipped" : ""), 4164 (size_t) field->name_length, field->name, 4165 (size_t) field->value_length, field->value); 4166 4167 if (field->skip) { 4168 r->resp.fields->last->nelts--; 4169 } 4170 } 4171 4172 r->status = resp->status; 4173 4174 if (resp->piggyback_content_length != 0) { 4175 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 4176 b->mem.free = b->mem.pos + resp->piggyback_content_length; 4177 4178 } else { 4179 b->mem.pos = b->mem.free; 4180 } 4181 4182 if (nxt_buf_mem_used_size(&b->mem) == 0) { 4183 next = b->next; 4184 b->next = NULL; 4185 4186 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 4187 b->completion_handler, task, b, b->parent); 4188 4189 b = next; 4190 } 4191 4192 if (b != NULL) { 4193 nxt_buf_chain_add(&r->out, b); 4194 } 4195 4196 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 4197 4198 if (r->websocket_handshake 4199 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 4200 { 4201 app_port = req_rpc_data->app_port; 4202 if (nxt_slow_path(app_port == NULL)) { 4203 goto fail; 4204 } 4205 4206 nxt_thread_mutex_lock(&app->mutex); 4207 4208 app_port->main_app_port->active_websockets++; 4209 4210 nxt_thread_mutex_unlock(&app->mutex); 4211 4212 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); 4213 req_rpc_data->apr_action = NXT_APR_CLOSE; 4214 4215 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 4216 4217 r->state = &nxt_http_websocket; 4218 4219 } else { 4220 r->state = &nxt_http_request_send_state; 4221 } 4222 } 4223 4224 return; 4225 4226 fail: 4227 4228 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4229 4230 nxt_request_rpc_data_unlink(task, req_rpc_data); 4231 } 4232 4233 4234 static void 4235 nxt_router_req_headers_ack_handler(nxt_task_t *task, 4236 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 4237 { 4238 int res; 4239 nxt_app_t *app; 4240 nxt_buf_t *b; 4241 nxt_bool_t start_process, unlinked; 4242 nxt_port_t *app_port, *main_app_port, *idle_port; 4243 nxt_queue_link_t *idle_lnk; 4244 nxt_http_request_t *r; 4245 4246 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 4247 req_rpc_data->stream, 4248 msg->port_msg.pid, msg->port_msg.reply_port); 4249 4250 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 4251 msg->port_msg.pid); 4252 4253 app = req_rpc_data->app; 4254 r = req_rpc_data->request; 4255 4256 start_process = 0; 4257 unlinked = 0; 4258 4259 nxt_thread_mutex_lock(&app->mutex); 4260 4261 if (r->app_link.next != NULL) { 4262 nxt_queue_remove(&r->app_link); 4263 r->app_link.next = NULL; 4264 4265 unlinked = 1; 4266 } 4267 4268 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 4269 msg->port_msg.reply_port); 4270 if (nxt_slow_path(app_port == NULL)) { 4271 nxt_thread_mutex_unlock(&app->mutex); 4272 4273 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4274 4275 if (unlinked) { 4276 nxt_mp_release(r->mem_pool); 4277 } 4278 4279 return; 4280 } 4281 4282 main_app_port = app_port->main_app_port; 4283 4284 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 4285 app->idle_processes--; 4286 4287 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", 4288 &app->name, main_app_port->pid, main_app_port->id, 4289 (main_app_port->idle_start ? "idle_ports" : "spare_ports")); 4290 4291 /* Check port was in 'spare_ports' using idle_start field. */ 4292 if (main_app_port->idle_start == 0 4293 && app->idle_processes >= app->spare_processes) 4294 { 4295 /* 4296 * If there is a vacant space in spare ports, 4297 * move the last idle to spare_ports. 4298 */ 4299 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4300 4301 idle_lnk = nxt_queue_last(&app->idle_ports); 4302 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4303 nxt_queue_remove(idle_lnk); 4304 4305 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4306 4307 idle_port->idle_start = 0; 4308 4309 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4310 "to spare_ports", 4311 &app->name, idle_port->pid, idle_port->id); 4312 } 4313 4314 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4315 app->pending_processes++; 4316 start_process = 1; 4317 } 4318 } 4319 4320 main_app_port->active_requests++; 4321 4322 nxt_port_inc_use(app_port); 4323 4324 nxt_thread_mutex_unlock(&app->mutex); 4325 4326 if (unlinked) { 4327 nxt_mp_release(r->mem_pool); 4328 } 4329 4330 if (start_process) { 4331 nxt_router_start_app_process(task, app); 4332 } 4333 4334 nxt_port_use(task, req_rpc_data->app_port, -1); 4335 4336 req_rpc_data->app_port = app_port; 4337 4338 b = req_rpc_data->msg_info.buf; 4339 4340 if (b != NULL) { 4341 /* First buffer is already sent. Start from second. */ 4342 b = b->next; 4343 4344 req_rpc_data->msg_info.buf->next = NULL; 4345 } 4346 4347 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) { 4348 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, 4349 req_rpc_data->msg_info.body_fd); 4350 4351 if (req_rpc_data->msg_info.body_fd != -1) { 4352 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 4353 } 4354 4355 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 4356 req_rpc_data->msg_info.body_fd, 4357 req_rpc_data->stream, 4358 task->thread->engine->port->id, b); 4359 4360 if (nxt_slow_path(res != NXT_OK)) { 4361 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4362 } 4363 } 4364 4365 if (app->timeout != 0) { 4366 r->timer.handler = nxt_router_app_timeout; 4367 r->timer_data = req_rpc_data; 4368 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4369 } 4370 } 4371 4372 4373 static const nxt_http_request_state_t nxt_http_request_send_state 4374 nxt_aligned(64) = 4375 { 4376 .error_handler = nxt_http_request_error_handler, 4377 }; 4378 4379 4380 static void 4381 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 4382 { 4383 nxt_buf_t *out; 4384 nxt_http_request_t *r; 4385 4386 r = obj; 4387 4388 out = r->out; 4389 4390 if (out != NULL) { 4391 r->out = NULL; 4392 nxt_http_request_send(task, r, out); 4393 } 4394 } 4395 4396 4397 static void 4398 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4399 void *data) 4400 { 4401 nxt_request_rpc_data_t *req_rpc_data; 4402 4403 req_rpc_data = data; 4404 4405 req_rpc_data->rpc_cancel = 0; 4406 4407 /* TODO cancel message and return if cancelled. */ 4408 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 4409 4410 if (req_rpc_data->request != NULL) { 4411 nxt_http_request_error(task, req_rpc_data->request, 4412 NXT_HTTP_SERVICE_UNAVAILABLE); 4413 } 4414 4415 nxt_request_rpc_data_unlink(task, req_rpc_data); 4416 } 4417 4418 4419 static void 4420 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4421 void *data) 4422 { 4423 nxt_app_t *app; 4424 nxt_bool_t start_process; 4425 nxt_port_t *port; 4426 nxt_app_joint_t *app_joint; 4427 nxt_app_joint_rpc_t *app_joint_rpc; 4428 4429 nxt_assert(data != NULL); 4430 4431 app_joint_rpc = data; 4432 app_joint = app_joint_rpc->app_joint; 4433 port = msg->u.new_port; 4434 4435 nxt_assert(app_joint != NULL); 4436 nxt_assert(port != NULL); 4437 nxt_assert(port->type == NXT_PROCESS_APP); 4438 nxt_assert(port->id == 0); 4439 4440 app = app_joint->app; 4441 4442 nxt_router_app_joint_use(task, app_joint, -1); 4443 4444 if (nxt_slow_path(app == NULL)) { 4445 nxt_debug(task, "new port ready for released app, send QUIT"); 4446 4447 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4448 4449 return; 4450 } 4451 4452 nxt_thread_mutex_lock(&app->mutex); 4453 4454 nxt_assert(app->pending_processes != 0); 4455 4456 app->pending_processes--; 4457 4458 if (nxt_slow_path(app->generation != app_joint_rpc->generation)) { 4459 nxt_debug(task, "new port ready for restarted app, send QUIT"); 4460 4461 start_process = !task->thread->engine->shutdown 4462 && nxt_router_app_can_start(app) 4463 && nxt_router_app_need_start(app); 4464 4465 if (start_process) { 4466 app->pending_processes++; 4467 } 4468 4469 nxt_thread_mutex_unlock(&app->mutex); 4470 4471 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4472 4473 if (start_process) { 4474 nxt_router_start_app_process(task, app); 4475 } 4476 4477 return; 4478 } 4479 4480 port->app = app; 4481 port->main_app_port = port; 4482 4483 app->processes++; 4484 nxt_port_hash_add(&app->port_hash, port); 4485 app->port_hash_count++; 4486 4487 nxt_thread_mutex_unlock(&app->mutex); 4488 4489 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 4490 &app->name, port->pid, app->processes, app->pending_processes); 4491 4492 nxt_router_app_shared_port_send(task, port); 4493 4494 nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); 4495 } 4496 4497 4498 static nxt_int_t 4499 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) 4500 { 4501 nxt_buf_t *b; 4502 nxt_port_t *port; 4503 nxt_port_msg_new_port_t *msg; 4504 4505 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 4506 sizeof(nxt_port_data_t)); 4507 if (nxt_slow_path(b == NULL)) { 4508 return NXT_ERROR; 4509 } 4510 4511 port = app_port->app->shared_port; 4512 4513 nxt_debug(task, "send port %FD to process %PI", 4514 port->pair[0], app_port->pid); 4515 4516 b->mem.free += sizeof(nxt_port_msg_new_port_t); 4517 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 4518 4519 msg->id = port->id; 4520 msg->pid = port->pid; 4521 msg->max_size = port->max_size; 4522 msg->max_share = port->max_share; 4523 msg->type = port->type; 4524 4525 return nxt_port_socket_write2(task, app_port, 4526 NXT_PORT_MSG_NEW_PORT, 4527 port->pair[0], port->queue_fd, 4528 0, 0, b); 4529 } 4530 4531 4532 static void 4533 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4534 void *data) 4535 { 4536 nxt_app_t *app; 4537 nxt_app_joint_t *app_joint; 4538 nxt_queue_link_t *link; 4539 nxt_http_request_t *r; 4540 nxt_app_joint_rpc_t *app_joint_rpc; 4541 4542 nxt_assert(data != NULL); 4543 4544 app_joint_rpc = data; 4545 app_joint = app_joint_rpc->app_joint; 4546 4547 nxt_assert(app_joint != NULL); 4548 4549 app = app_joint->app; 4550 4551 nxt_router_app_joint_use(task, app_joint, -1); 4552 4553 if (nxt_slow_path(app == NULL)) { 4554 nxt_debug(task, "start error for released app"); 4555 4556 return; 4557 } 4558 4559 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4560 4561 link = NULL; 4562 4563 nxt_thread_mutex_lock(&app->mutex); 4564 4565 nxt_assert(app->pending_processes != 0); 4566 4567 app->pending_processes--; 4568 4569 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4570 link = nxt_queue_first(&app->ack_waiting_req); 4571 4572 nxt_queue_remove(link); 4573 link->next = NULL; 4574 } 4575 4576 nxt_thread_mutex_unlock(&app->mutex); 4577 4578 while (link != NULL) { 4579 r = nxt_container_of(link, nxt_http_request_t, app_link); 4580 4581 nxt_event_engine_post(r->engine, &r->err_work); 4582 4583 link = NULL; 4584 4585 nxt_thread_mutex_lock(&app->mutex); 4586 4587 if (app->processes == 0 && app->pending_processes == 0 4588 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4589 { 4590 link = nxt_queue_first(&app->ack_waiting_req); 4591 4592 nxt_queue_remove(link); 4593 link->next = NULL; 4594 } 4595 4596 nxt_thread_mutex_unlock(&app->mutex); 4597 } 4598 } 4599 4600 4601 4602 nxt_inline nxt_port_t * 4603 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) 4604 { 4605 nxt_port_t *port; 4606 4607 port = NULL; 4608 4609 nxt_thread_mutex_lock(&app->mutex); 4610 4611 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 4612 4613 /* Caller is responsible to decrease port use count. */ 4614 nxt_queue_chk_remove(&port->app_link); 4615 4616 if (nxt_queue_chk_remove(&port->idle_link)) { 4617 app->idle_processes--; 4618 4619 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", 4620 &app->name, port->pid, port->id, 4621 (port->idle_start ? "idle_ports" : "spare_ports")); 4622 } 4623 4624 nxt_port_hash_remove(&app->port_hash, port); 4625 app->port_hash_count--; 4626 4627 port->app = NULL; 4628 app->processes--; 4629 4630 break; 4631 4632 } nxt_queue_loop; 4633 4634 nxt_thread_mutex_unlock(&app->mutex); 4635 4636 return port; 4637 } 4638 4639 4640 static void 4641 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4642 { 4643 int c; 4644 4645 c = nxt_atomic_fetch_add(&app->use_count, i); 4646 4647 if (i < 0 && c == -i) { 4648 4649 if (task->thread->engine != app->engine) { 4650 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 4651 4652 } else { 4653 nxt_router_free_app(task, app->joint, NULL); 4654 } 4655 } 4656 } 4657 4658 4659 static void 4660 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 4661 { 4662 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 4663 4664 nxt_queue_remove(&app->link); 4665 4666 nxt_router_app_use(task, app, -1); 4667 } 4668 4669 4670 static void 4671 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 4672 nxt_apr_action_t action) 4673 { 4674 int inc_use; 4675 uint32_t got_response, dec_requests; 4676 nxt_app_t *app; 4677 nxt_bool_t port_unchained, send_quit, adjust_idle_timer; 4678 nxt_port_t *main_app_port; 4679 4680 nxt_assert(port != NULL); 4681 nxt_assert(port->app != NULL); 4682 4683 app = port->app; 4684 4685 inc_use = 0; 4686 got_response = 0; 4687 dec_requests = 0; 4688 4689 switch (action) { 4690 case NXT_APR_NEW_PORT: 4691 break; 4692 case NXT_APR_REQUEST_FAILED: 4693 dec_requests = 1; 4694 inc_use = -1; 4695 break; 4696 case NXT_APR_GOT_RESPONSE: 4697 got_response = 1; 4698 inc_use = -1; 4699 break; 4700 case NXT_APR_UPGRADE: 4701 got_response = 1; 4702 break; 4703 case NXT_APR_CLOSE: 4704 inc_use = -1; 4705 break; 4706 } 4707 4708 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, 4709 port->pid, port->id, 4710 (int) inc_use, (int) got_response); 4711 4712 if (port->id == NXT_SHARED_PORT_ID) { 4713 nxt_thread_mutex_lock(&app->mutex); 4714 4715 app->active_requests -= got_response + dec_requests; 4716 4717 nxt_thread_mutex_unlock(&app->mutex); 4718 4719 goto adjust_use; 4720 } 4721 4722 main_app_port = port->main_app_port; 4723 4724 nxt_thread_mutex_lock(&app->mutex); 4725 4726 main_app_port->app_responses += got_response; 4727 main_app_port->active_requests -= got_response + dec_requests; 4728 app->active_requests -= got_response + dec_requests; 4729 4730 if (main_app_port->pair[1] != -1 4731 && (app->max_requests == 0 4732 || main_app_port->app_responses < app->max_requests)) 4733 { 4734 if (main_app_port->app_link.next == NULL) { 4735 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4736 4737 nxt_port_inc_use(main_app_port); 4738 } 4739 } 4740 4741 send_quit = (app->max_requests > 0 4742 && main_app_port->app_responses >= app->max_requests); 4743 4744 if (send_quit) { 4745 port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); 4746 4747 nxt_port_hash_remove(&app->port_hash, main_app_port); 4748 app->port_hash_count--; 4749 4750 main_app_port->app = NULL; 4751 app->processes--; 4752 4753 } else { 4754 port_unchained = 0; 4755 } 4756 4757 adjust_idle_timer = 0; 4758 4759 if (main_app_port->pair[1] != -1 && !send_quit 4760 && main_app_port->active_requests == 0 4761 && main_app_port->active_websockets == 0 4762 && main_app_port->idle_link.next == NULL) 4763 { 4764 if (app->idle_processes == app->spare_processes 4765 && app->adjust_idle_work.data == NULL) 4766 { 4767 adjust_idle_timer = 1; 4768 app->adjust_idle_work.data = app; 4769 app->adjust_idle_work.next = NULL; 4770 } 4771 4772 if (app->idle_processes < app->spare_processes) { 4773 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4774 4775 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", 4776 &app->name, main_app_port->pid, main_app_port->id); 4777 } else { 4778 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4779 4780 main_app_port->idle_start = task->thread->engine->timers.now; 4781 4782 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", 4783 &app->name, main_app_port->pid, main_app_port->id); 4784 } 4785 4786 app->idle_processes++; 4787 } 4788 4789 nxt_thread_mutex_unlock(&app->mutex); 4790 4791 if (adjust_idle_timer) { 4792 nxt_router_app_use(task, app, 1); 4793 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4794 } 4795 4796 /* ? */ 4797 if (main_app_port->pair[1] == -1) { 4798 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4799 &app->name, app, main_app_port, main_app_port->pid); 4800 4801 goto adjust_use; 4802 } 4803 4804 if (send_quit) { 4805 nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); 4806 4807 nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, 4808 NULL); 4809 4810 if (port_unchained) { 4811 nxt_port_use(task, main_app_port, -1); 4812 } 4813 4814 goto adjust_use; 4815 } 4816 4817 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4818 &app->name, app); 4819 4820 adjust_use: 4821 4822 nxt_port_use(task, port, inc_use); 4823 } 4824 4825 4826 void 4827 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4828 { 4829 nxt_app_t *app; 4830 nxt_bool_t unchain, start_process; 4831 nxt_port_t *idle_port; 4832 nxt_queue_link_t *idle_lnk; 4833 4834 app = port->app; 4835 4836 nxt_assert(app != NULL); 4837 4838 nxt_thread_mutex_lock(&app->mutex); 4839 4840 nxt_port_hash_remove(&app->port_hash, port); 4841 app->port_hash_count--; 4842 4843 if (port->id != 0) { 4844 nxt_thread_mutex_unlock(&app->mutex); 4845 4846 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, 4847 port->pid, port->id); 4848 4849 return; 4850 } 4851 4852 unchain = nxt_queue_chk_remove(&port->app_link); 4853 4854 if (nxt_queue_chk_remove(&port->idle_link)) { 4855 app->idle_processes--; 4856 4857 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", 4858 &app->name, port->pid, port->id, 4859 (port->idle_start ? "idle_ports" : "spare_ports")); 4860 4861 if (port->idle_start == 0 4862 && app->idle_processes >= app->spare_processes) 4863 { 4864 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4865 4866 idle_lnk = nxt_queue_last(&app->idle_ports); 4867 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4868 nxt_queue_remove(idle_lnk); 4869 4870 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4871 4872 idle_port->idle_start = 0; 4873 4874 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4875 "to spare_ports", 4876 &app->name, idle_port->pid, idle_port->id); 4877 } 4878 } 4879 4880 app->processes--; 4881 4882 start_process = !task->thread->engine->shutdown 4883 && nxt_router_app_can_start(app) 4884 && nxt_router_app_need_start(app); 4885 4886 if (start_process) { 4887 app->pending_processes++; 4888 } 4889 4890 nxt_thread_mutex_unlock(&app->mutex); 4891 4892 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 4893 4894 if (unchain) { 4895 nxt_port_use(task, port, -1); 4896 } 4897 4898 if (start_process) { 4899 nxt_router_start_app_process(task, app); 4900 } 4901 } 4902 4903 4904 static void 4905 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 4906 { 4907 nxt_app_t *app; 4908 nxt_bool_t queued; 4909 nxt_port_t *port; 4910 nxt_msec_t timeout, threshold; 4911 nxt_queue_link_t *lnk; 4912 nxt_event_engine_t *engine; 4913 4914 app = obj; 4915 queued = (data == app); 4916 4917 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 4918 &app->name, queued); 4919 4920 engine = task->thread->engine; 4921 4922 nxt_assert(app->engine == engine); 4923 4924 threshold = engine->timers.now + app->joint->idle_timer.bias; 4925 timeout = 0; 4926 4927 nxt_thread_mutex_lock(&app->mutex); 4928 4929 if (queued) { 4930 app->adjust_idle_work.data = NULL; 4931 } 4932 4933 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", 4934 &app->name, 4935 (int) app->idle_processes, (int) app->spare_processes); 4936 4937 while (app->idle_processes > app->spare_processes) { 4938 4939 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4940 4941 lnk = nxt_queue_first(&app->idle_ports); 4942 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 4943 4944 timeout = port->idle_start + app->idle_timeout; 4945 4946 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", 4947 &app->name, port->pid, 4948 port->idle_start, timeout, threshold); 4949 4950 if (timeout > threshold) { 4951 break; 4952 } 4953 4954 nxt_queue_remove(lnk); 4955 lnk->next = NULL; 4956 4957 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", 4958 &app->name, port->pid, port->id); 4959 4960 nxt_queue_chk_remove(&port->app_link); 4961 4962 nxt_port_hash_remove(&app->port_hash, port); 4963 app->port_hash_count--; 4964 4965 app->idle_processes--; 4966 app->processes--; 4967 port->app = NULL; 4968 4969 nxt_thread_mutex_unlock(&app->mutex); 4970 4971 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 4972 &app->name, port->pid); 4973 4974 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4975 4976 nxt_port_use(task, port, -1); 4977 4978 nxt_thread_mutex_lock(&app->mutex); 4979 } 4980 4981 nxt_thread_mutex_unlock(&app->mutex); 4982 4983 if (timeout > threshold) { 4984 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 4985 4986 } else { 4987 nxt_timer_disable(engine, &app->joint->idle_timer); 4988 } 4989 4990 if (queued) { 4991 nxt_router_app_use(task, app, -1); 4992 } 4993 } 4994 4995 4996 static void 4997 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 4998 { 4999 nxt_timer_t *timer; 5000 nxt_app_joint_t *app_joint; 5001 5002 timer = obj; 5003 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 5004 5005 if (nxt_fast_path(app_joint->app != NULL)) { 5006 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 5007 } 5008 } 5009 5010 5011 static void 5012 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 5013 { 5014 nxt_timer_t *timer; 5015 nxt_app_joint_t *app_joint; 5016 5017 timer = obj; 5018 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 5019 5020 nxt_router_app_joint_use(task, app_joint, -1); 5021 } 5022 5023 5024 static void 5025 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 5026 { 5027 nxt_app_t *app; 5028 nxt_port_t *port; 5029 nxt_app_joint_t *app_joint; 5030 5031 app_joint = obj; 5032 app = app_joint->app; 5033 5034 for ( ;; ) { 5035 port = nxt_router_app_get_port_for_quit(task, app); 5036 if (port == NULL) { 5037 break; 5038 } 5039 5040 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); 5041 5042 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 5043 5044 nxt_port_use(task, port, -1); 5045 } 5046 5047 nxt_thread_mutex_lock(&app->mutex); 5048 5049 for ( ;; ) { 5050 port = nxt_port_hash_retrieve(&app->port_hash); 5051 if (port == NULL) { 5052 break; 5053 } 5054 5055 app->port_hash_count--; 5056 5057 port->app = NULL; 5058 5059 nxt_port_close(task, port); 5060 5061 nxt_port_use(task, port, -1); 5062 } 5063 5064 nxt_thread_mutex_unlock(&app->mutex); 5065 5066 nxt_assert(app->processes == 0); 5067 nxt_assert(app->active_requests == 0); 5068 nxt_assert(app->port_hash_count == 0); 5069 nxt_assert(app->idle_processes == 0); 5070 nxt_assert(nxt_queue_is_empty(&app->ports)); 5071 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 5072 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 5073 5074 nxt_port_mmaps_destroy(&app->outgoing, 1); 5075 5076 nxt_thread_mutex_destroy(&app->outgoing.mutex); 5077 5078 if (app->shared_port != NULL) { 5079 app->shared_port->app = NULL; 5080 nxt_port_close(task, app->shared_port); 5081 nxt_port_use(task, app->shared_port, -1); 5082 5083 app->shared_port = NULL; 5084 } 5085 5086 nxt_thread_mutex_destroy(&app->mutex); 5087 nxt_mp_destroy(app->mem_pool); 5088 5089 app_joint->app = NULL; 5090 5091 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 5092 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 5093 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 5094 5095 } else { 5096 nxt_router_app_joint_use(task, app_joint, -1); 5097 } 5098 } 5099 5100 5101 static void 5102 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 5103 nxt_request_rpc_data_t *req_rpc_data) 5104 { 5105 nxt_bool_t start_process; 5106 nxt_port_t *port; 5107 nxt_http_request_t *r; 5108 5109 start_process = 0; 5110 5111 nxt_thread_mutex_lock(&app->mutex); 5112 5113 port = app->shared_port; 5114 nxt_port_inc_use(port); 5115 5116 app->active_requests++; 5117 5118 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 5119 app->pending_processes++; 5120 start_process = 1; 5121 } 5122 5123 r = req_rpc_data->request; 5124 5125 /* 5126 * Put request into application-wide list to be able to cancel request 5127 * if something goes wrong with application processes. 5128 */ 5129 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 5130 5131 nxt_thread_mutex_unlock(&app->mutex); 5132 5133 /* 5134 * Retain request memory pool while request is linked in ack_waiting_req 5135 * to guarantee request structure memory is accessble. 5136 */ 5137 nxt_mp_retain(r->mem_pool); 5138 5139 req_rpc_data->app_port = port; 5140 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 5141 5142 if (start_process) { 5143 nxt_router_start_app_process(task, app); 5144 } 5145 } 5146 5147 5148 void 5149 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, 5150 nxt_http_action_t *action) 5151 { 5152 nxt_event_engine_t *engine; 5153 nxt_http_app_conf_t *conf; 5154 nxt_request_rpc_data_t *req_rpc_data; 5155 5156 conf = action->u.conf; 5157 engine = task->thread->engine; 5158 5159 r->app_target = conf->target; 5160 5161 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, 5162 nxt_router_response_ready_handler, 5163 nxt_router_response_error_handler, 5164 sizeof(nxt_request_rpc_data_t)); 5165 if (nxt_slow_path(req_rpc_data == NULL)) { 5166 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 5167 return; 5168 } 5169 5170 /* 5171 * At this point we have request req_rpc_data allocated and registered 5172 * in port handlers. Need to fixup request memory pool. Counterpart 5173 * release will be called via following call chain: 5174 * nxt_request_rpc_data_unlink() -> 5175 * nxt_router_http_request_release_post() -> 5176 * nxt_router_http_request_release() 5177 */ 5178 nxt_mp_retain(r->mem_pool); 5179 5180 r->timer.task = &engine->task; 5181 r->timer.work_queue = &engine->fast_work_queue; 5182 r->timer.log = engine->task.log; 5183 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 5184 5185 r->engine = engine; 5186 r->err_work.handler = nxt_router_http_request_error; 5187 r->err_work.task = task; 5188 r->err_work.obj = r; 5189 5190 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 5191 req_rpc_data->app = conf->app; 5192 req_rpc_data->msg_info.body_fd = -1; 5193 req_rpc_data->rpc_cancel = 1; 5194 5195 nxt_router_app_use(task, conf->app, 1); 5196 5197 req_rpc_data->request = r; 5198 r->req_rpc_data = req_rpc_data; 5199 5200 if (r->last != NULL) { 5201 r->last->completion_handler = nxt_router_http_request_done; 5202 } 5203 5204 nxt_router_app_port_get(task, conf->app, req_rpc_data); 5205 nxt_router_app_prepare_request(task, req_rpc_data); 5206 } 5207 5208 5209 static void 5210 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 5211 { 5212 nxt_http_request_t *r; 5213 5214 r = obj; 5215 5216 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 5217 5218 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5219 5220 if (r->req_rpc_data != NULL) { 5221 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5222 } 5223 5224 nxt_mp_release(r->mem_pool); 5225 } 5226 5227 5228 static void 5229 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 5230 { 5231 nxt_http_request_t *r; 5232 5233 r = data; 5234 5235 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 5236 5237 if (r->req_rpc_data != NULL) { 5238 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5239 } 5240 5241 nxt_http_request_close_handler(task, r, r->proto.any); 5242 } 5243 5244 5245 static void 5246 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) 5247 { 5248 } 5249 5250 5251 static void 5252 nxt_router_app_prepare_request(nxt_task_t *task, 5253 nxt_request_rpc_data_t *req_rpc_data) 5254 { 5255 nxt_app_t *app; 5256 nxt_buf_t *buf, *body; 5257 nxt_int_t res; 5258 nxt_port_t *port, *reply_port; 5259 5260 int notify; 5261 struct { 5262 nxt_port_msg_t pm; 5263 nxt_port_mmap_msg_t mm; 5264 } msg; 5265 5266 5267 app = req_rpc_data->app; 5268 5269 nxt_assert(app != NULL); 5270 5271 port = req_rpc_data->app_port; 5272 5273 nxt_assert(port != NULL); 5274 nxt_assert(port->queue != NULL); 5275 5276 reply_port = task->thread->engine->port; 5277 5278 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, 5279 nxt_app_msg_prefix[app->type]); 5280 if (nxt_slow_path(buf == NULL)) { 5281 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", 5282 req_rpc_data->stream, &app->name); 5283 5284 nxt_http_request_error(task, req_rpc_data->request, 5285 NXT_HTTP_INTERNAL_SERVER_ERROR); 5286 5287 return; 5288 } 5289 5290 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 5291 nxt_buf_used_size(buf), 5292 port->socket.fd); 5293 5294 req_rpc_data->msg_info.buf = buf; 5295 5296 body = req_rpc_data->request->body; 5297 5298 if (body != NULL && nxt_buf_is_file(body)) { 5299 req_rpc_data->msg_info.body_fd = body->file->fd; 5300 5301 body->file->fd = -1; 5302 5303 } else { 5304 req_rpc_data->msg_info.body_fd = -1; 5305 } 5306 5307 msg.pm.stream = req_rpc_data->stream; 5308 msg.pm.pid = reply_port->pid; 5309 msg.pm.reply_port = reply_port->id; 5310 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; 5311 msg.pm.last = 0; 5312 msg.pm.mmap = 1; 5313 msg.pm.nf = 0; 5314 msg.pm.mf = 0; 5315 msg.pm.tracking = 0; 5316 5317 nxt_port_mmap_handler_t *mmap_handler = buf->parent; 5318 nxt_port_mmap_header_t *hdr = mmap_handler->hdr; 5319 5320 msg.mm.mmap_id = hdr->id; 5321 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); 5322 msg.mm.size = nxt_buf_used_size(buf); 5323 5324 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), 5325 req_rpc_data->stream, ¬ify, 5326 &req_rpc_data->msg_info.tracking_cookie); 5327 if (nxt_fast_path(res == NXT_OK)) { 5328 if (notify != 0) { 5329 (void) nxt_port_socket_write(task, port, 5330 NXT_PORT_MSG_READ_QUEUE, 5331 -1, req_rpc_data->stream, 5332 reply_port->id, NULL); 5333 5334 } else { 5335 nxt_debug(task, "queue is not empty"); 5336 } 5337 5338 buf->is_port_mmap_sent = 1; 5339 buf->mem.pos = buf->mem.free; 5340 5341 } else { 5342 nxt_alert(task, "stream #%uD, app '%V': failed to send app message", 5343 req_rpc_data->stream, &app->name); 5344 5345 nxt_http_request_error(task, req_rpc_data->request, 5346 NXT_HTTP_INTERNAL_SERVER_ERROR); 5347 } 5348 } 5349 5350 5351 struct nxt_fields_iter_s { 5352 nxt_list_part_t *part; 5353 nxt_http_field_t *field; 5354 }; 5355 5356 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 5357 5358 5359 static nxt_http_field_t * 5360 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 5361 { 5362 if (part == NULL) { 5363 return NULL; 5364 } 5365 5366 while (part->nelts == 0) { 5367 part = part->next; 5368 if (part == NULL) { 5369 return NULL; 5370 } 5371 } 5372 5373 i->part = part; 5374 i->field = nxt_list_data(i->part); 5375 5376 return i->field; 5377 } 5378 5379 5380 static nxt_http_field_t * 5381 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 5382 { 5383 return nxt_fields_part_first(nxt_list_part(fields), i); 5384 } 5385 5386 5387 static nxt_http_field_t * 5388 nxt_fields_next(nxt_fields_iter_t *i) 5389 { 5390 nxt_http_field_t *end = nxt_list_data(i->part); 5391 5392 end += i->part->nelts; 5393 i->field++; 5394 5395 if (i->field < end) { 5396 return i->field; 5397 } 5398 5399 return nxt_fields_part_first(i->part->next, i); 5400 } 5401 5402 5403 static nxt_buf_t * 5404 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, 5405 nxt_app_t *app, const nxt_str_t *prefix) 5406 { 5407 void *target_pos, *query_pos; 5408 u_char *pos, *end, *p, c; 5409 size_t fields_count, req_size, size, free_size; 5410 size_t copy_size; 5411 nxt_off_t content_length; 5412 nxt_buf_t *b, *buf, *out, **tail; 5413 nxt_http_field_t *field, *dup; 5414 nxt_unit_field_t *dst_field; 5415 nxt_fields_iter_t iter, dup_iter; 5416 nxt_unit_request_t *req; 5417 5418 req_size = sizeof(nxt_unit_request_t) 5419 + r->method->length + 1 5420 + r->version.length + 1 5421 + r->remote->length + 1 5422 + r->local->length + 1 5423 + r->server_name.length + 1 5424 + r->target.length + 1 5425 + (r->path->start != r->target.start ? r->path->length + 1 : 0); 5426 5427 content_length = r->content_length_n < 0 ? 0 : r->content_length_n; 5428 fields_count = 0; 5429 5430 nxt_list_each(field, r->fields) { 5431 fields_count++; 5432 5433 req_size += field->name_length + prefix->length + 1 5434 + field->value_length + 1; 5435 } nxt_list_loop; 5436 5437 req_size += fields_count * sizeof(nxt_unit_field_t); 5438 5439 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 5440 nxt_alert(task, "headers to big to fit in shared memory (%d)", 5441 (int) req_size); 5442 5443 return NULL; 5444 } 5445 5446 out = nxt_port_mmap_get_buf(task, &app->outgoing, 5447 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); 5448 if (nxt_slow_path(out == NULL)) { 5449 return NULL; 5450 } 5451 5452 req = (nxt_unit_request_t *) out->mem.free; 5453 out->mem.free += req_size; 5454 5455 req->app_target = r->app_target; 5456 5457 req->content_length = content_length; 5458 5459 p = (u_char *) (req->fields + fields_count); 5460 5461 nxt_debug(task, "fields_count=%d", (int) fields_count); 5462 5463 req->method_length = r->method->length; 5464 nxt_unit_sptr_set(&req->method, p); 5465 p = nxt_cpymem(p, r->method->start, r->method->length); 5466 *p++ = '\0'; 5467 5468 req->version_length = r->version.length; 5469 nxt_unit_sptr_set(&req->version, p); 5470 p = nxt_cpymem(p, r->version.start, r->version.length); 5471 *p++ = '\0'; 5472 5473 req->remote_length = r->remote->address_length; 5474 nxt_unit_sptr_set(&req->remote, p); 5475 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote), 5476 r->remote->address_length); 5477 *p++ = '\0'; 5478 5479 req->local_length = r->local->address_length; 5480 nxt_unit_sptr_set(&req->local, p); 5481 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length); 5482 *p++ = '\0'; 5483 5484 req->tls = (r->tls != NULL); 5485 req->websocket_handshake = r->websocket_handshake; 5486 5487 req->server_name_length = r->server_name.length; 5488 nxt_unit_sptr_set(&req->server_name, p); 5489 p = nxt_cpymem(p, r->server_name.start, r->server_name.length); 5490 *p++ = '\0'; 5491 5492 target_pos = p; 5493 req->target_length = (uint32_t) r->target.length; 5494 nxt_unit_sptr_set(&req->target, p); 5495 p = nxt_cpymem(p, r->target.start, r->target.length); 5496 *p++ = '\0'; 5497 5498 req->path_length = (uint32_t) r->path->length; 5499 if (r->path->start == r->target.start) { 5500 nxt_unit_sptr_set(&req->path, target_pos); 5501 5502 } else { 5503 nxt_unit_sptr_set(&req->path, p); 5504 p = nxt_cpymem(p, r->path->start, r->path->length); 5505 *p++ = '\0'; 5506 } 5507 5508 req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0; 5509 if (r->args != NULL && r->args->start != NULL) { 5510 query_pos = nxt_pointer_to(target_pos, 5511 r->args->start - r->target.start); 5512 5513 nxt_unit_sptr_set(&req->query, query_pos); 5514 5515 } else { 5516 req->query.offset = 0; 5517 } 5518 5519 req->content_length_field = NXT_UNIT_NONE_FIELD; 5520 req->content_type_field = NXT_UNIT_NONE_FIELD; 5521 req->cookie_field = NXT_UNIT_NONE_FIELD; 5522 req->authorization_field = NXT_UNIT_NONE_FIELD; 5523 5524 dst_field = req->fields; 5525 5526 for (field = nxt_fields_first(r->fields, &iter); 5527 field != NULL; 5528 field = nxt_fields_next(&iter)) 5529 { 5530 if (field->skip) { 5531 continue; 5532 } 5533 5534 dst_field->hash = field->hash; 5535 dst_field->skip = 0; 5536 dst_field->name_length = field->name_length + prefix->length; 5537 dst_field->value_length = field->value_length; 5538 5539 if (field == r->content_length) { 5540 req->content_length_field = dst_field - req->fields; 5541 5542 } else if (field == r->content_type) { 5543 req->content_type_field = dst_field - req->fields; 5544 5545 } else if (field == r->cookie) { 5546 req->cookie_field = dst_field - req->fields; 5547 5548 } else if (field == r->authorization) { 5549 req->authorization_field = dst_field - req->fields; 5550 } 5551 5552 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 5553 (int) field->hash, (int) field->skip, 5554 (int) field->name_length, field->name, 5555 (int) field->value_length, field->value); 5556 5557 if (prefix->length != 0) { 5558 nxt_unit_sptr_set(&dst_field->name, p); 5559 p = nxt_cpymem(p, prefix->start, prefix->length); 5560 5561 end = field->name + field->name_length; 5562 for (pos = field->name; pos < end; pos++) { 5563 c = *pos; 5564 5565 if (c >= 'a' && c <= 'z') { 5566 *p++ = (c & ~0x20); 5567 continue; 5568 } 5569 5570 if (c == '-') { 5571 *p++ = '_'; 5572 continue; 5573 } 5574 5575 *p++ = c; 5576 } 5577 5578 } else { 5579 nxt_unit_sptr_set(&dst_field->name, p); 5580 p = nxt_cpymem(p, field->name, field->name_length); 5581 } 5582 5583 *p++ = '\0'; 5584 5585 nxt_unit_sptr_set(&dst_field->value, p); 5586 p = nxt_cpymem(p, field->value, field->value_length); 5587 5588 if (prefix->length != 0) { 5589 dup_iter = iter; 5590 5591 for (dup = nxt_fields_next(&dup_iter); 5592 dup != NULL; 5593 dup = nxt_fields_next(&dup_iter)) 5594 { 5595 if (dup->name_length != field->name_length 5596 || dup->skip 5597 || dup->hash != field->hash 5598 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 5599 { 5600 continue; 5601 } 5602 5603 p = nxt_cpymem(p, ", ", 2); 5604 p = nxt_cpymem(p, dup->value, dup->value_length); 5605 5606 dst_field->value_length += 2 + dup->value_length; 5607 5608 dup->skip = 1; 5609 } 5610 } 5611 5612 *p++ = '\0'; 5613 5614 dst_field++; 5615 } 5616 5617 req->fields_count = (uint32_t) (dst_field - req->fields); 5618 5619 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 5620 5621 buf = out; 5622 tail = &buf->next; 5623 5624 for (b = r->body; b != NULL; b = b->next) { 5625 size = nxt_buf_mem_used_size(&b->mem); 5626 pos = b->mem.pos; 5627 5628 while (size > 0) { 5629 if (buf == NULL) { 5630 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 5631 5632 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5633 if (nxt_slow_path(buf == NULL)) { 5634 while (out != NULL) { 5635 buf = out->next; 5636 out->next = NULL; 5637 out->completion_handler(task, out, out->parent); 5638 out = buf; 5639 } 5640 return NULL; 5641 } 5642 5643 *tail = buf; 5644 tail = &buf->next; 5645 5646 } else { 5647 free_size = nxt_buf_mem_free_size(&buf->mem); 5648 if (free_size < size 5649 && nxt_port_mmap_increase_buf(task, buf, size, 1) 5650 == NXT_OK) 5651 { 5652 free_size = nxt_buf_mem_free_size(&buf->mem); 5653 } 5654 } 5655 5656 if (free_size > 0) { 5657 copy_size = nxt_min(free_size, size); 5658 5659 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 5660 5661 size -= copy_size; 5662 pos += copy_size; 5663 5664 if (size == 0) { 5665 break; 5666 } 5667 } 5668 5669 buf = NULL; 5670 } 5671 } 5672 5673 return out; 5674 } 5675 5676 5677 static void 5678 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 5679 { 5680 nxt_timer_t *timer; 5681 nxt_http_request_t *r; 5682 nxt_request_rpc_data_t *req_rpc_data; 5683 5684 timer = obj; 5685 5686 nxt_debug(task, "router app timeout"); 5687 5688 r = nxt_timer_data(timer, nxt_http_request_t, timer); 5689 req_rpc_data = r->timer_data; 5690 5691 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5692 5693 nxt_request_rpc_data_unlink(task, req_rpc_data); 5694 } 5695 5696 5697 static void 5698 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5699 { 5700 r->timer.handler = nxt_router_http_request_release; 5701 nxt_timer_add(task->thread->engine, &r->timer, 0); 5702 } 5703 5704 5705 static void 5706 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) 5707 { 5708 nxt_http_request_t *r; 5709 5710 nxt_debug(task, "http request pool release"); 5711 5712 r = nxt_timer_data(obj, nxt_http_request_t, timer); 5713 5714 nxt_mp_release(r->mem_pool); 5715 } 5716 5717 5718 static void 5719 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5720 { 5721 size_t mi; 5722 uint32_t i; 5723 nxt_bool_t ack; 5724 nxt_process_t *process; 5725 nxt_free_map_t *m; 5726 nxt_port_mmap_handler_t *mmap_handler; 5727 5728 nxt_debug(task, "oosm in %PI", msg->port_msg.pid); 5729 5730 process = nxt_runtime_process_find(task->thread->runtime, 5731 msg->port_msg.pid); 5732 if (nxt_slow_path(process == NULL)) { 5733 return; 5734 } 5735 5736 ack = 0; 5737 5738 /* 5739 * To mitigate possible racing condition (when OOSM message received 5740 * after some of the memory was already freed), need to try to find 5741 * first free segment in shared memory and send ACK if found. 5742 */ 5743 5744 nxt_thread_mutex_lock(&process->incoming.mutex); 5745 5746 for (i = 0; i < process->incoming.size; i++) { 5747 mmap_handler = process->incoming.elts[i].mmap_handler; 5748 5749 if (nxt_slow_path(mmap_handler == NULL)) { 5750 continue; 5751 } 5752 5753 m = mmap_handler->hdr->free_map; 5754 5755 for (mi = 0; mi < MAX_FREE_IDX; mi++) { 5756 if (m[mi] != 0) { 5757 ack = 1; 5758 5759 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", 5760 i, mi, m[mi]); 5761 5762 break; 5763 } 5764 } 5765 } 5766 5767 nxt_thread_mutex_unlock(&process->incoming.mutex); 5768 5769 if (ack) { 5770 nxt_process_broadcast_shm_ack(task, process); 5771 } 5772 } 5773 5774 5775 static void 5776 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5777 { 5778 nxt_fd_t fd; 5779 nxt_port_t *port; 5780 nxt_runtime_t *rt; 5781 nxt_port_mmaps_t *mmaps; 5782 nxt_port_msg_get_mmap_t *get_mmap_msg; 5783 nxt_port_mmap_handler_t *mmap_handler; 5784 5785 rt = task->thread->runtime; 5786 5787 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5788 msg->port_msg.reply_port); 5789 if (nxt_slow_path(port == NULL)) { 5790 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", 5791 msg->port_msg.pid, msg->port_msg.reply_port); 5792 5793 return; 5794 } 5795 5796 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5797 < (int) sizeof(nxt_port_msg_get_mmap_t))) 5798 { 5799 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", 5800 (int) nxt_buf_used_size(msg->buf)); 5801 5802 return; 5803 } 5804 5805 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; 5806 5807 nxt_assert(port->type == NXT_PROCESS_APP); 5808 5809 if (nxt_slow_path(port->app == NULL)) { 5810 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", 5811 port->pid, port->id); 5812 5813 // FIXME 5814 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5815 -1, msg->port_msg.stream, 0, NULL); 5816 5817 return; 5818 } 5819 5820 mmaps = &port->app->outgoing; 5821 nxt_thread_mutex_lock(&mmaps->mutex); 5822 5823 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { 5824 nxt_thread_mutex_unlock(&mmaps->mutex); 5825 5826 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", 5827 (int) get_mmap_msg->id); 5828 5829 // FIXME 5830 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5831 -1, msg->port_msg.stream, 0, NULL); 5832 return; 5833 } 5834 5835 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; 5836 5837 fd = mmap_handler->fd; 5838 5839 nxt_thread_mutex_unlock(&mmaps->mutex); 5840 5841 nxt_debug(task, "get mmap %PI:%d found", 5842 msg->port_msg.pid, (int) get_mmap_msg->id); 5843 5844 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 5845 } 5846 5847 5848 static void 5849 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5850 { 5851 nxt_port_t *port, *reply_port; 5852 nxt_runtime_t *rt; 5853 nxt_port_msg_get_port_t *get_port_msg; 5854 5855 rt = task->thread->runtime; 5856 5857 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5858 msg->port_msg.reply_port); 5859 if (nxt_slow_path(reply_port == NULL)) { 5860 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", 5861 msg->port_msg.pid, msg->port_msg.reply_port); 5862 5863 return; 5864 } 5865 5866 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5867 < (int) sizeof(nxt_port_msg_get_port_t))) 5868 { 5869 nxt_alert(task, "get_port_handler: message buffer too small (%d)", 5870 (int) nxt_buf_used_size(msg->buf)); 5871 5872 return; 5873 } 5874 5875 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; 5876 5877 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); 5878 if (nxt_slow_path(port == NULL)) { 5879 nxt_alert(task, "get_port_handler: port %PI:%d not found", 5880 get_port_msg->pid, get_port_msg->id); 5881 5882 return; 5883 } 5884 5885 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, 5886 get_port_msg->id); 5887 5888 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); 5889 } 5890