1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 #include <nxt_app_queue.h> 19 #include <nxt_port_queue.h> 20 21 #define NXT_SHARED_PORT_ID 0xFFFFu 22 23 typedef struct { 24 nxt_str_t type; 25 uint32_t processes; 26 uint32_t max_processes; 27 uint32_t spare_processes; 28 nxt_msec_t timeout; 29 nxt_msec_t idle_timeout; 30 uint32_t requests; 31 nxt_conf_value_t *limits_value; 32 nxt_conf_value_t *processes_value; 33 nxt_conf_value_t *targets_value; 34 } nxt_router_app_conf_t; 35 36 37 typedef struct { 38 nxt_str_t pass; 39 nxt_str_t application; 40 } nxt_router_listener_conf_t; 41 42 43 #if (NXT_TLS) 44 45 typedef struct { 46 nxt_str_t name; 47 nxt_socket_conf_t *socket_conf; 48 nxt_router_temp_conf_t *temp_conf; 49 nxt_tls_init_t *tls_init; 50 nxt_bool_t last; 51 52 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 53 } nxt_router_tlssock_t; 54 55 #endif 56 57 58 typedef struct { 59 nxt_str_t *name; 60 nxt_socket_conf_t *socket_conf; 61 nxt_router_temp_conf_t *temp_conf; 62 nxt_bool_t last; 63 } nxt_socket_rpc_t; 64 65 66 typedef struct { 67 nxt_app_t *app; 68 nxt_router_temp_conf_t *temp_conf; 69 } nxt_app_rpc_t; 70 71 72 typedef struct { 73 nxt_app_joint_t *app_joint; 74 uint32_t generation; 75 } nxt_app_joint_rpc_t; 76 77 78 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 79 nxt_mp_t *mp); 80 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 81 static void nxt_router_greet_controller(nxt_task_t *task, 82 nxt_port_t *controller_port); 83 84 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 85 86 static void nxt_router_new_port_handler(nxt_task_t *task, 87 nxt_port_recv_msg_t *msg); 88 static void nxt_router_conf_data_handler(nxt_task_t *task, 89 nxt_port_recv_msg_t *msg); 90 static void nxt_router_app_restart_handler(nxt_task_t *task, 91 nxt_port_recv_msg_t *msg); 92 static void nxt_router_remove_pid_handler(nxt_task_t *task, 93 nxt_port_recv_msg_t *msg); 94 static void nxt_router_access_log_reopen_handler(nxt_task_t *task, 95 nxt_port_recv_msg_t *msg); 96 97 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 98 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 99 static void nxt_router_conf_ready(nxt_task_t *task, 100 nxt_router_temp_conf_t *tmcf); 101 static void nxt_router_conf_error(nxt_task_t *task, 102 nxt_router_temp_conf_t *tmcf); 103 static void nxt_router_conf_send(nxt_task_t *task, 104 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 105 106 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 107 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 108 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 109 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 110 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task, 111 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf, 112 nxt_conf_value_t *conf); 113 114 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 115 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 116 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 117 nxt_app_t *app); 118 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 119 nxt_str_t *name); 120 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 121 int i); 122 123 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 124 nxt_port_t *port); 125 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 126 nxt_port_t *port); 127 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 128 nxt_port_t *port, nxt_fd_t fd); 129 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 130 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 131 static void nxt_router_listen_socket_ready(nxt_task_t *task, 132 nxt_port_recv_msg_t *msg, void *data); 133 static void nxt_router_listen_socket_error(nxt_task_t *task, 134 nxt_port_recv_msg_t *msg, void *data); 135 #if (NXT_TLS) 136 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 137 nxt_port_recv_msg_t *msg, void *data); 138 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 139 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init, 140 nxt_bool_t last); 141 #endif 142 static void nxt_router_app_rpc_create(nxt_task_t *task, 143 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 144 static void nxt_router_app_prefork_ready(nxt_task_t *task, 145 nxt_port_recv_msg_t *msg, void *data); 146 static void nxt_router_app_prefork_error(nxt_task_t *task, 147 nxt_port_recv_msg_t *msg, void *data); 148 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 149 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 150 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 151 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 152 153 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 154 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 155 const nxt_event_interface_t *interface); 156 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 157 nxt_router_engine_conf_t *recf); 158 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 159 nxt_router_engine_conf_t *recf); 160 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 161 nxt_router_engine_conf_t *recf); 162 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 163 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 164 nxt_work_handler_t handler); 165 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 166 nxt_router_engine_conf_t *recf); 167 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 168 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 169 170 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 171 nxt_router_temp_conf_t *tmcf); 172 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 173 nxt_event_engine_t *engine); 174 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 175 nxt_router_temp_conf_t *tmcf); 176 177 static void nxt_router_engines_post(nxt_router_t *router, 178 nxt_router_temp_conf_t *tmcf); 179 static void nxt_router_engine_post(nxt_event_engine_t *engine, 180 nxt_work_t *jobs); 181 182 static void nxt_router_thread_start(void *data); 183 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 184 void *data); 185 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 186 void *data); 187 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 188 void *data); 189 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 190 void *data); 191 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 192 void *data); 193 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 194 void *data); 195 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 196 void *data); 197 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 198 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 199 static void nxt_router_listen_socket_release(nxt_task_t *task, 200 nxt_socket_conf_t *skcf); 201 202 static void nxt_router_access_log_writer(nxt_task_t *task, 203 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 204 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 205 struct tm *tm, size_t size, const char *format); 206 static void nxt_router_access_log_open(nxt_task_t *task, 207 nxt_router_temp_conf_t *tmcf); 208 static void nxt_router_access_log_ready(nxt_task_t *task, 209 nxt_port_recv_msg_t *msg, void *data); 210 static void nxt_router_access_log_error(nxt_task_t *task, 211 nxt_port_recv_msg_t *msg, void *data); 212 static void nxt_router_access_log_release(nxt_task_t *task, 213 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 214 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 215 void *data); 216 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 217 nxt_port_recv_msg_t *msg, void *data); 218 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 219 nxt_port_recv_msg_t *msg, void *data); 220 221 static void nxt_router_app_port_ready(nxt_task_t *task, 222 nxt_port_recv_msg_t *msg, void *data); 223 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, 224 nxt_port_t *app_port); 225 static void nxt_router_app_port_error(nxt_task_t *task, 226 nxt_port_recv_msg_t *msg, void *data); 227 228 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 229 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 230 231 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 232 nxt_apr_action_t action); 233 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 234 nxt_request_rpc_data_t *req_rpc_data); 235 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 236 void *data); 237 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 238 void *data); 239 240 static void nxt_router_app_prepare_request(nxt_task_t *task, 241 nxt_request_rpc_data_t *req_rpc_data); 242 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 243 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 244 245 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 246 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 247 void *data); 248 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 249 void *data); 250 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 251 void *data); 252 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 253 254 static const nxt_http_request_state_t nxt_http_request_send_state; 255 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 256 257 static void nxt_router_app_joint_use(nxt_task_t *task, 258 nxt_app_joint_t *app_joint, int i); 259 260 static void nxt_router_http_request_release_post(nxt_task_t *task, 261 nxt_http_request_t *r); 262 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 263 void *data); 264 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 265 static void nxt_router_get_port_handler(nxt_task_t *task, 266 nxt_port_recv_msg_t *msg); 267 static void nxt_router_get_mmap_handler(nxt_task_t *task, 268 nxt_port_recv_msg_t *msg); 269 270 extern const nxt_http_request_state_t nxt_http_websocket; 271 272 static nxt_router_t *nxt_router; 273 274 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 275 static const nxt_str_t empty_prefix = nxt_string(""); 276 277 static const nxt_str_t *nxt_app_msg_prefix[] = { 278 &empty_prefix, 279 &empty_prefix, 280 &http_prefix, 281 &http_prefix, 282 &http_prefix, 283 &empty_prefix, 284 }; 285 286 287 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 288 .quit = nxt_signal_quit_handler, 289 .new_port = nxt_router_new_port_handler, 290 .get_port = nxt_router_get_port_handler, 291 .change_file = nxt_port_change_log_file_handler, 292 .mmap = nxt_port_mmap_handler, 293 .get_mmap = nxt_router_get_mmap_handler, 294 .data = nxt_router_conf_data_handler, 295 .app_restart = nxt_router_app_restart_handler, 296 .remove_pid = nxt_router_remove_pid_handler, 297 .access_log = nxt_router_access_log_reopen_handler, 298 .rpc_ready = nxt_port_rpc_handler, 299 .rpc_error = nxt_port_rpc_handler, 300 .oosm = nxt_router_oosm_handler, 301 }; 302 303 304 const nxt_process_init_t nxt_router_process = { 305 .name = "router", 306 .type = NXT_PROCESS_ROUTER, 307 .prefork = nxt_router_prefork, 308 .restart = 1, 309 .setup = nxt_process_core_setup, 310 .start = nxt_router_start, 311 .port_handlers = &nxt_router_process_port_handlers, 312 .signals = nxt_process_signals, 313 }; 314 315 316 /* Queues of nxt_socket_conf_t */ 317 nxt_queue_t creating_sockets; 318 nxt_queue_t pending_sockets; 319 nxt_queue_t updating_sockets; 320 nxt_queue_t keeping_sockets; 321 nxt_queue_t deleting_sockets; 322 323 324 static nxt_int_t 325 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 326 { 327 nxt_runtime_stop_app_processes(task, task->thread->runtime); 328 329 return NXT_OK; 330 } 331 332 333 static nxt_int_t 334 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 335 { 336 nxt_int_t ret; 337 nxt_port_t *controller_port; 338 nxt_router_t *router; 339 nxt_runtime_t *rt; 340 341 rt = task->thread->runtime; 342 343 nxt_log(task, NXT_LOG_INFO, "router started"); 344 345 #if (NXT_TLS) 346 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 347 if (nxt_slow_path(rt->tls == NULL)) { 348 return NXT_ERROR; 349 } 350 351 ret = rt->tls->library_init(task); 352 if (nxt_slow_path(ret != NXT_OK)) { 353 return ret; 354 } 355 #endif 356 357 ret = nxt_http_init(task); 358 if (nxt_slow_path(ret != NXT_OK)) { 359 return ret; 360 } 361 362 router = nxt_zalloc(sizeof(nxt_router_t)); 363 if (nxt_slow_path(router == NULL)) { 364 return NXT_ERROR; 365 } 366 367 nxt_queue_init(&router->engines); 368 nxt_queue_init(&router->sockets); 369 nxt_queue_init(&router->apps); 370 371 nxt_router = router; 372 373 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 374 if (controller_port != NULL) { 375 nxt_router_greet_controller(task, controller_port); 376 } 377 378 return NXT_OK; 379 } 380 381 382 static void 383 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 384 { 385 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 386 -1, 0, 0, NULL); 387 } 388 389 390 static void 391 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 392 void *data) 393 { 394 size_t size; 395 uint32_t stream; 396 nxt_mp_t *mp; 397 nxt_int_t ret; 398 nxt_app_t *app; 399 nxt_buf_t *b; 400 nxt_port_t *main_port; 401 nxt_runtime_t *rt; 402 nxt_app_joint_rpc_t *app_joint_rpc; 403 404 app = data; 405 406 rt = task->thread->runtime; 407 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 408 409 nxt_debug(task, "app '%V' %p start process", &app->name, app); 410 411 size = app->name.length + 1 + app->conf.length; 412 413 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 414 415 if (nxt_slow_path(b == NULL)) { 416 goto failed; 417 } 418 419 nxt_buf_cpystr(b, &app->name); 420 *b->mem.free++ = '\0'; 421 nxt_buf_cpystr(b, &app->conf); 422 423 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, 424 nxt_router_app_port_ready, 425 nxt_router_app_port_error, 426 sizeof(nxt_app_joint_rpc_t)); 427 if (nxt_slow_path(app_joint_rpc == NULL)) { 428 goto failed; 429 } 430 431 stream = nxt_port_rpc_ex_stream(app_joint_rpc); 432 433 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 434 -1, stream, port->id, b); 435 if (nxt_slow_path(ret != NXT_OK)) { 436 nxt_port_rpc_cancel(task, port, stream); 437 438 goto failed; 439 } 440 441 app_joint_rpc->app_joint = app->joint; 442 app_joint_rpc->generation = app->generation; 443 444 nxt_router_app_joint_use(task, app->joint, 1); 445 446 nxt_router_app_use(task, app, -1); 447 448 return; 449 450 failed: 451 452 if (b != NULL) { 453 mp = b->data; 454 nxt_mp_free(mp, b); 455 nxt_mp_release(mp); 456 } 457 458 nxt_thread_mutex_lock(&app->mutex); 459 460 app->pending_processes--; 461 462 nxt_thread_mutex_unlock(&app->mutex); 463 464 nxt_router_app_use(task, app, -1); 465 } 466 467 468 static void 469 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 470 { 471 app_joint->use_count += i; 472 473 if (app_joint->use_count == 0) { 474 nxt_assert(app_joint->app == NULL); 475 476 nxt_free(app_joint); 477 } 478 } 479 480 481 static nxt_int_t 482 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 483 { 484 nxt_int_t res; 485 nxt_port_t *router_port; 486 nxt_runtime_t *rt; 487 488 nxt_debug(task, "app '%V' start process", &app->name); 489 490 rt = task->thread->runtime; 491 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 492 493 nxt_router_app_use(task, app, 1); 494 495 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 496 app); 497 498 if (res == NXT_OK) { 499 return res; 500 } 501 502 nxt_thread_mutex_lock(&app->mutex); 503 504 app->pending_processes--; 505 506 nxt_thread_mutex_unlock(&app->mutex); 507 508 nxt_router_app_use(task, app, -1); 509 510 return NXT_ERROR; 511 } 512 513 514 nxt_inline nxt_bool_t 515 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 516 { 517 nxt_buf_t *b, *next; 518 nxt_bool_t cancelled; 519 nxt_port_t *app_port; 520 nxt_msg_info_t *msg_info; 521 522 msg_info = &req_rpc_data->msg_info; 523 524 if (msg_info->buf == NULL) { 525 return 0; 526 } 527 528 app_port = req_rpc_data->app_port; 529 530 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) { 531 cancelled = nxt_app_queue_cancel(app_port->queue, 532 msg_info->tracking_cookie, 533 req_rpc_data->stream); 534 535 if (cancelled) { 536 nxt_debug(task, "stream #%uD: cancelled by router", 537 req_rpc_data->stream); 538 } 539 540 } else { 541 cancelled = 0; 542 } 543 544 for (b = msg_info->buf; b != NULL; b = next) { 545 next = b->next; 546 b->next = NULL; 547 548 if (b->is_port_mmap_sent) { 549 b->is_port_mmap_sent = cancelled == 0; 550 } 551 552 b->completion_handler(task, b, b->parent); 553 } 554 555 msg_info->buf = NULL; 556 557 return cancelled; 558 } 559 560 561 nxt_inline nxt_bool_t 562 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 563 { 564 if (lnk->next != NULL) { 565 nxt_queue_remove(lnk); 566 567 lnk->next = NULL; 568 569 return 1; 570 } 571 572 return 0; 573 } 574 575 576 nxt_inline void 577 nxt_request_rpc_data_unlink(nxt_task_t *task, 578 nxt_request_rpc_data_t *req_rpc_data) 579 { 580 nxt_app_t *app; 581 nxt_bool_t unlinked; 582 nxt_http_request_t *r; 583 584 nxt_router_msg_cancel(task, req_rpc_data); 585 586 if (req_rpc_data->app_port != NULL) { 587 nxt_router_app_port_release(task, req_rpc_data->app_port, 588 req_rpc_data->apr_action); 589 590 req_rpc_data->app_port = NULL; 591 } 592 593 app = req_rpc_data->app; 594 r = req_rpc_data->request; 595 596 if (r != NULL) { 597 r->timer_data = NULL; 598 599 nxt_router_http_request_release_post(task, r); 600 601 r->req_rpc_data = NULL; 602 req_rpc_data->request = NULL; 603 604 if (app != NULL) { 605 unlinked = 0; 606 607 nxt_thread_mutex_lock(&app->mutex); 608 609 if (r->app_link.next != NULL) { 610 nxt_queue_remove(&r->app_link); 611 r->app_link.next = NULL; 612 613 unlinked = 1; 614 } 615 616 nxt_thread_mutex_unlock(&app->mutex); 617 618 if (unlinked) { 619 nxt_mp_release(r->mem_pool); 620 } 621 } 622 } 623 624 if (app != NULL) { 625 nxt_router_app_use(task, app, -1); 626 627 req_rpc_data->app = NULL; 628 } 629 630 if (req_rpc_data->msg_info.body_fd != -1) { 631 nxt_fd_close(req_rpc_data->msg_info.body_fd); 632 633 req_rpc_data->msg_info.body_fd = -1; 634 } 635 636 if (req_rpc_data->rpc_cancel) { 637 req_rpc_data->rpc_cancel = 0; 638 639 nxt_port_rpc_cancel(task, task->thread->engine->port, 640 req_rpc_data->stream); 641 } 642 } 643 644 645 static void 646 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 647 { 648 nxt_int_t res; 649 nxt_app_t *app; 650 nxt_port_t *port, *main_app_port; 651 nxt_runtime_t *rt; 652 653 nxt_port_new_port_handler(task, msg); 654 655 port = msg->u.new_port; 656 657 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 658 nxt_router_greet_controller(task, msg->u.new_port); 659 } 660 661 if (port == NULL || port->type != NXT_PROCESS_APP) { 662 663 if (msg->port_msg.stream == 0) { 664 return; 665 } 666 667 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 668 669 } else { 670 if (msg->fd[1] != -1) { 671 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 672 if (nxt_slow_path(res != NXT_OK)) { 673 return; 674 } 675 676 nxt_fd_close(msg->fd[1]); 677 msg->fd[1] = -1; 678 } 679 } 680 681 if (msg->port_msg.stream != 0) { 682 nxt_port_rpc_handler(task, msg); 683 return; 684 } 685 686 /* 687 * Port with "id == 0" is application 'main' port and it always 688 * should come with non-zero stream. 689 */ 690 nxt_assert(port->id != 0); 691 692 /* Find 'main' app port and get app reference. */ 693 rt = task->thread->runtime; 694 695 /* 696 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 697 * sent to main port (with id == 0) and processed in main thread. 698 */ 699 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 700 nxt_assert(main_app_port != NULL); 701 702 app = main_app_port->app; 703 704 if (nxt_fast_path(app != NULL)) { 705 nxt_thread_mutex_lock(&app->mutex); 706 707 /* TODO here should be find-and-add code because there can be 708 port waiters in port_hash */ 709 nxt_port_hash_add(&app->port_hash, port); 710 app->port_hash_count++; 711 712 nxt_thread_mutex_unlock(&app->mutex); 713 714 port->app = app; 715 } 716 717 port->main_app_port = main_app_port; 718 719 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 720 } 721 722 723 static void 724 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 725 { 726 void *p; 727 size_t size; 728 nxt_int_t ret; 729 nxt_port_t *port; 730 nxt_router_temp_conf_t *tmcf; 731 732 port = nxt_runtime_port_find(task->thread->runtime, 733 msg->port_msg.pid, 734 msg->port_msg.reply_port); 735 if (nxt_slow_path(port == NULL)) { 736 nxt_alert(task, "conf_data_handler: reply port not found"); 737 return; 738 } 739 740 p = MAP_FAILED; 741 742 /* 743 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 744 * initialized in 'cleanup' section. 745 */ 746 size = 0; 747 748 tmcf = nxt_router_temp_conf(task); 749 if (nxt_slow_path(tmcf == NULL)) { 750 goto fail; 751 } 752 753 if (nxt_slow_path(msg->fd[0] == -1)) { 754 nxt_alert(task, "conf_data_handler: invalid shm fd"); 755 goto fail; 756 } 757 758 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 759 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 760 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 761 goto fail; 762 } 763 764 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 765 766 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 767 768 nxt_fd_close(msg->fd[0]); 769 msg->fd[0] = -1; 770 771 if (nxt_slow_path(p == MAP_FAILED)) { 772 goto fail; 773 } 774 775 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 776 777 tmcf->router_conf->router = nxt_router; 778 tmcf->stream = msg->port_msg.stream; 779 tmcf->port = port; 780 781 nxt_port_use(task, tmcf->port, 1); 782 783 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 784 785 if (nxt_fast_path(ret == NXT_OK)) { 786 nxt_router_conf_apply(task, tmcf, NULL); 787 788 } else { 789 nxt_router_conf_error(task, tmcf); 790 } 791 792 goto cleanup; 793 794 fail: 795 796 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 797 msg->port_msg.stream, 0, NULL); 798 799 if (tmcf != NULL) { 800 nxt_mp_release(tmcf->mem_pool); 801 } 802 803 cleanup: 804 805 if (p != MAP_FAILED) { 806 nxt_mem_munmap(p, size); 807 } 808 809 if (msg->fd[0] != -1) { 810 nxt_fd_close(msg->fd[0]); 811 msg->fd[0] = -1; 812 } 813 } 814 815 816 static void 817 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 818 { 819 nxt_app_t *app; 820 nxt_int_t ret; 821 nxt_str_t app_name; 822 nxt_port_t *port, *reply_port, *shared_port, *old_shared_port; 823 nxt_port_msg_type_t reply; 824 825 reply_port = nxt_runtime_port_find(task->thread->runtime, 826 msg->port_msg.pid, 827 msg->port_msg.reply_port); 828 if (nxt_slow_path(reply_port == NULL)) { 829 nxt_alert(task, "app_restart_handler: reply port not found"); 830 return; 831 } 832 833 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem); 834 app_name.start = msg->buf->mem.pos; 835 836 nxt_debug(task, "app_restart_handler: %V", &app_name); 837 838 app = nxt_router_app_find(&nxt_router->apps, &app_name); 839 840 if (nxt_fast_path(app != NULL)) { 841 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 842 NXT_PROCESS_APP); 843 if (nxt_slow_path(shared_port == NULL)) { 844 goto fail; 845 } 846 847 ret = nxt_port_socket_init(task, shared_port, 0); 848 if (nxt_slow_path(ret != NXT_OK)) { 849 nxt_port_use(task, shared_port, -1); 850 goto fail; 851 } 852 853 ret = nxt_router_app_queue_init(task, shared_port); 854 if (nxt_slow_path(ret != NXT_OK)) { 855 nxt_port_write_close(shared_port); 856 nxt_port_read_close(shared_port); 857 nxt_port_use(task, shared_port, -1); 858 goto fail; 859 } 860 861 nxt_port_write_enable(task, shared_port); 862 863 nxt_thread_mutex_lock(&app->mutex); 864 865 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 866 867 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 868 0, 0, NULL); 869 870 } nxt_queue_loop; 871 872 app->generation++; 873 874 shared_port->app = app; 875 876 old_shared_port = app->shared_port; 877 old_shared_port->app = NULL; 878 879 app->shared_port = shared_port; 880 881 nxt_thread_mutex_unlock(&app->mutex); 882 883 nxt_port_close(task, old_shared_port); 884 nxt_port_use(task, old_shared_port, -1); 885 886 reply = NXT_PORT_MSG_RPC_READY_LAST; 887 888 } else { 889 890 fail: 891 892 reply = NXT_PORT_MSG_RPC_ERROR; 893 } 894 895 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream, 896 0, NULL); 897 } 898 899 900 static void 901 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 902 void *data) 903 { 904 union { 905 nxt_pid_t removed_pid; 906 void *data; 907 } u; 908 909 u.data = data; 910 911 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 912 } 913 914 915 static void 916 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 917 { 918 nxt_event_engine_t *engine; 919 920 nxt_port_remove_pid_handler(task, msg); 921 922 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 923 { 924 if (nxt_fast_path(engine->port != NULL)) { 925 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 926 msg->u.data); 927 } 928 } 929 nxt_queue_loop; 930 931 if (msg->port_msg.stream == 0) { 932 return; 933 } 934 935 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 936 937 nxt_port_rpc_handler(task, msg); 938 } 939 940 941 static nxt_router_temp_conf_t * 942 nxt_router_temp_conf(nxt_task_t *task) 943 { 944 nxt_mp_t *mp, *tmp; 945 nxt_router_conf_t *rtcf; 946 nxt_router_temp_conf_t *tmcf; 947 948 mp = nxt_mp_create(1024, 128, 256, 32); 949 if (nxt_slow_path(mp == NULL)) { 950 return NULL; 951 } 952 953 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 954 if (nxt_slow_path(rtcf == NULL)) { 955 goto fail; 956 } 957 958 rtcf->mem_pool = mp; 959 960 tmp = nxt_mp_create(1024, 128, 256, 32); 961 if (nxt_slow_path(tmp == NULL)) { 962 goto fail; 963 } 964 965 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 966 if (nxt_slow_path(tmcf == NULL)) { 967 goto temp_fail; 968 } 969 970 tmcf->mem_pool = tmp; 971 tmcf->router_conf = rtcf; 972 tmcf->count = 1; 973 tmcf->engine = task->thread->engine; 974 975 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 976 sizeof(nxt_router_engine_conf_t)); 977 if (nxt_slow_path(tmcf->engines == NULL)) { 978 goto temp_fail; 979 } 980 981 nxt_queue_init(&creating_sockets); 982 nxt_queue_init(&pending_sockets); 983 nxt_queue_init(&updating_sockets); 984 nxt_queue_init(&keeping_sockets); 985 nxt_queue_init(&deleting_sockets); 986 987 #if (NXT_TLS) 988 nxt_queue_init(&tmcf->tls); 989 #endif 990 991 nxt_queue_init(&tmcf->apps); 992 nxt_queue_init(&tmcf->previous); 993 994 return tmcf; 995 996 temp_fail: 997 998 nxt_mp_destroy(tmp); 999 1000 fail: 1001 1002 nxt_mp_destroy(mp); 1003 1004 return NULL; 1005 } 1006 1007 1008 nxt_inline nxt_bool_t 1009 nxt_router_app_can_start(nxt_app_t *app) 1010 { 1011 return app->processes + app->pending_processes < app->max_processes 1012 && app->pending_processes < app->max_pending_processes; 1013 } 1014 1015 1016 nxt_inline nxt_bool_t 1017 nxt_router_app_need_start(nxt_app_t *app) 1018 { 1019 return (app->active_requests 1020 > app->port_hash_count + app->pending_processes) 1021 || (app->spare_processes 1022 > app->idle_processes + app->pending_processes); 1023 } 1024 1025 1026 static void 1027 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1028 { 1029 nxt_int_t ret; 1030 nxt_app_t *app; 1031 nxt_router_t *router; 1032 nxt_runtime_t *rt; 1033 nxt_queue_link_t *qlk; 1034 nxt_socket_conf_t *skcf; 1035 nxt_router_conf_t *rtcf; 1036 nxt_router_temp_conf_t *tmcf; 1037 const nxt_event_interface_t *interface; 1038 #if (NXT_TLS) 1039 nxt_router_tlssock_t *tls; 1040 #endif 1041 1042 tmcf = obj; 1043 1044 qlk = nxt_queue_first(&pending_sockets); 1045 1046 if (qlk != nxt_queue_tail(&pending_sockets)) { 1047 nxt_queue_remove(qlk); 1048 nxt_queue_insert_tail(&creating_sockets, qlk); 1049 1050 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1051 1052 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1053 1054 return; 1055 } 1056 1057 #if (NXT_TLS) 1058 qlk = nxt_queue_last(&tmcf->tls); 1059 1060 if (qlk != nxt_queue_head(&tmcf->tls)) { 1061 nxt_queue_remove(qlk); 1062 1063 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1064 1065 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 1066 nxt_router_tls_rpc_handler, tls); 1067 return; 1068 } 1069 #endif 1070 1071 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1072 1073 if (nxt_router_app_need_start(app)) { 1074 nxt_router_app_rpc_create(task, tmcf, app); 1075 return; 1076 } 1077 1078 } nxt_queue_loop; 1079 1080 rtcf = tmcf->router_conf; 1081 1082 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1083 nxt_router_access_log_open(task, tmcf); 1084 return; 1085 } 1086 1087 rt = task->thread->runtime; 1088 1089 interface = nxt_service_get(rt->services, "engine", NULL); 1090 1091 router = rtcf->router; 1092 1093 ret = nxt_router_engines_create(task, router, tmcf, interface); 1094 if (nxt_slow_path(ret != NXT_OK)) { 1095 goto fail; 1096 } 1097 1098 ret = nxt_router_threads_create(task, rt, tmcf); 1099 if (nxt_slow_path(ret != NXT_OK)) { 1100 goto fail; 1101 } 1102 1103 nxt_router_apps_sort(task, router, tmcf); 1104 1105 nxt_router_apps_hash_use(task, rtcf, 1); 1106 1107 nxt_router_engines_post(router, tmcf); 1108 1109 nxt_queue_add(&router->sockets, &updating_sockets); 1110 nxt_queue_add(&router->sockets, &creating_sockets); 1111 1112 router->access_log = rtcf->access_log; 1113 1114 nxt_router_conf_ready(task, tmcf); 1115 1116 return; 1117 1118 fail: 1119 1120 nxt_router_conf_error(task, tmcf); 1121 1122 return; 1123 } 1124 1125 1126 static void 1127 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1128 { 1129 nxt_joint_job_t *job; 1130 1131 job = obj; 1132 1133 nxt_router_conf_ready(task, job->tmcf); 1134 } 1135 1136 1137 static void 1138 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1139 { 1140 uint32_t count; 1141 nxt_router_conf_t *rtcf; 1142 nxt_thread_spinlock_t *lock; 1143 1144 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1145 1146 if (--tmcf->count > 0) { 1147 return; 1148 } 1149 1150 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1151 1152 rtcf = tmcf->router_conf; 1153 1154 lock = &rtcf->router->lock; 1155 1156 nxt_thread_spin_lock(lock); 1157 1158 count = rtcf->count; 1159 1160 nxt_thread_spin_unlock(lock); 1161 1162 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1163 1164 if (count == 0) { 1165 nxt_router_apps_hash_use(task, rtcf, -1); 1166 1167 nxt_router_access_log_release(task, lock, rtcf->access_log); 1168 1169 nxt_mp_destroy(rtcf->mem_pool); 1170 } 1171 1172 nxt_mp_release(tmcf->mem_pool); 1173 } 1174 1175 1176 static void 1177 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1178 { 1179 nxt_app_t *app; 1180 nxt_queue_t new_socket_confs; 1181 nxt_socket_t s; 1182 nxt_router_t *router; 1183 nxt_queue_link_t *qlk; 1184 nxt_socket_conf_t *skcf; 1185 nxt_router_conf_t *rtcf; 1186 1187 nxt_alert(task, "failed to apply new conf"); 1188 1189 for (qlk = nxt_queue_first(&creating_sockets); 1190 qlk != nxt_queue_tail(&creating_sockets); 1191 qlk = nxt_queue_next(qlk)) 1192 { 1193 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1194 s = skcf->listen->socket; 1195 1196 if (s != -1) { 1197 nxt_socket_close(task, s); 1198 } 1199 1200 nxt_free(skcf->listen); 1201 } 1202 1203 nxt_queue_init(&new_socket_confs); 1204 nxt_queue_add(&new_socket_confs, &updating_sockets); 1205 nxt_queue_add(&new_socket_confs, &pending_sockets); 1206 nxt_queue_add(&new_socket_confs, &creating_sockets); 1207 1208 rtcf = tmcf->router_conf; 1209 1210 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1211 1212 nxt_router_app_unlink(task, app); 1213 1214 } nxt_queue_loop; 1215 1216 router = rtcf->router; 1217 1218 nxt_queue_add(&router->sockets, &keeping_sockets); 1219 nxt_queue_add(&router->sockets, &deleting_sockets); 1220 1221 nxt_queue_add(&router->apps, &tmcf->previous); 1222 1223 // TODO: new engines and threads 1224 1225 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1226 1227 nxt_mp_destroy(rtcf->mem_pool); 1228 1229 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1230 1231 nxt_mp_release(tmcf->mem_pool); 1232 } 1233 1234 1235 static void 1236 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1237 nxt_port_msg_type_t type) 1238 { 1239 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1240 1241 nxt_port_use(task, tmcf->port, -1); 1242 1243 tmcf->port = NULL; 1244 } 1245 1246 1247 static nxt_conf_map_t nxt_router_conf[] = { 1248 { 1249 nxt_string("listeners_threads"), 1250 NXT_CONF_MAP_INT32, 1251 offsetof(nxt_router_conf_t, threads), 1252 }, 1253 }; 1254 1255 1256 static nxt_conf_map_t nxt_router_app_conf[] = { 1257 { 1258 nxt_string("type"), 1259 NXT_CONF_MAP_STR, 1260 offsetof(nxt_router_app_conf_t, type), 1261 }, 1262 1263 { 1264 nxt_string("limits"), 1265 NXT_CONF_MAP_PTR, 1266 offsetof(nxt_router_app_conf_t, limits_value), 1267 }, 1268 1269 { 1270 nxt_string("processes"), 1271 NXT_CONF_MAP_INT32, 1272 offsetof(nxt_router_app_conf_t, processes), 1273 }, 1274 1275 { 1276 nxt_string("processes"), 1277 NXT_CONF_MAP_PTR, 1278 offsetof(nxt_router_app_conf_t, processes_value), 1279 }, 1280 1281 { 1282 nxt_string("targets"), 1283 NXT_CONF_MAP_PTR, 1284 offsetof(nxt_router_app_conf_t, targets_value), 1285 }, 1286 }; 1287 1288 1289 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1290 { 1291 nxt_string("timeout"), 1292 NXT_CONF_MAP_MSEC, 1293 offsetof(nxt_router_app_conf_t, timeout), 1294 }, 1295 1296 { 1297 nxt_string("requests"), 1298 NXT_CONF_MAP_INT32, 1299 offsetof(nxt_router_app_conf_t, requests), 1300 }, 1301 }; 1302 1303 1304 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1305 { 1306 nxt_string("spare"), 1307 NXT_CONF_MAP_INT32, 1308 offsetof(nxt_router_app_conf_t, spare_processes), 1309 }, 1310 1311 { 1312 nxt_string("max"), 1313 NXT_CONF_MAP_INT32, 1314 offsetof(nxt_router_app_conf_t, max_processes), 1315 }, 1316 1317 { 1318 nxt_string("idle_timeout"), 1319 NXT_CONF_MAP_MSEC, 1320 offsetof(nxt_router_app_conf_t, idle_timeout), 1321 }, 1322 }; 1323 1324 1325 static nxt_conf_map_t nxt_router_listener_conf[] = { 1326 { 1327 nxt_string("pass"), 1328 NXT_CONF_MAP_STR_COPY, 1329 offsetof(nxt_router_listener_conf_t, pass), 1330 }, 1331 1332 { 1333 nxt_string("application"), 1334 NXT_CONF_MAP_STR_COPY, 1335 offsetof(nxt_router_listener_conf_t, application), 1336 }, 1337 }; 1338 1339 1340 static nxt_conf_map_t nxt_router_http_conf[] = { 1341 { 1342 nxt_string("header_buffer_size"), 1343 NXT_CONF_MAP_SIZE, 1344 offsetof(nxt_socket_conf_t, header_buffer_size), 1345 }, 1346 1347 { 1348 nxt_string("large_header_buffer_size"), 1349 NXT_CONF_MAP_SIZE, 1350 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1351 }, 1352 1353 { 1354 nxt_string("large_header_buffers"), 1355 NXT_CONF_MAP_SIZE, 1356 offsetof(nxt_socket_conf_t, large_header_buffers), 1357 }, 1358 1359 { 1360 nxt_string("body_buffer_size"), 1361 NXT_CONF_MAP_SIZE, 1362 offsetof(nxt_socket_conf_t, body_buffer_size), 1363 }, 1364 1365 { 1366 nxt_string("max_body_size"), 1367 NXT_CONF_MAP_SIZE, 1368 offsetof(nxt_socket_conf_t, max_body_size), 1369 }, 1370 1371 { 1372 nxt_string("idle_timeout"), 1373 NXT_CONF_MAP_MSEC, 1374 offsetof(nxt_socket_conf_t, idle_timeout), 1375 }, 1376 1377 { 1378 nxt_string("header_read_timeout"), 1379 NXT_CONF_MAP_MSEC, 1380 offsetof(nxt_socket_conf_t, header_read_timeout), 1381 }, 1382 1383 { 1384 nxt_string("body_read_timeout"), 1385 NXT_CONF_MAP_MSEC, 1386 offsetof(nxt_socket_conf_t, body_read_timeout), 1387 }, 1388 1389 { 1390 nxt_string("send_timeout"), 1391 NXT_CONF_MAP_MSEC, 1392 offsetof(nxt_socket_conf_t, send_timeout), 1393 }, 1394 1395 { 1396 nxt_string("body_temp_path"), 1397 NXT_CONF_MAP_STR, 1398 offsetof(nxt_socket_conf_t, body_temp_path), 1399 }, 1400 1401 { 1402 nxt_string("discard_unsafe_fields"), 1403 NXT_CONF_MAP_INT8, 1404 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1405 }, 1406 }; 1407 1408 1409 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1410 { 1411 nxt_string("max_frame_size"), 1412 NXT_CONF_MAP_SIZE, 1413 offsetof(nxt_websocket_conf_t, max_frame_size), 1414 }, 1415 1416 { 1417 nxt_string("read_timeout"), 1418 NXT_CONF_MAP_MSEC, 1419 offsetof(nxt_websocket_conf_t, read_timeout), 1420 }, 1421 1422 { 1423 nxt_string("keepalive_interval"), 1424 NXT_CONF_MAP_MSEC, 1425 offsetof(nxt_websocket_conf_t, keepalive_interval), 1426 }, 1427 1428 }; 1429 1430 1431 static nxt_int_t 1432 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1433 u_char *start, u_char *end) 1434 { 1435 u_char *p; 1436 size_t size; 1437 nxt_mp_t *mp, *app_mp; 1438 uint32_t next, next_target; 1439 nxt_int_t ret; 1440 nxt_str_t name, path, target; 1441 nxt_app_t *app, *prev; 1442 nxt_str_t *t, *s, *targets; 1443 nxt_uint_t n, i; 1444 nxt_port_t *port; 1445 nxt_router_t *router; 1446 nxt_app_joint_t *app_joint; 1447 #if (NXT_TLS) 1448 nxt_tls_init_t *tls_init; 1449 nxt_conf_value_t *certificate; 1450 #endif 1451 nxt_conf_value_t *conf, *http, *value, *websocket; 1452 nxt_conf_value_t *applications, *application; 1453 nxt_conf_value_t *listeners, *listener; 1454 nxt_conf_value_t *routes_conf, *static_conf, *client_ip_conf; 1455 nxt_socket_conf_t *skcf; 1456 nxt_http_routes_t *routes; 1457 nxt_event_engine_t *engine; 1458 nxt_app_lang_module_t *lang; 1459 nxt_router_app_conf_t apcf; 1460 nxt_router_access_log_t *access_log; 1461 nxt_router_listener_conf_t lscf; 1462 1463 static nxt_str_t http_path = nxt_string("/settings/http"); 1464 static nxt_str_t applications_path = nxt_string("/applications"); 1465 static nxt_str_t listeners_path = nxt_string("/listeners"); 1466 static nxt_str_t routes_path = nxt_string("/routes"); 1467 static nxt_str_t access_log_path = nxt_string("/access_log"); 1468 #if (NXT_TLS) 1469 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1470 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands"); 1471 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size"); 1472 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout"); 1473 #endif 1474 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1475 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1476 static nxt_str_t client_ip_path = nxt_string("/client_ip"); 1477 1478 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1479 if (conf == NULL) { 1480 nxt_alert(task, "configuration parsing error"); 1481 return NXT_ERROR; 1482 } 1483 1484 mp = tmcf->router_conf->mem_pool; 1485 1486 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1487 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1488 if (ret != NXT_OK) { 1489 nxt_alert(task, "root map error"); 1490 return NXT_ERROR; 1491 } 1492 1493 if (tmcf->router_conf->threads == 0) { 1494 tmcf->router_conf->threads = nxt_ncpu; 1495 } 1496 1497 static_conf = nxt_conf_get_path(conf, &static_path); 1498 1499 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf); 1500 if (nxt_slow_path(ret != NXT_OK)) { 1501 return NXT_ERROR; 1502 } 1503 1504 router = tmcf->router_conf->router; 1505 1506 applications = nxt_conf_get_path(conf, &applications_path); 1507 1508 if (applications != NULL) { 1509 next = 0; 1510 1511 for ( ;; ) { 1512 application = nxt_conf_next_object_member(applications, 1513 &name, &next); 1514 if (application == NULL) { 1515 break; 1516 } 1517 1518 nxt_debug(task, "application \"%V\"", &name); 1519 1520 size = nxt_conf_json_length(application, NULL); 1521 1522 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1523 if (nxt_slow_path(app_mp == NULL)) { 1524 goto fail; 1525 } 1526 1527 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1528 if (app == NULL) { 1529 goto app_fail; 1530 } 1531 1532 nxt_memzero(app, sizeof(nxt_app_t)); 1533 1534 app->mem_pool = app_mp; 1535 1536 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1537 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1538 + name.length); 1539 1540 p = nxt_conf_json_print(app->conf.start, application, NULL); 1541 app->conf.length = p - app->conf.start; 1542 1543 nxt_assert(app->conf.length <= size); 1544 1545 nxt_debug(task, "application conf \"%V\"", &app->conf); 1546 1547 prev = nxt_router_app_find(&router->apps, &name); 1548 1549 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1550 nxt_mp_destroy(app_mp); 1551 1552 nxt_queue_remove(&prev->link); 1553 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1554 1555 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev); 1556 if (nxt_slow_path(ret != NXT_OK)) { 1557 goto fail; 1558 } 1559 1560 continue; 1561 } 1562 1563 apcf.processes = 1; 1564 apcf.max_processes = 1; 1565 apcf.spare_processes = 0; 1566 apcf.timeout = 0; 1567 apcf.idle_timeout = 15000; 1568 apcf.requests = 0; 1569 apcf.limits_value = NULL; 1570 apcf.processes_value = NULL; 1571 apcf.targets_value = NULL; 1572 1573 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1574 if (nxt_slow_path(app_joint == NULL)) { 1575 goto app_fail; 1576 } 1577 1578 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1579 1580 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1581 nxt_nitems(nxt_router_app_conf), &apcf); 1582 if (ret != NXT_OK) { 1583 nxt_alert(task, "application map error"); 1584 goto app_fail; 1585 } 1586 1587 if (apcf.limits_value != NULL) { 1588 1589 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1590 nxt_alert(task, "application limits is not object"); 1591 goto app_fail; 1592 } 1593 1594 ret = nxt_conf_map_object(mp, apcf.limits_value, 1595 nxt_router_app_limits_conf, 1596 nxt_nitems(nxt_router_app_limits_conf), 1597 &apcf); 1598 if (ret != NXT_OK) { 1599 nxt_alert(task, "application limits map error"); 1600 goto app_fail; 1601 } 1602 } 1603 1604 if (apcf.processes_value != NULL 1605 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1606 { 1607 ret = nxt_conf_map_object(mp, apcf.processes_value, 1608 nxt_router_app_processes_conf, 1609 nxt_nitems(nxt_router_app_processes_conf), 1610 &apcf); 1611 if (ret != NXT_OK) { 1612 nxt_alert(task, "application processes map error"); 1613 goto app_fail; 1614 } 1615 1616 } else { 1617 apcf.max_processes = apcf.processes; 1618 apcf.spare_processes = apcf.processes; 1619 } 1620 1621 if (apcf.targets_value != NULL) { 1622 n = nxt_conf_object_members_count(apcf.targets_value); 1623 1624 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1625 if (nxt_slow_path(targets == NULL)) { 1626 goto app_fail; 1627 } 1628 1629 next_target = 0; 1630 1631 for (i = 0; i < n; i++) { 1632 (void) nxt_conf_next_object_member(apcf.targets_value, 1633 &target, &next_target); 1634 1635 s = nxt_str_dup(app_mp, &targets[i], &target); 1636 if (nxt_slow_path(s == NULL)) { 1637 goto app_fail; 1638 } 1639 } 1640 1641 } else { 1642 targets = NULL; 1643 } 1644 1645 nxt_debug(task, "application type: %V", &apcf.type); 1646 nxt_debug(task, "application processes: %D", apcf.processes); 1647 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1648 nxt_debug(task, "application requests: %D", apcf.requests); 1649 1650 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1651 1652 if (lang == NULL) { 1653 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1654 goto app_fail; 1655 } 1656 1657 nxt_debug(task, "application language module: \"%s\"", lang->file); 1658 1659 ret = nxt_thread_mutex_create(&app->mutex); 1660 if (ret != NXT_OK) { 1661 goto app_fail; 1662 } 1663 1664 nxt_queue_init(&app->ports); 1665 nxt_queue_init(&app->spare_ports); 1666 nxt_queue_init(&app->idle_ports); 1667 nxt_queue_init(&app->ack_waiting_req); 1668 1669 app->name.length = name.length; 1670 nxt_memcpy(app->name.start, name.start, name.length); 1671 1672 app->type = lang->type; 1673 app->max_processes = apcf.max_processes; 1674 app->spare_processes = apcf.spare_processes; 1675 app->max_pending_processes = apcf.spare_processes 1676 ? apcf.spare_processes : 1; 1677 app->timeout = apcf.timeout; 1678 app->idle_timeout = apcf.idle_timeout; 1679 app->max_requests = apcf.requests; 1680 1681 app->targets = targets; 1682 1683 engine = task->thread->engine; 1684 1685 app->engine = engine; 1686 1687 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1688 app->adjust_idle_work.task = &engine->task; 1689 app->adjust_idle_work.obj = app; 1690 1691 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1692 1693 ret = nxt_router_apps_hash_add(tmcf->router_conf, app); 1694 if (nxt_slow_path(ret != NXT_OK)) { 1695 goto app_fail; 1696 } 1697 1698 nxt_router_app_use(task, app, 1); 1699 1700 app->joint = app_joint; 1701 1702 app_joint->use_count = 1; 1703 app_joint->app = app; 1704 1705 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1706 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1707 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1708 app_joint->idle_timer.task = &engine->task; 1709 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1710 1711 app_joint->free_app_work.handler = nxt_router_free_app; 1712 app_joint->free_app_work.task = &engine->task; 1713 app_joint->free_app_work.obj = app_joint; 1714 1715 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 1716 NXT_PROCESS_APP); 1717 if (nxt_slow_path(port == NULL)) { 1718 return NXT_ERROR; 1719 } 1720 1721 ret = nxt_port_socket_init(task, port, 0); 1722 if (nxt_slow_path(ret != NXT_OK)) { 1723 nxt_port_use(task, port, -1); 1724 return NXT_ERROR; 1725 } 1726 1727 ret = nxt_router_app_queue_init(task, port); 1728 if (nxt_slow_path(ret != NXT_OK)) { 1729 nxt_port_write_close(port); 1730 nxt_port_read_close(port); 1731 nxt_port_use(task, port, -1); 1732 return NXT_ERROR; 1733 } 1734 1735 nxt_port_write_enable(task, port); 1736 port->app = app; 1737 1738 app->shared_port = port; 1739 1740 nxt_thread_mutex_create(&app->outgoing.mutex); 1741 } 1742 } 1743 1744 routes_conf = nxt_conf_get_path(conf, &routes_path); 1745 if (nxt_fast_path(routes_conf != NULL)) { 1746 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1747 if (nxt_slow_path(routes == NULL)) { 1748 return NXT_ERROR; 1749 } 1750 tmcf->router_conf->routes = routes; 1751 } 1752 1753 ret = nxt_upstreams_create(task, tmcf, conf); 1754 if (nxt_slow_path(ret != NXT_OK)) { 1755 return ret; 1756 } 1757 1758 http = nxt_conf_get_path(conf, &http_path); 1759 #if 0 1760 if (http == NULL) { 1761 nxt_alert(task, "no \"http\" block"); 1762 return NXT_ERROR; 1763 } 1764 #endif 1765 1766 websocket = nxt_conf_get_path(conf, &websocket_path); 1767 1768 listeners = nxt_conf_get_path(conf, &listeners_path); 1769 1770 if (listeners != NULL) { 1771 next = 0; 1772 1773 for ( ;; ) { 1774 listener = nxt_conf_next_object_member(listeners, &name, &next); 1775 if (listener == NULL) { 1776 break; 1777 } 1778 1779 skcf = nxt_router_socket_conf(task, tmcf, &name); 1780 if (skcf == NULL) { 1781 goto fail; 1782 } 1783 1784 nxt_memzero(&lscf, sizeof(lscf)); 1785 1786 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1787 nxt_nitems(nxt_router_listener_conf), 1788 &lscf); 1789 if (ret != NXT_OK) { 1790 nxt_alert(task, "listener map error"); 1791 goto fail; 1792 } 1793 1794 nxt_debug(task, "application: %V", &lscf.application); 1795 1796 // STUB, default values if http block is not defined. 1797 skcf->header_buffer_size = 2048; 1798 skcf->large_header_buffer_size = 8192; 1799 skcf->large_header_buffers = 4; 1800 skcf->discard_unsafe_fields = 1; 1801 skcf->body_buffer_size = 16 * 1024; 1802 skcf->max_body_size = 8 * 1024 * 1024; 1803 skcf->proxy_header_buffer_size = 64 * 1024; 1804 skcf->proxy_buffer_size = 4096; 1805 skcf->proxy_buffers = 256; 1806 skcf->idle_timeout = 180 * 1000; 1807 skcf->header_read_timeout = 30 * 1000; 1808 skcf->body_read_timeout = 30 * 1000; 1809 skcf->send_timeout = 30 * 1000; 1810 skcf->proxy_timeout = 60 * 1000; 1811 skcf->proxy_send_timeout = 30 * 1000; 1812 skcf->proxy_read_timeout = 30 * 1000; 1813 1814 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1815 skcf->websocket_conf.read_timeout = 60 * 1000; 1816 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1817 1818 nxt_str_null(&skcf->body_temp_path); 1819 1820 if (http != NULL) { 1821 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1822 nxt_nitems(nxt_router_http_conf), 1823 skcf); 1824 if (ret != NXT_OK) { 1825 nxt_alert(task, "http map error"); 1826 goto fail; 1827 } 1828 } 1829 1830 if (websocket != NULL) { 1831 ret = nxt_conf_map_object(mp, websocket, 1832 nxt_router_websocket_conf, 1833 nxt_nitems(nxt_router_websocket_conf), 1834 &skcf->websocket_conf); 1835 if (ret != NXT_OK) { 1836 nxt_alert(task, "websocket map error"); 1837 goto fail; 1838 } 1839 } 1840 1841 t = &skcf->body_temp_path; 1842 1843 if (t->length == 0) { 1844 t->start = (u_char *) task->thread->runtime->tmp; 1845 t->length = nxt_strlen(t->start); 1846 } 1847 1848 client_ip_conf = nxt_conf_get_path(listener, &client_ip_path); 1849 ret = nxt_router_conf_process_client_ip(task, tmcf, skcf, 1850 client_ip_conf); 1851 if (nxt_slow_path(ret != NXT_OK)) { 1852 return NXT_ERROR; 1853 } 1854 1855 #if (NXT_TLS) 1856 certificate = nxt_conf_get_path(listener, &certificate_path); 1857 1858 if (certificate != NULL) { 1859 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t)); 1860 if (nxt_slow_path(tls_init == NULL)) { 1861 return NXT_ERROR; 1862 } 1863 1864 tls_init->cache_size = 0; 1865 tls_init->timeout = 300; 1866 1867 value = nxt_conf_get_path(listener, &conf_cache_path); 1868 if (value != NULL) { 1869 tls_init->cache_size = nxt_conf_get_number(value); 1870 } 1871 1872 value = nxt_conf_get_path(listener, &conf_timeout_path); 1873 if (value != NULL) { 1874 tls_init->timeout = nxt_conf_get_number(value); 1875 } 1876 1877 tls_init->conf_cmds = nxt_conf_get_path(listener, 1878 &conf_commands_path); 1879 1880 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) { 1881 n = nxt_conf_array_elements_count(certificate); 1882 1883 for (i = 0; i < n; i++) { 1884 value = nxt_conf_get_array_element(certificate, i); 1885 1886 nxt_assert(value != NULL); 1887 1888 ret = nxt_router_conf_tls_insert(tmcf, value, skcf, 1889 tls_init, i == 0); 1890 if (nxt_slow_path(ret != NXT_OK)) { 1891 goto fail; 1892 } 1893 } 1894 1895 } else { 1896 /* NXT_CONF_STRING */ 1897 ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf, 1898 tls_init, 1); 1899 if (nxt_slow_path(ret != NXT_OK)) { 1900 goto fail; 1901 } 1902 } 1903 } 1904 #endif 1905 1906 skcf->listen->handler = nxt_http_conn_init; 1907 skcf->router_conf = tmcf->router_conf; 1908 skcf->router_conf->count++; 1909 1910 if (lscf.pass.length != 0) { 1911 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1912 1913 /* COMPATIBILITY: listener application. */ 1914 } else if (lscf.application.length > 0) { 1915 skcf->action = nxt_http_pass_application(task, 1916 tmcf->router_conf, 1917 &lscf.application); 1918 } 1919 1920 if (nxt_slow_path(skcf->action == NULL)) { 1921 goto fail; 1922 } 1923 } 1924 } 1925 1926 ret = nxt_http_routes_resolve(task, tmcf); 1927 if (nxt_slow_path(ret != NXT_OK)) { 1928 goto fail; 1929 } 1930 1931 value = nxt_conf_get_path(conf, &access_log_path); 1932 1933 if (value != NULL) { 1934 nxt_conf_get_string(value, &path); 1935 1936 access_log = router->access_log; 1937 1938 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1939 nxt_thread_spin_lock(&router->lock); 1940 access_log->count++; 1941 nxt_thread_spin_unlock(&router->lock); 1942 1943 } else { 1944 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1945 + path.length); 1946 if (access_log == NULL) { 1947 nxt_alert(task, "failed to allocate access log structure"); 1948 goto fail; 1949 } 1950 1951 access_log->fd = -1; 1952 access_log->handler = &nxt_router_access_log_writer; 1953 access_log->count = 1; 1954 1955 access_log->path.length = path.length; 1956 access_log->path.start = (u_char *) access_log 1957 + sizeof(nxt_router_access_log_t); 1958 1959 nxt_memcpy(access_log->path.start, path.start, path.length); 1960 } 1961 1962 tmcf->router_conf->access_log = access_log; 1963 } 1964 1965 nxt_queue_add(&deleting_sockets, &router->sockets); 1966 nxt_queue_init(&router->sockets); 1967 1968 return NXT_OK; 1969 1970 app_fail: 1971 1972 nxt_mp_destroy(app_mp); 1973 1974 fail: 1975 1976 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1977 1978 nxt_queue_remove(&app->link); 1979 nxt_thread_mutex_destroy(&app->mutex); 1980 nxt_mp_destroy(app->mem_pool); 1981 1982 } nxt_queue_loop; 1983 1984 return NXT_ERROR; 1985 } 1986 1987 1988 #if (NXT_TLS) 1989 1990 static nxt_int_t 1991 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 1992 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, 1993 nxt_tls_init_t *tls_init, nxt_bool_t last) 1994 { 1995 nxt_router_tlssock_t *tls; 1996 1997 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t)); 1998 if (nxt_slow_path(tls == NULL)) { 1999 return NXT_ERROR; 2000 } 2001 2002 tls->tls_init = tls_init; 2003 tls->socket_conf = skcf; 2004 tls->temp_conf = tmcf; 2005 tls->last = last; 2006 nxt_conf_get_string(value, &tls->name); 2007 2008 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 2009 2010 return NXT_OK; 2011 } 2012 2013 #endif 2014 2015 2016 static nxt_int_t 2017 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 2018 nxt_conf_value_t *conf) 2019 { 2020 uint32_t next, i; 2021 nxt_mp_t *mp; 2022 nxt_str_t *type, exten, str; 2023 nxt_int_t ret; 2024 nxt_uint_t exts; 2025 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 2026 2027 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 2028 2029 mp = rtcf->mem_pool; 2030 2031 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 2032 if (nxt_slow_path(ret != NXT_OK)) { 2033 return NXT_ERROR; 2034 } 2035 2036 if (conf == NULL) { 2037 return NXT_OK; 2038 } 2039 2040 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 2041 2042 if (mtypes_conf != NULL) { 2043 next = 0; 2044 2045 for ( ;; ) { 2046 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 2047 2048 if (ext_conf == NULL) { 2049 break; 2050 } 2051 2052 type = nxt_str_dup(mp, NULL, &str); 2053 if (nxt_slow_path(type == NULL)) { 2054 return NXT_ERROR; 2055 } 2056 2057 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 2058 nxt_conf_get_string(ext_conf, &str); 2059 2060 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2061 return NXT_ERROR; 2062 } 2063 2064 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2065 &exten, type); 2066 if (nxt_slow_path(ret != NXT_OK)) { 2067 return NXT_ERROR; 2068 } 2069 2070 continue; 2071 } 2072 2073 exts = nxt_conf_array_elements_count(ext_conf); 2074 2075 for (i = 0; i < exts; i++) { 2076 value = nxt_conf_get_array_element(ext_conf, i); 2077 2078 nxt_conf_get_string(value, &str); 2079 2080 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2081 return NXT_ERROR; 2082 } 2083 2084 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2085 &exten, type); 2086 if (nxt_slow_path(ret != NXT_OK)) { 2087 return NXT_ERROR; 2088 } 2089 } 2090 } 2091 } 2092 2093 return NXT_OK; 2094 } 2095 2096 2097 static nxt_int_t 2098 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2099 nxt_socket_conf_t *skcf, nxt_conf_value_t *conf) 2100 { 2101 char c; 2102 size_t i; 2103 nxt_mp_t *mp; 2104 uint32_t hash; 2105 nxt_str_t header; 2106 nxt_conf_value_t *source_conf, *header_conf, *recursive_conf; 2107 nxt_http_client_ip_t *client_ip; 2108 nxt_http_route_addr_rule_t *source; 2109 2110 static nxt_str_t header_path = nxt_string("/header"); 2111 static nxt_str_t source_path = nxt_string("/source"); 2112 static nxt_str_t recursive_path = nxt_string("/recursive"); 2113 2114 if (conf == NULL) { 2115 skcf->client_ip = NULL; 2116 2117 return NXT_OK; 2118 } 2119 2120 mp = tmcf->router_conf->mem_pool; 2121 2122 source_conf = nxt_conf_get_path(conf, &source_path); 2123 header_conf = nxt_conf_get_path(conf, &header_path); 2124 recursive_conf = nxt_conf_get_path(conf, &recursive_path); 2125 2126 if (source_conf == NULL || header_conf == NULL) { 2127 return NXT_ERROR; 2128 } 2129 2130 client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t)); 2131 if (nxt_slow_path(client_ip == NULL)) { 2132 return NXT_ERROR; 2133 } 2134 2135 source = nxt_http_route_addr_rule_create(task, mp, source_conf); 2136 if (nxt_slow_path(source == NULL)) { 2137 return NXT_ERROR; 2138 } 2139 2140 client_ip->source = source; 2141 2142 nxt_conf_get_string(header_conf, &header); 2143 2144 if (recursive_conf != NULL) { 2145 client_ip->recursive = nxt_conf_get_boolean(recursive_conf); 2146 } 2147 2148 client_ip->header = nxt_str_dup(mp, NULL, &header); 2149 if (nxt_slow_path(client_ip->header == NULL)) { 2150 return NXT_ERROR; 2151 } 2152 2153 hash = NXT_HTTP_FIELD_HASH_INIT; 2154 2155 for (i = 0; i < client_ip->header->length; i++) { 2156 c = client_ip->header->start[i]; 2157 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c)); 2158 } 2159 2160 hash = nxt_http_field_hash_end(hash) & 0xFFFF; 2161 2162 client_ip->header_hash = hash; 2163 2164 skcf->client_ip = client_ip; 2165 2166 return NXT_OK; 2167 } 2168 2169 2170 static nxt_app_t * 2171 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 2172 { 2173 nxt_app_t *app; 2174 2175 nxt_queue_each(app, queue, nxt_app_t, link) { 2176 2177 if (nxt_strstr_eq(name, &app->name)) { 2178 return app; 2179 } 2180 2181 } nxt_queue_loop; 2182 2183 return NULL; 2184 } 2185 2186 2187 static nxt_int_t 2188 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 2189 { 2190 void *mem; 2191 nxt_int_t fd; 2192 2193 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 2194 if (nxt_slow_path(fd == -1)) { 2195 return NXT_ERROR; 2196 } 2197 2198 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 2199 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2200 if (nxt_slow_path(mem == MAP_FAILED)) { 2201 nxt_fd_close(fd); 2202 2203 return NXT_ERROR; 2204 } 2205 2206 nxt_app_queue_init(mem); 2207 2208 port->queue_fd = fd; 2209 port->queue = mem; 2210 2211 return NXT_OK; 2212 } 2213 2214 2215 static nxt_int_t 2216 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 2217 { 2218 void *mem; 2219 nxt_int_t fd; 2220 2221 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 2222 if (nxt_slow_path(fd == -1)) { 2223 return NXT_ERROR; 2224 } 2225 2226 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2227 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2228 if (nxt_slow_path(mem == MAP_FAILED)) { 2229 nxt_fd_close(fd); 2230 2231 return NXT_ERROR; 2232 } 2233 2234 nxt_port_queue_init(mem); 2235 2236 port->queue_fd = fd; 2237 port->queue = mem; 2238 2239 return NXT_OK; 2240 } 2241 2242 2243 static nxt_int_t 2244 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 2245 { 2246 void *mem; 2247 2248 nxt_assert(fd != -1); 2249 2250 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2251 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2252 if (nxt_slow_path(mem == MAP_FAILED)) { 2253 2254 return NXT_ERROR; 2255 } 2256 2257 port->queue = mem; 2258 2259 return NXT_OK; 2260 } 2261 2262 2263 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2264 NXT_LVLHSH_DEFAULT, 2265 nxt_router_apps_hash_test, 2266 nxt_mp_lvlhsh_alloc, 2267 nxt_mp_lvlhsh_free, 2268 }; 2269 2270 2271 static nxt_int_t 2272 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2273 { 2274 nxt_app_t *app; 2275 2276 app = data; 2277 2278 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2279 } 2280 2281 2282 static nxt_int_t 2283 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2284 { 2285 nxt_lvlhsh_query_t lhq; 2286 2287 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2288 lhq.replace = 0; 2289 lhq.key = app->name; 2290 lhq.value = app; 2291 lhq.proto = &nxt_router_apps_hash_proto; 2292 lhq.pool = rtcf->mem_pool; 2293 2294 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2295 2296 case NXT_OK: 2297 return NXT_OK; 2298 2299 case NXT_DECLINED: 2300 nxt_thread_log_alert("router app hash adding failed: " 2301 "\"%V\" is already in hash", &lhq.key); 2302 /* Fall through. */ 2303 default: 2304 return NXT_ERROR; 2305 } 2306 } 2307 2308 2309 static nxt_app_t * 2310 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2311 { 2312 nxt_lvlhsh_query_t lhq; 2313 2314 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2315 lhq.key = *name; 2316 lhq.proto = &nxt_router_apps_hash_proto; 2317 2318 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2319 return NULL; 2320 } 2321 2322 return lhq.value; 2323 } 2324 2325 2326 static void 2327 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2328 { 2329 nxt_app_t *app; 2330 nxt_lvlhsh_each_t lhe; 2331 2332 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2333 2334 for ( ;; ) { 2335 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2336 2337 if (app == NULL) { 2338 break; 2339 } 2340 2341 nxt_router_app_use(task, app, i); 2342 } 2343 } 2344 2345 2346 typedef struct { 2347 nxt_app_t *app; 2348 nxt_int_t target; 2349 } nxt_http_app_conf_t; 2350 2351 2352 nxt_int_t 2353 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name, 2354 nxt_str_t *target, nxt_http_action_t *action) 2355 { 2356 nxt_app_t *app; 2357 nxt_str_t *targets; 2358 nxt_uint_t i; 2359 nxt_http_app_conf_t *conf; 2360 2361 app = nxt_router_apps_hash_get(rtcf, name); 2362 if (app == NULL) { 2363 return NXT_DECLINED; 2364 } 2365 2366 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t)); 2367 if (nxt_slow_path(conf == NULL)) { 2368 return NXT_ERROR; 2369 } 2370 2371 action->handler = nxt_http_application_handler; 2372 action->u.conf = conf; 2373 2374 conf->app = app; 2375 2376 if (target != NULL && target->length != 0) { 2377 targets = app->targets; 2378 2379 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++); 2380 2381 conf->target = i; 2382 2383 } else { 2384 conf->target = 0; 2385 } 2386 2387 return NXT_OK; 2388 } 2389 2390 2391 static nxt_socket_conf_t * 2392 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2393 nxt_str_t *name) 2394 { 2395 size_t size; 2396 nxt_int_t ret; 2397 nxt_bool_t wildcard; 2398 nxt_sockaddr_t *sa; 2399 nxt_socket_conf_t *skcf; 2400 nxt_listen_socket_t *ls; 2401 2402 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2403 if (nxt_slow_path(sa == NULL)) { 2404 nxt_alert(task, "invalid listener \"%V\"", name); 2405 return NULL; 2406 } 2407 2408 sa->type = SOCK_STREAM; 2409 2410 nxt_debug(task, "router listener: \"%*s\"", 2411 (size_t) sa->length, nxt_sockaddr_start(sa)); 2412 2413 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2414 if (nxt_slow_path(skcf == NULL)) { 2415 return NULL; 2416 } 2417 2418 size = nxt_sockaddr_size(sa); 2419 2420 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2421 2422 if (ret != NXT_OK) { 2423 2424 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2425 if (nxt_slow_path(ls == NULL)) { 2426 return NULL; 2427 } 2428 2429 skcf->listen = ls; 2430 2431 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2432 nxt_memcpy(ls->sockaddr, sa, size); 2433 2434 nxt_listen_socket_remote_size(ls); 2435 2436 ls->socket = -1; 2437 ls->backlog = NXT_LISTEN_BACKLOG; 2438 ls->flags = NXT_NONBLOCK; 2439 ls->read_after_accept = 1; 2440 } 2441 2442 switch (sa->u.sockaddr.sa_family) { 2443 #if (NXT_HAVE_UNIX_DOMAIN) 2444 case AF_UNIX: 2445 wildcard = 0; 2446 break; 2447 #endif 2448 #if (NXT_INET6) 2449 case AF_INET6: 2450 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2451 break; 2452 #endif 2453 case AF_INET: 2454 default: 2455 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2456 break; 2457 } 2458 2459 if (!wildcard) { 2460 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2461 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2462 return NULL; 2463 } 2464 2465 nxt_memcpy(skcf->sockaddr, sa, size); 2466 } 2467 2468 return skcf; 2469 } 2470 2471 2472 static nxt_int_t 2473 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2474 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2475 { 2476 nxt_router_t *router; 2477 nxt_queue_link_t *qlk; 2478 nxt_socket_conf_t *skcf; 2479 2480 router = tmcf->router_conf->router; 2481 2482 for (qlk = nxt_queue_first(&router->sockets); 2483 qlk != nxt_queue_tail(&router->sockets); 2484 qlk = nxt_queue_next(qlk)) 2485 { 2486 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2487 2488 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2489 nskcf->listen = skcf->listen; 2490 2491 nxt_queue_remove(qlk); 2492 nxt_queue_insert_tail(&keeping_sockets, qlk); 2493 2494 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2495 2496 return NXT_OK; 2497 } 2498 } 2499 2500 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2501 2502 return NXT_DECLINED; 2503 } 2504 2505 2506 static void 2507 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2508 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2509 { 2510 size_t size; 2511 uint32_t stream; 2512 nxt_int_t ret; 2513 nxt_buf_t *b; 2514 nxt_port_t *main_port, *router_port; 2515 nxt_runtime_t *rt; 2516 nxt_socket_rpc_t *rpc; 2517 2518 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2519 if (rpc == NULL) { 2520 goto fail; 2521 } 2522 2523 rpc->socket_conf = skcf; 2524 rpc->temp_conf = tmcf; 2525 2526 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2527 2528 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2529 if (b == NULL) { 2530 goto fail; 2531 } 2532 2533 b->completion_handler = nxt_buf_dummy_completion; 2534 2535 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2536 2537 rt = task->thread->runtime; 2538 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2539 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2540 2541 stream = nxt_port_rpc_register_handler(task, router_port, 2542 nxt_router_listen_socket_ready, 2543 nxt_router_listen_socket_error, 2544 main_port->pid, rpc); 2545 if (nxt_slow_path(stream == 0)) { 2546 goto fail; 2547 } 2548 2549 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2550 stream, router_port->id, b); 2551 2552 if (nxt_slow_path(ret != NXT_OK)) { 2553 nxt_port_rpc_cancel(task, router_port, stream); 2554 goto fail; 2555 } 2556 2557 return; 2558 2559 fail: 2560 2561 nxt_router_conf_error(task, tmcf); 2562 } 2563 2564 2565 static void 2566 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2567 void *data) 2568 { 2569 nxt_int_t ret; 2570 nxt_socket_t s; 2571 nxt_socket_rpc_t *rpc; 2572 2573 rpc = data; 2574 2575 s = msg->fd[0]; 2576 2577 ret = nxt_socket_nonblocking(task, s); 2578 if (nxt_slow_path(ret != NXT_OK)) { 2579 goto fail; 2580 } 2581 2582 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2583 2584 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2585 if (nxt_slow_path(ret != NXT_OK)) { 2586 goto fail; 2587 } 2588 2589 rpc->socket_conf->listen->socket = s; 2590 2591 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2592 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2593 2594 return; 2595 2596 fail: 2597 2598 nxt_socket_close(task, s); 2599 2600 nxt_router_conf_error(task, rpc->temp_conf); 2601 } 2602 2603 2604 static void 2605 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2606 void *data) 2607 { 2608 nxt_socket_rpc_t *rpc; 2609 nxt_router_temp_conf_t *tmcf; 2610 2611 rpc = data; 2612 tmcf = rpc->temp_conf; 2613 2614 #if 0 2615 u_char *p; 2616 size_t size; 2617 uint8_t error; 2618 nxt_buf_t *in, *out; 2619 nxt_sockaddr_t *sa; 2620 2621 static nxt_str_t socket_errors[] = { 2622 nxt_string("ListenerSystem"), 2623 nxt_string("ListenerNoIPv6"), 2624 nxt_string("ListenerPort"), 2625 nxt_string("ListenerInUse"), 2626 nxt_string("ListenerNoAddress"), 2627 nxt_string("ListenerNoAccess"), 2628 nxt_string("ListenerPath"), 2629 }; 2630 2631 sa = rpc->socket_conf->listen->sockaddr; 2632 2633 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2634 2635 if (nxt_slow_path(in == NULL)) { 2636 return; 2637 } 2638 2639 p = in->mem.pos; 2640 2641 error = *p++; 2642 2643 size = nxt_length("listen socket error: ") 2644 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2645 + sa->length + socket_errors[error].length + (in->mem.free - p); 2646 2647 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2648 if (nxt_slow_path(out == NULL)) { 2649 return; 2650 } 2651 2652 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2653 "listen socket error: " 2654 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2655 (size_t) sa->length, nxt_sockaddr_start(sa), 2656 &socket_errors[error], in->mem.free - p, p); 2657 2658 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2659 #endif 2660 2661 nxt_router_conf_error(task, tmcf); 2662 } 2663 2664 2665 #if (NXT_TLS) 2666 2667 static void 2668 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2669 void *data) 2670 { 2671 nxt_mp_t *mp; 2672 nxt_int_t ret; 2673 nxt_tls_conf_t *tlscf; 2674 nxt_router_tlssock_t *tls; 2675 nxt_tls_bundle_conf_t *bundle; 2676 nxt_router_temp_conf_t *tmcf; 2677 2678 nxt_debug(task, "tls rpc handler"); 2679 2680 tls = data; 2681 tmcf = tls->temp_conf; 2682 2683 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2684 goto fail; 2685 } 2686 2687 mp = tmcf->router_conf->mem_pool; 2688 2689 if (tls->socket_conf->tls == NULL){ 2690 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2691 if (nxt_slow_path(tlscf == NULL)) { 2692 goto fail; 2693 } 2694 2695 tlscf->no_wait_shutdown = 1; 2696 tls->socket_conf->tls = tlscf; 2697 2698 } else { 2699 tlscf = tls->socket_conf->tls; 2700 } 2701 2702 tls->tls_init->conf = tlscf; 2703 2704 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); 2705 if (nxt_slow_path(bundle == NULL)) { 2706 goto fail; 2707 } 2708 2709 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) { 2710 goto fail; 2711 } 2712 2713 bundle->chain_file = msg->fd[0]; 2714 bundle->next = tlscf->bundle; 2715 tlscf->bundle = bundle; 2716 2717 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init, 2718 tls->last); 2719 if (nxt_slow_path(ret != NXT_OK)) { 2720 goto fail; 2721 } 2722 2723 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2724 nxt_router_conf_apply, task, tmcf, NULL); 2725 return; 2726 2727 fail: 2728 2729 nxt_router_conf_error(task, tmcf); 2730 } 2731 2732 #endif 2733 2734 2735 static void 2736 nxt_router_app_rpc_create(nxt_task_t *task, 2737 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2738 { 2739 size_t size; 2740 uint32_t stream; 2741 nxt_int_t ret; 2742 nxt_buf_t *b; 2743 nxt_port_t *main_port, *router_port; 2744 nxt_runtime_t *rt; 2745 nxt_app_rpc_t *rpc; 2746 2747 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2748 if (rpc == NULL) { 2749 goto fail; 2750 } 2751 2752 rpc->app = app; 2753 rpc->temp_conf = tmcf; 2754 2755 nxt_debug(task, "app '%V' prefork", &app->name); 2756 2757 size = app->name.length + 1 + app->conf.length; 2758 2759 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2760 if (nxt_slow_path(b == NULL)) { 2761 goto fail; 2762 } 2763 2764 b->completion_handler = nxt_buf_dummy_completion; 2765 2766 nxt_buf_cpystr(b, &app->name); 2767 *b->mem.free++ = '\0'; 2768 nxt_buf_cpystr(b, &app->conf); 2769 2770 rt = task->thread->runtime; 2771 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2772 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2773 2774 stream = nxt_port_rpc_register_handler(task, router_port, 2775 nxt_router_app_prefork_ready, 2776 nxt_router_app_prefork_error, 2777 -1, rpc); 2778 if (nxt_slow_path(stream == 0)) { 2779 goto fail; 2780 } 2781 2782 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 2783 -1, stream, router_port->id, b); 2784 2785 if (nxt_slow_path(ret != NXT_OK)) { 2786 nxt_port_rpc_cancel(task, router_port, stream); 2787 goto fail; 2788 } 2789 2790 app->pending_processes++; 2791 2792 return; 2793 2794 fail: 2795 2796 nxt_router_conf_error(task, tmcf); 2797 } 2798 2799 2800 static void 2801 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2802 void *data) 2803 { 2804 nxt_app_t *app; 2805 nxt_port_t *port; 2806 nxt_app_rpc_t *rpc; 2807 nxt_event_engine_t *engine; 2808 2809 rpc = data; 2810 app = rpc->app; 2811 2812 port = msg->u.new_port; 2813 2814 nxt_assert(port != NULL); 2815 nxt_assert(port->type == NXT_PROCESS_APP); 2816 nxt_assert(port->id == 0); 2817 2818 port->app = app; 2819 port->main_app_port = port; 2820 2821 app->pending_processes--; 2822 app->processes++; 2823 app->idle_processes++; 2824 2825 engine = task->thread->engine; 2826 2827 nxt_queue_insert_tail(&app->ports, &port->app_link); 2828 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2829 2830 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2831 &app->name, port->pid, port->id); 2832 2833 nxt_port_hash_add(&app->port_hash, port); 2834 app->port_hash_count++; 2835 2836 port->idle_start = 0; 2837 2838 nxt_port_inc_use(port); 2839 2840 nxt_router_app_shared_port_send(task, port); 2841 2842 nxt_work_queue_add(&engine->fast_work_queue, 2843 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2844 } 2845 2846 2847 static void 2848 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2849 void *data) 2850 { 2851 nxt_app_t *app; 2852 nxt_app_rpc_t *rpc; 2853 nxt_router_temp_conf_t *tmcf; 2854 2855 rpc = data; 2856 app = rpc->app; 2857 tmcf = rpc->temp_conf; 2858 2859 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2860 &app->name); 2861 2862 app->pending_processes--; 2863 2864 nxt_router_conf_error(task, tmcf); 2865 } 2866 2867 2868 static nxt_int_t 2869 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2870 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2871 { 2872 nxt_int_t ret; 2873 nxt_uint_t n, threads; 2874 nxt_queue_link_t *qlk; 2875 nxt_router_engine_conf_t *recf; 2876 2877 threads = tmcf->router_conf->threads; 2878 2879 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2880 sizeof(nxt_router_engine_conf_t)); 2881 if (nxt_slow_path(tmcf->engines == NULL)) { 2882 return NXT_ERROR; 2883 } 2884 2885 n = 0; 2886 2887 for (qlk = nxt_queue_first(&router->engines); 2888 qlk != nxt_queue_tail(&router->engines); 2889 qlk = nxt_queue_next(qlk)) 2890 { 2891 recf = nxt_array_zero_add(tmcf->engines); 2892 if (nxt_slow_path(recf == NULL)) { 2893 return NXT_ERROR; 2894 } 2895 2896 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2897 2898 if (n < threads) { 2899 recf->action = NXT_ROUTER_ENGINE_KEEP; 2900 ret = nxt_router_engine_conf_update(tmcf, recf); 2901 2902 } else { 2903 recf->action = NXT_ROUTER_ENGINE_DELETE; 2904 ret = nxt_router_engine_conf_delete(tmcf, recf); 2905 } 2906 2907 if (nxt_slow_path(ret != NXT_OK)) { 2908 return ret; 2909 } 2910 2911 n++; 2912 } 2913 2914 tmcf->new_threads = n; 2915 2916 while (n < threads) { 2917 recf = nxt_array_zero_add(tmcf->engines); 2918 if (nxt_slow_path(recf == NULL)) { 2919 return NXT_ERROR; 2920 } 2921 2922 recf->action = NXT_ROUTER_ENGINE_ADD; 2923 2924 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2925 if (nxt_slow_path(recf->engine == NULL)) { 2926 return NXT_ERROR; 2927 } 2928 2929 ret = nxt_router_engine_conf_create(tmcf, recf); 2930 if (nxt_slow_path(ret != NXT_OK)) { 2931 return ret; 2932 } 2933 2934 n++; 2935 } 2936 2937 return NXT_OK; 2938 } 2939 2940 2941 static nxt_int_t 2942 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2943 nxt_router_engine_conf_t *recf) 2944 { 2945 nxt_int_t ret; 2946 2947 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2948 nxt_router_listen_socket_create); 2949 if (nxt_slow_path(ret != NXT_OK)) { 2950 return ret; 2951 } 2952 2953 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2954 nxt_router_listen_socket_create); 2955 if (nxt_slow_path(ret != NXT_OK)) { 2956 return ret; 2957 } 2958 2959 return ret; 2960 } 2961 2962 2963 static nxt_int_t 2964 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2965 nxt_router_engine_conf_t *recf) 2966 { 2967 nxt_int_t ret; 2968 2969 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2970 nxt_router_listen_socket_create); 2971 if (nxt_slow_path(ret != NXT_OK)) { 2972 return ret; 2973 } 2974 2975 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2976 nxt_router_listen_socket_update); 2977 if (nxt_slow_path(ret != NXT_OK)) { 2978 return ret; 2979 } 2980 2981 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 2982 if (nxt_slow_path(ret != NXT_OK)) { 2983 return ret; 2984 } 2985 2986 return ret; 2987 } 2988 2989 2990 static nxt_int_t 2991 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2992 nxt_router_engine_conf_t *recf) 2993 { 2994 nxt_int_t ret; 2995 2996 ret = nxt_router_engine_quit(tmcf, recf); 2997 if (nxt_slow_path(ret != NXT_OK)) { 2998 return ret; 2999 } 3000 3001 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 3002 if (nxt_slow_path(ret != NXT_OK)) { 3003 return ret; 3004 } 3005 3006 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3007 } 3008 3009 3010 static nxt_int_t 3011 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 3012 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 3013 nxt_work_handler_t handler) 3014 { 3015 nxt_int_t ret; 3016 nxt_joint_job_t *job; 3017 nxt_queue_link_t *qlk; 3018 nxt_socket_conf_t *skcf; 3019 nxt_socket_conf_joint_t *joint; 3020 3021 for (qlk = nxt_queue_first(sockets); 3022 qlk != nxt_queue_tail(sockets); 3023 qlk = nxt_queue_next(qlk)) 3024 { 3025 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3026 if (nxt_slow_path(job == NULL)) { 3027 return NXT_ERROR; 3028 } 3029 3030 job->work.next = recf->jobs; 3031 recf->jobs = &job->work; 3032 3033 job->task = tmcf->engine->task; 3034 job->work.handler = handler; 3035 job->work.task = &job->task; 3036 job->work.obj = job; 3037 job->tmcf = tmcf; 3038 3039 tmcf->count++; 3040 3041 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 3042 sizeof(nxt_socket_conf_joint_t)); 3043 if (nxt_slow_path(joint == NULL)) { 3044 return NXT_ERROR; 3045 } 3046 3047 job->work.data = joint; 3048 3049 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 3050 if (nxt_slow_path(ret != NXT_OK)) { 3051 return ret; 3052 } 3053 3054 joint->count = 1; 3055 3056 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3057 skcf->count++; 3058 joint->socket_conf = skcf; 3059 3060 joint->engine = recf->engine; 3061 } 3062 3063 return NXT_OK; 3064 } 3065 3066 3067 static nxt_int_t 3068 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 3069 nxt_router_engine_conf_t *recf) 3070 { 3071 nxt_joint_job_t *job; 3072 3073 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3074 if (nxt_slow_path(job == NULL)) { 3075 return NXT_ERROR; 3076 } 3077 3078 job->work.next = recf->jobs; 3079 recf->jobs = &job->work; 3080 3081 job->task = tmcf->engine->task; 3082 job->work.handler = nxt_router_worker_thread_quit; 3083 job->work.task = &job->task; 3084 job->work.obj = NULL; 3085 job->work.data = NULL; 3086 job->tmcf = NULL; 3087 3088 return NXT_OK; 3089 } 3090 3091 3092 static nxt_int_t 3093 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 3094 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 3095 { 3096 nxt_joint_job_t *job; 3097 nxt_queue_link_t *qlk; 3098 3099 for (qlk = nxt_queue_first(sockets); 3100 qlk != nxt_queue_tail(sockets); 3101 qlk = nxt_queue_next(qlk)) 3102 { 3103 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3104 if (nxt_slow_path(job == NULL)) { 3105 return NXT_ERROR; 3106 } 3107 3108 job->work.next = recf->jobs; 3109 recf->jobs = &job->work; 3110 3111 job->task = tmcf->engine->task; 3112 job->work.handler = nxt_router_listen_socket_delete; 3113 job->work.task = &job->task; 3114 job->work.obj = job; 3115 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3116 job->tmcf = tmcf; 3117 3118 tmcf->count++; 3119 } 3120 3121 return NXT_OK; 3122 } 3123 3124 3125 static nxt_int_t 3126 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 3127 nxt_router_temp_conf_t *tmcf) 3128 { 3129 nxt_int_t ret; 3130 nxt_uint_t i, threads; 3131 nxt_router_engine_conf_t *recf; 3132 3133 recf = tmcf->engines->elts; 3134 threads = tmcf->router_conf->threads; 3135 3136 for (i = tmcf->new_threads; i < threads; i++) { 3137 ret = nxt_router_thread_create(task, rt, recf[i].engine); 3138 if (nxt_slow_path(ret != NXT_OK)) { 3139 return ret; 3140 } 3141 } 3142 3143 return NXT_OK; 3144 } 3145 3146 3147 static nxt_int_t 3148 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 3149 nxt_event_engine_t *engine) 3150 { 3151 nxt_int_t ret; 3152 nxt_thread_link_t *link; 3153 nxt_thread_handle_t handle; 3154 3155 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 3156 3157 if (nxt_slow_path(link == NULL)) { 3158 return NXT_ERROR; 3159 } 3160 3161 link->start = nxt_router_thread_start; 3162 link->engine = engine; 3163 link->work.handler = nxt_router_thread_exit_handler; 3164 link->work.task = task; 3165 link->work.data = link; 3166 3167 nxt_queue_insert_tail(&rt->engines, &engine->link); 3168 3169 ret = nxt_thread_create(&handle, link); 3170 3171 if (nxt_slow_path(ret != NXT_OK)) { 3172 nxt_queue_remove(&engine->link); 3173 } 3174 3175 return ret; 3176 } 3177 3178 3179 static void 3180 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 3181 nxt_router_temp_conf_t *tmcf) 3182 { 3183 nxt_app_t *app; 3184 3185 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 3186 3187 nxt_router_app_unlink(task, app); 3188 3189 } nxt_queue_loop; 3190 3191 nxt_queue_add(&router->apps, &tmcf->previous); 3192 nxt_queue_add(&router->apps, &tmcf->apps); 3193 } 3194 3195 3196 static void 3197 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 3198 { 3199 nxt_uint_t n; 3200 nxt_event_engine_t *engine; 3201 nxt_router_engine_conf_t *recf; 3202 3203 recf = tmcf->engines->elts; 3204 3205 for (n = tmcf->engines->nelts; n != 0; n--) { 3206 engine = recf->engine; 3207 3208 switch (recf->action) { 3209 3210 case NXT_ROUTER_ENGINE_KEEP: 3211 break; 3212 3213 case NXT_ROUTER_ENGINE_ADD: 3214 nxt_queue_insert_tail(&router->engines, &engine->link0); 3215 break; 3216 3217 case NXT_ROUTER_ENGINE_DELETE: 3218 nxt_queue_remove(&engine->link0); 3219 break; 3220 } 3221 3222 nxt_router_engine_post(engine, recf->jobs); 3223 3224 recf++; 3225 } 3226 } 3227 3228 3229 static void 3230 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 3231 { 3232 nxt_work_t *work, *next; 3233 3234 for (work = jobs; work != NULL; work = next) { 3235 next = work->next; 3236 work->next = NULL; 3237 3238 nxt_event_engine_post(engine, work); 3239 } 3240 } 3241 3242 3243 static nxt_port_handlers_t nxt_router_app_port_handlers = { 3244 .rpc_error = nxt_port_rpc_handler, 3245 .mmap = nxt_port_mmap_handler, 3246 .data = nxt_port_rpc_handler, 3247 .oosm = nxt_router_oosm_handler, 3248 .req_headers_ack = nxt_port_rpc_handler, 3249 }; 3250 3251 3252 static void 3253 nxt_router_thread_start(void *data) 3254 { 3255 nxt_int_t ret; 3256 nxt_port_t *port; 3257 nxt_task_t *task; 3258 nxt_work_t *work; 3259 nxt_thread_t *thread; 3260 nxt_thread_link_t *link; 3261 nxt_event_engine_t *engine; 3262 3263 link = data; 3264 engine = link->engine; 3265 task = &engine->task; 3266 3267 thread = nxt_thread(); 3268 3269 nxt_event_engine_thread_adopt(engine); 3270 3271 /* STUB */ 3272 thread->runtime = engine->task.thread->runtime; 3273 3274 engine->task.thread = thread; 3275 engine->task.log = thread->log; 3276 thread->engine = engine; 3277 thread->task = &engine->task; 3278 #if 0 3279 thread->fiber = &engine->fibers->fiber; 3280 #endif 3281 3282 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 3283 if (nxt_slow_path(engine->mem_pool == NULL)) { 3284 return; 3285 } 3286 3287 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 3288 NXT_PROCESS_ROUTER); 3289 if (nxt_slow_path(port == NULL)) { 3290 return; 3291 } 3292 3293 ret = nxt_port_socket_init(task, port, 0); 3294 if (nxt_slow_path(ret != NXT_OK)) { 3295 nxt_port_use(task, port, -1); 3296 return; 3297 } 3298 3299 ret = nxt_router_port_queue_init(task, port); 3300 if (nxt_slow_path(ret != NXT_OK)) { 3301 nxt_port_use(task, port, -1); 3302 return; 3303 } 3304 3305 engine->port = port; 3306 3307 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 3308 3309 work = nxt_zalloc(sizeof(nxt_work_t)); 3310 if (nxt_slow_path(work == NULL)) { 3311 return; 3312 } 3313 3314 work->handler = nxt_router_rt_add_port; 3315 work->task = link->work.task; 3316 work->obj = work; 3317 work->data = port; 3318 3319 nxt_event_engine_post(link->work.task->thread->engine, work); 3320 3321 nxt_event_engine_start(engine); 3322 } 3323 3324 3325 static void 3326 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) 3327 { 3328 nxt_int_t res; 3329 nxt_port_t *port; 3330 nxt_runtime_t *rt; 3331 3332 rt = task->thread->runtime; 3333 port = data; 3334 3335 nxt_free(obj); 3336 3337 res = nxt_port_hash_add(&rt->ports, port); 3338 3339 if (nxt_fast_path(res == NXT_OK)) { 3340 nxt_port_use(task, port, 1); 3341 } 3342 } 3343 3344 3345 static void 3346 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 3347 { 3348 nxt_joint_job_t *job; 3349 nxt_socket_conf_t *skcf; 3350 nxt_listen_event_t *lev; 3351 nxt_listen_socket_t *ls; 3352 nxt_thread_spinlock_t *lock; 3353 nxt_socket_conf_joint_t *joint; 3354 3355 job = obj; 3356 joint = data; 3357 3358 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 3359 3360 skcf = joint->socket_conf; 3361 ls = skcf->listen; 3362 3363 lev = nxt_listen_event(task, ls); 3364 if (nxt_slow_path(lev == NULL)) { 3365 nxt_router_listen_socket_release(task, skcf); 3366 return; 3367 } 3368 3369 lev->socket.data = joint; 3370 3371 lock = &skcf->router_conf->router->lock; 3372 3373 nxt_thread_spin_lock(lock); 3374 ls->count++; 3375 nxt_thread_spin_unlock(lock); 3376 3377 job->work.next = NULL; 3378 job->work.handler = nxt_router_conf_wait; 3379 3380 nxt_event_engine_post(job->tmcf->engine, &job->work); 3381 } 3382 3383 3384 nxt_inline nxt_listen_event_t * 3385 nxt_router_listen_event(nxt_queue_t *listen_connections, 3386 nxt_socket_conf_t *skcf) 3387 { 3388 nxt_socket_t fd; 3389 nxt_queue_link_t *qlk; 3390 nxt_listen_event_t *lev; 3391 3392 fd = skcf->listen->socket; 3393 3394 for (qlk = nxt_queue_first(listen_connections); 3395 qlk != nxt_queue_tail(listen_connections); 3396 qlk = nxt_queue_next(qlk)) 3397 { 3398 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 3399 3400 if (fd == lev->socket.fd) { 3401 return lev; 3402 } 3403 } 3404 3405 return NULL; 3406 } 3407 3408 3409 static void 3410 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 3411 { 3412 nxt_joint_job_t *job; 3413 nxt_event_engine_t *engine; 3414 nxt_listen_event_t *lev; 3415 nxt_socket_conf_joint_t *joint, *old; 3416 3417 job = obj; 3418 joint = data; 3419 3420 engine = task->thread->engine; 3421 3422 nxt_queue_insert_tail(&engine->joints, &joint->link); 3423 3424 lev = nxt_router_listen_event(&engine->listen_connections, 3425 joint->socket_conf); 3426 3427 old = lev->socket.data; 3428 lev->socket.data = joint; 3429 lev->listen = joint->socket_conf->listen; 3430 3431 job->work.next = NULL; 3432 job->work.handler = nxt_router_conf_wait; 3433 3434 nxt_event_engine_post(job->tmcf->engine, &job->work); 3435 3436 /* 3437 * The task is allocated from configuration temporary 3438 * memory pool so it can be freed after engine post operation. 3439 */ 3440 3441 nxt_router_conf_release(&engine->task, old); 3442 } 3443 3444 3445 static void 3446 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 3447 { 3448 nxt_socket_conf_t *skcf; 3449 nxt_listen_event_t *lev; 3450 nxt_event_engine_t *engine; 3451 nxt_socket_conf_joint_t *joint; 3452 3453 skcf = data; 3454 3455 engine = task->thread->engine; 3456 3457 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 3458 3459 nxt_fd_event_delete(engine, &lev->socket); 3460 3461 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 3462 lev->socket.fd); 3463 3464 joint = lev->socket.data; 3465 joint->close_job = obj; 3466 3467 lev->timer.handler = nxt_router_listen_socket_close; 3468 lev->timer.work_queue = &engine->fast_work_queue; 3469 3470 nxt_timer_add(engine, &lev->timer, 0); 3471 } 3472 3473 3474 static void 3475 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 3476 { 3477 nxt_event_engine_t *engine; 3478 3479 nxt_debug(task, "router worker thread quit"); 3480 3481 engine = task->thread->engine; 3482 3483 engine->shutdown = 1; 3484 3485 if (nxt_queue_is_empty(&engine->joints)) { 3486 nxt_thread_exit(task->thread); 3487 } 3488 } 3489 3490 3491 static void 3492 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3493 { 3494 nxt_timer_t *timer; 3495 nxt_joint_job_t *job; 3496 nxt_listen_event_t *lev; 3497 nxt_socket_conf_joint_t *joint; 3498 3499 timer = obj; 3500 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3501 3502 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3503 lev->socket.fd); 3504 3505 nxt_queue_remove(&lev->link); 3506 3507 joint = lev->socket.data; 3508 lev->socket.data = NULL; 3509 3510 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3511 task = &task->thread->engine->task; 3512 3513 nxt_router_listen_socket_release(task, joint->socket_conf); 3514 3515 job = joint->close_job; 3516 job->work.next = NULL; 3517 job->work.handler = nxt_router_conf_wait; 3518 3519 nxt_event_engine_post(job->tmcf->engine, &job->work); 3520 3521 nxt_router_listen_event_release(task, lev, joint); 3522 } 3523 3524 3525 static void 3526 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3527 { 3528 nxt_listen_socket_t *ls; 3529 nxt_thread_spinlock_t *lock; 3530 3531 ls = skcf->listen; 3532 lock = &skcf->router_conf->router->lock; 3533 3534 nxt_thread_spin_lock(lock); 3535 3536 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3537 task->thread->engine, ls->count); 3538 3539 if (--ls->count != 0) { 3540 ls = NULL; 3541 } 3542 3543 nxt_thread_spin_unlock(lock); 3544 3545 if (ls != NULL) { 3546 nxt_socket_close(task, ls->socket); 3547 nxt_free(ls); 3548 } 3549 } 3550 3551 3552 void 3553 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3554 nxt_socket_conf_joint_t *joint) 3555 { 3556 nxt_event_engine_t *engine; 3557 3558 nxt_debug(task, "listen event count: %D", lev->count); 3559 3560 engine = task->thread->engine; 3561 3562 if (--lev->count == 0) { 3563 if (lev->next != NULL) { 3564 nxt_sockaddr_cache_free(engine, lev->next); 3565 3566 nxt_conn_free(task, lev->next); 3567 } 3568 3569 nxt_free(lev); 3570 } 3571 3572 if (joint != NULL) { 3573 nxt_router_conf_release(task, joint); 3574 } 3575 3576 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3577 nxt_thread_exit(task->thread); 3578 } 3579 } 3580 3581 3582 void 3583 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3584 { 3585 nxt_socket_conf_t *skcf; 3586 nxt_router_conf_t *rtcf; 3587 nxt_thread_spinlock_t *lock; 3588 3589 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3590 3591 if (--joint->count != 0) { 3592 return; 3593 } 3594 3595 nxt_queue_remove(&joint->link); 3596 3597 /* 3598 * The joint content can not be safely used after the critical 3599 * section protected by the spinlock because its memory pool may 3600 * be already destroyed by another thread. 3601 */ 3602 skcf = joint->socket_conf; 3603 rtcf = skcf->router_conf; 3604 lock = &rtcf->router->lock; 3605 3606 nxt_thread_spin_lock(lock); 3607 3608 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3609 rtcf, rtcf->count); 3610 3611 if (--skcf->count != 0) { 3612 skcf = NULL; 3613 rtcf = NULL; 3614 3615 } else { 3616 nxt_queue_remove(&skcf->link); 3617 3618 if (--rtcf->count != 0) { 3619 rtcf = NULL; 3620 } 3621 } 3622 3623 nxt_thread_spin_unlock(lock); 3624 3625 #if (NXT_TLS) 3626 if (skcf != NULL && skcf->tls != NULL) { 3627 task->thread->runtime->tls->server_free(task, skcf->tls); 3628 } 3629 #endif 3630 3631 /* TODO remove engine->port */ 3632 3633 if (rtcf != NULL) { 3634 nxt_debug(task, "old router conf is destroyed"); 3635 3636 nxt_router_apps_hash_use(task, rtcf, -1); 3637 3638 nxt_router_access_log_release(task, lock, rtcf->access_log); 3639 3640 nxt_mp_thread_adopt(rtcf->mem_pool); 3641 3642 nxt_mp_destroy(rtcf->mem_pool); 3643 } 3644 } 3645 3646 3647 static void 3648 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3649 nxt_router_access_log_t *access_log) 3650 { 3651 size_t size; 3652 u_char *buf, *p; 3653 nxt_off_t bytes; 3654 3655 static nxt_time_string_t date_cache = { 3656 (nxt_atomic_uint_t) -1, 3657 nxt_router_access_log_date, 3658 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3659 nxt_length("31/Dec/1986:19:40:00 +0300"), 3660 NXT_THREAD_TIME_LOCAL, 3661 NXT_THREAD_TIME_SEC, 3662 }; 3663 3664 size = r->remote->address_length 3665 + 6 /* ' - - [' */ 3666 + date_cache.size 3667 + 3 /* '] "' */ 3668 + r->method->length 3669 + 1 /* space */ 3670 + r->target.length 3671 + 1 /* space */ 3672 + r->version.length 3673 + 2 /* '" ' */ 3674 + 3 /* status */ 3675 + 1 /* space */ 3676 + NXT_OFF_T_LEN 3677 + 2 /* ' "' */ 3678 + (r->referer != NULL ? r->referer->value_length : 1) 3679 + 3 /* '" "' */ 3680 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3681 + 2 /* '"\n' */ 3682 ; 3683 3684 buf = nxt_mp_nget(r->mem_pool, size); 3685 if (nxt_slow_path(buf == NULL)) { 3686 return; 3687 } 3688 3689 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3690 r->remote->address_length); 3691 3692 p = nxt_cpymem(p, " - - [", 6); 3693 3694 p = nxt_thread_time_string(task->thread, &date_cache, p); 3695 3696 p = nxt_cpymem(p, "] \"", 3); 3697 3698 if (r->method->length != 0) { 3699 p = nxt_cpymem(p, r->method->start, r->method->length); 3700 3701 if (r->target.length != 0) { 3702 *p++ = ' '; 3703 p = nxt_cpymem(p, r->target.start, r->target.length); 3704 3705 if (r->version.length != 0) { 3706 *p++ = ' '; 3707 p = nxt_cpymem(p, r->version.start, r->version.length); 3708 } 3709 } 3710 3711 } else { 3712 *p++ = '-'; 3713 } 3714 3715 p = nxt_cpymem(p, "\" ", 2); 3716 3717 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3718 3719 *p++ = ' '; 3720 3721 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3722 3723 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3724 3725 p = nxt_cpymem(p, " \"", 2); 3726 3727 if (r->referer != NULL) { 3728 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3729 3730 } else { 3731 *p++ = '-'; 3732 } 3733 3734 p = nxt_cpymem(p, "\" \"", 3); 3735 3736 if (r->user_agent != NULL) { 3737 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3738 3739 } else { 3740 *p++ = '-'; 3741 } 3742 3743 p = nxt_cpymem(p, "\"\n", 2); 3744 3745 nxt_fd_write(access_log->fd, buf, p - buf); 3746 } 3747 3748 3749 static u_char * 3750 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3751 size_t size, const char *format) 3752 { 3753 u_char sign; 3754 time_t gmtoff; 3755 3756 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3757 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3758 3759 gmtoff = nxt_timezone(tm) / 60; 3760 3761 if (gmtoff < 0) { 3762 gmtoff = -gmtoff; 3763 sign = '-'; 3764 3765 } else { 3766 sign = '+'; 3767 } 3768 3769 return nxt_sprintf(buf, buf + size, format, 3770 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3771 tm->tm_hour, tm->tm_min, tm->tm_sec, 3772 sign, gmtoff / 60, gmtoff % 60); 3773 } 3774 3775 3776 static void 3777 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3778 { 3779 uint32_t stream; 3780 nxt_int_t ret; 3781 nxt_buf_t *b; 3782 nxt_port_t *main_port, *router_port; 3783 nxt_runtime_t *rt; 3784 nxt_router_access_log_t *access_log; 3785 3786 access_log = tmcf->router_conf->access_log; 3787 3788 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3789 if (nxt_slow_path(b == NULL)) { 3790 goto fail; 3791 } 3792 3793 b->completion_handler = nxt_buf_dummy_completion; 3794 3795 nxt_buf_cpystr(b, &access_log->path); 3796 *b->mem.free++ = '\0'; 3797 3798 rt = task->thread->runtime; 3799 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3800 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3801 3802 stream = nxt_port_rpc_register_handler(task, router_port, 3803 nxt_router_access_log_ready, 3804 nxt_router_access_log_error, 3805 -1, tmcf); 3806 if (nxt_slow_path(stream == 0)) { 3807 goto fail; 3808 } 3809 3810 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3811 stream, router_port->id, b); 3812 3813 if (nxt_slow_path(ret != NXT_OK)) { 3814 nxt_port_rpc_cancel(task, router_port, stream); 3815 goto fail; 3816 } 3817 3818 return; 3819 3820 fail: 3821 3822 nxt_router_conf_error(task, tmcf); 3823 } 3824 3825 3826 static void 3827 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3828 void *data) 3829 { 3830 nxt_router_temp_conf_t *tmcf; 3831 nxt_router_access_log_t *access_log; 3832 3833 tmcf = data; 3834 3835 access_log = tmcf->router_conf->access_log; 3836 3837 access_log->fd = msg->fd[0]; 3838 3839 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3840 nxt_router_conf_apply, task, tmcf, NULL); 3841 } 3842 3843 3844 static void 3845 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3846 void *data) 3847 { 3848 nxt_router_temp_conf_t *tmcf; 3849 3850 tmcf = data; 3851 3852 nxt_router_conf_error(task, tmcf); 3853 } 3854 3855 3856 static void 3857 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3858 nxt_router_access_log_t *access_log) 3859 { 3860 if (access_log == NULL) { 3861 return; 3862 } 3863 3864 nxt_thread_spin_lock(lock); 3865 3866 if (--access_log->count != 0) { 3867 access_log = NULL; 3868 } 3869 3870 nxt_thread_spin_unlock(lock); 3871 3872 if (access_log != NULL) { 3873 3874 if (access_log->fd != -1) { 3875 nxt_fd_close(access_log->fd); 3876 } 3877 3878 nxt_free(access_log); 3879 } 3880 } 3881 3882 3883 typedef struct { 3884 nxt_mp_t *mem_pool; 3885 nxt_router_access_log_t *access_log; 3886 } nxt_router_access_log_reopen_t; 3887 3888 3889 static void 3890 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3891 { 3892 nxt_mp_t *mp; 3893 uint32_t stream; 3894 nxt_int_t ret; 3895 nxt_buf_t *b; 3896 nxt_port_t *main_port, *router_port; 3897 nxt_runtime_t *rt; 3898 nxt_router_access_log_t *access_log; 3899 nxt_router_access_log_reopen_t *reopen; 3900 3901 access_log = nxt_router->access_log; 3902 3903 if (access_log == NULL) { 3904 return; 3905 } 3906 3907 mp = nxt_mp_create(1024, 128, 256, 32); 3908 if (nxt_slow_path(mp == NULL)) { 3909 return; 3910 } 3911 3912 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3913 if (nxt_slow_path(reopen == NULL)) { 3914 goto fail; 3915 } 3916 3917 reopen->mem_pool = mp; 3918 reopen->access_log = access_log; 3919 3920 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 3921 if (nxt_slow_path(b == NULL)) { 3922 goto fail; 3923 } 3924 3925 b->completion_handler = nxt_router_access_log_reopen_completion; 3926 3927 nxt_buf_cpystr(b, &access_log->path); 3928 *b->mem.free++ = '\0'; 3929 3930 rt = task->thread->runtime; 3931 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3932 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3933 3934 stream = nxt_port_rpc_register_handler(task, router_port, 3935 nxt_router_access_log_reopen_ready, 3936 nxt_router_access_log_reopen_error, 3937 -1, reopen); 3938 if (nxt_slow_path(stream == 0)) { 3939 goto fail; 3940 } 3941 3942 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3943 stream, router_port->id, b); 3944 3945 if (nxt_slow_path(ret != NXT_OK)) { 3946 nxt_port_rpc_cancel(task, router_port, stream); 3947 goto fail; 3948 } 3949 3950 nxt_mp_retain(mp); 3951 3952 return; 3953 3954 fail: 3955 3956 nxt_mp_destroy(mp); 3957 } 3958 3959 3960 static void 3961 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 3962 { 3963 nxt_mp_t *mp; 3964 nxt_buf_t *b; 3965 3966 b = obj; 3967 mp = b->data; 3968 3969 nxt_mp_release(mp); 3970 } 3971 3972 3973 static void 3974 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3975 void *data) 3976 { 3977 nxt_router_access_log_t *access_log; 3978 nxt_router_access_log_reopen_t *reopen; 3979 3980 reopen = data; 3981 3982 access_log = reopen->access_log; 3983 3984 if (access_log == nxt_router->access_log) { 3985 3986 if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { 3987 nxt_alert(task, "dup2(%FD, %FD) failed %E", 3988 msg->fd[0], access_log->fd, nxt_errno); 3989 } 3990 } 3991 3992 nxt_fd_close(msg->fd[0]); 3993 nxt_mp_release(reopen->mem_pool); 3994 } 3995 3996 3997 static void 3998 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3999 void *data) 4000 { 4001 nxt_router_access_log_reopen_t *reopen; 4002 4003 reopen = data; 4004 4005 nxt_mp_release(reopen->mem_pool); 4006 } 4007 4008 4009 static void 4010 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 4011 { 4012 nxt_port_t *port; 4013 nxt_thread_link_t *link; 4014 nxt_event_engine_t *engine; 4015 nxt_thread_handle_t handle; 4016 4017 handle = (nxt_thread_handle_t) (uintptr_t) obj; 4018 link = data; 4019 4020 nxt_thread_wait(handle); 4021 4022 engine = link->engine; 4023 4024 nxt_queue_remove(&engine->link); 4025 4026 port = engine->port; 4027 4028 // TODO notify all apps 4029 4030 port->engine = task->thread->engine; 4031 nxt_mp_thread_adopt(port->mem_pool); 4032 nxt_port_use(task, port, -1); 4033 4034 nxt_mp_thread_adopt(engine->mem_pool); 4035 nxt_mp_destroy(engine->mem_pool); 4036 4037 nxt_event_engine_free(engine); 4038 4039 nxt_free(link); 4040 } 4041 4042 4043 static void 4044 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4045 void *data) 4046 { 4047 size_t b_size, count; 4048 nxt_int_t ret; 4049 nxt_app_t *app; 4050 nxt_buf_t *b, *next; 4051 nxt_port_t *app_port; 4052 nxt_unit_field_t *f; 4053 nxt_http_field_t *field; 4054 nxt_http_request_t *r; 4055 nxt_unit_response_t *resp; 4056 nxt_request_rpc_data_t *req_rpc_data; 4057 4058 req_rpc_data = data; 4059 4060 r = req_rpc_data->request; 4061 if (nxt_slow_path(r == NULL)) { 4062 return; 4063 } 4064 4065 if (r->error) { 4066 nxt_request_rpc_data_unlink(task, req_rpc_data); 4067 return; 4068 } 4069 4070 app = req_rpc_data->app; 4071 nxt_assert(app != NULL); 4072 4073 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { 4074 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); 4075 4076 return; 4077 } 4078 4079 b = (msg->size == 0) ? NULL : msg->buf; 4080 4081 if (msg->port_msg.last != 0) { 4082 nxt_debug(task, "router data create last buf"); 4083 4084 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 4085 4086 req_rpc_data->rpc_cancel = 0; 4087 4088 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { 4089 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; 4090 } 4091 4092 nxt_request_rpc_data_unlink(task, req_rpc_data); 4093 4094 } else { 4095 if (app->timeout != 0) { 4096 r->timer.handler = nxt_router_app_timeout; 4097 r->timer_data = req_rpc_data; 4098 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4099 } 4100 } 4101 4102 if (b == NULL) { 4103 return; 4104 } 4105 4106 if (msg->buf == b) { 4107 /* Disable instant buffer completion/re-using by port. */ 4108 msg->buf = NULL; 4109 } 4110 4111 if (r->header_sent) { 4112 nxt_buf_chain_add(&r->out, b); 4113 nxt_http_request_send_body(task, r, NULL); 4114 4115 } else { 4116 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0; 4117 4118 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) { 4119 nxt_alert(task, "response buffer too small: %z", b_size); 4120 goto fail; 4121 } 4122 4123 resp = (void *) b->mem.pos; 4124 count = (b_size - sizeof(nxt_unit_response_t)) 4125 / sizeof(nxt_unit_field_t); 4126 4127 if (nxt_slow_path(count < resp->fields_count)) { 4128 nxt_alert(task, "response buffer too small for fields count: %D", 4129 resp->fields_count); 4130 goto fail; 4131 } 4132 4133 field = NULL; 4134 4135 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 4136 if (f->skip) { 4137 continue; 4138 } 4139 4140 field = nxt_list_add(r->resp.fields); 4141 4142 if (nxt_slow_path(field == NULL)) { 4143 goto fail; 4144 } 4145 4146 field->hash = f->hash; 4147 field->skip = 0; 4148 field->hopbyhop = 0; 4149 4150 field->name_length = f->name_length; 4151 field->value_length = f->value_length; 4152 field->name = nxt_unit_sptr_get(&f->name); 4153 field->value = nxt_unit_sptr_get(&f->value); 4154 4155 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 4156 if (nxt_slow_path(ret != NXT_OK)) { 4157 goto fail; 4158 } 4159 4160 nxt_debug(task, "header%s: %*s: %*s", 4161 (field->skip ? " skipped" : ""), 4162 (size_t) field->name_length, field->name, 4163 (size_t) field->value_length, field->value); 4164 4165 if (field->skip) { 4166 r->resp.fields->last->nelts--; 4167 } 4168 } 4169 4170 r->status = resp->status; 4171 4172 if (resp->piggyback_content_length != 0) { 4173 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 4174 b->mem.free = b->mem.pos + resp->piggyback_content_length; 4175 4176 } else { 4177 b->mem.pos = b->mem.free; 4178 } 4179 4180 if (nxt_buf_mem_used_size(&b->mem) == 0) { 4181 next = b->next; 4182 b->next = NULL; 4183 4184 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 4185 b->completion_handler, task, b, b->parent); 4186 4187 b = next; 4188 } 4189 4190 if (b != NULL) { 4191 nxt_buf_chain_add(&r->out, b); 4192 } 4193 4194 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 4195 4196 if (r->websocket_handshake 4197 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 4198 { 4199 app_port = req_rpc_data->app_port; 4200 if (nxt_slow_path(app_port == NULL)) { 4201 goto fail; 4202 } 4203 4204 nxt_thread_mutex_lock(&app->mutex); 4205 4206 app_port->main_app_port->active_websockets++; 4207 4208 nxt_thread_mutex_unlock(&app->mutex); 4209 4210 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); 4211 req_rpc_data->apr_action = NXT_APR_CLOSE; 4212 4213 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 4214 4215 r->state = &nxt_http_websocket; 4216 4217 } else { 4218 r->state = &nxt_http_request_send_state; 4219 } 4220 } 4221 4222 return; 4223 4224 fail: 4225 4226 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4227 4228 nxt_request_rpc_data_unlink(task, req_rpc_data); 4229 } 4230 4231 4232 static void 4233 nxt_router_req_headers_ack_handler(nxt_task_t *task, 4234 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 4235 { 4236 int res; 4237 nxt_app_t *app; 4238 nxt_buf_t *b; 4239 nxt_bool_t start_process, unlinked; 4240 nxt_port_t *app_port, *main_app_port, *idle_port; 4241 nxt_queue_link_t *idle_lnk; 4242 nxt_http_request_t *r; 4243 4244 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 4245 req_rpc_data->stream, 4246 msg->port_msg.pid, msg->port_msg.reply_port); 4247 4248 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 4249 msg->port_msg.pid); 4250 4251 app = req_rpc_data->app; 4252 r = req_rpc_data->request; 4253 4254 start_process = 0; 4255 unlinked = 0; 4256 4257 nxt_thread_mutex_lock(&app->mutex); 4258 4259 if (r->app_link.next != NULL) { 4260 nxt_queue_remove(&r->app_link); 4261 r->app_link.next = NULL; 4262 4263 unlinked = 1; 4264 } 4265 4266 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 4267 msg->port_msg.reply_port); 4268 if (nxt_slow_path(app_port == NULL)) { 4269 nxt_thread_mutex_unlock(&app->mutex); 4270 4271 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4272 4273 if (unlinked) { 4274 nxt_mp_release(r->mem_pool); 4275 } 4276 4277 return; 4278 } 4279 4280 main_app_port = app_port->main_app_port; 4281 4282 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 4283 app->idle_processes--; 4284 4285 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", 4286 &app->name, main_app_port->pid, main_app_port->id, 4287 (main_app_port->idle_start ? "idle_ports" : "spare_ports")); 4288 4289 /* Check port was in 'spare_ports' using idle_start field. */ 4290 if (main_app_port->idle_start == 0 4291 && app->idle_processes >= app->spare_processes) 4292 { 4293 /* 4294 * If there is a vacant space in spare ports, 4295 * move the last idle to spare_ports. 4296 */ 4297 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4298 4299 idle_lnk = nxt_queue_last(&app->idle_ports); 4300 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4301 nxt_queue_remove(idle_lnk); 4302 4303 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4304 4305 idle_port->idle_start = 0; 4306 4307 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4308 "to spare_ports", 4309 &app->name, idle_port->pid, idle_port->id); 4310 } 4311 4312 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4313 app->pending_processes++; 4314 start_process = 1; 4315 } 4316 } 4317 4318 main_app_port->active_requests++; 4319 4320 nxt_port_inc_use(app_port); 4321 4322 nxt_thread_mutex_unlock(&app->mutex); 4323 4324 if (unlinked) { 4325 nxt_mp_release(r->mem_pool); 4326 } 4327 4328 if (start_process) { 4329 nxt_router_start_app_process(task, app); 4330 } 4331 4332 nxt_port_use(task, req_rpc_data->app_port, -1); 4333 4334 req_rpc_data->app_port = app_port; 4335 4336 b = req_rpc_data->msg_info.buf; 4337 4338 if (b != NULL) { 4339 /* First buffer is already sent. Start from second. */ 4340 b = b->next; 4341 4342 req_rpc_data->msg_info.buf->next = NULL; 4343 } 4344 4345 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) { 4346 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, 4347 req_rpc_data->msg_info.body_fd); 4348 4349 if (req_rpc_data->msg_info.body_fd != -1) { 4350 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 4351 } 4352 4353 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 4354 req_rpc_data->msg_info.body_fd, 4355 req_rpc_data->stream, 4356 task->thread->engine->port->id, b); 4357 4358 if (nxt_slow_path(res != NXT_OK)) { 4359 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4360 } 4361 } 4362 4363 if (app->timeout != 0) { 4364 r->timer.handler = nxt_router_app_timeout; 4365 r->timer_data = req_rpc_data; 4366 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4367 } 4368 } 4369 4370 4371 static const nxt_http_request_state_t nxt_http_request_send_state 4372 nxt_aligned(64) = 4373 { 4374 .error_handler = nxt_http_request_error_handler, 4375 }; 4376 4377 4378 static void 4379 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 4380 { 4381 nxt_buf_t *out; 4382 nxt_http_request_t *r; 4383 4384 r = obj; 4385 4386 out = r->out; 4387 4388 if (out != NULL) { 4389 r->out = NULL; 4390 nxt_http_request_send(task, r, out); 4391 } 4392 } 4393 4394 4395 static void 4396 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4397 void *data) 4398 { 4399 nxt_request_rpc_data_t *req_rpc_data; 4400 4401 req_rpc_data = data; 4402 4403 req_rpc_data->rpc_cancel = 0; 4404 4405 /* TODO cancel message and return if cancelled. */ 4406 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 4407 4408 if (req_rpc_data->request != NULL) { 4409 nxt_http_request_error(task, req_rpc_data->request, 4410 NXT_HTTP_SERVICE_UNAVAILABLE); 4411 } 4412 4413 nxt_request_rpc_data_unlink(task, req_rpc_data); 4414 } 4415 4416 4417 static void 4418 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4419 void *data) 4420 { 4421 nxt_app_t *app; 4422 nxt_bool_t start_process; 4423 nxt_port_t *port; 4424 nxt_app_joint_t *app_joint; 4425 nxt_app_joint_rpc_t *app_joint_rpc; 4426 4427 nxt_assert(data != NULL); 4428 4429 app_joint_rpc = data; 4430 app_joint = app_joint_rpc->app_joint; 4431 port = msg->u.new_port; 4432 4433 nxt_assert(app_joint != NULL); 4434 nxt_assert(port != NULL); 4435 nxt_assert(port->type == NXT_PROCESS_APP); 4436 nxt_assert(port->id == 0); 4437 4438 app = app_joint->app; 4439 4440 nxt_router_app_joint_use(task, app_joint, -1); 4441 4442 if (nxt_slow_path(app == NULL)) { 4443 nxt_debug(task, "new port ready for released app, send QUIT"); 4444 4445 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4446 4447 return; 4448 } 4449 4450 nxt_thread_mutex_lock(&app->mutex); 4451 4452 nxt_assert(app->pending_processes != 0); 4453 4454 app->pending_processes--; 4455 4456 if (nxt_slow_path(app->generation != app_joint_rpc->generation)) { 4457 nxt_debug(task, "new port ready for restarted app, send QUIT"); 4458 4459 start_process = !task->thread->engine->shutdown 4460 && nxt_router_app_can_start(app) 4461 && nxt_router_app_need_start(app); 4462 4463 if (start_process) { 4464 app->pending_processes++; 4465 } 4466 4467 nxt_thread_mutex_unlock(&app->mutex); 4468 4469 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4470 4471 if (start_process) { 4472 nxt_router_start_app_process(task, app); 4473 } 4474 4475 return; 4476 } 4477 4478 port->app = app; 4479 port->main_app_port = port; 4480 4481 app->processes++; 4482 nxt_port_hash_add(&app->port_hash, port); 4483 app->port_hash_count++; 4484 4485 nxt_thread_mutex_unlock(&app->mutex); 4486 4487 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 4488 &app->name, port->pid, app->processes, app->pending_processes); 4489 4490 nxt_router_app_shared_port_send(task, port); 4491 4492 nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); 4493 } 4494 4495 4496 static nxt_int_t 4497 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) 4498 { 4499 nxt_buf_t *b; 4500 nxt_port_t *port; 4501 nxt_port_msg_new_port_t *msg; 4502 4503 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 4504 sizeof(nxt_port_data_t)); 4505 if (nxt_slow_path(b == NULL)) { 4506 return NXT_ERROR; 4507 } 4508 4509 port = app_port->app->shared_port; 4510 4511 nxt_debug(task, "send port %FD to process %PI", 4512 port->pair[0], app_port->pid); 4513 4514 b->mem.free += sizeof(nxt_port_msg_new_port_t); 4515 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 4516 4517 msg->id = port->id; 4518 msg->pid = port->pid; 4519 msg->max_size = port->max_size; 4520 msg->max_share = port->max_share; 4521 msg->type = port->type; 4522 4523 return nxt_port_socket_write2(task, app_port, 4524 NXT_PORT_MSG_NEW_PORT, 4525 port->pair[0], port->queue_fd, 4526 0, 0, b); 4527 } 4528 4529 4530 static void 4531 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4532 void *data) 4533 { 4534 nxt_app_t *app; 4535 nxt_app_joint_t *app_joint; 4536 nxt_queue_link_t *link; 4537 nxt_http_request_t *r; 4538 nxt_app_joint_rpc_t *app_joint_rpc; 4539 4540 nxt_assert(data != NULL); 4541 4542 app_joint_rpc = data; 4543 app_joint = app_joint_rpc->app_joint; 4544 4545 nxt_assert(app_joint != NULL); 4546 4547 app = app_joint->app; 4548 4549 nxt_router_app_joint_use(task, app_joint, -1); 4550 4551 if (nxt_slow_path(app == NULL)) { 4552 nxt_debug(task, "start error for released app"); 4553 4554 return; 4555 } 4556 4557 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4558 4559 link = NULL; 4560 4561 nxt_thread_mutex_lock(&app->mutex); 4562 4563 nxt_assert(app->pending_processes != 0); 4564 4565 app->pending_processes--; 4566 4567 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4568 link = nxt_queue_first(&app->ack_waiting_req); 4569 4570 nxt_queue_remove(link); 4571 link->next = NULL; 4572 } 4573 4574 nxt_thread_mutex_unlock(&app->mutex); 4575 4576 while (link != NULL) { 4577 r = nxt_container_of(link, nxt_http_request_t, app_link); 4578 4579 nxt_event_engine_post(r->engine, &r->err_work); 4580 4581 link = NULL; 4582 4583 nxt_thread_mutex_lock(&app->mutex); 4584 4585 if (app->processes == 0 && app->pending_processes == 0 4586 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4587 { 4588 link = nxt_queue_first(&app->ack_waiting_req); 4589 4590 nxt_queue_remove(link); 4591 link->next = NULL; 4592 } 4593 4594 nxt_thread_mutex_unlock(&app->mutex); 4595 } 4596 } 4597 4598 4599 4600 nxt_inline nxt_port_t * 4601 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) 4602 { 4603 nxt_port_t *port; 4604 4605 port = NULL; 4606 4607 nxt_thread_mutex_lock(&app->mutex); 4608 4609 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 4610 4611 /* Caller is responsible to decrease port use count. */ 4612 nxt_queue_chk_remove(&port->app_link); 4613 4614 if (nxt_queue_chk_remove(&port->idle_link)) { 4615 app->idle_processes--; 4616 4617 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", 4618 &app->name, port->pid, port->id, 4619 (port->idle_start ? "idle_ports" : "spare_ports")); 4620 } 4621 4622 nxt_port_hash_remove(&app->port_hash, port); 4623 app->port_hash_count--; 4624 4625 port->app = NULL; 4626 app->processes--; 4627 4628 break; 4629 4630 } nxt_queue_loop; 4631 4632 nxt_thread_mutex_unlock(&app->mutex); 4633 4634 return port; 4635 } 4636 4637 4638 static void 4639 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4640 { 4641 int c; 4642 4643 c = nxt_atomic_fetch_add(&app->use_count, i); 4644 4645 if (i < 0 && c == -i) { 4646 4647 if (task->thread->engine != app->engine) { 4648 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 4649 4650 } else { 4651 nxt_router_free_app(task, app->joint, NULL); 4652 } 4653 } 4654 } 4655 4656 4657 static void 4658 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 4659 { 4660 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 4661 4662 nxt_queue_remove(&app->link); 4663 4664 nxt_router_app_use(task, app, -1); 4665 } 4666 4667 4668 static void 4669 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 4670 nxt_apr_action_t action) 4671 { 4672 int inc_use; 4673 uint32_t got_response, dec_requests; 4674 nxt_app_t *app; 4675 nxt_bool_t port_unchained, send_quit, adjust_idle_timer; 4676 nxt_port_t *main_app_port; 4677 4678 nxt_assert(port != NULL); 4679 nxt_assert(port->app != NULL); 4680 4681 app = port->app; 4682 4683 inc_use = 0; 4684 got_response = 0; 4685 dec_requests = 0; 4686 4687 switch (action) { 4688 case NXT_APR_NEW_PORT: 4689 break; 4690 case NXT_APR_REQUEST_FAILED: 4691 dec_requests = 1; 4692 inc_use = -1; 4693 break; 4694 case NXT_APR_GOT_RESPONSE: 4695 got_response = 1; 4696 inc_use = -1; 4697 break; 4698 case NXT_APR_UPGRADE: 4699 got_response = 1; 4700 break; 4701 case NXT_APR_CLOSE: 4702 inc_use = -1; 4703 break; 4704 } 4705 4706 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, 4707 port->pid, port->id, 4708 (int) inc_use, (int) got_response); 4709 4710 if (port->id == NXT_SHARED_PORT_ID) { 4711 nxt_thread_mutex_lock(&app->mutex); 4712 4713 app->active_requests -= got_response + dec_requests; 4714 4715 nxt_thread_mutex_unlock(&app->mutex); 4716 4717 goto adjust_use; 4718 } 4719 4720 main_app_port = port->main_app_port; 4721 4722 nxt_thread_mutex_lock(&app->mutex); 4723 4724 main_app_port->app_responses += got_response; 4725 main_app_port->active_requests -= got_response + dec_requests; 4726 app->active_requests -= got_response + dec_requests; 4727 4728 if (main_app_port->pair[1] != -1 4729 && (app->max_requests == 0 4730 || main_app_port->app_responses < app->max_requests)) 4731 { 4732 if (main_app_port->app_link.next == NULL) { 4733 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4734 4735 nxt_port_inc_use(main_app_port); 4736 } 4737 } 4738 4739 send_quit = (app->max_requests > 0 4740 && main_app_port->app_responses >= app->max_requests); 4741 4742 if (send_quit) { 4743 port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); 4744 4745 nxt_port_hash_remove(&app->port_hash, main_app_port); 4746 app->port_hash_count--; 4747 4748 main_app_port->app = NULL; 4749 app->processes--; 4750 4751 } else { 4752 port_unchained = 0; 4753 } 4754 4755 adjust_idle_timer = 0; 4756 4757 if (main_app_port->pair[1] != -1 && !send_quit 4758 && main_app_port->active_requests == 0 4759 && main_app_port->active_websockets == 0 4760 && main_app_port->idle_link.next == NULL) 4761 { 4762 if (app->idle_processes == app->spare_processes 4763 && app->adjust_idle_work.data == NULL) 4764 { 4765 adjust_idle_timer = 1; 4766 app->adjust_idle_work.data = app; 4767 app->adjust_idle_work.next = NULL; 4768 } 4769 4770 if (app->idle_processes < app->spare_processes) { 4771 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4772 4773 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", 4774 &app->name, main_app_port->pid, main_app_port->id); 4775 } else { 4776 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4777 4778 main_app_port->idle_start = task->thread->engine->timers.now; 4779 4780 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", 4781 &app->name, main_app_port->pid, main_app_port->id); 4782 } 4783 4784 app->idle_processes++; 4785 } 4786 4787 nxt_thread_mutex_unlock(&app->mutex); 4788 4789 if (adjust_idle_timer) { 4790 nxt_router_app_use(task, app, 1); 4791 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4792 } 4793 4794 /* ? */ 4795 if (main_app_port->pair[1] == -1) { 4796 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4797 &app->name, app, main_app_port, main_app_port->pid); 4798 4799 goto adjust_use; 4800 } 4801 4802 if (send_quit) { 4803 nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); 4804 4805 nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, 4806 NULL); 4807 4808 if (port_unchained) { 4809 nxt_port_use(task, main_app_port, -1); 4810 } 4811 4812 goto adjust_use; 4813 } 4814 4815 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4816 &app->name, app); 4817 4818 adjust_use: 4819 4820 nxt_port_use(task, port, inc_use); 4821 } 4822 4823 4824 void 4825 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4826 { 4827 nxt_app_t *app; 4828 nxt_bool_t unchain, start_process; 4829 nxt_port_t *idle_port; 4830 nxt_queue_link_t *idle_lnk; 4831 4832 app = port->app; 4833 4834 nxt_assert(app != NULL); 4835 4836 nxt_thread_mutex_lock(&app->mutex); 4837 4838 nxt_port_hash_remove(&app->port_hash, port); 4839 app->port_hash_count--; 4840 4841 if (port->id != 0) { 4842 nxt_thread_mutex_unlock(&app->mutex); 4843 4844 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, 4845 port->pid, port->id); 4846 4847 return; 4848 } 4849 4850 unchain = nxt_queue_chk_remove(&port->app_link); 4851 4852 if (nxt_queue_chk_remove(&port->idle_link)) { 4853 app->idle_processes--; 4854 4855 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", 4856 &app->name, port->pid, port->id, 4857 (port->idle_start ? "idle_ports" : "spare_ports")); 4858 4859 if (port->idle_start == 0 4860 && app->idle_processes >= app->spare_processes) 4861 { 4862 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4863 4864 idle_lnk = nxt_queue_last(&app->idle_ports); 4865 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4866 nxt_queue_remove(idle_lnk); 4867 4868 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4869 4870 idle_port->idle_start = 0; 4871 4872 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4873 "to spare_ports", 4874 &app->name, idle_port->pid, idle_port->id); 4875 } 4876 } 4877 4878 app->processes--; 4879 4880 start_process = !task->thread->engine->shutdown 4881 && nxt_router_app_can_start(app) 4882 && nxt_router_app_need_start(app); 4883 4884 if (start_process) { 4885 app->pending_processes++; 4886 } 4887 4888 nxt_thread_mutex_unlock(&app->mutex); 4889 4890 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 4891 4892 if (unchain) { 4893 nxt_port_use(task, port, -1); 4894 } 4895 4896 if (start_process) { 4897 nxt_router_start_app_process(task, app); 4898 } 4899 } 4900 4901 4902 static void 4903 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 4904 { 4905 nxt_app_t *app; 4906 nxt_bool_t queued; 4907 nxt_port_t *port; 4908 nxt_msec_t timeout, threshold; 4909 nxt_queue_link_t *lnk; 4910 nxt_event_engine_t *engine; 4911 4912 app = obj; 4913 queued = (data == app); 4914 4915 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 4916 &app->name, queued); 4917 4918 engine = task->thread->engine; 4919 4920 nxt_assert(app->engine == engine); 4921 4922 threshold = engine->timers.now + app->joint->idle_timer.bias; 4923 timeout = 0; 4924 4925 nxt_thread_mutex_lock(&app->mutex); 4926 4927 if (queued) { 4928 app->adjust_idle_work.data = NULL; 4929 } 4930 4931 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", 4932 &app->name, 4933 (int) app->idle_processes, (int) app->spare_processes); 4934 4935 while (app->idle_processes > app->spare_processes) { 4936 4937 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4938 4939 lnk = nxt_queue_first(&app->idle_ports); 4940 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 4941 4942 timeout = port->idle_start + app->idle_timeout; 4943 4944 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", 4945 &app->name, port->pid, 4946 port->idle_start, timeout, threshold); 4947 4948 if (timeout > threshold) { 4949 break; 4950 } 4951 4952 nxt_queue_remove(lnk); 4953 lnk->next = NULL; 4954 4955 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", 4956 &app->name, port->pid, port->id); 4957 4958 nxt_queue_chk_remove(&port->app_link); 4959 4960 nxt_port_hash_remove(&app->port_hash, port); 4961 app->port_hash_count--; 4962 4963 app->idle_processes--; 4964 app->processes--; 4965 port->app = NULL; 4966 4967 nxt_thread_mutex_unlock(&app->mutex); 4968 4969 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 4970 &app->name, port->pid); 4971 4972 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4973 4974 nxt_port_use(task, port, -1); 4975 4976 nxt_thread_mutex_lock(&app->mutex); 4977 } 4978 4979 nxt_thread_mutex_unlock(&app->mutex); 4980 4981 if (timeout > threshold) { 4982 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 4983 4984 } else { 4985 nxt_timer_disable(engine, &app->joint->idle_timer); 4986 } 4987 4988 if (queued) { 4989 nxt_router_app_use(task, app, -1); 4990 } 4991 } 4992 4993 4994 static void 4995 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 4996 { 4997 nxt_timer_t *timer; 4998 nxt_app_joint_t *app_joint; 4999 5000 timer = obj; 5001 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 5002 5003 if (nxt_fast_path(app_joint->app != NULL)) { 5004 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 5005 } 5006 } 5007 5008 5009 static void 5010 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 5011 { 5012 nxt_timer_t *timer; 5013 nxt_app_joint_t *app_joint; 5014 5015 timer = obj; 5016 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 5017 5018 nxt_router_app_joint_use(task, app_joint, -1); 5019 } 5020 5021 5022 static void 5023 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 5024 { 5025 nxt_app_t *app; 5026 nxt_port_t *port; 5027 nxt_app_joint_t *app_joint; 5028 5029 app_joint = obj; 5030 app = app_joint->app; 5031 5032 for ( ;; ) { 5033 port = nxt_router_app_get_port_for_quit(task, app); 5034 if (port == NULL) { 5035 break; 5036 } 5037 5038 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); 5039 5040 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 5041 5042 nxt_port_use(task, port, -1); 5043 } 5044 5045 nxt_thread_mutex_lock(&app->mutex); 5046 5047 for ( ;; ) { 5048 port = nxt_port_hash_retrieve(&app->port_hash); 5049 if (port == NULL) { 5050 break; 5051 } 5052 5053 app->port_hash_count--; 5054 5055 port->app = NULL; 5056 5057 nxt_port_close(task, port); 5058 5059 nxt_port_use(task, port, -1); 5060 } 5061 5062 nxt_thread_mutex_unlock(&app->mutex); 5063 5064 nxt_assert(app->processes == 0); 5065 nxt_assert(app->active_requests == 0); 5066 nxt_assert(app->port_hash_count == 0); 5067 nxt_assert(app->idle_processes == 0); 5068 nxt_assert(nxt_queue_is_empty(&app->ports)); 5069 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 5070 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 5071 5072 nxt_port_mmaps_destroy(&app->outgoing, 1); 5073 5074 nxt_thread_mutex_destroy(&app->outgoing.mutex); 5075 5076 if (app->shared_port != NULL) { 5077 app->shared_port->app = NULL; 5078 nxt_port_close(task, app->shared_port); 5079 nxt_port_use(task, app->shared_port, -1); 5080 5081 app->shared_port = NULL; 5082 } 5083 5084 nxt_thread_mutex_destroy(&app->mutex); 5085 nxt_mp_destroy(app->mem_pool); 5086 5087 app_joint->app = NULL; 5088 5089 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 5090 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 5091 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 5092 5093 } else { 5094 nxt_router_app_joint_use(task, app_joint, -1); 5095 } 5096 } 5097 5098 5099 static void 5100 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 5101 nxt_request_rpc_data_t *req_rpc_data) 5102 { 5103 nxt_bool_t start_process; 5104 nxt_port_t *port; 5105 nxt_http_request_t *r; 5106 5107 start_process = 0; 5108 5109 nxt_thread_mutex_lock(&app->mutex); 5110 5111 port = app->shared_port; 5112 nxt_port_inc_use(port); 5113 5114 app->active_requests++; 5115 5116 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 5117 app->pending_processes++; 5118 start_process = 1; 5119 } 5120 5121 r = req_rpc_data->request; 5122 5123 /* 5124 * Put request into application-wide list to be able to cancel request 5125 * if something goes wrong with application processes. 5126 */ 5127 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 5128 5129 nxt_thread_mutex_unlock(&app->mutex); 5130 5131 /* 5132 * Retain request memory pool while request is linked in ack_waiting_req 5133 * to guarantee request structure memory is accessble. 5134 */ 5135 nxt_mp_retain(r->mem_pool); 5136 5137 req_rpc_data->app_port = port; 5138 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 5139 5140 if (start_process) { 5141 nxt_router_start_app_process(task, app); 5142 } 5143 } 5144 5145 5146 void 5147 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, 5148 nxt_http_action_t *action) 5149 { 5150 nxt_event_engine_t *engine; 5151 nxt_http_app_conf_t *conf; 5152 nxt_request_rpc_data_t *req_rpc_data; 5153 5154 conf = action->u.conf; 5155 engine = task->thread->engine; 5156 5157 r->app_target = conf->target; 5158 5159 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, 5160 nxt_router_response_ready_handler, 5161 nxt_router_response_error_handler, 5162 sizeof(nxt_request_rpc_data_t)); 5163 if (nxt_slow_path(req_rpc_data == NULL)) { 5164 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 5165 return; 5166 } 5167 5168 /* 5169 * At this point we have request req_rpc_data allocated and registered 5170 * in port handlers. Need to fixup request memory pool. Counterpart 5171 * release will be called via following call chain: 5172 * nxt_request_rpc_data_unlink() -> 5173 * nxt_router_http_request_release_post() -> 5174 * nxt_router_http_request_release() 5175 */ 5176 nxt_mp_retain(r->mem_pool); 5177 5178 r->timer.task = &engine->task; 5179 r->timer.work_queue = &engine->fast_work_queue; 5180 r->timer.log = engine->task.log; 5181 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 5182 5183 r->engine = engine; 5184 r->err_work.handler = nxt_router_http_request_error; 5185 r->err_work.task = task; 5186 r->err_work.obj = r; 5187 5188 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 5189 req_rpc_data->app = conf->app; 5190 req_rpc_data->msg_info.body_fd = -1; 5191 req_rpc_data->rpc_cancel = 1; 5192 5193 nxt_router_app_use(task, conf->app, 1); 5194 5195 req_rpc_data->request = r; 5196 r->req_rpc_data = req_rpc_data; 5197 5198 if (r->last != NULL) { 5199 r->last->completion_handler = nxt_router_http_request_done; 5200 } 5201 5202 nxt_router_app_port_get(task, conf->app, req_rpc_data); 5203 nxt_router_app_prepare_request(task, req_rpc_data); 5204 } 5205 5206 5207 static void 5208 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 5209 { 5210 nxt_http_request_t *r; 5211 5212 r = obj; 5213 5214 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 5215 5216 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5217 5218 if (r->req_rpc_data != NULL) { 5219 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5220 } 5221 5222 nxt_mp_release(r->mem_pool); 5223 } 5224 5225 5226 static void 5227 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 5228 { 5229 nxt_http_request_t *r; 5230 5231 r = data; 5232 5233 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 5234 5235 if (r->req_rpc_data != NULL) { 5236 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5237 } 5238 5239 nxt_http_request_close_handler(task, r, r->proto.any); 5240 } 5241 5242 5243 static void 5244 nxt_router_app_prepare_request(nxt_task_t *task, 5245 nxt_request_rpc_data_t *req_rpc_data) 5246 { 5247 nxt_app_t *app; 5248 nxt_buf_t *buf, *body; 5249 nxt_int_t res; 5250 nxt_port_t *port, *reply_port; 5251 5252 int notify; 5253 struct { 5254 nxt_port_msg_t pm; 5255 nxt_port_mmap_msg_t mm; 5256 } msg; 5257 5258 5259 app = req_rpc_data->app; 5260 5261 nxt_assert(app != NULL); 5262 5263 port = req_rpc_data->app_port; 5264 5265 nxt_assert(port != NULL); 5266 nxt_assert(port->queue != NULL); 5267 5268 reply_port = task->thread->engine->port; 5269 5270 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, 5271 nxt_app_msg_prefix[app->type]); 5272 if (nxt_slow_path(buf == NULL)) { 5273 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", 5274 req_rpc_data->stream, &app->name); 5275 5276 nxt_http_request_error(task, req_rpc_data->request, 5277 NXT_HTTP_INTERNAL_SERVER_ERROR); 5278 5279 return; 5280 } 5281 5282 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 5283 nxt_buf_used_size(buf), 5284 port->socket.fd); 5285 5286 req_rpc_data->msg_info.buf = buf; 5287 5288 body = req_rpc_data->request->body; 5289 5290 if (body != NULL && nxt_buf_is_file(body)) { 5291 req_rpc_data->msg_info.body_fd = body->file->fd; 5292 5293 body->file->fd = -1; 5294 5295 } else { 5296 req_rpc_data->msg_info.body_fd = -1; 5297 } 5298 5299 msg.pm.stream = req_rpc_data->stream; 5300 msg.pm.pid = reply_port->pid; 5301 msg.pm.reply_port = reply_port->id; 5302 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; 5303 msg.pm.last = 0; 5304 msg.pm.mmap = 1; 5305 msg.pm.nf = 0; 5306 msg.pm.mf = 0; 5307 msg.pm.tracking = 0; 5308 5309 nxt_port_mmap_handler_t *mmap_handler = buf->parent; 5310 nxt_port_mmap_header_t *hdr = mmap_handler->hdr; 5311 5312 msg.mm.mmap_id = hdr->id; 5313 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); 5314 msg.mm.size = nxt_buf_used_size(buf); 5315 5316 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), 5317 req_rpc_data->stream, ¬ify, 5318 &req_rpc_data->msg_info.tracking_cookie); 5319 if (nxt_fast_path(res == NXT_OK)) { 5320 if (notify != 0) { 5321 (void) nxt_port_socket_write(task, port, 5322 NXT_PORT_MSG_READ_QUEUE, 5323 -1, req_rpc_data->stream, 5324 reply_port->id, NULL); 5325 5326 } else { 5327 nxt_debug(task, "queue is not empty"); 5328 } 5329 5330 buf->is_port_mmap_sent = 1; 5331 buf->mem.pos = buf->mem.free; 5332 5333 } else { 5334 nxt_alert(task, "stream #%uD, app '%V': failed to send app message", 5335 req_rpc_data->stream, &app->name); 5336 5337 nxt_http_request_error(task, req_rpc_data->request, 5338 NXT_HTTP_INTERNAL_SERVER_ERROR); 5339 } 5340 } 5341 5342 5343 struct nxt_fields_iter_s { 5344 nxt_list_part_t *part; 5345 nxt_http_field_t *field; 5346 }; 5347 5348 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 5349 5350 5351 static nxt_http_field_t * 5352 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 5353 { 5354 if (part == NULL) { 5355 return NULL; 5356 } 5357 5358 while (part->nelts == 0) { 5359 part = part->next; 5360 if (part == NULL) { 5361 return NULL; 5362 } 5363 } 5364 5365 i->part = part; 5366 i->field = nxt_list_data(i->part); 5367 5368 return i->field; 5369 } 5370 5371 5372 static nxt_http_field_t * 5373 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 5374 { 5375 return nxt_fields_part_first(nxt_list_part(fields), i); 5376 } 5377 5378 5379 static nxt_http_field_t * 5380 nxt_fields_next(nxt_fields_iter_t *i) 5381 { 5382 nxt_http_field_t *end = nxt_list_data(i->part); 5383 5384 end += i->part->nelts; 5385 i->field++; 5386 5387 if (i->field < end) { 5388 return i->field; 5389 } 5390 5391 return nxt_fields_part_first(i->part->next, i); 5392 } 5393 5394 5395 static nxt_buf_t * 5396 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, 5397 nxt_app_t *app, const nxt_str_t *prefix) 5398 { 5399 void *target_pos, *query_pos; 5400 u_char *pos, *end, *p, c; 5401 size_t fields_count, req_size, size, free_size; 5402 size_t copy_size; 5403 nxt_off_t content_length; 5404 nxt_buf_t *b, *buf, *out, **tail; 5405 nxt_http_field_t *field, *dup; 5406 nxt_unit_field_t *dst_field; 5407 nxt_fields_iter_t iter, dup_iter; 5408 nxt_unit_request_t *req; 5409 5410 req_size = sizeof(nxt_unit_request_t) 5411 + r->method->length + 1 5412 + r->version.length + 1 5413 + r->remote->length + 1 5414 + r->local->length + 1 5415 + r->server_name.length + 1 5416 + r->target.length + 1 5417 + (r->path->start != r->target.start ? r->path->length + 1 : 0); 5418 5419 content_length = r->content_length_n < 0 ? 0 : r->content_length_n; 5420 fields_count = 0; 5421 5422 nxt_list_each(field, r->fields) { 5423 fields_count++; 5424 5425 req_size += field->name_length + prefix->length + 1 5426 + field->value_length + 1; 5427 } nxt_list_loop; 5428 5429 req_size += fields_count * sizeof(nxt_unit_field_t); 5430 5431 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 5432 nxt_alert(task, "headers to big to fit in shared memory (%d)", 5433 (int) req_size); 5434 5435 return NULL; 5436 } 5437 5438 out = nxt_port_mmap_get_buf(task, &app->outgoing, 5439 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); 5440 if (nxt_slow_path(out == NULL)) { 5441 return NULL; 5442 } 5443 5444 req = (nxt_unit_request_t *) out->mem.free; 5445 out->mem.free += req_size; 5446 5447 req->app_target = r->app_target; 5448 5449 req->content_length = content_length; 5450 5451 p = (u_char *) (req->fields + fields_count); 5452 5453 nxt_debug(task, "fields_count=%d", (int) fields_count); 5454 5455 req->method_length = r->method->length; 5456 nxt_unit_sptr_set(&req->method, p); 5457 p = nxt_cpymem(p, r->method->start, r->method->length); 5458 *p++ = '\0'; 5459 5460 req->version_length = r->version.length; 5461 nxt_unit_sptr_set(&req->version, p); 5462 p = nxt_cpymem(p, r->version.start, r->version.length); 5463 *p++ = '\0'; 5464 5465 req->remote_length = r->remote->address_length; 5466 nxt_unit_sptr_set(&req->remote, p); 5467 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote), 5468 r->remote->address_length); 5469 *p++ = '\0'; 5470 5471 req->local_length = r->local->address_length; 5472 nxt_unit_sptr_set(&req->local, p); 5473 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length); 5474 *p++ = '\0'; 5475 5476 req->tls = (r->tls != NULL); 5477 req->websocket_handshake = r->websocket_handshake; 5478 5479 req->server_name_length = r->server_name.length; 5480 nxt_unit_sptr_set(&req->server_name, p); 5481 p = nxt_cpymem(p, r->server_name.start, r->server_name.length); 5482 *p++ = '\0'; 5483 5484 target_pos = p; 5485 req->target_length = (uint32_t) r->target.length; 5486 nxt_unit_sptr_set(&req->target, p); 5487 p = nxt_cpymem(p, r->target.start, r->target.length); 5488 *p++ = '\0'; 5489 5490 req->path_length = (uint32_t) r->path->length; 5491 if (r->path->start == r->target.start) { 5492 nxt_unit_sptr_set(&req->path, target_pos); 5493 5494 } else { 5495 nxt_unit_sptr_set(&req->path, p); 5496 p = nxt_cpymem(p, r->path->start, r->path->length); 5497 *p++ = '\0'; 5498 } 5499 5500 req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0; 5501 if (r->args != NULL && r->args->start != NULL) { 5502 query_pos = nxt_pointer_to(target_pos, 5503 r->args->start - r->target.start); 5504 5505 nxt_unit_sptr_set(&req->query, query_pos); 5506 5507 } else { 5508 req->query.offset = 0; 5509 } 5510 5511 req->content_length_field = NXT_UNIT_NONE_FIELD; 5512 req->content_type_field = NXT_UNIT_NONE_FIELD; 5513 req->cookie_field = NXT_UNIT_NONE_FIELD; 5514 req->authorization_field = NXT_UNIT_NONE_FIELD; 5515 5516 dst_field = req->fields; 5517 5518 for (field = nxt_fields_first(r->fields, &iter); 5519 field != NULL; 5520 field = nxt_fields_next(&iter)) 5521 { 5522 if (field->skip) { 5523 continue; 5524 } 5525 5526 dst_field->hash = field->hash; 5527 dst_field->skip = 0; 5528 dst_field->name_length = field->name_length + prefix->length; 5529 dst_field->value_length = field->value_length; 5530 5531 if (field == r->content_length) { 5532 req->content_length_field = dst_field - req->fields; 5533 5534 } else if (field == r->content_type) { 5535 req->content_type_field = dst_field - req->fields; 5536 5537 } else if (field == r->cookie) { 5538 req->cookie_field = dst_field - req->fields; 5539 5540 } else if (field == r->authorization) { 5541 req->authorization_field = dst_field - req->fields; 5542 } 5543 5544 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 5545 (int) field->hash, (int) field->skip, 5546 (int) field->name_length, field->name, 5547 (int) field->value_length, field->value); 5548 5549 if (prefix->length != 0) { 5550 nxt_unit_sptr_set(&dst_field->name, p); 5551 p = nxt_cpymem(p, prefix->start, prefix->length); 5552 5553 end = field->name + field->name_length; 5554 for (pos = field->name; pos < end; pos++) { 5555 c = *pos; 5556 5557 if (c >= 'a' && c <= 'z') { 5558 *p++ = (c & ~0x20); 5559 continue; 5560 } 5561 5562 if (c == '-') { 5563 *p++ = '_'; 5564 continue; 5565 } 5566 5567 *p++ = c; 5568 } 5569 5570 } else { 5571 nxt_unit_sptr_set(&dst_field->name, p); 5572 p = nxt_cpymem(p, field->name, field->name_length); 5573 } 5574 5575 *p++ = '\0'; 5576 5577 nxt_unit_sptr_set(&dst_field->value, p); 5578 p = nxt_cpymem(p, field->value, field->value_length); 5579 5580 if (prefix->length != 0) { 5581 dup_iter = iter; 5582 5583 for (dup = nxt_fields_next(&dup_iter); 5584 dup != NULL; 5585 dup = nxt_fields_next(&dup_iter)) 5586 { 5587 if (dup->name_length != field->name_length 5588 || dup->skip 5589 || dup->hash != field->hash 5590 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 5591 { 5592 continue; 5593 } 5594 5595 p = nxt_cpymem(p, ", ", 2); 5596 p = nxt_cpymem(p, dup->value, dup->value_length); 5597 5598 dst_field->value_length += 2 + dup->value_length; 5599 5600 dup->skip = 1; 5601 } 5602 } 5603 5604 *p++ = '\0'; 5605 5606 dst_field++; 5607 } 5608 5609 req->fields_count = (uint32_t) (dst_field - req->fields); 5610 5611 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 5612 5613 buf = out; 5614 tail = &buf->next; 5615 5616 for (b = r->body; b != NULL; b = b->next) { 5617 size = nxt_buf_mem_used_size(&b->mem); 5618 pos = b->mem.pos; 5619 5620 while (size > 0) { 5621 if (buf == NULL) { 5622 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 5623 5624 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5625 if (nxt_slow_path(buf == NULL)) { 5626 while (out != NULL) { 5627 buf = out->next; 5628 out->next = NULL; 5629 out->completion_handler(task, out, out->parent); 5630 out = buf; 5631 } 5632 return NULL; 5633 } 5634 5635 *tail = buf; 5636 tail = &buf->next; 5637 5638 } else { 5639 free_size = nxt_buf_mem_free_size(&buf->mem); 5640 if (free_size < size 5641 && nxt_port_mmap_increase_buf(task, buf, size, 1) 5642 == NXT_OK) 5643 { 5644 free_size = nxt_buf_mem_free_size(&buf->mem); 5645 } 5646 } 5647 5648 if (free_size > 0) { 5649 copy_size = nxt_min(free_size, size); 5650 5651 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 5652 5653 size -= copy_size; 5654 pos += copy_size; 5655 5656 if (size == 0) { 5657 break; 5658 } 5659 } 5660 5661 buf = NULL; 5662 } 5663 } 5664 5665 return out; 5666 } 5667 5668 5669 static void 5670 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 5671 { 5672 nxt_timer_t *timer; 5673 nxt_http_request_t *r; 5674 nxt_request_rpc_data_t *req_rpc_data; 5675 5676 timer = obj; 5677 5678 nxt_debug(task, "router app timeout"); 5679 5680 r = nxt_timer_data(timer, nxt_http_request_t, timer); 5681 req_rpc_data = r->timer_data; 5682 5683 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5684 5685 nxt_request_rpc_data_unlink(task, req_rpc_data); 5686 } 5687 5688 5689 static void 5690 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5691 { 5692 r->timer.handler = nxt_router_http_request_release; 5693 nxt_timer_add(task->thread->engine, &r->timer, 0); 5694 } 5695 5696 5697 static void 5698 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) 5699 { 5700 nxt_http_request_t *r; 5701 5702 nxt_debug(task, "http request pool release"); 5703 5704 r = nxt_timer_data(obj, nxt_http_request_t, timer); 5705 5706 nxt_mp_release(r->mem_pool); 5707 } 5708 5709 5710 static void 5711 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5712 { 5713 size_t mi; 5714 uint32_t i; 5715 nxt_bool_t ack; 5716 nxt_process_t *process; 5717 nxt_free_map_t *m; 5718 nxt_port_mmap_handler_t *mmap_handler; 5719 5720 nxt_debug(task, "oosm in %PI", msg->port_msg.pid); 5721 5722 process = nxt_runtime_process_find(task->thread->runtime, 5723 msg->port_msg.pid); 5724 if (nxt_slow_path(process == NULL)) { 5725 return; 5726 } 5727 5728 ack = 0; 5729 5730 /* 5731 * To mitigate possible racing condition (when OOSM message received 5732 * after some of the memory was already freed), need to try to find 5733 * first free segment in shared memory and send ACK if found. 5734 */ 5735 5736 nxt_thread_mutex_lock(&process->incoming.mutex); 5737 5738 for (i = 0; i < process->incoming.size; i++) { 5739 mmap_handler = process->incoming.elts[i].mmap_handler; 5740 5741 if (nxt_slow_path(mmap_handler == NULL)) { 5742 continue; 5743 } 5744 5745 m = mmap_handler->hdr->free_map; 5746 5747 for (mi = 0; mi < MAX_FREE_IDX; mi++) { 5748 if (m[mi] != 0) { 5749 ack = 1; 5750 5751 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", 5752 i, mi, m[mi]); 5753 5754 break; 5755 } 5756 } 5757 } 5758 5759 nxt_thread_mutex_unlock(&process->incoming.mutex); 5760 5761 if (ack) { 5762 nxt_process_broadcast_shm_ack(task, process); 5763 } 5764 } 5765 5766 5767 static void 5768 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5769 { 5770 nxt_fd_t fd; 5771 nxt_port_t *port; 5772 nxt_runtime_t *rt; 5773 nxt_port_mmaps_t *mmaps; 5774 nxt_port_msg_get_mmap_t *get_mmap_msg; 5775 nxt_port_mmap_handler_t *mmap_handler; 5776 5777 rt = task->thread->runtime; 5778 5779 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5780 msg->port_msg.reply_port); 5781 if (nxt_slow_path(port == NULL)) { 5782 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", 5783 msg->port_msg.pid, msg->port_msg.reply_port); 5784 5785 return; 5786 } 5787 5788 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5789 < (int) sizeof(nxt_port_msg_get_mmap_t))) 5790 { 5791 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", 5792 (int) nxt_buf_used_size(msg->buf)); 5793 5794 return; 5795 } 5796 5797 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; 5798 5799 nxt_assert(port->type == NXT_PROCESS_APP); 5800 5801 if (nxt_slow_path(port->app == NULL)) { 5802 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", 5803 port->pid, port->id); 5804 5805 // FIXME 5806 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5807 -1, msg->port_msg.stream, 0, NULL); 5808 5809 return; 5810 } 5811 5812 mmaps = &port->app->outgoing; 5813 nxt_thread_mutex_lock(&mmaps->mutex); 5814 5815 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { 5816 nxt_thread_mutex_unlock(&mmaps->mutex); 5817 5818 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", 5819 (int) get_mmap_msg->id); 5820 5821 // FIXME 5822 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5823 -1, msg->port_msg.stream, 0, NULL); 5824 return; 5825 } 5826 5827 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; 5828 5829 fd = mmap_handler->fd; 5830 5831 nxt_thread_mutex_unlock(&mmaps->mutex); 5832 5833 nxt_debug(task, "get mmap %PI:%d found", 5834 msg->port_msg.pid, (int) get_mmap_msg->id); 5835 5836 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 5837 } 5838 5839 5840 static void 5841 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5842 { 5843 nxt_port_t *port, *reply_port; 5844 nxt_runtime_t *rt; 5845 nxt_port_msg_get_port_t *get_port_msg; 5846 5847 rt = task->thread->runtime; 5848 5849 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5850 msg->port_msg.reply_port); 5851 if (nxt_slow_path(reply_port == NULL)) { 5852 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", 5853 msg->port_msg.pid, msg->port_msg.reply_port); 5854 5855 return; 5856 } 5857 5858 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5859 < (int) sizeof(nxt_port_msg_get_port_t))) 5860 { 5861 nxt_alert(task, "get_port_handler: message buffer too small (%d)", 5862 (int) nxt_buf_used_size(msg->buf)); 5863 5864 return; 5865 } 5866 5867 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; 5868 5869 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); 5870 if (nxt_slow_path(port == NULL)) { 5871 nxt_alert(task, "get_port_handler: port %PI:%d not found", 5872 get_port_msg->pid, get_port_msg->id); 5873 5874 return; 5875 } 5876 5877 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, 5878 get_port_msg->id); 5879 5880 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); 5881 } 5882