1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 #include <nxt_app_queue.h> 19 #include <nxt_port_queue.h> 20 21 #define NXT_SHARED_PORT_ID 0xFFFFu 22 23 typedef struct { 24 nxt_str_t type; 25 uint32_t processes; 26 uint32_t max_processes; 27 uint32_t spare_processes; 28 nxt_msec_t timeout; 29 nxt_msec_t idle_timeout; 30 nxt_conf_value_t *limits_value; 31 nxt_conf_value_t *processes_value; 32 nxt_conf_value_t *targets_value; 33 } nxt_router_app_conf_t; 34 35 36 typedef struct { 37 nxt_str_t pass; 38 nxt_str_t application; 39 } nxt_router_listener_conf_t; 40 41 42 #if (NXT_TLS) 43 44 typedef struct { 45 nxt_str_t name; 46 nxt_socket_conf_t *socket_conf; 47 nxt_router_temp_conf_t *temp_conf; 48 nxt_tls_init_t *tls_init; 49 nxt_bool_t last; 50 51 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 52 } nxt_router_tlssock_t; 53 54 #endif 55 56 57 typedef struct { 58 nxt_str_t *name; 59 nxt_socket_conf_t *socket_conf; 60 nxt_router_temp_conf_t *temp_conf; 61 nxt_bool_t last; 62 } nxt_socket_rpc_t; 63 64 65 typedef struct { 66 nxt_app_t *app; 67 nxt_router_temp_conf_t *temp_conf; 68 uint8_t proto; /* 1 bit */ 69 } nxt_app_rpc_t; 70 71 72 typedef struct { 73 nxt_app_joint_t *app_joint; 74 uint32_t generation; 75 uint8_t proto; /* 1 bit */ 76 } nxt_app_joint_rpc_t; 77 78 79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 80 nxt_mp_t *mp); 81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 82 static void nxt_router_greet_controller(nxt_task_t *task, 83 nxt_port_t *controller_port); 84 85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 86 87 static void nxt_router_new_port_handler(nxt_task_t *task, 88 nxt_port_recv_msg_t *msg); 89 static void nxt_router_conf_data_handler(nxt_task_t *task, 90 nxt_port_recv_msg_t *msg); 91 static void nxt_router_app_restart_handler(nxt_task_t *task, 92 nxt_port_recv_msg_t *msg); 93 static void nxt_router_remove_pid_handler(nxt_task_t *task, 94 nxt_port_recv_msg_t *msg); 95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task, 96 nxt_port_recv_msg_t *msg); 97 98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 100 static void nxt_router_conf_ready(nxt_task_t *task, 101 nxt_router_temp_conf_t *tmcf); 102 static void nxt_router_conf_error(nxt_task_t *task, 103 nxt_router_temp_conf_t *tmcf); 104 static void nxt_router_conf_send(nxt_task_t *task, 105 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 106 107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 108 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 110 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task, 112 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf, 113 nxt_conf_value_t *conf); 114 115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 118 nxt_app_t *app); 119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 120 nxt_str_t *name); 121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 122 int i); 123 124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 125 nxt_port_t *port); 126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 127 nxt_port_t *port); 128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 129 nxt_port_t *port, nxt_fd_t fd); 130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 131 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 132 static void nxt_router_listen_socket_ready(nxt_task_t *task, 133 nxt_port_recv_msg_t *msg, void *data); 134 static void nxt_router_listen_socket_error(nxt_task_t *task, 135 nxt_port_recv_msg_t *msg, void *data); 136 #if (NXT_TLS) 137 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 138 nxt_port_recv_msg_t *msg, void *data); 139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 140 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init, 141 nxt_bool_t last); 142 #endif 143 static void nxt_router_app_rpc_create(nxt_task_t *task, 144 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 145 static void nxt_router_app_prefork_ready(nxt_task_t *task, 146 nxt_port_recv_msg_t *msg, void *data); 147 static void nxt_router_app_prefork_error(nxt_task_t *task, 148 nxt_port_recv_msg_t *msg, void *data); 149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 150 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 152 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 153 154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 155 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 156 const nxt_event_interface_t *interface); 157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 158 nxt_router_engine_conf_t *recf); 159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 160 nxt_router_engine_conf_t *recf); 161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 162 nxt_router_engine_conf_t *recf); 163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 164 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 165 nxt_work_handler_t handler); 166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 167 nxt_router_engine_conf_t *recf); 168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 169 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 170 171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 172 nxt_router_temp_conf_t *tmcf); 173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 174 nxt_event_engine_t *engine); 175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 176 nxt_router_temp_conf_t *tmcf); 177 178 static void nxt_router_engines_post(nxt_router_t *router, 179 nxt_router_temp_conf_t *tmcf); 180 static void nxt_router_engine_post(nxt_event_engine_t *engine, 181 nxt_work_t *jobs); 182 183 static void nxt_router_thread_start(void *data); 184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 185 void *data); 186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 187 void *data); 188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 189 void *data); 190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 191 void *data); 192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 193 void *data); 194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 195 void *data); 196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 197 void *data); 198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 199 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 200 static void nxt_router_listen_socket_release(nxt_task_t *task, 201 nxt_socket_conf_t *skcf); 202 203 static void nxt_router_access_log_writer(nxt_task_t *task, 204 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 206 struct tm *tm, size_t size, const char *format); 207 static void nxt_router_access_log_open(nxt_task_t *task, 208 nxt_router_temp_conf_t *tmcf); 209 static void nxt_router_access_log_ready(nxt_task_t *task, 210 nxt_port_recv_msg_t *msg, void *data); 211 static void nxt_router_access_log_error(nxt_task_t *task, 212 nxt_port_recv_msg_t *msg, void *data); 213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock, 214 nxt_router_access_log_t *access_log); 215 static void nxt_router_access_log_release(nxt_task_t *task, 216 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 218 void *data); 219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 220 nxt_port_recv_msg_t *msg, void *data); 221 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 222 nxt_port_recv_msg_t *msg, void *data); 223 224 static void nxt_router_app_port_ready(nxt_task_t *task, 225 nxt_port_recv_msg_t *msg, void *data); 226 static void nxt_router_app_port_error(nxt_task_t *task, 227 nxt_port_recv_msg_t *msg, void *data); 228 229 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 230 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 231 232 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, 233 nxt_port_t *port, nxt_apr_action_t action); 234 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 235 nxt_request_rpc_data_t *req_rpc_data); 236 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 237 void *data); 238 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 239 void *data); 240 241 static void nxt_router_app_prepare_request(nxt_task_t *task, 242 nxt_request_rpc_data_t *req_rpc_data); 243 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 244 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 245 246 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 247 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 248 void *data); 249 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 250 void *data); 251 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 252 void *data); 253 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 254 255 static const nxt_http_request_state_t nxt_http_request_send_state; 256 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 257 258 static void nxt_router_app_joint_use(nxt_task_t *task, 259 nxt_app_joint_t *app_joint, int i); 260 261 static void nxt_router_http_request_release_post(nxt_task_t *task, 262 nxt_http_request_t *r); 263 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 264 void *data); 265 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 266 static void nxt_router_get_port_handler(nxt_task_t *task, 267 nxt_port_recv_msg_t *msg); 268 static void nxt_router_get_mmap_handler(nxt_task_t *task, 269 nxt_port_recv_msg_t *msg); 270 271 extern const nxt_http_request_state_t nxt_http_websocket; 272 273 static nxt_router_t *nxt_router; 274 275 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 276 static const nxt_str_t empty_prefix = nxt_string(""); 277 278 static const nxt_str_t *nxt_app_msg_prefix[] = { 279 &empty_prefix, 280 &empty_prefix, 281 &http_prefix, 282 &http_prefix, 283 &http_prefix, 284 &empty_prefix, 285 }; 286 287 288 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 289 .quit = nxt_signal_quit_handler, 290 .new_port = nxt_router_new_port_handler, 291 .get_port = nxt_router_get_port_handler, 292 .change_file = nxt_port_change_log_file_handler, 293 .mmap = nxt_port_mmap_handler, 294 .get_mmap = nxt_router_get_mmap_handler, 295 .data = nxt_router_conf_data_handler, 296 .app_restart = nxt_router_app_restart_handler, 297 .remove_pid = nxt_router_remove_pid_handler, 298 .access_log = nxt_router_access_log_reopen_handler, 299 .rpc_ready = nxt_port_rpc_handler, 300 .rpc_error = nxt_port_rpc_handler, 301 .oosm = nxt_router_oosm_handler, 302 }; 303 304 305 const nxt_process_init_t nxt_router_process = { 306 .name = "router", 307 .type = NXT_PROCESS_ROUTER, 308 .prefork = nxt_router_prefork, 309 .restart = 1, 310 .setup = nxt_process_core_setup, 311 .start = nxt_router_start, 312 .port_handlers = &nxt_router_process_port_handlers, 313 .signals = nxt_process_signals, 314 }; 315 316 317 /* Queues of nxt_socket_conf_t */ 318 nxt_queue_t creating_sockets; 319 nxt_queue_t pending_sockets; 320 nxt_queue_t updating_sockets; 321 nxt_queue_t keeping_sockets; 322 nxt_queue_t deleting_sockets; 323 324 325 static nxt_int_t 326 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 327 { 328 nxt_runtime_stop_app_processes(task, task->thread->runtime); 329 330 return NXT_OK; 331 } 332 333 334 static nxt_int_t 335 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 336 { 337 nxt_int_t ret; 338 nxt_port_t *controller_port; 339 nxt_router_t *router; 340 nxt_runtime_t *rt; 341 342 rt = task->thread->runtime; 343 344 nxt_log(task, NXT_LOG_INFO, "router started"); 345 346 #if (NXT_TLS) 347 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 348 if (nxt_slow_path(rt->tls == NULL)) { 349 return NXT_ERROR; 350 } 351 352 ret = rt->tls->library_init(task); 353 if (nxt_slow_path(ret != NXT_OK)) { 354 return ret; 355 } 356 #endif 357 358 ret = nxt_http_init(task); 359 if (nxt_slow_path(ret != NXT_OK)) { 360 return ret; 361 } 362 363 router = nxt_zalloc(sizeof(nxt_router_t)); 364 if (nxt_slow_path(router == NULL)) { 365 return NXT_ERROR; 366 } 367 368 nxt_queue_init(&router->engines); 369 nxt_queue_init(&router->sockets); 370 nxt_queue_init(&router->apps); 371 372 nxt_router = router; 373 374 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 375 if (controller_port != NULL) { 376 nxt_router_greet_controller(task, controller_port); 377 } 378 379 return NXT_OK; 380 } 381 382 383 static void 384 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 385 { 386 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 387 -1, 0, 0, NULL); 388 } 389 390 391 static void 392 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 393 void *data) 394 { 395 size_t size; 396 uint32_t stream; 397 nxt_fd_t port_fd, queue_fd; 398 nxt_int_t ret; 399 nxt_app_t *app; 400 nxt_buf_t *b; 401 nxt_port_t *dport; 402 nxt_runtime_t *rt; 403 nxt_app_joint_rpc_t *app_joint_rpc; 404 405 app = data; 406 407 nxt_thread_mutex_lock(&app->mutex); 408 409 dport = app->proto_port; 410 411 nxt_thread_mutex_unlock(&app->mutex); 412 413 if (dport != NULL) { 414 nxt_debug(task, "app '%V' %p start process", &app->name, app); 415 416 b = NULL; 417 port_fd = -1; 418 queue_fd = -1; 419 420 } else { 421 if (app->proto_port_requests > 0) { 422 nxt_debug(task, "app '%V' %p wait for prototype process", 423 &app->name, app); 424 425 app->proto_port_requests++; 426 427 goto skip; 428 } 429 430 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app); 431 432 rt = task->thread->runtime; 433 dport = rt->port_by_type[NXT_PROCESS_MAIN]; 434 435 size = app->name.length + 1 + app->conf.length; 436 437 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0); 438 if (nxt_slow_path(b == NULL)) { 439 goto failed; 440 } 441 442 nxt_buf_cpystr(b, &app->name); 443 *b->mem.free++ = '\0'; 444 nxt_buf_cpystr(b, &app->conf); 445 446 port_fd = app->shared_port->pair[0]; 447 queue_fd = app->shared_port->queue_fd; 448 } 449 450 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, 451 nxt_router_app_port_ready, 452 nxt_router_app_port_error, 453 sizeof(nxt_app_joint_rpc_t)); 454 if (nxt_slow_path(app_joint_rpc == NULL)) { 455 goto failed; 456 } 457 458 stream = nxt_port_rpc_ex_stream(app_joint_rpc); 459 460 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, 461 port_fd, queue_fd, stream, port->id, b); 462 if (nxt_slow_path(ret != NXT_OK)) { 463 nxt_port_rpc_cancel(task, port, stream); 464 465 goto failed; 466 } 467 468 app_joint_rpc->app_joint = app->joint; 469 app_joint_rpc->generation = app->generation; 470 app_joint_rpc->proto = (b != NULL); 471 472 if (b != NULL) { 473 app->proto_port_requests++; 474 475 b = NULL; 476 } 477 478 nxt_router_app_joint_use(task, app->joint, 1); 479 480 failed: 481 482 if (b != NULL) { 483 nxt_mp_free(b->data, b); 484 } 485 486 skip: 487 488 nxt_router_app_use(task, app, -1); 489 } 490 491 492 static void 493 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 494 { 495 app_joint->use_count += i; 496 497 if (app_joint->use_count == 0) { 498 nxt_assert(app_joint->app == NULL); 499 500 nxt_free(app_joint); 501 } 502 } 503 504 505 static nxt_int_t 506 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 507 { 508 nxt_int_t res; 509 nxt_port_t *router_port; 510 nxt_runtime_t *rt; 511 512 nxt_debug(task, "app '%V' start process", &app->name); 513 514 rt = task->thread->runtime; 515 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 516 517 nxt_router_app_use(task, app, 1); 518 519 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 520 app); 521 522 if (res == NXT_OK) { 523 return res; 524 } 525 526 nxt_thread_mutex_lock(&app->mutex); 527 528 app->pending_processes--; 529 530 nxt_thread_mutex_unlock(&app->mutex); 531 532 nxt_router_app_use(task, app, -1); 533 534 return NXT_ERROR; 535 } 536 537 538 nxt_inline nxt_bool_t 539 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 540 { 541 nxt_buf_t *b, *next; 542 nxt_bool_t cancelled; 543 nxt_port_t *app_port; 544 nxt_msg_info_t *msg_info; 545 546 msg_info = &req_rpc_data->msg_info; 547 548 if (msg_info->buf == NULL) { 549 return 0; 550 } 551 552 app_port = req_rpc_data->app_port; 553 554 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) { 555 cancelled = nxt_app_queue_cancel(app_port->queue, 556 msg_info->tracking_cookie, 557 req_rpc_data->stream); 558 559 if (cancelled) { 560 nxt_debug(task, "stream #%uD: cancelled by router", 561 req_rpc_data->stream); 562 } 563 564 } else { 565 cancelled = 0; 566 } 567 568 for (b = msg_info->buf; b != NULL; b = next) { 569 next = b->next; 570 b->next = NULL; 571 572 if (b->is_port_mmap_sent) { 573 b->is_port_mmap_sent = cancelled == 0; 574 } 575 576 b->completion_handler(task, b, b->parent); 577 } 578 579 msg_info->buf = NULL; 580 581 return cancelled; 582 } 583 584 585 nxt_inline nxt_bool_t 586 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 587 { 588 if (lnk->next != NULL) { 589 nxt_queue_remove(lnk); 590 591 lnk->next = NULL; 592 593 return 1; 594 } 595 596 return 0; 597 } 598 599 600 nxt_inline void 601 nxt_request_rpc_data_unlink(nxt_task_t *task, 602 nxt_request_rpc_data_t *req_rpc_data) 603 { 604 nxt_app_t *app; 605 nxt_bool_t unlinked; 606 nxt_http_request_t *r; 607 608 nxt_router_msg_cancel(task, req_rpc_data); 609 610 app = req_rpc_data->app; 611 612 if (req_rpc_data->app_port != NULL) { 613 nxt_router_app_port_release(task, app, req_rpc_data->app_port, 614 req_rpc_data->apr_action); 615 616 req_rpc_data->app_port = NULL; 617 } 618 619 r = req_rpc_data->request; 620 621 if (r != NULL) { 622 r->timer_data = NULL; 623 624 nxt_router_http_request_release_post(task, r); 625 626 r->req_rpc_data = NULL; 627 req_rpc_data->request = NULL; 628 629 if (app != NULL) { 630 unlinked = 0; 631 632 nxt_thread_mutex_lock(&app->mutex); 633 634 if (r->app_link.next != NULL) { 635 nxt_queue_remove(&r->app_link); 636 r->app_link.next = NULL; 637 638 unlinked = 1; 639 } 640 641 nxt_thread_mutex_unlock(&app->mutex); 642 643 if (unlinked) { 644 nxt_mp_release(r->mem_pool); 645 } 646 } 647 } 648 649 if (app != NULL) { 650 nxt_router_app_use(task, app, -1); 651 652 req_rpc_data->app = NULL; 653 } 654 655 if (req_rpc_data->msg_info.body_fd != -1) { 656 nxt_fd_close(req_rpc_data->msg_info.body_fd); 657 658 req_rpc_data->msg_info.body_fd = -1; 659 } 660 661 if (req_rpc_data->rpc_cancel) { 662 req_rpc_data->rpc_cancel = 0; 663 664 nxt_port_rpc_cancel(task, task->thread->engine->port, 665 req_rpc_data->stream); 666 } 667 } 668 669 670 static void 671 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 672 { 673 nxt_int_t res; 674 nxt_app_t *app; 675 nxt_port_t *port, *main_app_port; 676 nxt_runtime_t *rt; 677 678 nxt_port_new_port_handler(task, msg); 679 680 port = msg->u.new_port; 681 682 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 683 nxt_router_greet_controller(task, msg->u.new_port); 684 } 685 686 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) { 687 nxt_port_rpc_handler(task, msg); 688 689 return; 690 } 691 692 if (port == NULL || port->type != NXT_PROCESS_APP) { 693 694 if (msg->port_msg.stream == 0) { 695 return; 696 } 697 698 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 699 700 } else { 701 if (msg->fd[1] != -1) { 702 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 703 if (nxt_slow_path(res != NXT_OK)) { 704 return; 705 } 706 707 nxt_fd_close(msg->fd[1]); 708 msg->fd[1] = -1; 709 } 710 } 711 712 if (msg->port_msg.stream != 0) { 713 nxt_port_rpc_handler(task, msg); 714 return; 715 } 716 717 nxt_debug(task, "new port id %d (%d)", port->id, port->type); 718 719 /* 720 * Port with "id == 0" is application 'main' port and it always 721 * should come with non-zero stream. 722 */ 723 nxt_assert(port->id != 0); 724 725 /* Find 'main' app port and get app reference. */ 726 rt = task->thread->runtime; 727 728 /* 729 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 730 * sent to main port (with id == 0) and processed in main thread. 731 */ 732 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 733 nxt_assert(main_app_port != NULL); 734 735 app = main_app_port->app; 736 737 if (nxt_fast_path(app != NULL)) { 738 nxt_thread_mutex_lock(&app->mutex); 739 740 /* TODO here should be find-and-add code because there can be 741 port waiters in port_hash */ 742 nxt_port_hash_add(&app->port_hash, port); 743 app->port_hash_count++; 744 745 nxt_thread_mutex_unlock(&app->mutex); 746 747 port->app = app; 748 } 749 750 port->main_app_port = main_app_port; 751 752 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 753 } 754 755 756 static void 757 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 758 { 759 void *p; 760 size_t size; 761 nxt_int_t ret; 762 nxt_port_t *port; 763 nxt_router_temp_conf_t *tmcf; 764 765 port = nxt_runtime_port_find(task->thread->runtime, 766 msg->port_msg.pid, 767 msg->port_msg.reply_port); 768 if (nxt_slow_path(port == NULL)) { 769 nxt_alert(task, "conf_data_handler: reply port not found"); 770 return; 771 } 772 773 p = MAP_FAILED; 774 775 /* 776 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 777 * initialized in 'cleanup' section. 778 */ 779 size = 0; 780 781 tmcf = nxt_router_temp_conf(task); 782 if (nxt_slow_path(tmcf == NULL)) { 783 goto fail; 784 } 785 786 if (nxt_slow_path(msg->fd[0] == -1)) { 787 nxt_alert(task, "conf_data_handler: invalid shm fd"); 788 goto fail; 789 } 790 791 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 792 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 793 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 794 goto fail; 795 } 796 797 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 798 799 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 800 801 nxt_fd_close(msg->fd[0]); 802 msg->fd[0] = -1; 803 804 if (nxt_slow_path(p == MAP_FAILED)) { 805 goto fail; 806 } 807 808 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 809 810 tmcf->router_conf->router = nxt_router; 811 tmcf->stream = msg->port_msg.stream; 812 tmcf->port = port; 813 814 nxt_port_use(task, tmcf->port, 1); 815 816 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 817 818 if (nxt_fast_path(ret == NXT_OK)) { 819 nxt_router_conf_apply(task, tmcf, NULL); 820 821 } else { 822 nxt_router_conf_error(task, tmcf); 823 } 824 825 goto cleanup; 826 827 fail: 828 829 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 830 msg->port_msg.stream, 0, NULL); 831 832 if (tmcf != NULL) { 833 nxt_mp_release(tmcf->mem_pool); 834 } 835 836 cleanup: 837 838 if (p != MAP_FAILED) { 839 nxt_mem_munmap(p, size); 840 } 841 842 if (msg->fd[0] != -1) { 843 nxt_fd_close(msg->fd[0]); 844 msg->fd[0] = -1; 845 } 846 } 847 848 849 static void 850 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 851 { 852 nxt_app_t *app; 853 nxt_int_t ret; 854 nxt_str_t app_name; 855 nxt_port_t *reply_port, *shared_port, *old_shared_port; 856 nxt_port_t *proto_port; 857 nxt_port_msg_type_t reply; 858 859 reply_port = nxt_runtime_port_find(task->thread->runtime, 860 msg->port_msg.pid, 861 msg->port_msg.reply_port); 862 if (nxt_slow_path(reply_port == NULL)) { 863 nxt_alert(task, "app_restart_handler: reply port not found"); 864 return; 865 } 866 867 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem); 868 app_name.start = msg->buf->mem.pos; 869 870 nxt_debug(task, "app_restart_handler: %V", &app_name); 871 872 app = nxt_router_app_find(&nxt_router->apps, &app_name); 873 874 if (nxt_fast_path(app != NULL)) { 875 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 876 NXT_PROCESS_APP); 877 if (nxt_slow_path(shared_port == NULL)) { 878 goto fail; 879 } 880 881 ret = nxt_port_socket_init(task, shared_port, 0); 882 if (nxt_slow_path(ret != NXT_OK)) { 883 nxt_port_use(task, shared_port, -1); 884 goto fail; 885 } 886 887 ret = nxt_router_app_queue_init(task, shared_port); 888 if (nxt_slow_path(ret != NXT_OK)) { 889 nxt_port_write_close(shared_port); 890 nxt_port_read_close(shared_port); 891 nxt_port_use(task, shared_port, -1); 892 goto fail; 893 } 894 895 nxt_port_write_enable(task, shared_port); 896 897 nxt_thread_mutex_lock(&app->mutex); 898 899 proto_port = app->proto_port; 900 901 if (proto_port != NULL) { 902 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, 903 proto_port->pid); 904 905 app->proto_port = NULL; 906 proto_port->app = NULL; 907 } 908 909 app->generation++; 910 911 shared_port->app = app; 912 913 old_shared_port = app->shared_port; 914 old_shared_port->app = NULL; 915 916 app->shared_port = shared_port; 917 918 nxt_thread_mutex_unlock(&app->mutex); 919 920 nxt_port_close(task, old_shared_port); 921 nxt_port_use(task, old_shared_port, -1); 922 923 if (proto_port != NULL) { 924 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, 925 -1, 0, 0, NULL); 926 927 nxt_port_close(task, proto_port); 928 929 nxt_port_use(task, proto_port, -1); 930 } 931 932 reply = NXT_PORT_MSG_RPC_READY_LAST; 933 934 } else { 935 936 fail: 937 938 reply = NXT_PORT_MSG_RPC_ERROR; 939 } 940 941 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream, 942 0, NULL); 943 } 944 945 946 static void 947 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 948 void *data) 949 { 950 union { 951 nxt_pid_t removed_pid; 952 void *data; 953 } u; 954 955 u.data = data; 956 957 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 958 } 959 960 961 static void 962 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 963 { 964 nxt_event_engine_t *engine; 965 966 nxt_port_remove_pid_handler(task, msg); 967 968 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 969 { 970 if (nxt_fast_path(engine->port != NULL)) { 971 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 972 msg->u.data); 973 } 974 } 975 nxt_queue_loop; 976 977 if (msg->port_msg.stream == 0) { 978 return; 979 } 980 981 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 982 983 nxt_port_rpc_handler(task, msg); 984 } 985 986 987 static nxt_router_temp_conf_t * 988 nxt_router_temp_conf(nxt_task_t *task) 989 { 990 nxt_mp_t *mp, *tmp; 991 nxt_router_conf_t *rtcf; 992 nxt_router_temp_conf_t *tmcf; 993 994 mp = nxt_mp_create(1024, 128, 256, 32); 995 if (nxt_slow_path(mp == NULL)) { 996 return NULL; 997 } 998 999 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 1000 if (nxt_slow_path(rtcf == NULL)) { 1001 goto fail; 1002 } 1003 1004 rtcf->mem_pool = mp; 1005 1006 tmp = nxt_mp_create(1024, 128, 256, 32); 1007 if (nxt_slow_path(tmp == NULL)) { 1008 goto fail; 1009 } 1010 1011 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 1012 if (nxt_slow_path(tmcf == NULL)) { 1013 goto temp_fail; 1014 } 1015 1016 tmcf->mem_pool = tmp; 1017 tmcf->router_conf = rtcf; 1018 tmcf->count = 1; 1019 tmcf->engine = task->thread->engine; 1020 1021 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 1022 sizeof(nxt_router_engine_conf_t)); 1023 if (nxt_slow_path(tmcf->engines == NULL)) { 1024 goto temp_fail; 1025 } 1026 1027 nxt_queue_init(&creating_sockets); 1028 nxt_queue_init(&pending_sockets); 1029 nxt_queue_init(&updating_sockets); 1030 nxt_queue_init(&keeping_sockets); 1031 nxt_queue_init(&deleting_sockets); 1032 1033 #if (NXT_TLS) 1034 nxt_queue_init(&tmcf->tls); 1035 #endif 1036 1037 nxt_queue_init(&tmcf->apps); 1038 nxt_queue_init(&tmcf->previous); 1039 1040 return tmcf; 1041 1042 temp_fail: 1043 1044 nxt_mp_destroy(tmp); 1045 1046 fail: 1047 1048 nxt_mp_destroy(mp); 1049 1050 return NULL; 1051 } 1052 1053 1054 nxt_inline nxt_bool_t 1055 nxt_router_app_can_start(nxt_app_t *app) 1056 { 1057 return app->processes + app->pending_processes < app->max_processes 1058 && app->pending_processes < app->max_pending_processes; 1059 } 1060 1061 1062 nxt_inline nxt_bool_t 1063 nxt_router_app_need_start(nxt_app_t *app) 1064 { 1065 return (app->active_requests 1066 > app->port_hash_count + app->pending_processes) 1067 || (app->spare_processes 1068 > app->idle_processes + app->pending_processes); 1069 } 1070 1071 1072 static void 1073 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1074 { 1075 nxt_int_t ret; 1076 nxt_app_t *app; 1077 nxt_router_t *router; 1078 nxt_runtime_t *rt; 1079 nxt_queue_link_t *qlk; 1080 nxt_socket_conf_t *skcf; 1081 nxt_router_conf_t *rtcf; 1082 nxt_router_temp_conf_t *tmcf; 1083 const nxt_event_interface_t *interface; 1084 #if (NXT_TLS) 1085 nxt_router_tlssock_t *tls; 1086 #endif 1087 1088 tmcf = obj; 1089 1090 qlk = nxt_queue_first(&pending_sockets); 1091 1092 if (qlk != nxt_queue_tail(&pending_sockets)) { 1093 nxt_queue_remove(qlk); 1094 nxt_queue_insert_tail(&creating_sockets, qlk); 1095 1096 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1097 1098 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1099 1100 return; 1101 } 1102 1103 #if (NXT_TLS) 1104 qlk = nxt_queue_last(&tmcf->tls); 1105 1106 if (qlk != nxt_queue_head(&tmcf->tls)) { 1107 nxt_queue_remove(qlk); 1108 1109 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1110 1111 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 1112 nxt_router_tls_rpc_handler, tls); 1113 return; 1114 } 1115 #endif 1116 1117 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1118 1119 if (nxt_router_app_need_start(app)) { 1120 nxt_router_app_rpc_create(task, tmcf, app); 1121 return; 1122 } 1123 1124 } nxt_queue_loop; 1125 1126 rtcf = tmcf->router_conf; 1127 1128 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1129 nxt_router_access_log_open(task, tmcf); 1130 return; 1131 } 1132 1133 rt = task->thread->runtime; 1134 1135 interface = nxt_service_get(rt->services, "engine", NULL); 1136 1137 router = rtcf->router; 1138 1139 ret = nxt_router_engines_create(task, router, tmcf, interface); 1140 if (nxt_slow_path(ret != NXT_OK)) { 1141 goto fail; 1142 } 1143 1144 ret = nxt_router_threads_create(task, rt, tmcf); 1145 if (nxt_slow_path(ret != NXT_OK)) { 1146 goto fail; 1147 } 1148 1149 nxt_router_apps_sort(task, router, tmcf); 1150 1151 nxt_router_apps_hash_use(task, rtcf, 1); 1152 1153 nxt_router_engines_post(router, tmcf); 1154 1155 nxt_queue_add(&router->sockets, &updating_sockets); 1156 nxt_queue_add(&router->sockets, &creating_sockets); 1157 1158 if (router->access_log != rtcf->access_log) { 1159 nxt_router_access_log_use(&router->lock, rtcf->access_log); 1160 1161 nxt_router_access_log_release(task, &router->lock, router->access_log); 1162 1163 router->access_log = rtcf->access_log; 1164 } 1165 1166 nxt_router_conf_ready(task, tmcf); 1167 1168 return; 1169 1170 fail: 1171 1172 nxt_router_conf_error(task, tmcf); 1173 1174 return; 1175 } 1176 1177 1178 static void 1179 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1180 { 1181 nxt_joint_job_t *job; 1182 1183 job = obj; 1184 1185 nxt_router_conf_ready(task, job->tmcf); 1186 } 1187 1188 1189 static void 1190 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1191 { 1192 uint32_t count; 1193 nxt_router_conf_t *rtcf; 1194 nxt_thread_spinlock_t *lock; 1195 1196 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1197 1198 if (--tmcf->count > 0) { 1199 return; 1200 } 1201 1202 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1203 1204 rtcf = tmcf->router_conf; 1205 1206 lock = &rtcf->router->lock; 1207 1208 nxt_thread_spin_lock(lock); 1209 1210 count = rtcf->count; 1211 1212 nxt_thread_spin_unlock(lock); 1213 1214 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1215 1216 if (count == 0) { 1217 nxt_router_apps_hash_use(task, rtcf, -1); 1218 1219 nxt_router_access_log_release(task, lock, rtcf->access_log); 1220 1221 nxt_mp_destroy(rtcf->mem_pool); 1222 } 1223 1224 nxt_mp_release(tmcf->mem_pool); 1225 } 1226 1227 1228 static void 1229 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1230 { 1231 nxt_app_t *app; 1232 nxt_queue_t new_socket_confs; 1233 nxt_socket_t s; 1234 nxt_router_t *router; 1235 nxt_queue_link_t *qlk; 1236 nxt_socket_conf_t *skcf; 1237 nxt_router_conf_t *rtcf; 1238 1239 nxt_alert(task, "failed to apply new conf"); 1240 1241 for (qlk = nxt_queue_first(&creating_sockets); 1242 qlk != nxt_queue_tail(&creating_sockets); 1243 qlk = nxt_queue_next(qlk)) 1244 { 1245 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1246 s = skcf->listen->socket; 1247 1248 if (s != -1) { 1249 nxt_socket_close(task, s); 1250 } 1251 1252 nxt_free(skcf->listen); 1253 } 1254 1255 nxt_queue_init(&new_socket_confs); 1256 nxt_queue_add(&new_socket_confs, &updating_sockets); 1257 nxt_queue_add(&new_socket_confs, &pending_sockets); 1258 nxt_queue_add(&new_socket_confs, &creating_sockets); 1259 1260 rtcf = tmcf->router_conf; 1261 1262 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1263 1264 nxt_router_app_unlink(task, app); 1265 1266 } nxt_queue_loop; 1267 1268 router = rtcf->router; 1269 1270 nxt_queue_add(&router->sockets, &keeping_sockets); 1271 nxt_queue_add(&router->sockets, &deleting_sockets); 1272 1273 nxt_queue_add(&router->apps, &tmcf->previous); 1274 1275 // TODO: new engines and threads 1276 1277 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1278 1279 nxt_mp_destroy(rtcf->mem_pool); 1280 1281 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1282 1283 nxt_mp_release(tmcf->mem_pool); 1284 } 1285 1286 1287 static void 1288 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1289 nxt_port_msg_type_t type) 1290 { 1291 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1292 1293 nxt_port_use(task, tmcf->port, -1); 1294 1295 tmcf->port = NULL; 1296 } 1297 1298 1299 static nxt_conf_map_t nxt_router_conf[] = { 1300 { 1301 nxt_string("listeners_threads"), 1302 NXT_CONF_MAP_INT32, 1303 offsetof(nxt_router_conf_t, threads), 1304 }, 1305 }; 1306 1307 1308 static nxt_conf_map_t nxt_router_app_conf[] = { 1309 { 1310 nxt_string("type"), 1311 NXT_CONF_MAP_STR, 1312 offsetof(nxt_router_app_conf_t, type), 1313 }, 1314 1315 { 1316 nxt_string("limits"), 1317 NXT_CONF_MAP_PTR, 1318 offsetof(nxt_router_app_conf_t, limits_value), 1319 }, 1320 1321 { 1322 nxt_string("processes"), 1323 NXT_CONF_MAP_INT32, 1324 offsetof(nxt_router_app_conf_t, processes), 1325 }, 1326 1327 { 1328 nxt_string("processes"), 1329 NXT_CONF_MAP_PTR, 1330 offsetof(nxt_router_app_conf_t, processes_value), 1331 }, 1332 1333 { 1334 nxt_string("targets"), 1335 NXT_CONF_MAP_PTR, 1336 offsetof(nxt_router_app_conf_t, targets_value), 1337 }, 1338 }; 1339 1340 1341 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1342 { 1343 nxt_string("timeout"), 1344 NXT_CONF_MAP_MSEC, 1345 offsetof(nxt_router_app_conf_t, timeout), 1346 }, 1347 }; 1348 1349 1350 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1351 { 1352 nxt_string("spare"), 1353 NXT_CONF_MAP_INT32, 1354 offsetof(nxt_router_app_conf_t, spare_processes), 1355 }, 1356 1357 { 1358 nxt_string("max"), 1359 NXT_CONF_MAP_INT32, 1360 offsetof(nxt_router_app_conf_t, max_processes), 1361 }, 1362 1363 { 1364 nxt_string("idle_timeout"), 1365 NXT_CONF_MAP_MSEC, 1366 offsetof(nxt_router_app_conf_t, idle_timeout), 1367 }, 1368 }; 1369 1370 1371 static nxt_conf_map_t nxt_router_listener_conf[] = { 1372 { 1373 nxt_string("pass"), 1374 NXT_CONF_MAP_STR_COPY, 1375 offsetof(nxt_router_listener_conf_t, pass), 1376 }, 1377 1378 { 1379 nxt_string("application"), 1380 NXT_CONF_MAP_STR_COPY, 1381 offsetof(nxt_router_listener_conf_t, application), 1382 }, 1383 }; 1384 1385 1386 static nxt_conf_map_t nxt_router_http_conf[] = { 1387 { 1388 nxt_string("header_buffer_size"), 1389 NXT_CONF_MAP_SIZE, 1390 offsetof(nxt_socket_conf_t, header_buffer_size), 1391 }, 1392 1393 { 1394 nxt_string("large_header_buffer_size"), 1395 NXT_CONF_MAP_SIZE, 1396 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1397 }, 1398 1399 { 1400 nxt_string("large_header_buffers"), 1401 NXT_CONF_MAP_SIZE, 1402 offsetof(nxt_socket_conf_t, large_header_buffers), 1403 }, 1404 1405 { 1406 nxt_string("body_buffer_size"), 1407 NXT_CONF_MAP_SIZE, 1408 offsetof(nxt_socket_conf_t, body_buffer_size), 1409 }, 1410 1411 { 1412 nxt_string("max_body_size"), 1413 NXT_CONF_MAP_SIZE, 1414 offsetof(nxt_socket_conf_t, max_body_size), 1415 }, 1416 1417 { 1418 nxt_string("idle_timeout"), 1419 NXT_CONF_MAP_MSEC, 1420 offsetof(nxt_socket_conf_t, idle_timeout), 1421 }, 1422 1423 { 1424 nxt_string("header_read_timeout"), 1425 NXT_CONF_MAP_MSEC, 1426 offsetof(nxt_socket_conf_t, header_read_timeout), 1427 }, 1428 1429 { 1430 nxt_string("body_read_timeout"), 1431 NXT_CONF_MAP_MSEC, 1432 offsetof(nxt_socket_conf_t, body_read_timeout), 1433 }, 1434 1435 { 1436 nxt_string("send_timeout"), 1437 NXT_CONF_MAP_MSEC, 1438 offsetof(nxt_socket_conf_t, send_timeout), 1439 }, 1440 1441 { 1442 nxt_string("body_temp_path"), 1443 NXT_CONF_MAP_STR, 1444 offsetof(nxt_socket_conf_t, body_temp_path), 1445 }, 1446 1447 { 1448 nxt_string("discard_unsafe_fields"), 1449 NXT_CONF_MAP_INT8, 1450 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1451 }, 1452 }; 1453 1454 1455 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1456 { 1457 nxt_string("max_frame_size"), 1458 NXT_CONF_MAP_SIZE, 1459 offsetof(nxt_websocket_conf_t, max_frame_size), 1460 }, 1461 1462 { 1463 nxt_string("read_timeout"), 1464 NXT_CONF_MAP_MSEC, 1465 offsetof(nxt_websocket_conf_t, read_timeout), 1466 }, 1467 1468 { 1469 nxt_string("keepalive_interval"), 1470 NXT_CONF_MAP_MSEC, 1471 offsetof(nxt_websocket_conf_t, keepalive_interval), 1472 }, 1473 1474 }; 1475 1476 1477 static nxt_int_t 1478 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1479 u_char *start, u_char *end) 1480 { 1481 u_char *p; 1482 size_t size; 1483 nxt_mp_t *mp, *app_mp; 1484 uint32_t next, next_target; 1485 nxt_int_t ret; 1486 nxt_str_t name, path, target; 1487 nxt_app_t *app, *prev; 1488 nxt_str_t *t, *s, *targets; 1489 nxt_uint_t n, i; 1490 nxt_port_t *port; 1491 nxt_router_t *router; 1492 nxt_app_joint_t *app_joint; 1493 #if (NXT_TLS) 1494 nxt_tls_init_t *tls_init; 1495 nxt_conf_value_t *certificate; 1496 #endif 1497 nxt_conf_value_t *conf, *http, *value, *websocket; 1498 nxt_conf_value_t *applications, *application; 1499 nxt_conf_value_t *listeners, *listener; 1500 nxt_conf_value_t *routes_conf, *static_conf, *client_ip_conf; 1501 nxt_socket_conf_t *skcf; 1502 nxt_http_routes_t *routes; 1503 nxt_event_engine_t *engine; 1504 nxt_app_lang_module_t *lang; 1505 nxt_router_app_conf_t apcf; 1506 nxt_router_access_log_t *access_log; 1507 nxt_router_listener_conf_t lscf; 1508 1509 static nxt_str_t http_path = nxt_string("/settings/http"); 1510 static nxt_str_t applications_path = nxt_string("/applications"); 1511 static nxt_str_t listeners_path = nxt_string("/listeners"); 1512 static nxt_str_t routes_path = nxt_string("/routes"); 1513 static nxt_str_t access_log_path = nxt_string("/access_log"); 1514 #if (NXT_TLS) 1515 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1516 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands"); 1517 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size"); 1518 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout"); 1519 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets"); 1520 #endif 1521 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1522 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1523 static nxt_str_t client_ip_path = nxt_string("/client_ip"); 1524 1525 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1526 if (conf == NULL) { 1527 nxt_alert(task, "configuration parsing error"); 1528 return NXT_ERROR; 1529 } 1530 1531 mp = tmcf->router_conf->mem_pool; 1532 1533 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1534 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1535 if (ret != NXT_OK) { 1536 nxt_alert(task, "root map error"); 1537 return NXT_ERROR; 1538 } 1539 1540 if (tmcf->router_conf->threads == 0) { 1541 tmcf->router_conf->threads = nxt_ncpu; 1542 } 1543 1544 static_conf = nxt_conf_get_path(conf, &static_path); 1545 1546 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf); 1547 if (nxt_slow_path(ret != NXT_OK)) { 1548 return NXT_ERROR; 1549 } 1550 1551 router = tmcf->router_conf->router; 1552 1553 applications = nxt_conf_get_path(conf, &applications_path); 1554 1555 if (applications != NULL) { 1556 next = 0; 1557 1558 for ( ;; ) { 1559 application = nxt_conf_next_object_member(applications, 1560 &name, &next); 1561 if (application == NULL) { 1562 break; 1563 } 1564 1565 nxt_debug(task, "application \"%V\"", &name); 1566 1567 size = nxt_conf_json_length(application, NULL); 1568 1569 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1570 if (nxt_slow_path(app_mp == NULL)) { 1571 goto fail; 1572 } 1573 1574 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1575 if (app == NULL) { 1576 goto app_fail; 1577 } 1578 1579 nxt_memzero(app, sizeof(nxt_app_t)); 1580 1581 app->mem_pool = app_mp; 1582 1583 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1584 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1585 + name.length); 1586 1587 p = nxt_conf_json_print(app->conf.start, application, NULL); 1588 app->conf.length = p - app->conf.start; 1589 1590 nxt_assert(app->conf.length <= size); 1591 1592 nxt_debug(task, "application conf \"%V\"", &app->conf); 1593 1594 prev = nxt_router_app_find(&router->apps, &name); 1595 1596 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1597 nxt_mp_destroy(app_mp); 1598 1599 nxt_queue_remove(&prev->link); 1600 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1601 1602 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev); 1603 if (nxt_slow_path(ret != NXT_OK)) { 1604 goto fail; 1605 } 1606 1607 continue; 1608 } 1609 1610 apcf.processes = 1; 1611 apcf.max_processes = 1; 1612 apcf.spare_processes = 0; 1613 apcf.timeout = 0; 1614 apcf.idle_timeout = 15000; 1615 apcf.limits_value = NULL; 1616 apcf.processes_value = NULL; 1617 apcf.targets_value = NULL; 1618 1619 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1620 if (nxt_slow_path(app_joint == NULL)) { 1621 goto app_fail; 1622 } 1623 1624 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1625 1626 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1627 nxt_nitems(nxt_router_app_conf), &apcf); 1628 if (ret != NXT_OK) { 1629 nxt_alert(task, "application map error"); 1630 goto app_fail; 1631 } 1632 1633 if (apcf.limits_value != NULL) { 1634 1635 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1636 nxt_alert(task, "application limits is not object"); 1637 goto app_fail; 1638 } 1639 1640 ret = nxt_conf_map_object(mp, apcf.limits_value, 1641 nxt_router_app_limits_conf, 1642 nxt_nitems(nxt_router_app_limits_conf), 1643 &apcf); 1644 if (ret != NXT_OK) { 1645 nxt_alert(task, "application limits map error"); 1646 goto app_fail; 1647 } 1648 } 1649 1650 if (apcf.processes_value != NULL 1651 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1652 { 1653 ret = nxt_conf_map_object(mp, apcf.processes_value, 1654 nxt_router_app_processes_conf, 1655 nxt_nitems(nxt_router_app_processes_conf), 1656 &apcf); 1657 if (ret != NXT_OK) { 1658 nxt_alert(task, "application processes map error"); 1659 goto app_fail; 1660 } 1661 1662 } else { 1663 apcf.max_processes = apcf.processes; 1664 apcf.spare_processes = apcf.processes; 1665 } 1666 1667 if (apcf.targets_value != NULL) { 1668 n = nxt_conf_object_members_count(apcf.targets_value); 1669 1670 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1671 if (nxt_slow_path(targets == NULL)) { 1672 goto app_fail; 1673 } 1674 1675 next_target = 0; 1676 1677 for (i = 0; i < n; i++) { 1678 (void) nxt_conf_next_object_member(apcf.targets_value, 1679 &target, &next_target); 1680 1681 s = nxt_str_dup(app_mp, &targets[i], &target); 1682 if (nxt_slow_path(s == NULL)) { 1683 goto app_fail; 1684 } 1685 } 1686 1687 } else { 1688 targets = NULL; 1689 } 1690 1691 nxt_debug(task, "application type: %V", &apcf.type); 1692 nxt_debug(task, "application processes: %D", apcf.processes); 1693 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1694 1695 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1696 1697 if (lang == NULL) { 1698 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1699 goto app_fail; 1700 } 1701 1702 nxt_debug(task, "application language module: \"%s\"", lang->file); 1703 1704 ret = nxt_thread_mutex_create(&app->mutex); 1705 if (ret != NXT_OK) { 1706 goto app_fail; 1707 } 1708 1709 nxt_queue_init(&app->ports); 1710 nxt_queue_init(&app->spare_ports); 1711 nxt_queue_init(&app->idle_ports); 1712 nxt_queue_init(&app->ack_waiting_req); 1713 1714 app->name.length = name.length; 1715 nxt_memcpy(app->name.start, name.start, name.length); 1716 1717 app->type = lang->type; 1718 app->max_processes = apcf.max_processes; 1719 app->spare_processes = apcf.spare_processes; 1720 app->max_pending_processes = apcf.spare_processes 1721 ? apcf.spare_processes : 1; 1722 app->timeout = apcf.timeout; 1723 app->idle_timeout = apcf.idle_timeout; 1724 1725 app->targets = targets; 1726 1727 engine = task->thread->engine; 1728 1729 app->engine = engine; 1730 1731 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1732 app->adjust_idle_work.task = &engine->task; 1733 app->adjust_idle_work.obj = app; 1734 1735 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1736 1737 ret = nxt_router_apps_hash_add(tmcf->router_conf, app); 1738 if (nxt_slow_path(ret != NXT_OK)) { 1739 goto app_fail; 1740 } 1741 1742 nxt_router_app_use(task, app, 1); 1743 1744 app->joint = app_joint; 1745 1746 app_joint->use_count = 1; 1747 app_joint->app = app; 1748 1749 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1750 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1751 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1752 app_joint->idle_timer.task = &engine->task; 1753 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1754 1755 app_joint->free_app_work.handler = nxt_router_free_app; 1756 app_joint->free_app_work.task = &engine->task; 1757 app_joint->free_app_work.obj = app_joint; 1758 1759 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 1760 NXT_PROCESS_APP); 1761 if (nxt_slow_path(port == NULL)) { 1762 return NXT_ERROR; 1763 } 1764 1765 ret = nxt_port_socket_init(task, port, 0); 1766 if (nxt_slow_path(ret != NXT_OK)) { 1767 nxt_port_use(task, port, -1); 1768 return NXT_ERROR; 1769 } 1770 1771 ret = nxt_router_app_queue_init(task, port); 1772 if (nxt_slow_path(ret != NXT_OK)) { 1773 nxt_port_write_close(port); 1774 nxt_port_read_close(port); 1775 nxt_port_use(task, port, -1); 1776 return NXT_ERROR; 1777 } 1778 1779 nxt_port_write_enable(task, port); 1780 port->app = app; 1781 1782 app->shared_port = port; 1783 1784 nxt_thread_mutex_create(&app->outgoing.mutex); 1785 } 1786 } 1787 1788 routes_conf = nxt_conf_get_path(conf, &routes_path); 1789 if (nxt_fast_path(routes_conf != NULL)) { 1790 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1791 if (nxt_slow_path(routes == NULL)) { 1792 return NXT_ERROR; 1793 } 1794 tmcf->router_conf->routes = routes; 1795 } 1796 1797 ret = nxt_upstreams_create(task, tmcf, conf); 1798 if (nxt_slow_path(ret != NXT_OK)) { 1799 return ret; 1800 } 1801 1802 http = nxt_conf_get_path(conf, &http_path); 1803 #if 0 1804 if (http == NULL) { 1805 nxt_alert(task, "no \"http\" block"); 1806 return NXT_ERROR; 1807 } 1808 #endif 1809 1810 websocket = nxt_conf_get_path(conf, &websocket_path); 1811 1812 listeners = nxt_conf_get_path(conf, &listeners_path); 1813 1814 if (listeners != NULL) { 1815 next = 0; 1816 1817 for ( ;; ) { 1818 listener = nxt_conf_next_object_member(listeners, &name, &next); 1819 if (listener == NULL) { 1820 break; 1821 } 1822 1823 skcf = nxt_router_socket_conf(task, tmcf, &name); 1824 if (skcf == NULL) { 1825 goto fail; 1826 } 1827 1828 nxt_memzero(&lscf, sizeof(lscf)); 1829 1830 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1831 nxt_nitems(nxt_router_listener_conf), 1832 &lscf); 1833 if (ret != NXT_OK) { 1834 nxt_alert(task, "listener map error"); 1835 goto fail; 1836 } 1837 1838 nxt_debug(task, "application: %V", &lscf.application); 1839 1840 // STUB, default values if http block is not defined. 1841 skcf->header_buffer_size = 2048; 1842 skcf->large_header_buffer_size = 8192; 1843 skcf->large_header_buffers = 4; 1844 skcf->discard_unsafe_fields = 1; 1845 skcf->body_buffer_size = 16 * 1024; 1846 skcf->max_body_size = 8 * 1024 * 1024; 1847 skcf->proxy_header_buffer_size = 64 * 1024; 1848 skcf->proxy_buffer_size = 4096; 1849 skcf->proxy_buffers = 256; 1850 skcf->idle_timeout = 180 * 1000; 1851 skcf->header_read_timeout = 30 * 1000; 1852 skcf->body_read_timeout = 30 * 1000; 1853 skcf->send_timeout = 30 * 1000; 1854 skcf->proxy_timeout = 60 * 1000; 1855 skcf->proxy_send_timeout = 30 * 1000; 1856 skcf->proxy_read_timeout = 30 * 1000; 1857 1858 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1859 skcf->websocket_conf.read_timeout = 60 * 1000; 1860 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1861 1862 nxt_str_null(&skcf->body_temp_path); 1863 1864 if (http != NULL) { 1865 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1866 nxt_nitems(nxt_router_http_conf), 1867 skcf); 1868 if (ret != NXT_OK) { 1869 nxt_alert(task, "http map error"); 1870 goto fail; 1871 } 1872 } 1873 1874 if (websocket != NULL) { 1875 ret = nxt_conf_map_object(mp, websocket, 1876 nxt_router_websocket_conf, 1877 nxt_nitems(nxt_router_websocket_conf), 1878 &skcf->websocket_conf); 1879 if (ret != NXT_OK) { 1880 nxt_alert(task, "websocket map error"); 1881 goto fail; 1882 } 1883 } 1884 1885 t = &skcf->body_temp_path; 1886 1887 if (t->length == 0) { 1888 t->start = (u_char *) task->thread->runtime->tmp; 1889 t->length = nxt_strlen(t->start); 1890 } 1891 1892 client_ip_conf = nxt_conf_get_path(listener, &client_ip_path); 1893 ret = nxt_router_conf_process_client_ip(task, tmcf, skcf, 1894 client_ip_conf); 1895 if (nxt_slow_path(ret != NXT_OK)) { 1896 return NXT_ERROR; 1897 } 1898 1899 #if (NXT_TLS) 1900 certificate = nxt_conf_get_path(listener, &certificate_path); 1901 1902 if (certificate != NULL) { 1903 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t)); 1904 if (nxt_slow_path(tls_init == NULL)) { 1905 return NXT_ERROR; 1906 } 1907 1908 tls_init->cache_size = 0; 1909 tls_init->timeout = 300; 1910 1911 value = nxt_conf_get_path(listener, &conf_cache_path); 1912 if (value != NULL) { 1913 tls_init->cache_size = nxt_conf_get_number(value); 1914 } 1915 1916 value = nxt_conf_get_path(listener, &conf_timeout_path); 1917 if (value != NULL) { 1918 tls_init->timeout = nxt_conf_get_number(value); 1919 } 1920 1921 tls_init->conf_cmds = nxt_conf_get_path(listener, 1922 &conf_commands_path); 1923 1924 tls_init->tickets_conf = nxt_conf_get_path(listener, 1925 &conf_tickets); 1926 1927 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) { 1928 n = nxt_conf_array_elements_count(certificate); 1929 1930 for (i = 0; i < n; i++) { 1931 value = nxt_conf_get_array_element(certificate, i); 1932 1933 nxt_assert(value != NULL); 1934 1935 ret = nxt_router_conf_tls_insert(tmcf, value, skcf, 1936 tls_init, i == 0); 1937 if (nxt_slow_path(ret != NXT_OK)) { 1938 goto fail; 1939 } 1940 } 1941 1942 } else { 1943 /* NXT_CONF_STRING */ 1944 ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf, 1945 tls_init, 1); 1946 if (nxt_slow_path(ret != NXT_OK)) { 1947 goto fail; 1948 } 1949 } 1950 } 1951 #endif 1952 1953 skcf->listen->handler = nxt_http_conn_init; 1954 skcf->router_conf = tmcf->router_conf; 1955 skcf->router_conf->count++; 1956 1957 if (lscf.pass.length != 0) { 1958 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1959 1960 /* COMPATIBILITY: listener application. */ 1961 } else if (lscf.application.length > 0) { 1962 skcf->action = nxt_http_pass_application(task, 1963 tmcf->router_conf, 1964 &lscf.application); 1965 } 1966 1967 if (nxt_slow_path(skcf->action == NULL)) { 1968 goto fail; 1969 } 1970 } 1971 } 1972 1973 ret = nxt_http_routes_resolve(task, tmcf); 1974 if (nxt_slow_path(ret != NXT_OK)) { 1975 goto fail; 1976 } 1977 1978 value = nxt_conf_get_path(conf, &access_log_path); 1979 1980 if (value != NULL) { 1981 nxt_conf_get_string(value, &path); 1982 1983 access_log = router->access_log; 1984 1985 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1986 nxt_router_access_log_use(&router->lock, access_log); 1987 1988 } else { 1989 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1990 + path.length); 1991 if (access_log == NULL) { 1992 nxt_alert(task, "failed to allocate access log structure"); 1993 goto fail; 1994 } 1995 1996 access_log->fd = -1; 1997 access_log->handler = &nxt_router_access_log_writer; 1998 access_log->count = 1; 1999 2000 access_log->path.length = path.length; 2001 access_log->path.start = (u_char *) access_log 2002 + sizeof(nxt_router_access_log_t); 2003 2004 nxt_memcpy(access_log->path.start, path.start, path.length); 2005 } 2006 2007 tmcf->router_conf->access_log = access_log; 2008 } 2009 2010 nxt_queue_add(&deleting_sockets, &router->sockets); 2011 nxt_queue_init(&router->sockets); 2012 2013 return NXT_OK; 2014 2015 app_fail: 2016 2017 nxt_mp_destroy(app_mp); 2018 2019 fail: 2020 2021 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 2022 2023 nxt_queue_remove(&app->link); 2024 nxt_thread_mutex_destroy(&app->mutex); 2025 nxt_mp_destroy(app->mem_pool); 2026 2027 } nxt_queue_loop; 2028 2029 return NXT_ERROR; 2030 } 2031 2032 2033 #if (NXT_TLS) 2034 2035 static nxt_int_t 2036 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 2037 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, 2038 nxt_tls_init_t *tls_init, nxt_bool_t last) 2039 { 2040 nxt_router_tlssock_t *tls; 2041 2042 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t)); 2043 if (nxt_slow_path(tls == NULL)) { 2044 return NXT_ERROR; 2045 } 2046 2047 tls->tls_init = tls_init; 2048 tls->socket_conf = skcf; 2049 tls->temp_conf = tmcf; 2050 tls->last = last; 2051 nxt_conf_get_string(value, &tls->name); 2052 2053 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 2054 2055 return NXT_OK; 2056 } 2057 2058 #endif 2059 2060 2061 static nxt_int_t 2062 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 2063 nxt_conf_value_t *conf) 2064 { 2065 uint32_t next, i; 2066 nxt_mp_t *mp; 2067 nxt_str_t *type, exten, str; 2068 nxt_int_t ret; 2069 nxt_uint_t exts; 2070 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 2071 2072 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 2073 2074 mp = rtcf->mem_pool; 2075 2076 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 2077 if (nxt_slow_path(ret != NXT_OK)) { 2078 return NXT_ERROR; 2079 } 2080 2081 if (conf == NULL) { 2082 return NXT_OK; 2083 } 2084 2085 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 2086 2087 if (mtypes_conf != NULL) { 2088 next = 0; 2089 2090 for ( ;; ) { 2091 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 2092 2093 if (ext_conf == NULL) { 2094 break; 2095 } 2096 2097 type = nxt_str_dup(mp, NULL, &str); 2098 if (nxt_slow_path(type == NULL)) { 2099 return NXT_ERROR; 2100 } 2101 2102 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 2103 nxt_conf_get_string(ext_conf, &str); 2104 2105 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2106 return NXT_ERROR; 2107 } 2108 2109 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2110 &exten, type); 2111 if (nxt_slow_path(ret != NXT_OK)) { 2112 return NXT_ERROR; 2113 } 2114 2115 continue; 2116 } 2117 2118 exts = nxt_conf_array_elements_count(ext_conf); 2119 2120 for (i = 0; i < exts; i++) { 2121 value = nxt_conf_get_array_element(ext_conf, i); 2122 2123 nxt_conf_get_string(value, &str); 2124 2125 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2126 return NXT_ERROR; 2127 } 2128 2129 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2130 &exten, type); 2131 if (nxt_slow_path(ret != NXT_OK)) { 2132 return NXT_ERROR; 2133 } 2134 } 2135 } 2136 } 2137 2138 return NXT_OK; 2139 } 2140 2141 2142 static nxt_int_t 2143 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2144 nxt_socket_conf_t *skcf, nxt_conf_value_t *conf) 2145 { 2146 char c; 2147 size_t i; 2148 nxt_mp_t *mp; 2149 uint32_t hash; 2150 nxt_str_t header; 2151 nxt_conf_value_t *source_conf, *header_conf, *recursive_conf; 2152 nxt_http_client_ip_t *client_ip; 2153 nxt_http_route_addr_rule_t *source; 2154 2155 static nxt_str_t header_path = nxt_string("/header"); 2156 static nxt_str_t source_path = nxt_string("/source"); 2157 static nxt_str_t recursive_path = nxt_string("/recursive"); 2158 2159 if (conf == NULL) { 2160 skcf->client_ip = NULL; 2161 2162 return NXT_OK; 2163 } 2164 2165 mp = tmcf->router_conf->mem_pool; 2166 2167 source_conf = nxt_conf_get_path(conf, &source_path); 2168 header_conf = nxt_conf_get_path(conf, &header_path); 2169 recursive_conf = nxt_conf_get_path(conf, &recursive_path); 2170 2171 if (source_conf == NULL || header_conf == NULL) { 2172 return NXT_ERROR; 2173 } 2174 2175 client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t)); 2176 if (nxt_slow_path(client_ip == NULL)) { 2177 return NXT_ERROR; 2178 } 2179 2180 source = nxt_http_route_addr_rule_create(task, mp, source_conf); 2181 if (nxt_slow_path(source == NULL)) { 2182 return NXT_ERROR; 2183 } 2184 2185 client_ip->source = source; 2186 2187 nxt_conf_get_string(header_conf, &header); 2188 2189 if (recursive_conf != NULL) { 2190 client_ip->recursive = nxt_conf_get_boolean(recursive_conf); 2191 } 2192 2193 client_ip->header = nxt_str_dup(mp, NULL, &header); 2194 if (nxt_slow_path(client_ip->header == NULL)) { 2195 return NXT_ERROR; 2196 } 2197 2198 hash = NXT_HTTP_FIELD_HASH_INIT; 2199 2200 for (i = 0; i < client_ip->header->length; i++) { 2201 c = client_ip->header->start[i]; 2202 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c)); 2203 } 2204 2205 hash = nxt_http_field_hash_end(hash) & 0xFFFF; 2206 2207 client_ip->header_hash = hash; 2208 2209 skcf->client_ip = client_ip; 2210 2211 return NXT_OK; 2212 } 2213 2214 2215 static nxt_app_t * 2216 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 2217 { 2218 nxt_app_t *app; 2219 2220 nxt_queue_each(app, queue, nxt_app_t, link) { 2221 2222 if (nxt_strstr_eq(name, &app->name)) { 2223 return app; 2224 } 2225 2226 } nxt_queue_loop; 2227 2228 return NULL; 2229 } 2230 2231 2232 static nxt_int_t 2233 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 2234 { 2235 void *mem; 2236 nxt_int_t fd; 2237 2238 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 2239 if (nxt_slow_path(fd == -1)) { 2240 return NXT_ERROR; 2241 } 2242 2243 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 2244 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2245 if (nxt_slow_path(mem == MAP_FAILED)) { 2246 nxt_fd_close(fd); 2247 2248 return NXT_ERROR; 2249 } 2250 2251 nxt_app_queue_init(mem); 2252 2253 port->queue_fd = fd; 2254 port->queue = mem; 2255 2256 return NXT_OK; 2257 } 2258 2259 2260 static nxt_int_t 2261 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 2262 { 2263 void *mem; 2264 nxt_int_t fd; 2265 2266 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 2267 if (nxt_slow_path(fd == -1)) { 2268 return NXT_ERROR; 2269 } 2270 2271 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2272 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2273 if (nxt_slow_path(mem == MAP_FAILED)) { 2274 nxt_fd_close(fd); 2275 2276 return NXT_ERROR; 2277 } 2278 2279 nxt_port_queue_init(mem); 2280 2281 port->queue_fd = fd; 2282 port->queue = mem; 2283 2284 return NXT_OK; 2285 } 2286 2287 2288 static nxt_int_t 2289 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 2290 { 2291 void *mem; 2292 2293 nxt_assert(fd != -1); 2294 2295 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2296 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2297 if (nxt_slow_path(mem == MAP_FAILED)) { 2298 2299 return NXT_ERROR; 2300 } 2301 2302 port->queue = mem; 2303 2304 return NXT_OK; 2305 } 2306 2307 2308 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2309 NXT_LVLHSH_DEFAULT, 2310 nxt_router_apps_hash_test, 2311 nxt_mp_lvlhsh_alloc, 2312 nxt_mp_lvlhsh_free, 2313 }; 2314 2315 2316 static nxt_int_t 2317 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2318 { 2319 nxt_app_t *app; 2320 2321 app = data; 2322 2323 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2324 } 2325 2326 2327 static nxt_int_t 2328 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2329 { 2330 nxt_lvlhsh_query_t lhq; 2331 2332 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2333 lhq.replace = 0; 2334 lhq.key = app->name; 2335 lhq.value = app; 2336 lhq.proto = &nxt_router_apps_hash_proto; 2337 lhq.pool = rtcf->mem_pool; 2338 2339 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2340 2341 case NXT_OK: 2342 return NXT_OK; 2343 2344 case NXT_DECLINED: 2345 nxt_thread_log_alert("router app hash adding failed: " 2346 "\"%V\" is already in hash", &lhq.key); 2347 /* Fall through. */ 2348 default: 2349 return NXT_ERROR; 2350 } 2351 } 2352 2353 2354 static nxt_app_t * 2355 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2356 { 2357 nxt_lvlhsh_query_t lhq; 2358 2359 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2360 lhq.key = *name; 2361 lhq.proto = &nxt_router_apps_hash_proto; 2362 2363 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2364 return NULL; 2365 } 2366 2367 return lhq.value; 2368 } 2369 2370 2371 static void 2372 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2373 { 2374 nxt_app_t *app; 2375 nxt_lvlhsh_each_t lhe; 2376 2377 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2378 2379 for ( ;; ) { 2380 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2381 2382 if (app == NULL) { 2383 break; 2384 } 2385 2386 nxt_router_app_use(task, app, i); 2387 } 2388 } 2389 2390 2391 typedef struct { 2392 nxt_app_t *app; 2393 nxt_int_t target; 2394 } nxt_http_app_conf_t; 2395 2396 2397 nxt_int_t 2398 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name, 2399 nxt_str_t *target, nxt_http_action_t *action) 2400 { 2401 nxt_app_t *app; 2402 nxt_str_t *targets; 2403 nxt_uint_t i; 2404 nxt_http_app_conf_t *conf; 2405 2406 app = nxt_router_apps_hash_get(rtcf, name); 2407 if (app == NULL) { 2408 return NXT_DECLINED; 2409 } 2410 2411 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t)); 2412 if (nxt_slow_path(conf == NULL)) { 2413 return NXT_ERROR; 2414 } 2415 2416 action->handler = nxt_http_application_handler; 2417 action->u.conf = conf; 2418 2419 conf->app = app; 2420 2421 if (target != NULL && target->length != 0) { 2422 targets = app->targets; 2423 2424 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++); 2425 2426 conf->target = i; 2427 2428 } else { 2429 conf->target = 0; 2430 } 2431 2432 return NXT_OK; 2433 } 2434 2435 2436 static nxt_socket_conf_t * 2437 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2438 nxt_str_t *name) 2439 { 2440 size_t size; 2441 nxt_int_t ret; 2442 nxt_bool_t wildcard; 2443 nxt_sockaddr_t *sa; 2444 nxt_socket_conf_t *skcf; 2445 nxt_listen_socket_t *ls; 2446 2447 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2448 if (nxt_slow_path(sa == NULL)) { 2449 nxt_alert(task, "invalid listener \"%V\"", name); 2450 return NULL; 2451 } 2452 2453 sa->type = SOCK_STREAM; 2454 2455 nxt_debug(task, "router listener: \"%*s\"", 2456 (size_t) sa->length, nxt_sockaddr_start(sa)); 2457 2458 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2459 if (nxt_slow_path(skcf == NULL)) { 2460 return NULL; 2461 } 2462 2463 size = nxt_sockaddr_size(sa); 2464 2465 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2466 2467 if (ret != NXT_OK) { 2468 2469 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2470 if (nxt_slow_path(ls == NULL)) { 2471 return NULL; 2472 } 2473 2474 skcf->listen = ls; 2475 2476 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2477 nxt_memcpy(ls->sockaddr, sa, size); 2478 2479 nxt_listen_socket_remote_size(ls); 2480 2481 ls->socket = -1; 2482 ls->backlog = NXT_LISTEN_BACKLOG; 2483 ls->flags = NXT_NONBLOCK; 2484 ls->read_after_accept = 1; 2485 } 2486 2487 switch (sa->u.sockaddr.sa_family) { 2488 #if (NXT_HAVE_UNIX_DOMAIN) 2489 case AF_UNIX: 2490 wildcard = 0; 2491 break; 2492 #endif 2493 #if (NXT_INET6) 2494 case AF_INET6: 2495 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2496 break; 2497 #endif 2498 case AF_INET: 2499 default: 2500 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2501 break; 2502 } 2503 2504 if (!wildcard) { 2505 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2506 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2507 return NULL; 2508 } 2509 2510 nxt_memcpy(skcf->sockaddr, sa, size); 2511 } 2512 2513 return skcf; 2514 } 2515 2516 2517 static nxt_int_t 2518 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2519 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2520 { 2521 nxt_router_t *router; 2522 nxt_queue_link_t *qlk; 2523 nxt_socket_conf_t *skcf; 2524 2525 router = tmcf->router_conf->router; 2526 2527 for (qlk = nxt_queue_first(&router->sockets); 2528 qlk != nxt_queue_tail(&router->sockets); 2529 qlk = nxt_queue_next(qlk)) 2530 { 2531 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2532 2533 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2534 nskcf->listen = skcf->listen; 2535 2536 nxt_queue_remove(qlk); 2537 nxt_queue_insert_tail(&keeping_sockets, qlk); 2538 2539 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2540 2541 return NXT_OK; 2542 } 2543 } 2544 2545 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2546 2547 return NXT_DECLINED; 2548 } 2549 2550 2551 static void 2552 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2553 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2554 { 2555 size_t size; 2556 uint32_t stream; 2557 nxt_int_t ret; 2558 nxt_buf_t *b; 2559 nxt_port_t *main_port, *router_port; 2560 nxt_runtime_t *rt; 2561 nxt_socket_rpc_t *rpc; 2562 2563 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2564 if (rpc == NULL) { 2565 goto fail; 2566 } 2567 2568 rpc->socket_conf = skcf; 2569 rpc->temp_conf = tmcf; 2570 2571 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2572 2573 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2574 if (b == NULL) { 2575 goto fail; 2576 } 2577 2578 b->completion_handler = nxt_buf_dummy_completion; 2579 2580 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2581 2582 rt = task->thread->runtime; 2583 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2584 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2585 2586 stream = nxt_port_rpc_register_handler(task, router_port, 2587 nxt_router_listen_socket_ready, 2588 nxt_router_listen_socket_error, 2589 main_port->pid, rpc); 2590 if (nxt_slow_path(stream == 0)) { 2591 goto fail; 2592 } 2593 2594 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2595 stream, router_port->id, b); 2596 2597 if (nxt_slow_path(ret != NXT_OK)) { 2598 nxt_port_rpc_cancel(task, router_port, stream); 2599 goto fail; 2600 } 2601 2602 return; 2603 2604 fail: 2605 2606 nxt_router_conf_error(task, tmcf); 2607 } 2608 2609 2610 static void 2611 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2612 void *data) 2613 { 2614 nxt_int_t ret; 2615 nxt_socket_t s; 2616 nxt_socket_rpc_t *rpc; 2617 2618 rpc = data; 2619 2620 s = msg->fd[0]; 2621 2622 ret = nxt_socket_nonblocking(task, s); 2623 if (nxt_slow_path(ret != NXT_OK)) { 2624 goto fail; 2625 } 2626 2627 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2628 2629 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2630 if (nxt_slow_path(ret != NXT_OK)) { 2631 goto fail; 2632 } 2633 2634 rpc->socket_conf->listen->socket = s; 2635 2636 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2637 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2638 2639 return; 2640 2641 fail: 2642 2643 nxt_socket_close(task, s); 2644 2645 nxt_router_conf_error(task, rpc->temp_conf); 2646 } 2647 2648 2649 static void 2650 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2651 void *data) 2652 { 2653 nxt_socket_rpc_t *rpc; 2654 nxt_router_temp_conf_t *tmcf; 2655 2656 rpc = data; 2657 tmcf = rpc->temp_conf; 2658 2659 #if 0 2660 u_char *p; 2661 size_t size; 2662 uint8_t error; 2663 nxt_buf_t *in, *out; 2664 nxt_sockaddr_t *sa; 2665 2666 static nxt_str_t socket_errors[] = { 2667 nxt_string("ListenerSystem"), 2668 nxt_string("ListenerNoIPv6"), 2669 nxt_string("ListenerPort"), 2670 nxt_string("ListenerInUse"), 2671 nxt_string("ListenerNoAddress"), 2672 nxt_string("ListenerNoAccess"), 2673 nxt_string("ListenerPath"), 2674 }; 2675 2676 sa = rpc->socket_conf->listen->sockaddr; 2677 2678 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2679 2680 if (nxt_slow_path(in == NULL)) { 2681 return; 2682 } 2683 2684 p = in->mem.pos; 2685 2686 error = *p++; 2687 2688 size = nxt_length("listen socket error: ") 2689 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2690 + sa->length + socket_errors[error].length + (in->mem.free - p); 2691 2692 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2693 if (nxt_slow_path(out == NULL)) { 2694 return; 2695 } 2696 2697 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2698 "listen socket error: " 2699 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2700 (size_t) sa->length, nxt_sockaddr_start(sa), 2701 &socket_errors[error], in->mem.free - p, p); 2702 2703 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2704 #endif 2705 2706 nxt_router_conf_error(task, tmcf); 2707 } 2708 2709 2710 #if (NXT_TLS) 2711 2712 static void 2713 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2714 void *data) 2715 { 2716 nxt_mp_t *mp; 2717 nxt_int_t ret; 2718 nxt_tls_conf_t *tlscf; 2719 nxt_router_tlssock_t *tls; 2720 nxt_tls_bundle_conf_t *bundle; 2721 nxt_router_temp_conf_t *tmcf; 2722 2723 nxt_debug(task, "tls rpc handler"); 2724 2725 tls = data; 2726 tmcf = tls->temp_conf; 2727 2728 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2729 goto fail; 2730 } 2731 2732 mp = tmcf->router_conf->mem_pool; 2733 2734 if (tls->socket_conf->tls == NULL){ 2735 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2736 if (nxt_slow_path(tlscf == NULL)) { 2737 goto fail; 2738 } 2739 2740 tlscf->no_wait_shutdown = 1; 2741 tls->socket_conf->tls = tlscf; 2742 2743 } else { 2744 tlscf = tls->socket_conf->tls; 2745 } 2746 2747 tls->tls_init->conf = tlscf; 2748 2749 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); 2750 if (nxt_slow_path(bundle == NULL)) { 2751 goto fail; 2752 } 2753 2754 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) { 2755 goto fail; 2756 } 2757 2758 bundle->chain_file = msg->fd[0]; 2759 bundle->next = tlscf->bundle; 2760 tlscf->bundle = bundle; 2761 2762 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init, 2763 tls->last); 2764 if (nxt_slow_path(ret != NXT_OK)) { 2765 goto fail; 2766 } 2767 2768 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2769 nxt_router_conf_apply, task, tmcf, NULL); 2770 return; 2771 2772 fail: 2773 2774 nxt_router_conf_error(task, tmcf); 2775 } 2776 2777 #endif 2778 2779 2780 static void 2781 nxt_router_app_rpc_create(nxt_task_t *task, 2782 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2783 { 2784 size_t size; 2785 uint32_t stream; 2786 nxt_fd_t port_fd, queue_fd; 2787 nxt_int_t ret; 2788 nxt_buf_t *b; 2789 nxt_port_t *router_port, *dport; 2790 nxt_runtime_t *rt; 2791 nxt_app_rpc_t *rpc; 2792 2793 rt = task->thread->runtime; 2794 2795 dport = app->proto_port; 2796 2797 if (dport == NULL) { 2798 nxt_debug(task, "app '%V' prototype prefork", &app->name); 2799 2800 size = app->name.length + 1 + app->conf.length; 2801 2802 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2803 if (nxt_slow_path(b == NULL)) { 2804 goto fail; 2805 } 2806 2807 b->completion_handler = nxt_buf_dummy_completion; 2808 2809 nxt_buf_cpystr(b, &app->name); 2810 *b->mem.free++ = '\0'; 2811 nxt_buf_cpystr(b, &app->conf); 2812 2813 dport = rt->port_by_type[NXT_PROCESS_MAIN]; 2814 2815 port_fd = app->shared_port->pair[0]; 2816 queue_fd = app->shared_port->queue_fd; 2817 2818 } else { 2819 nxt_debug(task, "app '%V' prefork", &app->name); 2820 2821 b = NULL; 2822 port_fd = -1; 2823 queue_fd = -1; 2824 } 2825 2826 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2827 2828 rpc = nxt_port_rpc_register_handler_ex(task, router_port, 2829 nxt_router_app_prefork_ready, 2830 nxt_router_app_prefork_error, 2831 sizeof(nxt_app_rpc_t)); 2832 if (nxt_slow_path(rpc == NULL)) { 2833 goto fail; 2834 } 2835 2836 rpc->app = app; 2837 rpc->temp_conf = tmcf; 2838 rpc->proto = (b != NULL); 2839 2840 stream = nxt_port_rpc_ex_stream(rpc); 2841 2842 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, 2843 port_fd, queue_fd, stream, router_port->id, b); 2844 if (nxt_slow_path(ret != NXT_OK)) { 2845 nxt_port_rpc_cancel(task, router_port, stream); 2846 goto fail; 2847 } 2848 2849 if (b == NULL) { 2850 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid); 2851 2852 app->pending_processes++; 2853 } 2854 2855 return; 2856 2857 fail: 2858 2859 nxt_router_conf_error(task, tmcf); 2860 } 2861 2862 2863 static void 2864 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2865 void *data) 2866 { 2867 nxt_app_t *app; 2868 nxt_port_t *port; 2869 nxt_app_rpc_t *rpc; 2870 nxt_event_engine_t *engine; 2871 2872 rpc = data; 2873 app = rpc->app; 2874 2875 port = msg->u.new_port; 2876 2877 nxt_assert(port != NULL); 2878 nxt_assert(port->id == 0); 2879 2880 if (rpc->proto) { 2881 nxt_assert(app->proto_port == NULL); 2882 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); 2883 2884 nxt_port_inc_use(port); 2885 2886 app->proto_port = port; 2887 port->app = app; 2888 2889 nxt_router_app_rpc_create(task, rpc->temp_conf, app); 2890 2891 return; 2892 } 2893 2894 nxt_assert(port->type == NXT_PROCESS_APP); 2895 2896 port->app = app; 2897 port->main_app_port = port; 2898 2899 app->pending_processes--; 2900 app->processes++; 2901 app->idle_processes++; 2902 2903 engine = task->thread->engine; 2904 2905 nxt_queue_insert_tail(&app->ports, &port->app_link); 2906 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2907 2908 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2909 &app->name, port->pid, port->id); 2910 2911 nxt_port_hash_add(&app->port_hash, port); 2912 app->port_hash_count++; 2913 2914 port->idle_start = 0; 2915 2916 nxt_port_inc_use(port); 2917 2918 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 2919 2920 nxt_work_queue_add(&engine->fast_work_queue, 2921 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2922 } 2923 2924 2925 static void 2926 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2927 void *data) 2928 { 2929 nxt_app_t *app; 2930 nxt_app_rpc_t *rpc; 2931 nxt_router_temp_conf_t *tmcf; 2932 2933 rpc = data; 2934 app = rpc->app; 2935 tmcf = rpc->temp_conf; 2936 2937 if (rpc->proto) { 2938 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"", 2939 &app->name); 2940 2941 } else { 2942 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2943 &app->name); 2944 2945 app->pending_processes--; 2946 } 2947 2948 nxt_router_conf_error(task, tmcf); 2949 } 2950 2951 2952 static nxt_int_t 2953 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2954 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2955 { 2956 nxt_int_t ret; 2957 nxt_uint_t n, threads; 2958 nxt_queue_link_t *qlk; 2959 nxt_router_engine_conf_t *recf; 2960 2961 threads = tmcf->router_conf->threads; 2962 2963 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2964 sizeof(nxt_router_engine_conf_t)); 2965 if (nxt_slow_path(tmcf->engines == NULL)) { 2966 return NXT_ERROR; 2967 } 2968 2969 n = 0; 2970 2971 for (qlk = nxt_queue_first(&router->engines); 2972 qlk != nxt_queue_tail(&router->engines); 2973 qlk = nxt_queue_next(qlk)) 2974 { 2975 recf = nxt_array_zero_add(tmcf->engines); 2976 if (nxt_slow_path(recf == NULL)) { 2977 return NXT_ERROR; 2978 } 2979 2980 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2981 2982 if (n < threads) { 2983 recf->action = NXT_ROUTER_ENGINE_KEEP; 2984 ret = nxt_router_engine_conf_update(tmcf, recf); 2985 2986 } else { 2987 recf->action = NXT_ROUTER_ENGINE_DELETE; 2988 ret = nxt_router_engine_conf_delete(tmcf, recf); 2989 } 2990 2991 if (nxt_slow_path(ret != NXT_OK)) { 2992 return ret; 2993 } 2994 2995 n++; 2996 } 2997 2998 tmcf->new_threads = n; 2999 3000 while (n < threads) { 3001 recf = nxt_array_zero_add(tmcf->engines); 3002 if (nxt_slow_path(recf == NULL)) { 3003 return NXT_ERROR; 3004 } 3005 3006 recf->action = NXT_ROUTER_ENGINE_ADD; 3007 3008 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 3009 if (nxt_slow_path(recf->engine == NULL)) { 3010 return NXT_ERROR; 3011 } 3012 3013 ret = nxt_router_engine_conf_create(tmcf, recf); 3014 if (nxt_slow_path(ret != NXT_OK)) { 3015 return ret; 3016 } 3017 3018 n++; 3019 } 3020 3021 return NXT_OK; 3022 } 3023 3024 3025 static nxt_int_t 3026 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 3027 nxt_router_engine_conf_t *recf) 3028 { 3029 nxt_int_t ret; 3030 3031 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 3032 nxt_router_listen_socket_create); 3033 if (nxt_slow_path(ret != NXT_OK)) { 3034 return ret; 3035 } 3036 3037 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 3038 nxt_router_listen_socket_create); 3039 if (nxt_slow_path(ret != NXT_OK)) { 3040 return ret; 3041 } 3042 3043 return ret; 3044 } 3045 3046 3047 static nxt_int_t 3048 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 3049 nxt_router_engine_conf_t *recf) 3050 { 3051 nxt_int_t ret; 3052 3053 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 3054 nxt_router_listen_socket_create); 3055 if (nxt_slow_path(ret != NXT_OK)) { 3056 return ret; 3057 } 3058 3059 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 3060 nxt_router_listen_socket_update); 3061 if (nxt_slow_path(ret != NXT_OK)) { 3062 return ret; 3063 } 3064 3065 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3066 if (nxt_slow_path(ret != NXT_OK)) { 3067 return ret; 3068 } 3069 3070 return ret; 3071 } 3072 3073 3074 static nxt_int_t 3075 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 3076 nxt_router_engine_conf_t *recf) 3077 { 3078 nxt_int_t ret; 3079 3080 ret = nxt_router_engine_quit(tmcf, recf); 3081 if (nxt_slow_path(ret != NXT_OK)) { 3082 return ret; 3083 } 3084 3085 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 3086 if (nxt_slow_path(ret != NXT_OK)) { 3087 return ret; 3088 } 3089 3090 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3091 } 3092 3093 3094 static nxt_int_t 3095 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 3096 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 3097 nxt_work_handler_t handler) 3098 { 3099 nxt_int_t ret; 3100 nxt_joint_job_t *job; 3101 nxt_queue_link_t *qlk; 3102 nxt_socket_conf_t *skcf; 3103 nxt_socket_conf_joint_t *joint; 3104 3105 for (qlk = nxt_queue_first(sockets); 3106 qlk != nxt_queue_tail(sockets); 3107 qlk = nxt_queue_next(qlk)) 3108 { 3109 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3110 if (nxt_slow_path(job == NULL)) { 3111 return NXT_ERROR; 3112 } 3113 3114 job->work.next = recf->jobs; 3115 recf->jobs = &job->work; 3116 3117 job->task = tmcf->engine->task; 3118 job->work.handler = handler; 3119 job->work.task = &job->task; 3120 job->work.obj = job; 3121 job->tmcf = tmcf; 3122 3123 tmcf->count++; 3124 3125 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 3126 sizeof(nxt_socket_conf_joint_t)); 3127 if (nxt_slow_path(joint == NULL)) { 3128 return NXT_ERROR; 3129 } 3130 3131 job->work.data = joint; 3132 3133 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 3134 if (nxt_slow_path(ret != NXT_OK)) { 3135 return ret; 3136 } 3137 3138 joint->count = 1; 3139 3140 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3141 skcf->count++; 3142 joint->socket_conf = skcf; 3143 3144 joint->engine = recf->engine; 3145 } 3146 3147 return NXT_OK; 3148 } 3149 3150 3151 static nxt_int_t 3152 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 3153 nxt_router_engine_conf_t *recf) 3154 { 3155 nxt_joint_job_t *job; 3156 3157 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3158 if (nxt_slow_path(job == NULL)) { 3159 return NXT_ERROR; 3160 } 3161 3162 job->work.next = recf->jobs; 3163 recf->jobs = &job->work; 3164 3165 job->task = tmcf->engine->task; 3166 job->work.handler = nxt_router_worker_thread_quit; 3167 job->work.task = &job->task; 3168 job->work.obj = NULL; 3169 job->work.data = NULL; 3170 job->tmcf = NULL; 3171 3172 return NXT_OK; 3173 } 3174 3175 3176 static nxt_int_t 3177 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 3178 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 3179 { 3180 nxt_joint_job_t *job; 3181 nxt_queue_link_t *qlk; 3182 3183 for (qlk = nxt_queue_first(sockets); 3184 qlk != nxt_queue_tail(sockets); 3185 qlk = nxt_queue_next(qlk)) 3186 { 3187 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3188 if (nxt_slow_path(job == NULL)) { 3189 return NXT_ERROR; 3190 } 3191 3192 job->work.next = recf->jobs; 3193 recf->jobs = &job->work; 3194 3195 job->task = tmcf->engine->task; 3196 job->work.handler = nxt_router_listen_socket_delete; 3197 job->work.task = &job->task; 3198 job->work.obj = job; 3199 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3200 job->tmcf = tmcf; 3201 3202 tmcf->count++; 3203 } 3204 3205 return NXT_OK; 3206 } 3207 3208 3209 static nxt_int_t 3210 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 3211 nxt_router_temp_conf_t *tmcf) 3212 { 3213 nxt_int_t ret; 3214 nxt_uint_t i, threads; 3215 nxt_router_engine_conf_t *recf; 3216 3217 recf = tmcf->engines->elts; 3218 threads = tmcf->router_conf->threads; 3219 3220 for (i = tmcf->new_threads; i < threads; i++) { 3221 ret = nxt_router_thread_create(task, rt, recf[i].engine); 3222 if (nxt_slow_path(ret != NXT_OK)) { 3223 return ret; 3224 } 3225 } 3226 3227 return NXT_OK; 3228 } 3229 3230 3231 static nxt_int_t 3232 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 3233 nxt_event_engine_t *engine) 3234 { 3235 nxt_int_t ret; 3236 nxt_thread_link_t *link; 3237 nxt_thread_handle_t handle; 3238 3239 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 3240 3241 if (nxt_slow_path(link == NULL)) { 3242 return NXT_ERROR; 3243 } 3244 3245 link->start = nxt_router_thread_start; 3246 link->engine = engine; 3247 link->work.handler = nxt_router_thread_exit_handler; 3248 link->work.task = task; 3249 link->work.data = link; 3250 3251 nxt_queue_insert_tail(&rt->engines, &engine->link); 3252 3253 ret = nxt_thread_create(&handle, link); 3254 3255 if (nxt_slow_path(ret != NXT_OK)) { 3256 nxt_queue_remove(&engine->link); 3257 } 3258 3259 return ret; 3260 } 3261 3262 3263 static void 3264 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 3265 nxt_router_temp_conf_t *tmcf) 3266 { 3267 nxt_app_t *app; 3268 3269 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 3270 3271 nxt_router_app_unlink(task, app); 3272 3273 } nxt_queue_loop; 3274 3275 nxt_queue_add(&router->apps, &tmcf->previous); 3276 nxt_queue_add(&router->apps, &tmcf->apps); 3277 } 3278 3279 3280 static void 3281 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 3282 { 3283 nxt_uint_t n; 3284 nxt_event_engine_t *engine; 3285 nxt_router_engine_conf_t *recf; 3286 3287 recf = tmcf->engines->elts; 3288 3289 for (n = tmcf->engines->nelts; n != 0; n--) { 3290 engine = recf->engine; 3291 3292 switch (recf->action) { 3293 3294 case NXT_ROUTER_ENGINE_KEEP: 3295 break; 3296 3297 case NXT_ROUTER_ENGINE_ADD: 3298 nxt_queue_insert_tail(&router->engines, &engine->link0); 3299 break; 3300 3301 case NXT_ROUTER_ENGINE_DELETE: 3302 nxt_queue_remove(&engine->link0); 3303 break; 3304 } 3305 3306 nxt_router_engine_post(engine, recf->jobs); 3307 3308 recf++; 3309 } 3310 } 3311 3312 3313 static void 3314 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 3315 { 3316 nxt_work_t *work, *next; 3317 3318 for (work = jobs; work != NULL; work = next) { 3319 next = work->next; 3320 work->next = NULL; 3321 3322 nxt_event_engine_post(engine, work); 3323 } 3324 } 3325 3326 3327 static nxt_port_handlers_t nxt_router_app_port_handlers = { 3328 .rpc_error = nxt_port_rpc_handler, 3329 .mmap = nxt_port_mmap_handler, 3330 .data = nxt_port_rpc_handler, 3331 .oosm = nxt_router_oosm_handler, 3332 .req_headers_ack = nxt_port_rpc_handler, 3333 }; 3334 3335 3336 static void 3337 nxt_router_thread_start(void *data) 3338 { 3339 nxt_int_t ret; 3340 nxt_port_t *port; 3341 nxt_task_t *task; 3342 nxt_work_t *work; 3343 nxt_thread_t *thread; 3344 nxt_thread_link_t *link; 3345 nxt_event_engine_t *engine; 3346 3347 link = data; 3348 engine = link->engine; 3349 task = &engine->task; 3350 3351 thread = nxt_thread(); 3352 3353 nxt_event_engine_thread_adopt(engine); 3354 3355 /* STUB */ 3356 thread->runtime = engine->task.thread->runtime; 3357 3358 engine->task.thread = thread; 3359 engine->task.log = thread->log; 3360 thread->engine = engine; 3361 thread->task = &engine->task; 3362 #if 0 3363 thread->fiber = &engine->fibers->fiber; 3364 #endif 3365 3366 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 3367 if (nxt_slow_path(engine->mem_pool == NULL)) { 3368 return; 3369 } 3370 3371 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 3372 NXT_PROCESS_ROUTER); 3373 if (nxt_slow_path(port == NULL)) { 3374 return; 3375 } 3376 3377 ret = nxt_port_socket_init(task, port, 0); 3378 if (nxt_slow_path(ret != NXT_OK)) { 3379 nxt_port_use(task, port, -1); 3380 return; 3381 } 3382 3383 ret = nxt_router_port_queue_init(task, port); 3384 if (nxt_slow_path(ret != NXT_OK)) { 3385 nxt_port_use(task, port, -1); 3386 return; 3387 } 3388 3389 engine->port = port; 3390 3391 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 3392 3393 work = nxt_zalloc(sizeof(nxt_work_t)); 3394 if (nxt_slow_path(work == NULL)) { 3395 return; 3396 } 3397 3398 work->handler = nxt_router_rt_add_port; 3399 work->task = link->work.task; 3400 work->obj = work; 3401 work->data = port; 3402 3403 nxt_event_engine_post(link->work.task->thread->engine, work); 3404 3405 nxt_event_engine_start(engine); 3406 } 3407 3408 3409 static void 3410 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) 3411 { 3412 nxt_int_t res; 3413 nxt_port_t *port; 3414 nxt_runtime_t *rt; 3415 3416 rt = task->thread->runtime; 3417 port = data; 3418 3419 nxt_free(obj); 3420 3421 res = nxt_port_hash_add(&rt->ports, port); 3422 3423 if (nxt_fast_path(res == NXT_OK)) { 3424 nxt_port_use(task, port, 1); 3425 } 3426 } 3427 3428 3429 static void 3430 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 3431 { 3432 nxt_joint_job_t *job; 3433 nxt_socket_conf_t *skcf; 3434 nxt_listen_event_t *lev; 3435 nxt_listen_socket_t *ls; 3436 nxt_thread_spinlock_t *lock; 3437 nxt_socket_conf_joint_t *joint; 3438 3439 job = obj; 3440 joint = data; 3441 3442 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 3443 3444 skcf = joint->socket_conf; 3445 ls = skcf->listen; 3446 3447 lev = nxt_listen_event(task, ls); 3448 if (nxt_slow_path(lev == NULL)) { 3449 nxt_router_listen_socket_release(task, skcf); 3450 return; 3451 } 3452 3453 lev->socket.data = joint; 3454 3455 lock = &skcf->router_conf->router->lock; 3456 3457 nxt_thread_spin_lock(lock); 3458 ls->count++; 3459 nxt_thread_spin_unlock(lock); 3460 3461 job->work.next = NULL; 3462 job->work.handler = nxt_router_conf_wait; 3463 3464 nxt_event_engine_post(job->tmcf->engine, &job->work); 3465 } 3466 3467 3468 nxt_inline nxt_listen_event_t * 3469 nxt_router_listen_event(nxt_queue_t *listen_connections, 3470 nxt_socket_conf_t *skcf) 3471 { 3472 nxt_socket_t fd; 3473 nxt_queue_link_t *qlk; 3474 nxt_listen_event_t *lev; 3475 3476 fd = skcf->listen->socket; 3477 3478 for (qlk = nxt_queue_first(listen_connections); 3479 qlk != nxt_queue_tail(listen_connections); 3480 qlk = nxt_queue_next(qlk)) 3481 { 3482 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 3483 3484 if (fd == lev->socket.fd) { 3485 return lev; 3486 } 3487 } 3488 3489 return NULL; 3490 } 3491 3492 3493 static void 3494 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 3495 { 3496 nxt_joint_job_t *job; 3497 nxt_event_engine_t *engine; 3498 nxt_listen_event_t *lev; 3499 nxt_socket_conf_joint_t *joint, *old; 3500 3501 job = obj; 3502 joint = data; 3503 3504 engine = task->thread->engine; 3505 3506 nxt_queue_insert_tail(&engine->joints, &joint->link); 3507 3508 lev = nxt_router_listen_event(&engine->listen_connections, 3509 joint->socket_conf); 3510 3511 old = lev->socket.data; 3512 lev->socket.data = joint; 3513 lev->listen = joint->socket_conf->listen; 3514 3515 job->work.next = NULL; 3516 job->work.handler = nxt_router_conf_wait; 3517 3518 nxt_event_engine_post(job->tmcf->engine, &job->work); 3519 3520 /* 3521 * The task is allocated from configuration temporary 3522 * memory pool so it can be freed after engine post operation. 3523 */ 3524 3525 nxt_router_conf_release(&engine->task, old); 3526 } 3527 3528 3529 static void 3530 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 3531 { 3532 nxt_socket_conf_t *skcf; 3533 nxt_listen_event_t *lev; 3534 nxt_event_engine_t *engine; 3535 nxt_socket_conf_joint_t *joint; 3536 3537 skcf = data; 3538 3539 engine = task->thread->engine; 3540 3541 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 3542 3543 nxt_fd_event_delete(engine, &lev->socket); 3544 3545 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 3546 lev->socket.fd); 3547 3548 joint = lev->socket.data; 3549 joint->close_job = obj; 3550 3551 lev->timer.handler = nxt_router_listen_socket_close; 3552 lev->timer.work_queue = &engine->fast_work_queue; 3553 3554 nxt_timer_add(engine, &lev->timer, 0); 3555 } 3556 3557 3558 static void 3559 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 3560 { 3561 nxt_event_engine_t *engine; 3562 3563 nxt_debug(task, "router worker thread quit"); 3564 3565 engine = task->thread->engine; 3566 3567 engine->shutdown = 1; 3568 3569 if (nxt_queue_is_empty(&engine->joints)) { 3570 nxt_thread_exit(task->thread); 3571 } 3572 } 3573 3574 3575 static void 3576 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3577 { 3578 nxt_timer_t *timer; 3579 nxt_joint_job_t *job; 3580 nxt_listen_event_t *lev; 3581 nxt_socket_conf_joint_t *joint; 3582 3583 timer = obj; 3584 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3585 3586 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3587 lev->socket.fd); 3588 3589 nxt_queue_remove(&lev->link); 3590 3591 joint = lev->socket.data; 3592 lev->socket.data = NULL; 3593 3594 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3595 task = &task->thread->engine->task; 3596 3597 nxt_router_listen_socket_release(task, joint->socket_conf); 3598 3599 job = joint->close_job; 3600 job->work.next = NULL; 3601 job->work.handler = nxt_router_conf_wait; 3602 3603 nxt_event_engine_post(job->tmcf->engine, &job->work); 3604 3605 nxt_router_listen_event_release(task, lev, joint); 3606 } 3607 3608 3609 static void 3610 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3611 { 3612 nxt_listen_socket_t *ls; 3613 nxt_thread_spinlock_t *lock; 3614 3615 ls = skcf->listen; 3616 lock = &skcf->router_conf->router->lock; 3617 3618 nxt_thread_spin_lock(lock); 3619 3620 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3621 task->thread->engine, ls->count); 3622 3623 if (--ls->count != 0) { 3624 ls = NULL; 3625 } 3626 3627 nxt_thread_spin_unlock(lock); 3628 3629 if (ls != NULL) { 3630 nxt_socket_close(task, ls->socket); 3631 nxt_free(ls); 3632 } 3633 } 3634 3635 3636 void 3637 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3638 nxt_socket_conf_joint_t *joint) 3639 { 3640 nxt_event_engine_t *engine; 3641 3642 nxt_debug(task, "listen event count: %D", lev->count); 3643 3644 engine = task->thread->engine; 3645 3646 if (--lev->count == 0) { 3647 if (lev->next != NULL) { 3648 nxt_sockaddr_cache_free(engine, lev->next); 3649 3650 nxt_conn_free(task, lev->next); 3651 } 3652 3653 nxt_free(lev); 3654 } 3655 3656 if (joint != NULL) { 3657 nxt_router_conf_release(task, joint); 3658 } 3659 3660 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3661 nxt_thread_exit(task->thread); 3662 } 3663 } 3664 3665 3666 void 3667 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3668 { 3669 nxt_socket_conf_t *skcf; 3670 nxt_router_conf_t *rtcf; 3671 nxt_thread_spinlock_t *lock; 3672 3673 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3674 3675 if (--joint->count != 0) { 3676 return; 3677 } 3678 3679 nxt_queue_remove(&joint->link); 3680 3681 /* 3682 * The joint content can not be safely used after the critical 3683 * section protected by the spinlock because its memory pool may 3684 * be already destroyed by another thread. 3685 */ 3686 skcf = joint->socket_conf; 3687 rtcf = skcf->router_conf; 3688 lock = &rtcf->router->lock; 3689 3690 nxt_thread_spin_lock(lock); 3691 3692 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3693 rtcf, rtcf->count); 3694 3695 if (--skcf->count != 0) { 3696 skcf = NULL; 3697 rtcf = NULL; 3698 3699 } else { 3700 nxt_queue_remove(&skcf->link); 3701 3702 if (--rtcf->count != 0) { 3703 rtcf = NULL; 3704 } 3705 } 3706 3707 nxt_thread_spin_unlock(lock); 3708 3709 #if (NXT_TLS) 3710 if (skcf != NULL && skcf->tls != NULL) { 3711 task->thread->runtime->tls->server_free(task, skcf->tls); 3712 } 3713 #endif 3714 3715 /* TODO remove engine->port */ 3716 3717 if (rtcf != NULL) { 3718 nxt_debug(task, "old router conf is destroyed"); 3719 3720 nxt_router_apps_hash_use(task, rtcf, -1); 3721 3722 nxt_router_access_log_release(task, lock, rtcf->access_log); 3723 3724 nxt_mp_thread_adopt(rtcf->mem_pool); 3725 3726 nxt_mp_destroy(rtcf->mem_pool); 3727 } 3728 } 3729 3730 3731 static void 3732 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3733 nxt_router_access_log_t *access_log) 3734 { 3735 size_t size; 3736 u_char *buf, *p; 3737 nxt_off_t bytes; 3738 3739 static nxt_time_string_t date_cache = { 3740 (nxt_atomic_uint_t) -1, 3741 nxt_router_access_log_date, 3742 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3743 nxt_length("31/Dec/1986:19:40:00 +0300"), 3744 NXT_THREAD_TIME_LOCAL, 3745 NXT_THREAD_TIME_SEC, 3746 }; 3747 3748 size = r->remote->address_length 3749 + 6 /* ' - - [' */ 3750 + date_cache.size 3751 + 3 /* '] "' */ 3752 + r->method->length 3753 + 1 /* space */ 3754 + r->target.length 3755 + 1 /* space */ 3756 + r->version.length 3757 + 2 /* '" ' */ 3758 + 3 /* status */ 3759 + 1 /* space */ 3760 + NXT_OFF_T_LEN 3761 + 2 /* ' "' */ 3762 + (r->referer != NULL ? r->referer->value_length : 1) 3763 + 3 /* '" "' */ 3764 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3765 + 2 /* '"\n' */ 3766 ; 3767 3768 buf = nxt_mp_nget(r->mem_pool, size); 3769 if (nxt_slow_path(buf == NULL)) { 3770 return; 3771 } 3772 3773 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3774 r->remote->address_length); 3775 3776 p = nxt_cpymem(p, " - - [", 6); 3777 3778 p = nxt_thread_time_string(task->thread, &date_cache, p); 3779 3780 p = nxt_cpymem(p, "] \"", 3); 3781 3782 if (r->method->length != 0) { 3783 p = nxt_cpymem(p, r->method->start, r->method->length); 3784 3785 if (r->target.length != 0) { 3786 *p++ = ' '; 3787 p = nxt_cpymem(p, r->target.start, r->target.length); 3788 3789 if (r->version.length != 0) { 3790 *p++ = ' '; 3791 p = nxt_cpymem(p, r->version.start, r->version.length); 3792 } 3793 } 3794 3795 } else { 3796 *p++ = '-'; 3797 } 3798 3799 p = nxt_cpymem(p, "\" ", 2); 3800 3801 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3802 3803 *p++ = ' '; 3804 3805 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3806 3807 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3808 3809 p = nxt_cpymem(p, " \"", 2); 3810 3811 if (r->referer != NULL) { 3812 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3813 3814 } else { 3815 *p++ = '-'; 3816 } 3817 3818 p = nxt_cpymem(p, "\" \"", 3); 3819 3820 if (r->user_agent != NULL) { 3821 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3822 3823 } else { 3824 *p++ = '-'; 3825 } 3826 3827 p = nxt_cpymem(p, "\"\n", 2); 3828 3829 nxt_fd_write(access_log->fd, buf, p - buf); 3830 } 3831 3832 3833 static u_char * 3834 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3835 size_t size, const char *format) 3836 { 3837 u_char sign; 3838 time_t gmtoff; 3839 3840 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3841 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3842 3843 gmtoff = nxt_timezone(tm) / 60; 3844 3845 if (gmtoff < 0) { 3846 gmtoff = -gmtoff; 3847 sign = '-'; 3848 3849 } else { 3850 sign = '+'; 3851 } 3852 3853 return nxt_sprintf(buf, buf + size, format, 3854 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3855 tm->tm_hour, tm->tm_min, tm->tm_sec, 3856 sign, gmtoff / 60, gmtoff % 60); 3857 } 3858 3859 3860 static void 3861 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3862 { 3863 uint32_t stream; 3864 nxt_int_t ret; 3865 nxt_buf_t *b; 3866 nxt_port_t *main_port, *router_port; 3867 nxt_runtime_t *rt; 3868 nxt_router_access_log_t *access_log; 3869 3870 access_log = tmcf->router_conf->access_log; 3871 3872 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3873 if (nxt_slow_path(b == NULL)) { 3874 goto fail; 3875 } 3876 3877 b->completion_handler = nxt_buf_dummy_completion; 3878 3879 nxt_buf_cpystr(b, &access_log->path); 3880 *b->mem.free++ = '\0'; 3881 3882 rt = task->thread->runtime; 3883 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3884 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3885 3886 stream = nxt_port_rpc_register_handler(task, router_port, 3887 nxt_router_access_log_ready, 3888 nxt_router_access_log_error, 3889 -1, tmcf); 3890 if (nxt_slow_path(stream == 0)) { 3891 goto fail; 3892 } 3893 3894 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3895 stream, router_port->id, b); 3896 3897 if (nxt_slow_path(ret != NXT_OK)) { 3898 nxt_port_rpc_cancel(task, router_port, stream); 3899 goto fail; 3900 } 3901 3902 return; 3903 3904 fail: 3905 3906 nxt_router_conf_error(task, tmcf); 3907 } 3908 3909 3910 static void 3911 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3912 void *data) 3913 { 3914 nxt_router_temp_conf_t *tmcf; 3915 nxt_router_access_log_t *access_log; 3916 3917 tmcf = data; 3918 3919 access_log = tmcf->router_conf->access_log; 3920 3921 access_log->fd = msg->fd[0]; 3922 3923 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3924 nxt_router_conf_apply, task, tmcf, NULL); 3925 } 3926 3927 3928 static void 3929 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3930 void *data) 3931 { 3932 nxt_router_temp_conf_t *tmcf; 3933 3934 tmcf = data; 3935 3936 nxt_router_conf_error(task, tmcf); 3937 } 3938 3939 3940 static void 3941 nxt_router_access_log_use(nxt_thread_spinlock_t *lock, 3942 nxt_router_access_log_t *access_log) 3943 { 3944 if (access_log == NULL) { 3945 return; 3946 } 3947 3948 nxt_thread_spin_lock(lock); 3949 3950 access_log->count++; 3951 3952 nxt_thread_spin_unlock(lock); 3953 } 3954 3955 3956 static void 3957 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3958 nxt_router_access_log_t *access_log) 3959 { 3960 if (access_log == NULL) { 3961 return; 3962 } 3963 3964 nxt_thread_spin_lock(lock); 3965 3966 if (--access_log->count != 0) { 3967 access_log = NULL; 3968 } 3969 3970 nxt_thread_spin_unlock(lock); 3971 3972 if (access_log != NULL) { 3973 3974 if (access_log->fd != -1) { 3975 nxt_fd_close(access_log->fd); 3976 } 3977 3978 nxt_free(access_log); 3979 } 3980 } 3981 3982 3983 typedef struct { 3984 nxt_mp_t *mem_pool; 3985 nxt_router_access_log_t *access_log; 3986 } nxt_router_access_log_reopen_t; 3987 3988 3989 static void 3990 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3991 { 3992 nxt_mp_t *mp; 3993 uint32_t stream; 3994 nxt_int_t ret; 3995 nxt_buf_t *b; 3996 nxt_port_t *main_port, *router_port; 3997 nxt_runtime_t *rt; 3998 nxt_router_access_log_t *access_log; 3999 nxt_router_access_log_reopen_t *reopen; 4000 4001 access_log = nxt_router->access_log; 4002 4003 if (access_log == NULL) { 4004 return; 4005 } 4006 4007 mp = nxt_mp_create(1024, 128, 256, 32); 4008 if (nxt_slow_path(mp == NULL)) { 4009 return; 4010 } 4011 4012 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 4013 if (nxt_slow_path(reopen == NULL)) { 4014 goto fail; 4015 } 4016 4017 reopen->mem_pool = mp; 4018 reopen->access_log = access_log; 4019 4020 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 4021 if (nxt_slow_path(b == NULL)) { 4022 goto fail; 4023 } 4024 4025 b->completion_handler = nxt_router_access_log_reopen_completion; 4026 4027 nxt_buf_cpystr(b, &access_log->path); 4028 *b->mem.free++ = '\0'; 4029 4030 rt = task->thread->runtime; 4031 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 4032 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 4033 4034 stream = nxt_port_rpc_register_handler(task, router_port, 4035 nxt_router_access_log_reopen_ready, 4036 nxt_router_access_log_reopen_error, 4037 -1, reopen); 4038 if (nxt_slow_path(stream == 0)) { 4039 goto fail; 4040 } 4041 4042 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 4043 stream, router_port->id, b); 4044 4045 if (nxt_slow_path(ret != NXT_OK)) { 4046 nxt_port_rpc_cancel(task, router_port, stream); 4047 goto fail; 4048 } 4049 4050 nxt_mp_retain(mp); 4051 4052 return; 4053 4054 fail: 4055 4056 nxt_mp_destroy(mp); 4057 } 4058 4059 4060 static void 4061 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 4062 { 4063 nxt_mp_t *mp; 4064 nxt_buf_t *b; 4065 4066 b = obj; 4067 mp = b->data; 4068 4069 nxt_mp_release(mp); 4070 } 4071 4072 4073 static void 4074 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4075 void *data) 4076 { 4077 nxt_router_access_log_t *access_log; 4078 nxt_router_access_log_reopen_t *reopen; 4079 4080 reopen = data; 4081 4082 access_log = reopen->access_log; 4083 4084 if (access_log == nxt_router->access_log) { 4085 4086 if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { 4087 nxt_alert(task, "dup2(%FD, %FD) failed %E", 4088 msg->fd[0], access_log->fd, nxt_errno); 4089 } 4090 } 4091 4092 nxt_fd_close(msg->fd[0]); 4093 nxt_mp_release(reopen->mem_pool); 4094 } 4095 4096 4097 static void 4098 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4099 void *data) 4100 { 4101 nxt_router_access_log_reopen_t *reopen; 4102 4103 reopen = data; 4104 4105 nxt_mp_release(reopen->mem_pool); 4106 } 4107 4108 4109 static void 4110 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 4111 { 4112 nxt_port_t *port; 4113 nxt_thread_link_t *link; 4114 nxt_event_engine_t *engine; 4115 nxt_thread_handle_t handle; 4116 4117 handle = (nxt_thread_handle_t) (uintptr_t) obj; 4118 link = data; 4119 4120 nxt_thread_wait(handle); 4121 4122 engine = link->engine; 4123 4124 nxt_queue_remove(&engine->link); 4125 4126 port = engine->port; 4127 4128 // TODO notify all apps 4129 4130 port->engine = task->thread->engine; 4131 nxt_mp_thread_adopt(port->mem_pool); 4132 nxt_port_use(task, port, -1); 4133 4134 nxt_mp_thread_adopt(engine->mem_pool); 4135 nxt_mp_destroy(engine->mem_pool); 4136 4137 nxt_event_engine_free(engine); 4138 4139 nxt_free(link); 4140 } 4141 4142 4143 static void 4144 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4145 void *data) 4146 { 4147 size_t b_size, count; 4148 nxt_int_t ret; 4149 nxt_app_t *app; 4150 nxt_buf_t *b, *next; 4151 nxt_port_t *app_port; 4152 nxt_unit_field_t *f; 4153 nxt_http_field_t *field; 4154 nxt_http_request_t *r; 4155 nxt_unit_response_t *resp; 4156 nxt_request_rpc_data_t *req_rpc_data; 4157 4158 req_rpc_data = data; 4159 4160 r = req_rpc_data->request; 4161 if (nxt_slow_path(r == NULL)) { 4162 return; 4163 } 4164 4165 if (r->error) { 4166 nxt_request_rpc_data_unlink(task, req_rpc_data); 4167 return; 4168 } 4169 4170 app = req_rpc_data->app; 4171 nxt_assert(app != NULL); 4172 4173 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { 4174 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); 4175 4176 return; 4177 } 4178 4179 b = (msg->size == 0) ? NULL : msg->buf; 4180 4181 if (msg->port_msg.last != 0) { 4182 nxt_debug(task, "router data create last buf"); 4183 4184 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 4185 4186 req_rpc_data->rpc_cancel = 0; 4187 4188 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { 4189 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; 4190 } 4191 4192 nxt_request_rpc_data_unlink(task, req_rpc_data); 4193 4194 } else { 4195 if (app->timeout != 0) { 4196 r->timer.handler = nxt_router_app_timeout; 4197 r->timer_data = req_rpc_data; 4198 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4199 } 4200 } 4201 4202 if (b == NULL) { 4203 return; 4204 } 4205 4206 if (msg->buf == b) { 4207 /* Disable instant buffer completion/re-using by port. */ 4208 msg->buf = NULL; 4209 } 4210 4211 if (r->header_sent) { 4212 nxt_buf_chain_add(&r->out, b); 4213 nxt_http_request_send_body(task, r, NULL); 4214 4215 } else { 4216 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0; 4217 4218 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) { 4219 nxt_alert(task, "response buffer too small: %z", b_size); 4220 goto fail; 4221 } 4222 4223 resp = (void *) b->mem.pos; 4224 count = (b_size - sizeof(nxt_unit_response_t)) 4225 / sizeof(nxt_unit_field_t); 4226 4227 if (nxt_slow_path(count < resp->fields_count)) { 4228 nxt_alert(task, "response buffer too small for fields count: %D", 4229 resp->fields_count); 4230 goto fail; 4231 } 4232 4233 field = NULL; 4234 4235 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 4236 if (f->skip) { 4237 continue; 4238 } 4239 4240 field = nxt_list_add(r->resp.fields); 4241 4242 if (nxt_slow_path(field == NULL)) { 4243 goto fail; 4244 } 4245 4246 field->hash = f->hash; 4247 field->skip = 0; 4248 field->hopbyhop = 0; 4249 4250 field->name_length = f->name_length; 4251 field->value_length = f->value_length; 4252 field->name = nxt_unit_sptr_get(&f->name); 4253 field->value = nxt_unit_sptr_get(&f->value); 4254 4255 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 4256 if (nxt_slow_path(ret != NXT_OK)) { 4257 goto fail; 4258 } 4259 4260 nxt_debug(task, "header%s: %*s: %*s", 4261 (field->skip ? " skipped" : ""), 4262 (size_t) field->name_length, field->name, 4263 (size_t) field->value_length, field->value); 4264 4265 if (field->skip) { 4266 r->resp.fields->last->nelts--; 4267 } 4268 } 4269 4270 r->status = resp->status; 4271 4272 if (resp->piggyback_content_length != 0) { 4273 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 4274 b->mem.free = b->mem.pos + resp->piggyback_content_length; 4275 4276 } else { 4277 b->mem.pos = b->mem.free; 4278 } 4279 4280 if (nxt_buf_mem_used_size(&b->mem) == 0) { 4281 next = b->next; 4282 b->next = NULL; 4283 4284 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 4285 b->completion_handler, task, b, b->parent); 4286 4287 b = next; 4288 } 4289 4290 if (b != NULL) { 4291 nxt_buf_chain_add(&r->out, b); 4292 } 4293 4294 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 4295 4296 if (r->websocket_handshake 4297 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 4298 { 4299 app_port = req_rpc_data->app_port; 4300 if (nxt_slow_path(app_port == NULL)) { 4301 goto fail; 4302 } 4303 4304 nxt_thread_mutex_lock(&app->mutex); 4305 4306 app_port->main_app_port->active_websockets++; 4307 4308 nxt_thread_mutex_unlock(&app->mutex); 4309 4310 nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE); 4311 req_rpc_data->apr_action = NXT_APR_CLOSE; 4312 4313 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 4314 4315 r->state = &nxt_http_websocket; 4316 4317 } else { 4318 r->state = &nxt_http_request_send_state; 4319 } 4320 } 4321 4322 return; 4323 4324 fail: 4325 4326 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4327 4328 nxt_request_rpc_data_unlink(task, req_rpc_data); 4329 } 4330 4331 4332 static void 4333 nxt_router_req_headers_ack_handler(nxt_task_t *task, 4334 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 4335 { 4336 int res; 4337 nxt_app_t *app; 4338 nxt_buf_t *b; 4339 nxt_bool_t start_process, unlinked; 4340 nxt_port_t *app_port, *main_app_port, *idle_port; 4341 nxt_queue_link_t *idle_lnk; 4342 nxt_http_request_t *r; 4343 4344 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 4345 req_rpc_data->stream, 4346 msg->port_msg.pid, msg->port_msg.reply_port); 4347 4348 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 4349 msg->port_msg.pid); 4350 4351 app = req_rpc_data->app; 4352 r = req_rpc_data->request; 4353 4354 start_process = 0; 4355 unlinked = 0; 4356 4357 nxt_thread_mutex_lock(&app->mutex); 4358 4359 if (r->app_link.next != NULL) { 4360 nxt_queue_remove(&r->app_link); 4361 r->app_link.next = NULL; 4362 4363 unlinked = 1; 4364 } 4365 4366 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 4367 msg->port_msg.reply_port); 4368 if (nxt_slow_path(app_port == NULL)) { 4369 nxt_thread_mutex_unlock(&app->mutex); 4370 4371 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4372 4373 if (unlinked) { 4374 nxt_mp_release(r->mem_pool); 4375 } 4376 4377 return; 4378 } 4379 4380 main_app_port = app_port->main_app_port; 4381 4382 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 4383 app->idle_processes--; 4384 4385 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", 4386 &app->name, main_app_port->pid, main_app_port->id, 4387 (main_app_port->idle_start ? "idle_ports" : "spare_ports")); 4388 4389 /* Check port was in 'spare_ports' using idle_start field. */ 4390 if (main_app_port->idle_start == 0 4391 && app->idle_processes >= app->spare_processes) 4392 { 4393 /* 4394 * If there is a vacant space in spare ports, 4395 * move the last idle to spare_ports. 4396 */ 4397 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4398 4399 idle_lnk = nxt_queue_last(&app->idle_ports); 4400 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4401 nxt_queue_remove(idle_lnk); 4402 4403 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4404 4405 idle_port->idle_start = 0; 4406 4407 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4408 "to spare_ports", 4409 &app->name, idle_port->pid, idle_port->id); 4410 } 4411 4412 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4413 app->pending_processes++; 4414 start_process = 1; 4415 } 4416 } 4417 4418 main_app_port->active_requests++; 4419 4420 nxt_port_inc_use(app_port); 4421 4422 nxt_thread_mutex_unlock(&app->mutex); 4423 4424 if (unlinked) { 4425 nxt_mp_release(r->mem_pool); 4426 } 4427 4428 if (start_process) { 4429 nxt_router_start_app_process(task, app); 4430 } 4431 4432 nxt_port_use(task, req_rpc_data->app_port, -1); 4433 4434 req_rpc_data->app_port = app_port; 4435 4436 b = req_rpc_data->msg_info.buf; 4437 4438 if (b != NULL) { 4439 /* First buffer is already sent. Start from second. */ 4440 b = b->next; 4441 4442 req_rpc_data->msg_info.buf->next = NULL; 4443 } 4444 4445 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) { 4446 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, 4447 req_rpc_data->msg_info.body_fd); 4448 4449 if (req_rpc_data->msg_info.body_fd != -1) { 4450 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 4451 } 4452 4453 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 4454 req_rpc_data->msg_info.body_fd, 4455 req_rpc_data->stream, 4456 task->thread->engine->port->id, b); 4457 4458 if (nxt_slow_path(res != NXT_OK)) { 4459 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4460 } 4461 } 4462 4463 if (app->timeout != 0) { 4464 r->timer.handler = nxt_router_app_timeout; 4465 r->timer_data = req_rpc_data; 4466 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4467 } 4468 } 4469 4470 4471 static const nxt_http_request_state_t nxt_http_request_send_state 4472 nxt_aligned(64) = 4473 { 4474 .error_handler = nxt_http_request_error_handler, 4475 }; 4476 4477 4478 static void 4479 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 4480 { 4481 nxt_buf_t *out; 4482 nxt_http_request_t *r; 4483 4484 r = obj; 4485 4486 out = r->out; 4487 4488 if (out != NULL) { 4489 r->out = NULL; 4490 nxt_http_request_send(task, r, out); 4491 } 4492 } 4493 4494 4495 static void 4496 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4497 void *data) 4498 { 4499 nxt_request_rpc_data_t *req_rpc_data; 4500 4501 req_rpc_data = data; 4502 4503 req_rpc_data->rpc_cancel = 0; 4504 4505 /* TODO cancel message and return if cancelled. */ 4506 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 4507 4508 if (req_rpc_data->request != NULL) { 4509 nxt_http_request_error(task, req_rpc_data->request, 4510 NXT_HTTP_SERVICE_UNAVAILABLE); 4511 } 4512 4513 nxt_request_rpc_data_unlink(task, req_rpc_data); 4514 } 4515 4516 4517 static void 4518 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4519 void *data) 4520 { 4521 uint32_t n; 4522 nxt_app_t *app; 4523 nxt_bool_t start_process, restarted; 4524 nxt_port_t *port; 4525 nxt_app_joint_t *app_joint; 4526 nxt_app_joint_rpc_t *app_joint_rpc; 4527 4528 nxt_assert(data != NULL); 4529 4530 app_joint_rpc = data; 4531 app_joint = app_joint_rpc->app_joint; 4532 port = msg->u.new_port; 4533 4534 nxt_assert(app_joint != NULL); 4535 nxt_assert(port != NULL); 4536 nxt_assert(port->id == 0); 4537 4538 app = app_joint->app; 4539 4540 nxt_router_app_joint_use(task, app_joint, -1); 4541 4542 if (nxt_slow_path(app == NULL)) { 4543 nxt_debug(task, "new port ready for released app, send QUIT"); 4544 4545 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4546 4547 return; 4548 } 4549 4550 nxt_thread_mutex_lock(&app->mutex); 4551 4552 restarted = (app->generation != app_joint_rpc->generation); 4553 4554 if (app_joint_rpc->proto) { 4555 nxt_assert(app->proto_port == NULL); 4556 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); 4557 4558 n = app->proto_port_requests; 4559 app->proto_port_requests = 0; 4560 4561 if (nxt_slow_path(restarted)) { 4562 nxt_thread_mutex_unlock(&app->mutex); 4563 4564 nxt_debug(task, "proto port ready for restarted app, send QUIT"); 4565 4566 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, 4567 NULL); 4568 4569 } else { 4570 port->app = app; 4571 app->proto_port = port; 4572 4573 nxt_thread_mutex_unlock(&app->mutex); 4574 4575 nxt_port_use(task, port, 1); 4576 } 4577 4578 port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER]; 4579 4580 while (n > 0) { 4581 nxt_router_app_use(task, app, 1); 4582 4583 nxt_router_start_app_process_handler(task, port, app); 4584 4585 n--; 4586 } 4587 4588 return; 4589 } 4590 4591 nxt_assert(port->type == NXT_PROCESS_APP); 4592 nxt_assert(app->pending_processes != 0); 4593 4594 app->pending_processes--; 4595 4596 if (nxt_slow_path(restarted)) { 4597 nxt_debug(task, "new port ready for restarted app, send QUIT"); 4598 4599 start_process = !task->thread->engine->shutdown 4600 && nxt_router_app_can_start(app) 4601 && nxt_router_app_need_start(app); 4602 4603 if (start_process) { 4604 app->pending_processes++; 4605 } 4606 4607 nxt_thread_mutex_unlock(&app->mutex); 4608 4609 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4610 4611 if (start_process) { 4612 nxt_router_start_app_process(task, app); 4613 } 4614 4615 return; 4616 } 4617 4618 port->app = app; 4619 port->main_app_port = port; 4620 4621 app->processes++; 4622 nxt_port_hash_add(&app->port_hash, port); 4623 app->port_hash_count++; 4624 4625 nxt_thread_mutex_unlock(&app->mutex); 4626 4627 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 4628 &app->name, port->pid, app->processes, app->pending_processes); 4629 4630 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 4631 4632 nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); 4633 } 4634 4635 4636 static void 4637 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4638 void *data) 4639 { 4640 nxt_app_t *app; 4641 nxt_app_joint_t *app_joint; 4642 nxt_queue_link_t *link; 4643 nxt_http_request_t *r; 4644 nxt_app_joint_rpc_t *app_joint_rpc; 4645 4646 nxt_assert(data != NULL); 4647 4648 app_joint_rpc = data; 4649 app_joint = app_joint_rpc->app_joint; 4650 4651 nxt_assert(app_joint != NULL); 4652 4653 app = app_joint->app; 4654 4655 nxt_router_app_joint_use(task, app_joint, -1); 4656 4657 if (nxt_slow_path(app == NULL)) { 4658 nxt_debug(task, "start error for released app"); 4659 4660 return; 4661 } 4662 4663 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4664 4665 link = NULL; 4666 4667 nxt_thread_mutex_lock(&app->mutex); 4668 4669 nxt_assert(app->pending_processes != 0); 4670 4671 app->pending_processes--; 4672 4673 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4674 link = nxt_queue_first(&app->ack_waiting_req); 4675 4676 nxt_queue_remove(link); 4677 link->next = NULL; 4678 } 4679 4680 nxt_thread_mutex_unlock(&app->mutex); 4681 4682 while (link != NULL) { 4683 r = nxt_container_of(link, nxt_http_request_t, app_link); 4684 4685 nxt_event_engine_post(r->engine, &r->err_work); 4686 4687 link = NULL; 4688 4689 nxt_thread_mutex_lock(&app->mutex); 4690 4691 if (app->processes == 0 && app->pending_processes == 0 4692 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4693 { 4694 link = nxt_queue_first(&app->ack_waiting_req); 4695 4696 nxt_queue_remove(link); 4697 link->next = NULL; 4698 } 4699 4700 nxt_thread_mutex_unlock(&app->mutex); 4701 } 4702 } 4703 4704 4705 nxt_inline nxt_port_t * 4706 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) 4707 { 4708 nxt_port_t *port; 4709 4710 port = NULL; 4711 4712 nxt_thread_mutex_lock(&app->mutex); 4713 4714 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 4715 4716 /* Caller is responsible to decrease port use count. */ 4717 nxt_queue_chk_remove(&port->app_link); 4718 4719 if (nxt_queue_chk_remove(&port->idle_link)) { 4720 app->idle_processes--; 4721 4722 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", 4723 &app->name, port->pid, port->id, 4724 (port->idle_start ? "idle_ports" : "spare_ports")); 4725 } 4726 4727 nxt_port_hash_remove(&app->port_hash, port); 4728 app->port_hash_count--; 4729 4730 port->app = NULL; 4731 app->processes--; 4732 4733 break; 4734 4735 } nxt_queue_loop; 4736 4737 nxt_thread_mutex_unlock(&app->mutex); 4738 4739 return port; 4740 } 4741 4742 4743 static void 4744 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4745 { 4746 int c; 4747 4748 c = nxt_atomic_fetch_add(&app->use_count, i); 4749 4750 if (i < 0 && c == -i) { 4751 4752 if (task->thread->engine != app->engine) { 4753 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 4754 4755 } else { 4756 nxt_router_free_app(task, app->joint, NULL); 4757 } 4758 } 4759 } 4760 4761 4762 static void 4763 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 4764 { 4765 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 4766 4767 nxt_queue_remove(&app->link); 4768 4769 nxt_router_app_use(task, app, -1); 4770 } 4771 4772 4773 static void 4774 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port, 4775 nxt_apr_action_t action) 4776 { 4777 int inc_use; 4778 uint32_t got_response, dec_requests; 4779 nxt_bool_t adjust_idle_timer; 4780 nxt_port_t *main_app_port; 4781 4782 nxt_assert(port != NULL); 4783 4784 inc_use = 0; 4785 got_response = 0; 4786 dec_requests = 0; 4787 4788 switch (action) { 4789 case NXT_APR_NEW_PORT: 4790 break; 4791 case NXT_APR_REQUEST_FAILED: 4792 dec_requests = 1; 4793 inc_use = -1; 4794 break; 4795 case NXT_APR_GOT_RESPONSE: 4796 got_response = 1; 4797 inc_use = -1; 4798 break; 4799 case NXT_APR_UPGRADE: 4800 got_response = 1; 4801 break; 4802 case NXT_APR_CLOSE: 4803 inc_use = -1; 4804 break; 4805 } 4806 4807 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, 4808 port->pid, port->id, 4809 (int) inc_use, (int) got_response); 4810 4811 if (port->id == NXT_SHARED_PORT_ID) { 4812 nxt_thread_mutex_lock(&app->mutex); 4813 4814 app->active_requests -= got_response + dec_requests; 4815 4816 nxt_thread_mutex_unlock(&app->mutex); 4817 4818 goto adjust_use; 4819 } 4820 4821 main_app_port = port->main_app_port; 4822 4823 nxt_thread_mutex_lock(&app->mutex); 4824 4825 main_app_port->active_requests -= got_response + dec_requests; 4826 app->active_requests -= got_response + dec_requests; 4827 4828 if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) { 4829 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4830 4831 nxt_port_inc_use(main_app_port); 4832 } 4833 4834 adjust_idle_timer = 0; 4835 4836 if (main_app_port->pair[1] != -1 4837 && main_app_port->active_requests == 0 4838 && main_app_port->active_websockets == 0 4839 && main_app_port->idle_link.next == NULL) 4840 { 4841 if (app->idle_processes == app->spare_processes 4842 && app->adjust_idle_work.data == NULL) 4843 { 4844 adjust_idle_timer = 1; 4845 app->adjust_idle_work.data = app; 4846 app->adjust_idle_work.next = NULL; 4847 } 4848 4849 if (app->idle_processes < app->spare_processes) { 4850 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4851 4852 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", 4853 &app->name, main_app_port->pid, main_app_port->id); 4854 } else { 4855 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4856 4857 main_app_port->idle_start = task->thread->engine->timers.now; 4858 4859 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", 4860 &app->name, main_app_port->pid, main_app_port->id); 4861 } 4862 4863 app->idle_processes++; 4864 } 4865 4866 nxt_thread_mutex_unlock(&app->mutex); 4867 4868 if (adjust_idle_timer) { 4869 nxt_router_app_use(task, app, 1); 4870 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4871 } 4872 4873 /* ? */ 4874 if (main_app_port->pair[1] == -1) { 4875 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4876 &app->name, app, main_app_port, main_app_port->pid); 4877 4878 goto adjust_use; 4879 } 4880 4881 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4882 &app->name, app); 4883 4884 adjust_use: 4885 4886 nxt_port_use(task, port, inc_use); 4887 } 4888 4889 4890 void 4891 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4892 { 4893 nxt_app_t *app; 4894 nxt_bool_t unchain, start_process; 4895 nxt_port_t *idle_port; 4896 nxt_queue_link_t *idle_lnk; 4897 4898 app = port->app; 4899 4900 nxt_assert(app != NULL); 4901 4902 nxt_thread_mutex_lock(&app->mutex); 4903 4904 if (port == app->proto_port) { 4905 app->proto_port = NULL; 4906 port->app = NULL; 4907 4908 nxt_thread_mutex_unlock(&app->mutex); 4909 4910 nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name, 4911 port->pid); 4912 4913 nxt_port_use(task, port, -1); 4914 4915 return; 4916 } 4917 4918 nxt_port_hash_remove(&app->port_hash, port); 4919 app->port_hash_count--; 4920 4921 if (port->id != 0) { 4922 nxt_thread_mutex_unlock(&app->mutex); 4923 4924 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, 4925 port->pid, port->id); 4926 4927 return; 4928 } 4929 4930 unchain = nxt_queue_chk_remove(&port->app_link); 4931 4932 if (nxt_queue_chk_remove(&port->idle_link)) { 4933 app->idle_processes--; 4934 4935 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", 4936 &app->name, port->pid, port->id, 4937 (port->idle_start ? "idle_ports" : "spare_ports")); 4938 4939 if (port->idle_start == 0 4940 && app->idle_processes >= app->spare_processes) 4941 { 4942 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4943 4944 idle_lnk = nxt_queue_last(&app->idle_ports); 4945 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4946 nxt_queue_remove(idle_lnk); 4947 4948 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4949 4950 idle_port->idle_start = 0; 4951 4952 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4953 "to spare_ports", 4954 &app->name, idle_port->pid, idle_port->id); 4955 } 4956 } 4957 4958 app->processes--; 4959 4960 start_process = !task->thread->engine->shutdown 4961 && nxt_router_app_can_start(app) 4962 && nxt_router_app_need_start(app); 4963 4964 if (start_process) { 4965 app->pending_processes++; 4966 } 4967 4968 nxt_thread_mutex_unlock(&app->mutex); 4969 4970 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 4971 4972 if (unchain) { 4973 nxt_port_use(task, port, -1); 4974 } 4975 4976 if (start_process) { 4977 nxt_router_start_app_process(task, app); 4978 } 4979 } 4980 4981 4982 static void 4983 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 4984 { 4985 nxt_app_t *app; 4986 nxt_bool_t queued; 4987 nxt_port_t *port; 4988 nxt_msec_t timeout, threshold; 4989 nxt_queue_link_t *lnk; 4990 nxt_event_engine_t *engine; 4991 4992 app = obj; 4993 queued = (data == app); 4994 4995 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 4996 &app->name, queued); 4997 4998 engine = task->thread->engine; 4999 5000 nxt_assert(app->engine == engine); 5001 5002 threshold = engine->timers.now + app->joint->idle_timer.bias; 5003 timeout = 0; 5004 5005 nxt_thread_mutex_lock(&app->mutex); 5006 5007 if (queued) { 5008 app->adjust_idle_work.data = NULL; 5009 } 5010 5011 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", 5012 &app->name, 5013 (int) app->idle_processes, (int) app->spare_processes); 5014 5015 while (app->idle_processes > app->spare_processes) { 5016 5017 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 5018 5019 lnk = nxt_queue_first(&app->idle_ports); 5020 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 5021 5022 timeout = port->idle_start + app->idle_timeout; 5023 5024 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", 5025 &app->name, port->pid, 5026 port->idle_start, timeout, threshold); 5027 5028 if (timeout > threshold) { 5029 break; 5030 } 5031 5032 nxt_queue_remove(lnk); 5033 lnk->next = NULL; 5034 5035 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", 5036 &app->name, port->pid, port->id); 5037 5038 nxt_queue_chk_remove(&port->app_link); 5039 5040 nxt_port_hash_remove(&app->port_hash, port); 5041 app->port_hash_count--; 5042 5043 app->idle_processes--; 5044 app->processes--; 5045 port->app = NULL; 5046 5047 nxt_thread_mutex_unlock(&app->mutex); 5048 5049 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 5050 &app->name, port->pid); 5051 5052 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 5053 5054 nxt_port_use(task, port, -1); 5055 5056 nxt_thread_mutex_lock(&app->mutex); 5057 } 5058 5059 nxt_thread_mutex_unlock(&app->mutex); 5060 5061 if (timeout > threshold) { 5062 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 5063 5064 } else { 5065 nxt_timer_disable(engine, &app->joint->idle_timer); 5066 } 5067 5068 if (queued) { 5069 nxt_router_app_use(task, app, -1); 5070 } 5071 } 5072 5073 5074 static void 5075 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 5076 { 5077 nxt_timer_t *timer; 5078 nxt_app_joint_t *app_joint; 5079 5080 timer = obj; 5081 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 5082 5083 if (nxt_fast_path(app_joint->app != NULL)) { 5084 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 5085 } 5086 } 5087 5088 5089 static void 5090 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 5091 { 5092 nxt_timer_t *timer; 5093 nxt_app_joint_t *app_joint; 5094 5095 timer = obj; 5096 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 5097 5098 nxt_router_app_joint_use(task, app_joint, -1); 5099 } 5100 5101 5102 static void 5103 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 5104 { 5105 nxt_app_t *app; 5106 nxt_port_t *port, *proto_port; 5107 nxt_app_joint_t *app_joint; 5108 5109 app_joint = obj; 5110 app = app_joint->app; 5111 5112 for ( ;; ) { 5113 port = nxt_router_app_get_port_for_quit(task, app); 5114 if (port == NULL) { 5115 break; 5116 } 5117 5118 nxt_port_use(task, port, -1); 5119 } 5120 5121 nxt_thread_mutex_lock(&app->mutex); 5122 5123 for ( ;; ) { 5124 port = nxt_port_hash_retrieve(&app->port_hash); 5125 if (port == NULL) { 5126 break; 5127 } 5128 5129 app->port_hash_count--; 5130 5131 port->app = NULL; 5132 5133 nxt_port_close(task, port); 5134 5135 nxt_port_use(task, port, -1); 5136 } 5137 5138 proto_port = app->proto_port; 5139 5140 if (proto_port != NULL) { 5141 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, 5142 proto_port->pid); 5143 5144 app->proto_port = NULL; 5145 proto_port->app = NULL; 5146 } 5147 5148 nxt_thread_mutex_unlock(&app->mutex); 5149 5150 if (proto_port != NULL) { 5151 nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, 5152 -1, 0, 0, NULL); 5153 5154 nxt_port_close(task, proto_port); 5155 5156 nxt_port_use(task, proto_port, -1); 5157 } 5158 5159 nxt_assert(app->proto_port == NULL); 5160 nxt_assert(app->processes == 0); 5161 nxt_assert(app->active_requests == 0); 5162 nxt_assert(app->port_hash_count == 0); 5163 nxt_assert(app->idle_processes == 0); 5164 nxt_assert(nxt_queue_is_empty(&app->ports)); 5165 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 5166 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 5167 5168 nxt_port_mmaps_destroy(&app->outgoing, 1); 5169 5170 nxt_thread_mutex_destroy(&app->outgoing.mutex); 5171 5172 if (app->shared_port != NULL) { 5173 app->shared_port->app = NULL; 5174 nxt_port_close(task, app->shared_port); 5175 nxt_port_use(task, app->shared_port, -1); 5176 5177 app->shared_port = NULL; 5178 } 5179 5180 nxt_thread_mutex_destroy(&app->mutex); 5181 nxt_mp_destroy(app->mem_pool); 5182 5183 app_joint->app = NULL; 5184 5185 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 5186 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 5187 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 5188 5189 } else { 5190 nxt_router_app_joint_use(task, app_joint, -1); 5191 } 5192 } 5193 5194 5195 static void 5196 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 5197 nxt_request_rpc_data_t *req_rpc_data) 5198 { 5199 nxt_bool_t start_process; 5200 nxt_port_t *port; 5201 nxt_http_request_t *r; 5202 5203 start_process = 0; 5204 5205 nxt_thread_mutex_lock(&app->mutex); 5206 5207 port = app->shared_port; 5208 nxt_port_inc_use(port); 5209 5210 app->active_requests++; 5211 5212 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 5213 app->pending_processes++; 5214 start_process = 1; 5215 } 5216 5217 r = req_rpc_data->request; 5218 5219 /* 5220 * Put request into application-wide list to be able to cancel request 5221 * if something goes wrong with application processes. 5222 */ 5223 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 5224 5225 nxt_thread_mutex_unlock(&app->mutex); 5226 5227 /* 5228 * Retain request memory pool while request is linked in ack_waiting_req 5229 * to guarantee request structure memory is accessble. 5230 */ 5231 nxt_mp_retain(r->mem_pool); 5232 5233 req_rpc_data->app_port = port; 5234 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 5235 5236 if (start_process) { 5237 nxt_router_start_app_process(task, app); 5238 } 5239 } 5240 5241 5242 void 5243 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, 5244 nxt_http_action_t *action) 5245 { 5246 nxt_event_engine_t *engine; 5247 nxt_http_app_conf_t *conf; 5248 nxt_request_rpc_data_t *req_rpc_data; 5249 5250 conf = action->u.conf; 5251 engine = task->thread->engine; 5252 5253 r->app_target = conf->target; 5254 5255 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, 5256 nxt_router_response_ready_handler, 5257 nxt_router_response_error_handler, 5258 sizeof(nxt_request_rpc_data_t)); 5259 if (nxt_slow_path(req_rpc_data == NULL)) { 5260 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 5261 return; 5262 } 5263 5264 /* 5265 * At this point we have request req_rpc_data allocated and registered 5266 * in port handlers. Need to fixup request memory pool. Counterpart 5267 * release will be called via following call chain: 5268 * nxt_request_rpc_data_unlink() -> 5269 * nxt_router_http_request_release_post() -> 5270 * nxt_router_http_request_release() 5271 */ 5272 nxt_mp_retain(r->mem_pool); 5273 5274 r->timer.task = &engine->task; 5275 r->timer.work_queue = &engine->fast_work_queue; 5276 r->timer.log = engine->task.log; 5277 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 5278 5279 r->engine = engine; 5280 r->err_work.handler = nxt_router_http_request_error; 5281 r->err_work.task = task; 5282 r->err_work.obj = r; 5283 5284 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 5285 req_rpc_data->app = conf->app; 5286 req_rpc_data->msg_info.body_fd = -1; 5287 req_rpc_data->rpc_cancel = 1; 5288 5289 nxt_router_app_use(task, conf->app, 1); 5290 5291 req_rpc_data->request = r; 5292 r->req_rpc_data = req_rpc_data; 5293 5294 if (r->last != NULL) { 5295 r->last->completion_handler = nxt_router_http_request_done; 5296 } 5297 5298 nxt_router_app_port_get(task, conf->app, req_rpc_data); 5299 nxt_router_app_prepare_request(task, req_rpc_data); 5300 } 5301 5302 5303 static void 5304 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 5305 { 5306 nxt_http_request_t *r; 5307 5308 r = obj; 5309 5310 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 5311 5312 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5313 5314 if (r->req_rpc_data != NULL) { 5315 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5316 } 5317 5318 nxt_mp_release(r->mem_pool); 5319 } 5320 5321 5322 static void 5323 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 5324 { 5325 nxt_http_request_t *r; 5326 5327 r = data; 5328 5329 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 5330 5331 if (r->req_rpc_data != NULL) { 5332 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5333 } 5334 5335 nxt_http_request_close_handler(task, r, r->proto.any); 5336 } 5337 5338 5339 static void 5340 nxt_router_app_prepare_request(nxt_task_t *task, 5341 nxt_request_rpc_data_t *req_rpc_data) 5342 { 5343 nxt_app_t *app; 5344 nxt_buf_t *buf, *body; 5345 nxt_int_t res; 5346 nxt_port_t *port, *reply_port; 5347 5348 int notify; 5349 struct { 5350 nxt_port_msg_t pm; 5351 nxt_port_mmap_msg_t mm; 5352 } msg; 5353 5354 5355 app = req_rpc_data->app; 5356 5357 nxt_assert(app != NULL); 5358 5359 port = req_rpc_data->app_port; 5360 5361 nxt_assert(port != NULL); 5362 nxt_assert(port->queue != NULL); 5363 5364 reply_port = task->thread->engine->port; 5365 5366 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, 5367 nxt_app_msg_prefix[app->type]); 5368 if (nxt_slow_path(buf == NULL)) { 5369 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", 5370 req_rpc_data->stream, &app->name); 5371 5372 nxt_http_request_error(task, req_rpc_data->request, 5373 NXT_HTTP_INTERNAL_SERVER_ERROR); 5374 5375 return; 5376 } 5377 5378 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 5379 nxt_buf_used_size(buf), 5380 port->socket.fd); 5381 5382 req_rpc_data->msg_info.buf = buf; 5383 5384 body = req_rpc_data->request->body; 5385 5386 if (body != NULL && nxt_buf_is_file(body)) { 5387 req_rpc_data->msg_info.body_fd = body->file->fd; 5388 5389 body->file->fd = -1; 5390 5391 } else { 5392 req_rpc_data->msg_info.body_fd = -1; 5393 } 5394 5395 msg.pm.stream = req_rpc_data->stream; 5396 msg.pm.pid = reply_port->pid; 5397 msg.pm.reply_port = reply_port->id; 5398 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; 5399 msg.pm.last = 0; 5400 msg.pm.mmap = 1; 5401 msg.pm.nf = 0; 5402 msg.pm.mf = 0; 5403 msg.pm.tracking = 0; 5404 5405 nxt_port_mmap_handler_t *mmap_handler = buf->parent; 5406 nxt_port_mmap_header_t *hdr = mmap_handler->hdr; 5407 5408 msg.mm.mmap_id = hdr->id; 5409 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); 5410 msg.mm.size = nxt_buf_used_size(buf); 5411 5412 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), 5413 req_rpc_data->stream, ¬ify, 5414 &req_rpc_data->msg_info.tracking_cookie); 5415 if (nxt_fast_path(res == NXT_OK)) { 5416 if (notify != 0) { 5417 (void) nxt_port_socket_write(task, port, 5418 NXT_PORT_MSG_READ_QUEUE, 5419 -1, req_rpc_data->stream, 5420 reply_port->id, NULL); 5421 5422 } else { 5423 nxt_debug(task, "queue is not empty"); 5424 } 5425 5426 buf->is_port_mmap_sent = 1; 5427 buf->mem.pos = buf->mem.free; 5428 5429 } else { 5430 nxt_alert(task, "stream #%uD, app '%V': failed to send app message", 5431 req_rpc_data->stream, &app->name); 5432 5433 nxt_http_request_error(task, req_rpc_data->request, 5434 NXT_HTTP_INTERNAL_SERVER_ERROR); 5435 } 5436 } 5437 5438 5439 struct nxt_fields_iter_s { 5440 nxt_list_part_t *part; 5441 nxt_http_field_t *field; 5442 }; 5443 5444 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 5445 5446 5447 static nxt_http_field_t * 5448 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 5449 { 5450 if (part == NULL) { 5451 return NULL; 5452 } 5453 5454 while (part->nelts == 0) { 5455 part = part->next; 5456 if (part == NULL) { 5457 return NULL; 5458 } 5459 } 5460 5461 i->part = part; 5462 i->field = nxt_list_data(i->part); 5463 5464 return i->field; 5465 } 5466 5467 5468 static nxt_http_field_t * 5469 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 5470 { 5471 return nxt_fields_part_first(nxt_list_part(fields), i); 5472 } 5473 5474 5475 static nxt_http_field_t * 5476 nxt_fields_next(nxt_fields_iter_t *i) 5477 { 5478 nxt_http_field_t *end = nxt_list_data(i->part); 5479 5480 end += i->part->nelts; 5481 i->field++; 5482 5483 if (i->field < end) { 5484 return i->field; 5485 } 5486 5487 return nxt_fields_part_first(i->part->next, i); 5488 } 5489 5490 5491 static nxt_buf_t * 5492 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, 5493 nxt_app_t *app, const nxt_str_t *prefix) 5494 { 5495 void *target_pos, *query_pos; 5496 u_char *pos, *end, *p, c; 5497 size_t fields_count, req_size, size, free_size; 5498 size_t copy_size; 5499 nxt_off_t content_length; 5500 nxt_buf_t *b, *buf, *out, **tail; 5501 nxt_http_field_t *field, *dup; 5502 nxt_unit_field_t *dst_field; 5503 nxt_fields_iter_t iter, dup_iter; 5504 nxt_unit_request_t *req; 5505 5506 req_size = sizeof(nxt_unit_request_t) 5507 + r->method->length + 1 5508 + r->version.length + 1 5509 + r->remote->length + 1 5510 + r->local->length + 1 5511 + r->server_name.length + 1 5512 + r->target.length + 1 5513 + (r->path->start != r->target.start ? r->path->length + 1 : 0); 5514 5515 content_length = r->content_length_n < 0 ? 0 : r->content_length_n; 5516 fields_count = 0; 5517 5518 nxt_list_each(field, r->fields) { 5519 fields_count++; 5520 5521 req_size += field->name_length + prefix->length + 1 5522 + field->value_length + 1; 5523 } nxt_list_loop; 5524 5525 req_size += fields_count * sizeof(nxt_unit_field_t); 5526 5527 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 5528 nxt_alert(task, "headers to big to fit in shared memory (%d)", 5529 (int) req_size); 5530 5531 return NULL; 5532 } 5533 5534 out = nxt_port_mmap_get_buf(task, &app->outgoing, 5535 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); 5536 if (nxt_slow_path(out == NULL)) { 5537 return NULL; 5538 } 5539 5540 req = (nxt_unit_request_t *) out->mem.free; 5541 out->mem.free += req_size; 5542 5543 req->app_target = r->app_target; 5544 5545 req->content_length = content_length; 5546 5547 p = (u_char *) (req->fields + fields_count); 5548 5549 nxt_debug(task, "fields_count=%d", (int) fields_count); 5550 5551 req->method_length = r->method->length; 5552 nxt_unit_sptr_set(&req->method, p); 5553 p = nxt_cpymem(p, r->method->start, r->method->length); 5554 *p++ = '\0'; 5555 5556 req->version_length = r->version.length; 5557 nxt_unit_sptr_set(&req->version, p); 5558 p = nxt_cpymem(p, r->version.start, r->version.length); 5559 *p++ = '\0'; 5560 5561 req->remote_length = r->remote->address_length; 5562 nxt_unit_sptr_set(&req->remote, p); 5563 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote), 5564 r->remote->address_length); 5565 *p++ = '\0'; 5566 5567 req->local_length = r->local->address_length; 5568 nxt_unit_sptr_set(&req->local, p); 5569 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length); 5570 *p++ = '\0'; 5571 5572 req->tls = (r->tls != NULL); 5573 req->websocket_handshake = r->websocket_handshake; 5574 5575 req->server_name_length = r->server_name.length; 5576 nxt_unit_sptr_set(&req->server_name, p); 5577 p = nxt_cpymem(p, r->server_name.start, r->server_name.length); 5578 *p++ = '\0'; 5579 5580 target_pos = p; 5581 req->target_length = (uint32_t) r->target.length; 5582 nxt_unit_sptr_set(&req->target, p); 5583 p = nxt_cpymem(p, r->target.start, r->target.length); 5584 *p++ = '\0'; 5585 5586 req->path_length = (uint32_t) r->path->length; 5587 if (r->path->start == r->target.start) { 5588 nxt_unit_sptr_set(&req->path, target_pos); 5589 5590 } else { 5591 nxt_unit_sptr_set(&req->path, p); 5592 p = nxt_cpymem(p, r->path->start, r->path->length); 5593 *p++ = '\0'; 5594 } 5595 5596 req->query_length = (uint32_t) r->args->length; 5597 if (r->args->start != NULL) { 5598 query_pos = nxt_pointer_to(target_pos, 5599 r->args->start - r->target.start); 5600 5601 nxt_unit_sptr_set(&req->query, query_pos); 5602 5603 } else { 5604 req->query.offset = 0; 5605 } 5606 5607 req->content_length_field = NXT_UNIT_NONE_FIELD; 5608 req->content_type_field = NXT_UNIT_NONE_FIELD; 5609 req->cookie_field = NXT_UNIT_NONE_FIELD; 5610 req->authorization_field = NXT_UNIT_NONE_FIELD; 5611 5612 dst_field = req->fields; 5613 5614 for (field = nxt_fields_first(r->fields, &iter); 5615 field != NULL; 5616 field = nxt_fields_next(&iter)) 5617 { 5618 if (field->skip) { 5619 continue; 5620 } 5621 5622 dst_field->hash = field->hash; 5623 dst_field->skip = 0; 5624 dst_field->name_length = field->name_length + prefix->length; 5625 dst_field->value_length = field->value_length; 5626 5627 if (field == r->content_length) { 5628 req->content_length_field = dst_field - req->fields; 5629 5630 } else if (field == r->content_type) { 5631 req->content_type_field = dst_field - req->fields; 5632 5633 } else if (field == r->cookie) { 5634 req->cookie_field = dst_field - req->fields; 5635 5636 } else if (field == r->authorization) { 5637 req->authorization_field = dst_field - req->fields; 5638 } 5639 5640 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 5641 (int) field->hash, (int) field->skip, 5642 (int) field->name_length, field->name, 5643 (int) field->value_length, field->value); 5644 5645 if (prefix->length != 0) { 5646 nxt_unit_sptr_set(&dst_field->name, p); 5647 p = nxt_cpymem(p, prefix->start, prefix->length); 5648 5649 end = field->name + field->name_length; 5650 for (pos = field->name; pos < end; pos++) { 5651 c = *pos; 5652 5653 if (c >= 'a' && c <= 'z') { 5654 *p++ = (c & ~0x20); 5655 continue; 5656 } 5657 5658 if (c == '-') { 5659 *p++ = '_'; 5660 continue; 5661 } 5662 5663 *p++ = c; 5664 } 5665 5666 } else { 5667 nxt_unit_sptr_set(&dst_field->name, p); 5668 p = nxt_cpymem(p, field->name, field->name_length); 5669 } 5670 5671 *p++ = '\0'; 5672 5673 nxt_unit_sptr_set(&dst_field->value, p); 5674 p = nxt_cpymem(p, field->value, field->value_length); 5675 5676 if (prefix->length != 0) { 5677 dup_iter = iter; 5678 5679 for (dup = nxt_fields_next(&dup_iter); 5680 dup != NULL; 5681 dup = nxt_fields_next(&dup_iter)) 5682 { 5683 if (dup->name_length != field->name_length 5684 || dup->skip 5685 || dup->hash != field->hash 5686 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 5687 { 5688 continue; 5689 } 5690 5691 p = nxt_cpymem(p, ", ", 2); 5692 p = nxt_cpymem(p, dup->value, dup->value_length); 5693 5694 dst_field->value_length += 2 + dup->value_length; 5695 5696 dup->skip = 1; 5697 } 5698 } 5699 5700 *p++ = '\0'; 5701 5702 dst_field++; 5703 } 5704 5705 req->fields_count = (uint32_t) (dst_field - req->fields); 5706 5707 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 5708 5709 buf = out; 5710 tail = &buf->next; 5711 5712 for (b = r->body; b != NULL; b = b->next) { 5713 size = nxt_buf_mem_used_size(&b->mem); 5714 pos = b->mem.pos; 5715 5716 while (size > 0) { 5717 if (buf == NULL) { 5718 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 5719 5720 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5721 if (nxt_slow_path(buf == NULL)) { 5722 while (out != NULL) { 5723 buf = out->next; 5724 out->next = NULL; 5725 out->completion_handler(task, out, out->parent); 5726 out = buf; 5727 } 5728 return NULL; 5729 } 5730 5731 *tail = buf; 5732 tail = &buf->next; 5733 5734 } else { 5735 free_size = nxt_buf_mem_free_size(&buf->mem); 5736 if (free_size < size 5737 && nxt_port_mmap_increase_buf(task, buf, size, 1) 5738 == NXT_OK) 5739 { 5740 free_size = nxt_buf_mem_free_size(&buf->mem); 5741 } 5742 } 5743 5744 if (free_size > 0) { 5745 copy_size = nxt_min(free_size, size); 5746 5747 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 5748 5749 size -= copy_size; 5750 pos += copy_size; 5751 5752 if (size == 0) { 5753 break; 5754 } 5755 } 5756 5757 buf = NULL; 5758 } 5759 } 5760 5761 return out; 5762 } 5763 5764 5765 static void 5766 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 5767 { 5768 nxt_timer_t *timer; 5769 nxt_http_request_t *r; 5770 nxt_request_rpc_data_t *req_rpc_data; 5771 5772 timer = obj; 5773 5774 nxt_debug(task, "router app timeout"); 5775 5776 r = nxt_timer_data(timer, nxt_http_request_t, timer); 5777 req_rpc_data = r->timer_data; 5778 5779 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5780 5781 nxt_request_rpc_data_unlink(task, req_rpc_data); 5782 } 5783 5784 5785 static void 5786 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5787 { 5788 r->timer.handler = nxt_router_http_request_release; 5789 nxt_timer_add(task->thread->engine, &r->timer, 0); 5790 } 5791 5792 5793 static void 5794 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) 5795 { 5796 nxt_http_request_t *r; 5797 5798 nxt_debug(task, "http request pool release"); 5799 5800 r = nxt_timer_data(obj, nxt_http_request_t, timer); 5801 5802 nxt_mp_release(r->mem_pool); 5803 } 5804 5805 5806 static void 5807 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5808 { 5809 size_t mi; 5810 uint32_t i; 5811 nxt_bool_t ack; 5812 nxt_process_t *process; 5813 nxt_free_map_t *m; 5814 nxt_port_mmap_handler_t *mmap_handler; 5815 5816 nxt_debug(task, "oosm in %PI", msg->port_msg.pid); 5817 5818 process = nxt_runtime_process_find(task->thread->runtime, 5819 msg->port_msg.pid); 5820 if (nxt_slow_path(process == NULL)) { 5821 return; 5822 } 5823 5824 ack = 0; 5825 5826 /* 5827 * To mitigate possible racing condition (when OOSM message received 5828 * after some of the memory was already freed), need to try to find 5829 * first free segment in shared memory and send ACK if found. 5830 */ 5831 5832 nxt_thread_mutex_lock(&process->incoming.mutex); 5833 5834 for (i = 0; i < process->incoming.size; i++) { 5835 mmap_handler = process->incoming.elts[i].mmap_handler; 5836 5837 if (nxt_slow_path(mmap_handler == NULL)) { 5838 continue; 5839 } 5840 5841 m = mmap_handler->hdr->free_map; 5842 5843 for (mi = 0; mi < MAX_FREE_IDX; mi++) { 5844 if (m[mi] != 0) { 5845 ack = 1; 5846 5847 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", 5848 i, mi, m[mi]); 5849 5850 break; 5851 } 5852 } 5853 } 5854 5855 nxt_thread_mutex_unlock(&process->incoming.mutex); 5856 5857 if (ack) { 5858 nxt_process_broadcast_shm_ack(task, process); 5859 } 5860 } 5861 5862 5863 static void 5864 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5865 { 5866 nxt_fd_t fd; 5867 nxt_port_t *port; 5868 nxt_runtime_t *rt; 5869 nxt_port_mmaps_t *mmaps; 5870 nxt_port_msg_get_mmap_t *get_mmap_msg; 5871 nxt_port_mmap_handler_t *mmap_handler; 5872 5873 rt = task->thread->runtime; 5874 5875 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5876 msg->port_msg.reply_port); 5877 if (nxt_slow_path(port == NULL)) { 5878 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", 5879 msg->port_msg.pid, msg->port_msg.reply_port); 5880 5881 return; 5882 } 5883 5884 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5885 < (int) sizeof(nxt_port_msg_get_mmap_t))) 5886 { 5887 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", 5888 (int) nxt_buf_used_size(msg->buf)); 5889 5890 return; 5891 } 5892 5893 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; 5894 5895 nxt_assert(port->type == NXT_PROCESS_APP); 5896 5897 if (nxt_slow_path(port->app == NULL)) { 5898 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", 5899 port->pid, port->id); 5900 5901 // FIXME 5902 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5903 -1, msg->port_msg.stream, 0, NULL); 5904 5905 return; 5906 } 5907 5908 mmaps = &port->app->outgoing; 5909 nxt_thread_mutex_lock(&mmaps->mutex); 5910 5911 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { 5912 nxt_thread_mutex_unlock(&mmaps->mutex); 5913 5914 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", 5915 (int) get_mmap_msg->id); 5916 5917 // FIXME 5918 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5919 -1, msg->port_msg.stream, 0, NULL); 5920 return; 5921 } 5922 5923 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; 5924 5925 fd = mmap_handler->fd; 5926 5927 nxt_thread_mutex_unlock(&mmaps->mutex); 5928 5929 nxt_debug(task, "get mmap %PI:%d found", 5930 msg->port_msg.pid, (int) get_mmap_msg->id); 5931 5932 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 5933 } 5934 5935 5936 static void 5937 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5938 { 5939 nxt_port_t *port, *reply_port; 5940 nxt_runtime_t *rt; 5941 nxt_port_msg_get_port_t *get_port_msg; 5942 5943 rt = task->thread->runtime; 5944 5945 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5946 msg->port_msg.reply_port); 5947 if (nxt_slow_path(reply_port == NULL)) { 5948 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", 5949 msg->port_msg.pid, msg->port_msg.reply_port); 5950 5951 return; 5952 } 5953 5954 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5955 < (int) sizeof(nxt_port_msg_get_port_t))) 5956 { 5957 nxt_alert(task, "get_port_handler: message buffer too small (%d)", 5958 (int) nxt_buf_used_size(msg->buf)); 5959 5960 return; 5961 } 5962 5963 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; 5964 5965 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); 5966 if (nxt_slow_path(port == NULL)) { 5967 nxt_alert(task, "get_port_handler: port %PI:%d not found", 5968 get_port_msg->pid, get_port_msg->id); 5969 5970 return; 5971 } 5972 5973 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, 5974 get_port_msg->id); 5975 5976 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); 5977 } 5978