1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #include <nxt_status.h> 11 #if (NXT_TLS) 12 #include <nxt_cert.h> 13 #endif 14 #include <nxt_http.h> 15 #include <nxt_port_memory_int.h> 16 #include <nxt_unit_request.h> 17 #include <nxt_unit_response.h> 18 #include <nxt_router_request.h> 19 #include <nxt_app_queue.h> 20 #include <nxt_port_queue.h> 21 22 #define NXT_SHARED_PORT_ID 0xFFFFu 23 24 typedef struct { 25 nxt_str_t type; 26 uint32_t processes; 27 uint32_t max_processes; 28 uint32_t spare_processes; 29 nxt_msec_t timeout; 30 nxt_msec_t idle_timeout; 31 nxt_conf_value_t *limits_value; 32 nxt_conf_value_t *processes_value; 33 nxt_conf_value_t *targets_value; 34 } nxt_router_app_conf_t; 35 36 37 typedef struct { 38 nxt_str_t pass; 39 nxt_str_t application; 40 } nxt_router_listener_conf_t; 41 42 43 #if (NXT_TLS) 44 45 typedef struct { 46 nxt_str_t name; 47 nxt_socket_conf_t *socket_conf; 48 nxt_router_temp_conf_t *temp_conf; 49 nxt_tls_init_t *tls_init; 50 nxt_bool_t last; 51 52 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 53 } nxt_router_tlssock_t; 54 55 #endif 56 57 58 typedef struct { 59 nxt_str_t *name; 60 nxt_socket_conf_t *socket_conf; 61 nxt_router_temp_conf_t *temp_conf; 62 nxt_bool_t last; 63 } nxt_socket_rpc_t; 64 65 66 typedef struct { 67 nxt_app_t *app; 68 nxt_router_temp_conf_t *temp_conf; 69 uint8_t proto; /* 1 bit */ 70 } nxt_app_rpc_t; 71 72 73 typedef struct { 74 nxt_app_joint_t *app_joint; 75 uint32_t generation; 76 uint8_t proto; /* 1 bit */ 77 } nxt_app_joint_rpc_t; 78 79 80 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 81 nxt_mp_t *mp); 82 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 83 static void nxt_router_greet_controller(nxt_task_t *task, 84 nxt_port_t *controller_port); 85 86 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 87 88 static void nxt_router_new_port_handler(nxt_task_t *task, 89 nxt_port_recv_msg_t *msg); 90 static void nxt_router_conf_data_handler(nxt_task_t *task, 91 nxt_port_recv_msg_t *msg); 92 static void nxt_router_app_restart_handler(nxt_task_t *task, 93 nxt_port_recv_msg_t *msg); 94 static void nxt_router_status_handler(nxt_task_t *task, 95 nxt_port_recv_msg_t *msg); 96 static void nxt_router_remove_pid_handler(nxt_task_t *task, 97 nxt_port_recv_msg_t *msg); 98 99 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 100 static void nxt_router_conf_ready(nxt_task_t *task, 101 nxt_router_temp_conf_t *tmcf); 102 static void nxt_router_conf_send(nxt_task_t *task, 103 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 104 105 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 106 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 107 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 108 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 109 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task, 110 nxt_mp_t *mp, nxt_conf_value_t *conf); 111 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp, 112 nxt_conf_value_t *conf, nxt_http_forward_header_t *fh); 113 114 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 115 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 116 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 117 nxt_app_t *app); 118 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 119 nxt_str_t *name); 120 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 121 int i); 122 123 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 124 nxt_port_t *port); 125 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 126 nxt_port_t *port); 127 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 128 nxt_port_t *port, nxt_fd_t fd); 129 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 130 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 131 static void nxt_router_listen_socket_ready(nxt_task_t *task, 132 nxt_port_recv_msg_t *msg, void *data); 133 static void nxt_router_listen_socket_error(nxt_task_t *task, 134 nxt_port_recv_msg_t *msg, void *data); 135 #if (NXT_TLS) 136 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 137 nxt_port_recv_msg_t *msg, void *data); 138 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 139 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init, 140 nxt_bool_t last); 141 #endif 142 static void nxt_router_app_rpc_create(nxt_task_t *task, 143 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 144 static void nxt_router_app_prefork_ready(nxt_task_t *task, 145 nxt_port_recv_msg_t *msg, void *data); 146 static void nxt_router_app_prefork_error(nxt_task_t *task, 147 nxt_port_recv_msg_t *msg, void *data); 148 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 149 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 150 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 151 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 152 153 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 154 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 155 const nxt_event_interface_t *interface); 156 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 157 nxt_router_engine_conf_t *recf); 158 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 159 nxt_router_engine_conf_t *recf); 160 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 161 nxt_router_engine_conf_t *recf); 162 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 163 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 164 nxt_work_handler_t handler); 165 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 166 nxt_router_engine_conf_t *recf); 167 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 168 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 169 170 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 171 nxt_router_temp_conf_t *tmcf); 172 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 173 nxt_event_engine_t *engine); 174 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 175 nxt_router_temp_conf_t *tmcf); 176 177 static void nxt_router_engines_post(nxt_router_t *router, 178 nxt_router_temp_conf_t *tmcf); 179 static void nxt_router_engine_post(nxt_event_engine_t *engine, 180 nxt_work_t *jobs); 181 182 static void nxt_router_thread_start(void *data); 183 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 184 void *data); 185 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 186 void *data); 187 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 188 void *data); 189 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 190 void *data); 191 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 192 void *data); 193 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 194 void *data); 195 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 196 void *data); 197 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 198 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 199 static void nxt_router_listen_socket_release(nxt_task_t *task, 200 nxt_socket_conf_t *skcf); 201 202 static void nxt_router_app_port_ready(nxt_task_t *task, 203 nxt_port_recv_msg_t *msg, void *data); 204 static void nxt_router_app_port_error(nxt_task_t *task, 205 nxt_port_recv_msg_t *msg, void *data); 206 207 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 208 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 209 210 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, 211 nxt_port_t *port, nxt_apr_action_t action); 212 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 213 nxt_request_rpc_data_t *req_rpc_data); 214 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 215 void *data); 216 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 217 void *data); 218 219 static void nxt_router_app_prepare_request(nxt_task_t *task, 220 nxt_request_rpc_data_t *req_rpc_data); 221 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 222 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 223 224 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 225 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 226 void *data); 227 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 228 void *data); 229 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 230 void *data); 231 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 232 233 static const nxt_http_request_state_t nxt_http_request_send_state; 234 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 235 236 static void nxt_router_app_joint_use(nxt_task_t *task, 237 nxt_app_joint_t *app_joint, int i); 238 239 static void nxt_router_http_request_release_post(nxt_task_t *task, 240 nxt_http_request_t *r); 241 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 242 void *data); 243 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 244 static void nxt_router_get_port_handler(nxt_task_t *task, 245 nxt_port_recv_msg_t *msg); 246 static void nxt_router_get_mmap_handler(nxt_task_t *task, 247 nxt_port_recv_msg_t *msg); 248 249 extern const nxt_http_request_state_t nxt_http_websocket; 250 251 nxt_router_t *nxt_router; 252 253 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 254 static const nxt_str_t empty_prefix = nxt_string(""); 255 256 static const nxt_str_t *nxt_app_msg_prefix[] = { 257 &empty_prefix, 258 &empty_prefix, 259 &http_prefix, 260 &http_prefix, 261 &http_prefix, 262 &empty_prefix, 263 }; 264 265 266 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 267 .quit = nxt_signal_quit_handler, 268 .new_port = nxt_router_new_port_handler, 269 .get_port = nxt_router_get_port_handler, 270 .change_file = nxt_port_change_log_file_handler, 271 .mmap = nxt_port_mmap_handler, 272 .get_mmap = nxt_router_get_mmap_handler, 273 .data = nxt_router_conf_data_handler, 274 .app_restart = nxt_router_app_restart_handler, 275 .status = nxt_router_status_handler, 276 .remove_pid = nxt_router_remove_pid_handler, 277 .access_log = nxt_router_access_log_reopen_handler, 278 .rpc_ready = nxt_port_rpc_handler, 279 .rpc_error = nxt_port_rpc_handler, 280 .oosm = nxt_router_oosm_handler, 281 }; 282 283 284 const nxt_process_init_t nxt_router_process = { 285 .name = "router", 286 .type = NXT_PROCESS_ROUTER, 287 .prefork = nxt_router_prefork, 288 .restart = 1, 289 .setup = nxt_process_core_setup, 290 .start = nxt_router_start, 291 .port_handlers = &nxt_router_process_port_handlers, 292 .signals = nxt_process_signals, 293 }; 294 295 296 /* Queues of nxt_socket_conf_t */ 297 nxt_queue_t creating_sockets; 298 nxt_queue_t pending_sockets; 299 nxt_queue_t updating_sockets; 300 nxt_queue_t keeping_sockets; 301 nxt_queue_t deleting_sockets; 302 303 304 static nxt_int_t 305 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 306 { 307 nxt_runtime_stop_app_processes(task, task->thread->runtime); 308 309 return NXT_OK; 310 } 311 312 313 static nxt_int_t 314 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 315 { 316 nxt_int_t ret; 317 nxt_port_t *controller_port; 318 nxt_router_t *router; 319 nxt_runtime_t *rt; 320 321 rt = task->thread->runtime; 322 323 nxt_log(task, NXT_LOG_INFO, "router started"); 324 325 #if (NXT_TLS) 326 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 327 if (nxt_slow_path(rt->tls == NULL)) { 328 return NXT_ERROR; 329 } 330 331 ret = rt->tls->library_init(task); 332 if (nxt_slow_path(ret != NXT_OK)) { 333 return ret; 334 } 335 #endif 336 337 ret = nxt_http_init(task); 338 if (nxt_slow_path(ret != NXT_OK)) { 339 return ret; 340 } 341 342 router = nxt_zalloc(sizeof(nxt_router_t)); 343 if (nxt_slow_path(router == NULL)) { 344 return NXT_ERROR; 345 } 346 347 nxt_queue_init(&router->engines); 348 nxt_queue_init(&router->sockets); 349 nxt_queue_init(&router->apps); 350 351 nxt_router = router; 352 353 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 354 if (controller_port != NULL) { 355 nxt_router_greet_controller(task, controller_port); 356 } 357 358 return NXT_OK; 359 } 360 361 362 static void 363 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 364 { 365 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 366 -1, 0, 0, NULL); 367 } 368 369 370 static void 371 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 372 void *data) 373 { 374 size_t size; 375 uint32_t stream; 376 nxt_fd_t port_fd, queue_fd; 377 nxt_int_t ret; 378 nxt_app_t *app; 379 nxt_buf_t *b; 380 nxt_port_t *dport; 381 nxt_runtime_t *rt; 382 nxt_app_joint_rpc_t *app_joint_rpc; 383 384 app = data; 385 386 nxt_thread_mutex_lock(&app->mutex); 387 388 dport = app->proto_port; 389 390 nxt_thread_mutex_unlock(&app->mutex); 391 392 if (dport != NULL) { 393 nxt_debug(task, "app '%V' %p start process", &app->name, app); 394 395 b = NULL; 396 port_fd = -1; 397 queue_fd = -1; 398 399 } else { 400 if (app->proto_port_requests > 0) { 401 nxt_debug(task, "app '%V' %p wait for prototype process", 402 &app->name, app); 403 404 app->proto_port_requests++; 405 406 goto skip; 407 } 408 409 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app); 410 411 rt = task->thread->runtime; 412 dport = rt->port_by_type[NXT_PROCESS_MAIN]; 413 414 size = app->name.length + 1 + app->conf.length; 415 416 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0); 417 if (nxt_slow_path(b == NULL)) { 418 goto failed; 419 } 420 421 nxt_buf_cpystr(b, &app->name); 422 *b->mem.free++ = '\0'; 423 nxt_buf_cpystr(b, &app->conf); 424 425 port_fd = app->shared_port->pair[0]; 426 queue_fd = app->shared_port->queue_fd; 427 } 428 429 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, 430 nxt_router_app_port_ready, 431 nxt_router_app_port_error, 432 sizeof(nxt_app_joint_rpc_t)); 433 if (nxt_slow_path(app_joint_rpc == NULL)) { 434 goto failed; 435 } 436 437 stream = nxt_port_rpc_ex_stream(app_joint_rpc); 438 439 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, 440 port_fd, queue_fd, stream, port->id, b); 441 if (nxt_slow_path(ret != NXT_OK)) { 442 nxt_port_rpc_cancel(task, port, stream); 443 444 goto failed; 445 } 446 447 app_joint_rpc->app_joint = app->joint; 448 app_joint_rpc->generation = app->generation; 449 app_joint_rpc->proto = (b != NULL); 450 451 if (b != NULL) { 452 app->proto_port_requests++; 453 454 b = NULL; 455 } 456 457 nxt_router_app_joint_use(task, app->joint, 1); 458 459 failed: 460 461 if (b != NULL) { 462 nxt_mp_free(b->data, b); 463 } 464 465 skip: 466 467 nxt_router_app_use(task, app, -1); 468 } 469 470 471 static void 472 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 473 { 474 app_joint->use_count += i; 475 476 if (app_joint->use_count == 0) { 477 nxt_assert(app_joint->app == NULL); 478 479 nxt_free(app_joint); 480 } 481 } 482 483 484 static nxt_int_t 485 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 486 { 487 nxt_int_t res; 488 nxt_port_t *router_port; 489 nxt_runtime_t *rt; 490 491 nxt_debug(task, "app '%V' start process", &app->name); 492 493 rt = task->thread->runtime; 494 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 495 496 nxt_router_app_use(task, app, 1); 497 498 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 499 app); 500 501 if (res == NXT_OK) { 502 return res; 503 } 504 505 nxt_thread_mutex_lock(&app->mutex); 506 507 app->pending_processes--; 508 509 nxt_thread_mutex_unlock(&app->mutex); 510 511 nxt_router_app_use(task, app, -1); 512 513 return NXT_ERROR; 514 } 515 516 517 nxt_inline nxt_bool_t 518 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 519 { 520 nxt_buf_t *b, *next; 521 nxt_bool_t cancelled; 522 nxt_port_t *app_port; 523 nxt_msg_info_t *msg_info; 524 525 msg_info = &req_rpc_data->msg_info; 526 527 if (msg_info->buf == NULL) { 528 return 0; 529 } 530 531 app_port = req_rpc_data->app_port; 532 533 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) { 534 cancelled = nxt_app_queue_cancel(app_port->queue, 535 msg_info->tracking_cookie, 536 req_rpc_data->stream); 537 538 if (cancelled) { 539 nxt_debug(task, "stream #%uD: cancelled by router", 540 req_rpc_data->stream); 541 } 542 543 } else { 544 cancelled = 0; 545 } 546 547 for (b = msg_info->buf; b != NULL; b = next) { 548 next = b->next; 549 b->next = NULL; 550 551 if (b->is_port_mmap_sent) { 552 b->is_port_mmap_sent = cancelled == 0; 553 } 554 555 b->completion_handler(task, b, b->parent); 556 } 557 558 msg_info->buf = NULL; 559 560 return cancelled; 561 } 562 563 564 nxt_inline nxt_bool_t 565 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 566 { 567 if (lnk->next != NULL) { 568 nxt_queue_remove(lnk); 569 570 lnk->next = NULL; 571 572 return 1; 573 } 574 575 return 0; 576 } 577 578 579 nxt_inline void 580 nxt_request_rpc_data_unlink(nxt_task_t *task, 581 nxt_request_rpc_data_t *req_rpc_data) 582 { 583 nxt_app_t *app; 584 nxt_bool_t unlinked; 585 nxt_http_request_t *r; 586 587 nxt_router_msg_cancel(task, req_rpc_data); 588 589 app = req_rpc_data->app; 590 591 if (req_rpc_data->app_port != NULL) { 592 nxt_router_app_port_release(task, app, req_rpc_data->app_port, 593 req_rpc_data->apr_action); 594 595 req_rpc_data->app_port = NULL; 596 } 597 598 r = req_rpc_data->request; 599 600 if (r != NULL) { 601 r->timer_data = NULL; 602 603 nxt_router_http_request_release_post(task, r); 604 605 r->req_rpc_data = NULL; 606 req_rpc_data->request = NULL; 607 608 if (app != NULL) { 609 unlinked = 0; 610 611 nxt_thread_mutex_lock(&app->mutex); 612 613 if (r->app_link.next != NULL) { 614 nxt_queue_remove(&r->app_link); 615 r->app_link.next = NULL; 616 617 unlinked = 1; 618 } 619 620 nxt_thread_mutex_unlock(&app->mutex); 621 622 if (unlinked) { 623 nxt_mp_release(r->mem_pool); 624 } 625 } 626 } 627 628 if (app != NULL) { 629 nxt_router_app_use(task, app, -1); 630 631 req_rpc_data->app = NULL; 632 } 633 634 if (req_rpc_data->msg_info.body_fd != -1) { 635 nxt_fd_close(req_rpc_data->msg_info.body_fd); 636 637 req_rpc_data->msg_info.body_fd = -1; 638 } 639 640 if (req_rpc_data->rpc_cancel) { 641 req_rpc_data->rpc_cancel = 0; 642 643 nxt_port_rpc_cancel(task, task->thread->engine->port, 644 req_rpc_data->stream); 645 } 646 } 647 648 649 static void 650 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 651 { 652 nxt_int_t res; 653 nxt_app_t *app; 654 nxt_port_t *port, *main_app_port; 655 nxt_runtime_t *rt; 656 657 nxt_port_new_port_handler(task, msg); 658 659 port = msg->u.new_port; 660 661 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 662 nxt_router_greet_controller(task, msg->u.new_port); 663 } 664 665 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) { 666 nxt_port_rpc_handler(task, msg); 667 668 return; 669 } 670 671 if (port == NULL || port->type != NXT_PROCESS_APP) { 672 673 if (msg->port_msg.stream == 0) { 674 return; 675 } 676 677 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 678 679 } else { 680 if (msg->fd[1] != -1) { 681 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 682 if (nxt_slow_path(res != NXT_OK)) { 683 return; 684 } 685 686 nxt_fd_close(msg->fd[1]); 687 msg->fd[1] = -1; 688 } 689 } 690 691 if (msg->port_msg.stream != 0) { 692 nxt_port_rpc_handler(task, msg); 693 return; 694 } 695 696 nxt_debug(task, "new port id %d (%d)", port->id, port->type); 697 698 /* 699 * Port with "id == 0" is application 'main' port and it always 700 * should come with non-zero stream. 701 */ 702 nxt_assert(port->id != 0); 703 704 /* Find 'main' app port and get app reference. */ 705 rt = task->thread->runtime; 706 707 /* 708 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 709 * sent to main port (with id == 0) and processed in main thread. 710 */ 711 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 712 nxt_assert(main_app_port != NULL); 713 714 app = main_app_port->app; 715 716 if (nxt_fast_path(app != NULL)) { 717 nxt_thread_mutex_lock(&app->mutex); 718 719 /* TODO here should be find-and-add code because there can be 720 port waiters in port_hash */ 721 nxt_port_hash_add(&app->port_hash, port); 722 app->port_hash_count++; 723 724 nxt_thread_mutex_unlock(&app->mutex); 725 726 port->app = app; 727 } 728 729 port->main_app_port = main_app_port; 730 731 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 732 } 733 734 735 static void 736 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 737 { 738 void *p; 739 size_t size; 740 nxt_int_t ret; 741 nxt_port_t *port; 742 nxt_router_temp_conf_t *tmcf; 743 744 port = nxt_runtime_port_find(task->thread->runtime, 745 msg->port_msg.pid, 746 msg->port_msg.reply_port); 747 if (nxt_slow_path(port == NULL)) { 748 nxt_alert(task, "conf_data_handler: reply port not found"); 749 return; 750 } 751 752 p = MAP_FAILED; 753 754 /* 755 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 756 * initialized in 'cleanup' section. 757 */ 758 size = 0; 759 760 tmcf = nxt_router_temp_conf(task); 761 if (nxt_slow_path(tmcf == NULL)) { 762 goto fail; 763 } 764 765 if (nxt_slow_path(msg->fd[0] == -1)) { 766 nxt_alert(task, "conf_data_handler: invalid shm fd"); 767 goto fail; 768 } 769 770 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 771 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 772 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 773 goto fail; 774 } 775 776 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 777 778 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 779 780 nxt_fd_close(msg->fd[0]); 781 msg->fd[0] = -1; 782 783 if (nxt_slow_path(p == MAP_FAILED)) { 784 goto fail; 785 } 786 787 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 788 789 tmcf->router_conf->router = nxt_router; 790 tmcf->stream = msg->port_msg.stream; 791 tmcf->port = port; 792 793 nxt_port_use(task, tmcf->port, 1); 794 795 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 796 797 if (nxt_fast_path(ret == NXT_OK)) { 798 nxt_router_conf_apply(task, tmcf, NULL); 799 800 } else { 801 nxt_router_conf_error(task, tmcf); 802 } 803 804 goto cleanup; 805 806 fail: 807 808 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 809 msg->port_msg.stream, 0, NULL); 810 811 if (tmcf != NULL) { 812 nxt_mp_release(tmcf->mem_pool); 813 } 814 815 cleanup: 816 817 if (p != MAP_FAILED) { 818 nxt_mem_munmap(p, size); 819 } 820 821 if (msg->fd[0] != -1) { 822 nxt_fd_close(msg->fd[0]); 823 msg->fd[0] = -1; 824 } 825 } 826 827 828 static void 829 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 830 { 831 nxt_app_t *app; 832 nxt_int_t ret; 833 nxt_str_t app_name; 834 nxt_port_t *reply_port, *shared_port, *old_shared_port; 835 nxt_port_t *proto_port; 836 nxt_port_msg_type_t reply; 837 838 reply_port = nxt_runtime_port_find(task->thread->runtime, 839 msg->port_msg.pid, 840 msg->port_msg.reply_port); 841 if (nxt_slow_path(reply_port == NULL)) { 842 nxt_alert(task, "app_restart_handler: reply port not found"); 843 return; 844 } 845 846 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem); 847 app_name.start = msg->buf->mem.pos; 848 849 nxt_debug(task, "app_restart_handler: %V", &app_name); 850 851 app = nxt_router_app_find(&nxt_router->apps, &app_name); 852 853 if (nxt_fast_path(app != NULL)) { 854 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 855 NXT_PROCESS_APP); 856 if (nxt_slow_path(shared_port == NULL)) { 857 goto fail; 858 } 859 860 ret = nxt_port_socket_init(task, shared_port, 0); 861 if (nxt_slow_path(ret != NXT_OK)) { 862 nxt_port_use(task, shared_port, -1); 863 goto fail; 864 } 865 866 ret = nxt_router_app_queue_init(task, shared_port); 867 if (nxt_slow_path(ret != NXT_OK)) { 868 nxt_port_write_close(shared_port); 869 nxt_port_read_close(shared_port); 870 nxt_port_use(task, shared_port, -1); 871 goto fail; 872 } 873 874 nxt_port_write_enable(task, shared_port); 875 876 nxt_thread_mutex_lock(&app->mutex); 877 878 proto_port = app->proto_port; 879 880 if (proto_port != NULL) { 881 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, 882 proto_port->pid); 883 884 app->proto_port = NULL; 885 proto_port->app = NULL; 886 } 887 888 app->generation++; 889 890 shared_port->app = app; 891 892 old_shared_port = app->shared_port; 893 old_shared_port->app = NULL; 894 895 app->shared_port = shared_port; 896 897 nxt_thread_mutex_unlock(&app->mutex); 898 899 nxt_port_close(task, old_shared_port); 900 nxt_port_use(task, old_shared_port, -1); 901 902 if (proto_port != NULL) { 903 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, 904 -1, 0, 0, NULL); 905 906 nxt_port_close(task, proto_port); 907 908 nxt_port_use(task, proto_port, -1); 909 } 910 911 reply = NXT_PORT_MSG_RPC_READY_LAST; 912 913 } else { 914 915 fail: 916 917 reply = NXT_PORT_MSG_RPC_ERROR; 918 } 919 920 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream, 921 0, NULL); 922 } 923 924 925 static void 926 nxt_router_status_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 927 { 928 u_char *p; 929 size_t alloc; 930 nxt_app_t *app; 931 nxt_buf_t *b; 932 nxt_uint_t type; 933 nxt_port_t *port; 934 nxt_status_app_t *app_stat; 935 nxt_event_engine_t *engine; 936 nxt_status_report_t *report; 937 938 port = nxt_runtime_port_find(task->thread->runtime, 939 msg->port_msg.pid, 940 msg->port_msg.reply_port); 941 if (nxt_slow_path(port == NULL)) { 942 nxt_alert(task, "nxt_router_status_handler(): reply port not found"); 943 return; 944 } 945 946 alloc = sizeof(nxt_status_report_t); 947 948 nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) { 949 950 alloc += sizeof(nxt_status_app_t) + app->name.length; 951 952 } nxt_queue_loop; 953 954 b = nxt_buf_mem_alloc(port->mem_pool, alloc, 0); 955 if (nxt_slow_path(b == NULL)) { 956 type = NXT_PORT_MSG_RPC_ERROR; 957 goto fail; 958 } 959 960 report = (nxt_status_report_t *) b->mem.free; 961 b->mem.free = b->mem.end; 962 963 nxt_memzero(report, sizeof(nxt_status_report_t)); 964 965 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) { 966 967 report->accepted_conns += engine->accepted_conns_cnt; 968 report->idle_conns += engine->idle_conns_cnt; 969 report->closed_conns += engine->closed_conns_cnt; 970 report->requests += engine->requests_cnt; 971 972 } nxt_queue_loop; 973 974 report->apps_count = 0; 975 app_stat = report->apps; 976 p = b->mem.end; 977 978 nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) { 979 p -= app->name.length; 980 981 nxt_memcpy(p, app->name.start, app->name.length); 982 983 app_stat->name.length = app->name.length; 984 app_stat->name.start = (u_char *) (p - b->mem.pos); 985 986 app_stat->active_requests = app->active_requests; 987 app_stat->pending_processes = app->pending_processes; 988 app_stat->processes = app->processes; 989 app_stat->idle_processes = app->idle_processes; 990 991 report->apps_count++; 992 app_stat++; 993 } nxt_queue_loop; 994 995 type = NXT_PORT_MSG_RPC_READY_LAST; 996 997 fail: 998 999 nxt_port_socket_write(task, port, type, -1, msg->port_msg.stream, 0, b); 1000 } 1001 1002 1003 static void 1004 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 1005 void *data) 1006 { 1007 union { 1008 nxt_pid_t removed_pid; 1009 void *data; 1010 } u; 1011 1012 u.data = data; 1013 1014 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 1015 } 1016 1017 1018 static void 1019 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 1020 { 1021 nxt_event_engine_t *engine; 1022 1023 nxt_port_remove_pid_handler(task, msg); 1024 1025 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 1026 { 1027 if (nxt_fast_path(engine->port != NULL)) { 1028 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 1029 msg->u.data); 1030 } 1031 } 1032 nxt_queue_loop; 1033 1034 if (msg->port_msg.stream == 0) { 1035 return; 1036 } 1037 1038 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 1039 1040 nxt_port_rpc_handler(task, msg); 1041 } 1042 1043 1044 static nxt_router_temp_conf_t * 1045 nxt_router_temp_conf(nxt_task_t *task) 1046 { 1047 nxt_mp_t *mp, *tmp; 1048 nxt_router_conf_t *rtcf; 1049 nxt_router_temp_conf_t *tmcf; 1050 1051 mp = nxt_mp_create(1024, 128, 256, 32); 1052 if (nxt_slow_path(mp == NULL)) { 1053 return NULL; 1054 } 1055 1056 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 1057 if (nxt_slow_path(rtcf == NULL)) { 1058 goto fail; 1059 } 1060 1061 rtcf->mem_pool = mp; 1062 1063 rtcf->var_fields = nxt_array_create(mp, 4, sizeof(nxt_var_field_t)); 1064 if (nxt_slow_path(rtcf->var_fields == NULL)) { 1065 goto fail; 1066 } 1067 1068 tmp = nxt_mp_create(1024, 128, 256, 32); 1069 if (nxt_slow_path(tmp == NULL)) { 1070 goto fail; 1071 } 1072 1073 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 1074 if (nxt_slow_path(tmcf == NULL)) { 1075 goto temp_fail; 1076 } 1077 1078 tmcf->mem_pool = tmp; 1079 tmcf->router_conf = rtcf; 1080 tmcf->count = 1; 1081 tmcf->engine = task->thread->engine; 1082 1083 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 1084 sizeof(nxt_router_engine_conf_t)); 1085 if (nxt_slow_path(tmcf->engines == NULL)) { 1086 goto temp_fail; 1087 } 1088 1089 nxt_queue_init(&creating_sockets); 1090 nxt_queue_init(&pending_sockets); 1091 nxt_queue_init(&updating_sockets); 1092 nxt_queue_init(&keeping_sockets); 1093 nxt_queue_init(&deleting_sockets); 1094 1095 #if (NXT_TLS) 1096 nxt_queue_init(&tmcf->tls); 1097 #endif 1098 1099 nxt_queue_init(&tmcf->apps); 1100 nxt_queue_init(&tmcf->previous); 1101 1102 return tmcf; 1103 1104 temp_fail: 1105 1106 nxt_mp_destroy(tmp); 1107 1108 fail: 1109 1110 nxt_mp_destroy(mp); 1111 1112 return NULL; 1113 } 1114 1115 1116 nxt_inline nxt_bool_t 1117 nxt_router_app_can_start(nxt_app_t *app) 1118 { 1119 return app->processes + app->pending_processes < app->max_processes 1120 && app->pending_processes < app->max_pending_processes; 1121 } 1122 1123 1124 nxt_inline nxt_bool_t 1125 nxt_router_app_need_start(nxt_app_t *app) 1126 { 1127 return (app->active_requests 1128 > app->port_hash_count + app->pending_processes) 1129 || (app->spare_processes 1130 > app->idle_processes + app->pending_processes); 1131 } 1132 1133 1134 void 1135 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1136 { 1137 nxt_int_t ret; 1138 nxt_app_t *app; 1139 nxt_router_t *router; 1140 nxt_runtime_t *rt; 1141 nxt_queue_link_t *qlk; 1142 nxt_socket_conf_t *skcf; 1143 nxt_router_conf_t *rtcf; 1144 nxt_router_temp_conf_t *tmcf; 1145 const nxt_event_interface_t *interface; 1146 #if (NXT_TLS) 1147 nxt_router_tlssock_t *tls; 1148 #endif 1149 1150 tmcf = obj; 1151 1152 qlk = nxt_queue_first(&pending_sockets); 1153 1154 if (qlk != nxt_queue_tail(&pending_sockets)) { 1155 nxt_queue_remove(qlk); 1156 nxt_queue_insert_tail(&creating_sockets, qlk); 1157 1158 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1159 1160 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1161 1162 return; 1163 } 1164 1165 #if (NXT_TLS) 1166 qlk = nxt_queue_last(&tmcf->tls); 1167 1168 if (qlk != nxt_queue_head(&tmcf->tls)) { 1169 nxt_queue_remove(qlk); 1170 1171 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1172 1173 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 1174 nxt_router_tls_rpc_handler, tls); 1175 return; 1176 } 1177 #endif 1178 1179 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1180 1181 if (nxt_router_app_need_start(app)) { 1182 nxt_router_app_rpc_create(task, tmcf, app); 1183 return; 1184 } 1185 1186 } nxt_queue_loop; 1187 1188 rtcf = tmcf->router_conf; 1189 1190 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1191 nxt_router_access_log_open(task, tmcf); 1192 return; 1193 } 1194 1195 rt = task->thread->runtime; 1196 1197 interface = nxt_service_get(rt->services, "engine", NULL); 1198 1199 router = rtcf->router; 1200 1201 ret = nxt_router_engines_create(task, router, tmcf, interface); 1202 if (nxt_slow_path(ret != NXT_OK)) { 1203 goto fail; 1204 } 1205 1206 ret = nxt_router_threads_create(task, rt, tmcf); 1207 if (nxt_slow_path(ret != NXT_OK)) { 1208 goto fail; 1209 } 1210 1211 nxt_router_apps_sort(task, router, tmcf); 1212 1213 nxt_router_apps_hash_use(task, rtcf, 1); 1214 1215 nxt_router_engines_post(router, tmcf); 1216 1217 nxt_queue_add(&router->sockets, &updating_sockets); 1218 nxt_queue_add(&router->sockets, &creating_sockets); 1219 1220 if (router->access_log != rtcf->access_log) { 1221 nxt_router_access_log_use(&router->lock, rtcf->access_log); 1222 1223 nxt_router_access_log_release(task, &router->lock, router->access_log); 1224 1225 router->access_log = rtcf->access_log; 1226 } 1227 1228 nxt_router_conf_ready(task, tmcf); 1229 1230 return; 1231 1232 fail: 1233 1234 nxt_router_conf_error(task, tmcf); 1235 1236 return; 1237 } 1238 1239 1240 static void 1241 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1242 { 1243 nxt_joint_job_t *job; 1244 1245 job = obj; 1246 1247 nxt_router_conf_ready(task, job->tmcf); 1248 } 1249 1250 1251 static void 1252 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1253 { 1254 uint32_t count; 1255 nxt_router_conf_t *rtcf; 1256 nxt_thread_spinlock_t *lock; 1257 1258 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1259 1260 if (--tmcf->count > 0) { 1261 return; 1262 } 1263 1264 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1265 1266 rtcf = tmcf->router_conf; 1267 1268 lock = &rtcf->router->lock; 1269 1270 nxt_thread_spin_lock(lock); 1271 1272 count = rtcf->count; 1273 1274 nxt_thread_spin_unlock(lock); 1275 1276 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1277 1278 if (count == 0) { 1279 nxt_router_apps_hash_use(task, rtcf, -1); 1280 1281 nxt_router_access_log_release(task, lock, rtcf->access_log); 1282 1283 nxt_mp_destroy(rtcf->mem_pool); 1284 } 1285 1286 nxt_mp_release(tmcf->mem_pool); 1287 } 1288 1289 1290 void 1291 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1292 { 1293 nxt_app_t *app; 1294 nxt_socket_t s; 1295 nxt_router_t *router; 1296 nxt_queue_link_t *qlk; 1297 nxt_socket_conf_t *skcf; 1298 nxt_router_conf_t *rtcf; 1299 1300 nxt_alert(task, "failed to apply new conf"); 1301 1302 for (qlk = nxt_queue_first(&creating_sockets); 1303 qlk != nxt_queue_tail(&creating_sockets); 1304 qlk = nxt_queue_next(qlk)) 1305 { 1306 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1307 s = skcf->listen->socket; 1308 1309 if (s != -1) { 1310 nxt_socket_close(task, s); 1311 } 1312 1313 nxt_free(skcf->listen); 1314 } 1315 1316 rtcf = tmcf->router_conf; 1317 1318 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1319 1320 nxt_router_app_unlink(task, app); 1321 1322 } nxt_queue_loop; 1323 1324 router = rtcf->router; 1325 1326 nxt_queue_add(&router->sockets, &keeping_sockets); 1327 nxt_queue_add(&router->sockets, &deleting_sockets); 1328 1329 nxt_queue_add(&router->apps, &tmcf->previous); 1330 1331 // TODO: new engines and threads 1332 1333 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1334 1335 nxt_mp_destroy(rtcf->mem_pool); 1336 1337 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1338 1339 nxt_mp_release(tmcf->mem_pool); 1340 } 1341 1342 1343 static void 1344 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1345 nxt_port_msg_type_t type) 1346 { 1347 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1348 1349 nxt_port_use(task, tmcf->port, -1); 1350 1351 tmcf->port = NULL; 1352 } 1353 1354 1355 static nxt_conf_map_t nxt_router_conf[] = { 1356 { 1357 nxt_string("listeners_threads"), 1358 NXT_CONF_MAP_INT32, 1359 offsetof(nxt_router_conf_t, threads), 1360 }, 1361 }; 1362 1363 1364 static nxt_conf_map_t nxt_router_app_conf[] = { 1365 { 1366 nxt_string("type"), 1367 NXT_CONF_MAP_STR, 1368 offsetof(nxt_router_app_conf_t, type), 1369 }, 1370 1371 { 1372 nxt_string("limits"), 1373 NXT_CONF_MAP_PTR, 1374 offsetof(nxt_router_app_conf_t, limits_value), 1375 }, 1376 1377 { 1378 nxt_string("processes"), 1379 NXT_CONF_MAP_INT32, 1380 offsetof(nxt_router_app_conf_t, processes), 1381 }, 1382 1383 { 1384 nxt_string("processes"), 1385 NXT_CONF_MAP_PTR, 1386 offsetof(nxt_router_app_conf_t, processes_value), 1387 }, 1388 1389 { 1390 nxt_string("targets"), 1391 NXT_CONF_MAP_PTR, 1392 offsetof(nxt_router_app_conf_t, targets_value), 1393 }, 1394 }; 1395 1396 1397 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1398 { 1399 nxt_string("timeout"), 1400 NXT_CONF_MAP_MSEC, 1401 offsetof(nxt_router_app_conf_t, timeout), 1402 }, 1403 }; 1404 1405 1406 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1407 { 1408 nxt_string("spare"), 1409 NXT_CONF_MAP_INT32, 1410 offsetof(nxt_router_app_conf_t, spare_processes), 1411 }, 1412 1413 { 1414 nxt_string("max"), 1415 NXT_CONF_MAP_INT32, 1416 offsetof(nxt_router_app_conf_t, max_processes), 1417 }, 1418 1419 { 1420 nxt_string("idle_timeout"), 1421 NXT_CONF_MAP_MSEC, 1422 offsetof(nxt_router_app_conf_t, idle_timeout), 1423 }, 1424 }; 1425 1426 1427 static nxt_conf_map_t nxt_router_listener_conf[] = { 1428 { 1429 nxt_string("pass"), 1430 NXT_CONF_MAP_STR_COPY, 1431 offsetof(nxt_router_listener_conf_t, pass), 1432 }, 1433 1434 { 1435 nxt_string("application"), 1436 NXT_CONF_MAP_STR_COPY, 1437 offsetof(nxt_router_listener_conf_t, application), 1438 }, 1439 }; 1440 1441 1442 static nxt_conf_map_t nxt_router_http_conf[] = { 1443 { 1444 nxt_string("header_buffer_size"), 1445 NXT_CONF_MAP_SIZE, 1446 offsetof(nxt_socket_conf_t, header_buffer_size), 1447 }, 1448 1449 { 1450 nxt_string("large_header_buffer_size"), 1451 NXT_CONF_MAP_SIZE, 1452 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1453 }, 1454 1455 { 1456 nxt_string("large_header_buffers"), 1457 NXT_CONF_MAP_SIZE, 1458 offsetof(nxt_socket_conf_t, large_header_buffers), 1459 }, 1460 1461 { 1462 nxt_string("body_buffer_size"), 1463 NXT_CONF_MAP_SIZE, 1464 offsetof(nxt_socket_conf_t, body_buffer_size), 1465 }, 1466 1467 { 1468 nxt_string("max_body_size"), 1469 NXT_CONF_MAP_SIZE, 1470 offsetof(nxt_socket_conf_t, max_body_size), 1471 }, 1472 1473 { 1474 nxt_string("idle_timeout"), 1475 NXT_CONF_MAP_MSEC, 1476 offsetof(nxt_socket_conf_t, idle_timeout), 1477 }, 1478 1479 { 1480 nxt_string("header_read_timeout"), 1481 NXT_CONF_MAP_MSEC, 1482 offsetof(nxt_socket_conf_t, header_read_timeout), 1483 }, 1484 1485 { 1486 nxt_string("body_read_timeout"), 1487 NXT_CONF_MAP_MSEC, 1488 offsetof(nxt_socket_conf_t, body_read_timeout), 1489 }, 1490 1491 { 1492 nxt_string("send_timeout"), 1493 NXT_CONF_MAP_MSEC, 1494 offsetof(nxt_socket_conf_t, send_timeout), 1495 }, 1496 1497 { 1498 nxt_string("body_temp_path"), 1499 NXT_CONF_MAP_STR, 1500 offsetof(nxt_socket_conf_t, body_temp_path), 1501 }, 1502 1503 { 1504 nxt_string("discard_unsafe_fields"), 1505 NXT_CONF_MAP_INT8, 1506 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1507 }, 1508 }; 1509 1510 1511 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1512 { 1513 nxt_string("max_frame_size"), 1514 NXT_CONF_MAP_SIZE, 1515 offsetof(nxt_websocket_conf_t, max_frame_size), 1516 }, 1517 1518 { 1519 nxt_string("read_timeout"), 1520 NXT_CONF_MAP_MSEC, 1521 offsetof(nxt_websocket_conf_t, read_timeout), 1522 }, 1523 1524 { 1525 nxt_string("keepalive_interval"), 1526 NXT_CONF_MAP_MSEC, 1527 offsetof(nxt_websocket_conf_t, keepalive_interval), 1528 }, 1529 1530 }; 1531 1532 1533 static nxt_int_t 1534 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1535 u_char *start, u_char *end) 1536 { 1537 u_char *p; 1538 size_t size; 1539 nxt_mp_t *mp, *app_mp; 1540 uint32_t next, next_target; 1541 nxt_int_t ret; 1542 nxt_str_t name, target; 1543 nxt_app_t *app, *prev; 1544 nxt_str_t *t, *s, *targets; 1545 nxt_uint_t n, i; 1546 nxt_port_t *port; 1547 nxt_router_t *router; 1548 nxt_app_joint_t *app_joint; 1549 #if (NXT_TLS) 1550 nxt_tls_init_t *tls_init; 1551 nxt_conf_value_t *certificate; 1552 #endif 1553 nxt_conf_value_t *root, *conf, *http, *value, *websocket; 1554 nxt_conf_value_t *applications, *application; 1555 nxt_conf_value_t *listeners, *listener; 1556 nxt_socket_conf_t *skcf; 1557 nxt_router_conf_t *rtcf; 1558 nxt_http_routes_t *routes; 1559 nxt_event_engine_t *engine; 1560 nxt_app_lang_module_t *lang; 1561 nxt_router_app_conf_t apcf; 1562 nxt_router_listener_conf_t lscf; 1563 1564 static nxt_str_t http_path = nxt_string("/settings/http"); 1565 static nxt_str_t applications_path = nxt_string("/applications"); 1566 static nxt_str_t listeners_path = nxt_string("/listeners"); 1567 static nxt_str_t routes_path = nxt_string("/routes"); 1568 static nxt_str_t access_log_path = nxt_string("/access_log"); 1569 #if (NXT_TLS) 1570 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1571 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands"); 1572 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size"); 1573 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout"); 1574 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets"); 1575 #endif 1576 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1577 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1578 static nxt_str_t forwarded_path = nxt_string("/forwarded"); 1579 static nxt_str_t client_ip_path = nxt_string("/client_ip"); 1580 1581 root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1582 if (root == NULL) { 1583 nxt_alert(task, "configuration parsing error"); 1584 return NXT_ERROR; 1585 } 1586 1587 rtcf = tmcf->router_conf; 1588 mp = rtcf->mem_pool; 1589 1590 ret = nxt_conf_map_object(mp, root, nxt_router_conf, 1591 nxt_nitems(nxt_router_conf), rtcf); 1592 if (ret != NXT_OK) { 1593 nxt_alert(task, "root map error"); 1594 return NXT_ERROR; 1595 } 1596 1597 if (rtcf->threads == 0) { 1598 rtcf->threads = nxt_ncpu; 1599 } 1600 1601 conf = nxt_conf_get_path(root, &static_path); 1602 1603 ret = nxt_router_conf_process_static(task, rtcf, conf); 1604 if (nxt_slow_path(ret != NXT_OK)) { 1605 return NXT_ERROR; 1606 } 1607 1608 router = rtcf->router; 1609 1610 applications = nxt_conf_get_path(root, &applications_path); 1611 1612 if (applications != NULL) { 1613 next = 0; 1614 1615 for ( ;; ) { 1616 application = nxt_conf_next_object_member(applications, 1617 &name, &next); 1618 if (application == NULL) { 1619 break; 1620 } 1621 1622 nxt_debug(task, "application \"%V\"", &name); 1623 1624 size = nxt_conf_json_length(application, NULL); 1625 1626 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1627 if (nxt_slow_path(app_mp == NULL)) { 1628 goto fail; 1629 } 1630 1631 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1632 if (app == NULL) { 1633 goto app_fail; 1634 } 1635 1636 nxt_memzero(app, sizeof(nxt_app_t)); 1637 1638 app->mem_pool = app_mp; 1639 1640 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1641 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1642 + name.length); 1643 1644 p = nxt_conf_json_print(app->conf.start, application, NULL); 1645 app->conf.length = p - app->conf.start; 1646 1647 nxt_assert(app->conf.length <= size); 1648 1649 nxt_debug(task, "application conf \"%V\"", &app->conf); 1650 1651 prev = nxt_router_app_find(&router->apps, &name); 1652 1653 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1654 nxt_mp_destroy(app_mp); 1655 1656 nxt_queue_remove(&prev->link); 1657 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1658 1659 ret = nxt_router_apps_hash_add(rtcf, prev); 1660 if (nxt_slow_path(ret != NXT_OK)) { 1661 goto fail; 1662 } 1663 1664 continue; 1665 } 1666 1667 apcf.processes = 1; 1668 apcf.max_processes = 1; 1669 apcf.spare_processes = 0; 1670 apcf.timeout = 0; 1671 apcf.idle_timeout = 15000; 1672 apcf.limits_value = NULL; 1673 apcf.processes_value = NULL; 1674 apcf.targets_value = NULL; 1675 1676 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1677 if (nxt_slow_path(app_joint == NULL)) { 1678 goto app_fail; 1679 } 1680 1681 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1682 1683 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1684 nxt_nitems(nxt_router_app_conf), &apcf); 1685 if (ret != NXT_OK) { 1686 nxt_alert(task, "application map error"); 1687 goto app_fail; 1688 } 1689 1690 if (apcf.limits_value != NULL) { 1691 1692 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1693 nxt_alert(task, "application limits is not object"); 1694 goto app_fail; 1695 } 1696 1697 ret = nxt_conf_map_object(mp, apcf.limits_value, 1698 nxt_router_app_limits_conf, 1699 nxt_nitems(nxt_router_app_limits_conf), 1700 &apcf); 1701 if (ret != NXT_OK) { 1702 nxt_alert(task, "application limits map error"); 1703 goto app_fail; 1704 } 1705 } 1706 1707 if (apcf.processes_value != NULL 1708 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1709 { 1710 ret = nxt_conf_map_object(mp, apcf.processes_value, 1711 nxt_router_app_processes_conf, 1712 nxt_nitems(nxt_router_app_processes_conf), 1713 &apcf); 1714 if (ret != NXT_OK) { 1715 nxt_alert(task, "application processes map error"); 1716 goto app_fail; 1717 } 1718 1719 } else { 1720 apcf.max_processes = apcf.processes; 1721 apcf.spare_processes = apcf.processes; 1722 } 1723 1724 if (apcf.targets_value != NULL) { 1725 n = nxt_conf_object_members_count(apcf.targets_value); 1726 1727 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1728 if (nxt_slow_path(targets == NULL)) { 1729 goto app_fail; 1730 } 1731 1732 next_target = 0; 1733 1734 for (i = 0; i < n; i++) { 1735 (void) nxt_conf_next_object_member(apcf.targets_value, 1736 &target, &next_target); 1737 1738 s = nxt_str_dup(app_mp, &targets[i], &target); 1739 if (nxt_slow_path(s == NULL)) { 1740 goto app_fail; 1741 } 1742 } 1743 1744 } else { 1745 targets = NULL; 1746 } 1747 1748 nxt_debug(task, "application type: %V", &apcf.type); 1749 nxt_debug(task, "application processes: %D", apcf.processes); 1750 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1751 1752 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1753 1754 if (lang == NULL) { 1755 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1756 goto app_fail; 1757 } 1758 1759 nxt_debug(task, "application language module: \"%s\"", lang->file); 1760 1761 ret = nxt_thread_mutex_create(&app->mutex); 1762 if (ret != NXT_OK) { 1763 goto app_fail; 1764 } 1765 1766 nxt_queue_init(&app->ports); 1767 nxt_queue_init(&app->spare_ports); 1768 nxt_queue_init(&app->idle_ports); 1769 nxt_queue_init(&app->ack_waiting_req); 1770 1771 app->name.length = name.length; 1772 nxt_memcpy(app->name.start, name.start, name.length); 1773 1774 app->type = lang->type; 1775 app->max_processes = apcf.max_processes; 1776 app->spare_processes = apcf.spare_processes; 1777 app->max_pending_processes = apcf.spare_processes 1778 ? apcf.spare_processes : 1; 1779 app->timeout = apcf.timeout; 1780 app->idle_timeout = apcf.idle_timeout; 1781 1782 app->targets = targets; 1783 1784 engine = task->thread->engine; 1785 1786 app->engine = engine; 1787 1788 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1789 app->adjust_idle_work.task = &engine->task; 1790 app->adjust_idle_work.obj = app; 1791 1792 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1793 1794 ret = nxt_router_apps_hash_add(rtcf, app); 1795 if (nxt_slow_path(ret != NXT_OK)) { 1796 goto app_fail; 1797 } 1798 1799 nxt_router_app_use(task, app, 1); 1800 1801 app->joint = app_joint; 1802 1803 app_joint->use_count = 1; 1804 app_joint->app = app; 1805 1806 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1807 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1808 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1809 app_joint->idle_timer.task = &engine->task; 1810 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1811 1812 app_joint->free_app_work.handler = nxt_router_free_app; 1813 app_joint->free_app_work.task = &engine->task; 1814 app_joint->free_app_work.obj = app_joint; 1815 1816 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 1817 NXT_PROCESS_APP); 1818 if (nxt_slow_path(port == NULL)) { 1819 return NXT_ERROR; 1820 } 1821 1822 ret = nxt_port_socket_init(task, port, 0); 1823 if (nxt_slow_path(ret != NXT_OK)) { 1824 nxt_port_use(task, port, -1); 1825 return NXT_ERROR; 1826 } 1827 1828 ret = nxt_router_app_queue_init(task, port); 1829 if (nxt_slow_path(ret != NXT_OK)) { 1830 nxt_port_write_close(port); 1831 nxt_port_read_close(port); 1832 nxt_port_use(task, port, -1); 1833 return NXT_ERROR; 1834 } 1835 1836 nxt_port_write_enable(task, port); 1837 port->app = app; 1838 1839 app->shared_port = port; 1840 1841 nxt_thread_mutex_create(&app->outgoing.mutex); 1842 } 1843 } 1844 1845 conf = nxt_conf_get_path(root, &routes_path); 1846 if (nxt_fast_path(conf != NULL)) { 1847 routes = nxt_http_routes_create(task, tmcf, conf); 1848 if (nxt_slow_path(routes == NULL)) { 1849 return NXT_ERROR; 1850 } 1851 1852 rtcf->routes = routes; 1853 } 1854 1855 ret = nxt_upstreams_create(task, tmcf, root); 1856 if (nxt_slow_path(ret != NXT_OK)) { 1857 return ret; 1858 } 1859 1860 http = nxt_conf_get_path(root, &http_path); 1861 #if 0 1862 if (http == NULL) { 1863 nxt_alert(task, "no \"http\" block"); 1864 return NXT_ERROR; 1865 } 1866 #endif 1867 1868 websocket = nxt_conf_get_path(root, &websocket_path); 1869 1870 listeners = nxt_conf_get_path(root, &listeners_path); 1871 1872 if (listeners != NULL) { 1873 next = 0; 1874 1875 for ( ;; ) { 1876 listener = nxt_conf_next_object_member(listeners, &name, &next); 1877 if (listener == NULL) { 1878 break; 1879 } 1880 1881 skcf = nxt_router_socket_conf(task, tmcf, &name); 1882 if (skcf == NULL) { 1883 goto fail; 1884 } 1885 1886 nxt_memzero(&lscf, sizeof(lscf)); 1887 1888 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1889 nxt_nitems(nxt_router_listener_conf), 1890 &lscf); 1891 if (ret != NXT_OK) { 1892 nxt_alert(task, "listener map error"); 1893 goto fail; 1894 } 1895 1896 nxt_debug(task, "application: %V", &lscf.application); 1897 1898 // STUB, default values if http block is not defined. 1899 skcf->header_buffer_size = 2048; 1900 skcf->large_header_buffer_size = 8192; 1901 skcf->large_header_buffers = 4; 1902 skcf->discard_unsafe_fields = 1; 1903 skcf->body_buffer_size = 16 * 1024; 1904 skcf->max_body_size = 8 * 1024 * 1024; 1905 skcf->proxy_header_buffer_size = 64 * 1024; 1906 skcf->proxy_buffer_size = 4096; 1907 skcf->proxy_buffers = 256; 1908 skcf->idle_timeout = 180 * 1000; 1909 skcf->header_read_timeout = 30 * 1000; 1910 skcf->body_read_timeout = 30 * 1000; 1911 skcf->send_timeout = 30 * 1000; 1912 skcf->proxy_timeout = 60 * 1000; 1913 skcf->proxy_send_timeout = 30 * 1000; 1914 skcf->proxy_read_timeout = 30 * 1000; 1915 1916 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1917 skcf->websocket_conf.read_timeout = 60 * 1000; 1918 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1919 1920 nxt_str_null(&skcf->body_temp_path); 1921 1922 if (http != NULL) { 1923 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1924 nxt_nitems(nxt_router_http_conf), 1925 skcf); 1926 if (ret != NXT_OK) { 1927 nxt_alert(task, "http map error"); 1928 goto fail; 1929 } 1930 } 1931 1932 if (websocket != NULL) { 1933 ret = nxt_conf_map_object(mp, websocket, 1934 nxt_router_websocket_conf, 1935 nxt_nitems(nxt_router_websocket_conf), 1936 &skcf->websocket_conf); 1937 if (ret != NXT_OK) { 1938 nxt_alert(task, "websocket map error"); 1939 goto fail; 1940 } 1941 } 1942 1943 t = &skcf->body_temp_path; 1944 1945 if (t->length == 0) { 1946 t->start = (u_char *) task->thread->runtime->tmp; 1947 t->length = nxt_strlen(t->start); 1948 } 1949 1950 conf = nxt_conf_get_path(listener, &forwarded_path); 1951 1952 if (conf != NULL) { 1953 skcf->forwarded = nxt_router_conf_forward(task, mp, conf); 1954 if (nxt_slow_path(skcf->forwarded == NULL)) { 1955 return NXT_ERROR; 1956 } 1957 } 1958 1959 conf = nxt_conf_get_path(listener, &client_ip_path); 1960 1961 if (conf != NULL) { 1962 skcf->client_ip = nxt_router_conf_forward(task, mp, conf); 1963 if (nxt_slow_path(skcf->client_ip == NULL)) { 1964 return NXT_ERROR; 1965 } 1966 } 1967 1968 #if (NXT_TLS) 1969 certificate = nxt_conf_get_path(listener, &certificate_path); 1970 1971 if (certificate != NULL) { 1972 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t)); 1973 if (nxt_slow_path(tls_init == NULL)) { 1974 return NXT_ERROR; 1975 } 1976 1977 tls_init->cache_size = 0; 1978 tls_init->timeout = 300; 1979 1980 value = nxt_conf_get_path(listener, &conf_cache_path); 1981 if (value != NULL) { 1982 tls_init->cache_size = nxt_conf_get_number(value); 1983 } 1984 1985 value = nxt_conf_get_path(listener, &conf_timeout_path); 1986 if (value != NULL) { 1987 tls_init->timeout = nxt_conf_get_number(value); 1988 } 1989 1990 tls_init->conf_cmds = nxt_conf_get_path(listener, 1991 &conf_commands_path); 1992 1993 tls_init->tickets_conf = nxt_conf_get_path(listener, 1994 &conf_tickets); 1995 1996 n = nxt_conf_array_elements_count_or_1(certificate); 1997 1998 for (i = 0; i < n; i++) { 1999 value = nxt_conf_get_array_element_or_itself(certificate, 2000 i); 2001 nxt_assert(value != NULL); 2002 2003 ret = nxt_router_conf_tls_insert(tmcf, value, skcf, 2004 tls_init, i == 0); 2005 if (nxt_slow_path(ret != NXT_OK)) { 2006 goto fail; 2007 } 2008 } 2009 } 2010 #endif 2011 2012 skcf->listen->handler = nxt_http_conn_init; 2013 skcf->router_conf = rtcf; 2014 skcf->router_conf->count++; 2015 2016 if (lscf.pass.length != 0) { 2017 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 2018 2019 /* COMPATIBILITY: listener application. */ 2020 } else if (lscf.application.length > 0) { 2021 skcf->action = nxt_http_pass_application(task, rtcf, 2022 &lscf.application); 2023 } 2024 2025 if (nxt_slow_path(skcf->action == NULL)) { 2026 goto fail; 2027 } 2028 } 2029 } 2030 2031 ret = nxt_http_routes_resolve(task, tmcf); 2032 if (nxt_slow_path(ret != NXT_OK)) { 2033 goto fail; 2034 } 2035 2036 value = nxt_conf_get_path(root, &access_log_path); 2037 2038 if (value != NULL) { 2039 ret = nxt_router_access_log_create(task, rtcf, value); 2040 if (nxt_slow_path(ret != NXT_OK)) { 2041 goto fail; 2042 } 2043 } 2044 2045 nxt_queue_add(&deleting_sockets, &router->sockets); 2046 nxt_queue_init(&router->sockets); 2047 2048 return NXT_OK; 2049 2050 app_fail: 2051 2052 nxt_mp_destroy(app_mp); 2053 2054 fail: 2055 2056 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 2057 2058 nxt_queue_remove(&app->link); 2059 nxt_thread_mutex_destroy(&app->mutex); 2060 nxt_mp_destroy(app->mem_pool); 2061 2062 } nxt_queue_loop; 2063 2064 return NXT_ERROR; 2065 } 2066 2067 2068 #if (NXT_TLS) 2069 2070 static nxt_int_t 2071 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 2072 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, 2073 nxt_tls_init_t *tls_init, nxt_bool_t last) 2074 { 2075 nxt_router_tlssock_t *tls; 2076 2077 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t)); 2078 if (nxt_slow_path(tls == NULL)) { 2079 return NXT_ERROR; 2080 } 2081 2082 tls->tls_init = tls_init; 2083 tls->socket_conf = skcf; 2084 tls->temp_conf = tmcf; 2085 tls->last = last; 2086 nxt_conf_get_string(value, &tls->name); 2087 2088 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 2089 2090 return NXT_OK; 2091 } 2092 2093 #endif 2094 2095 2096 static nxt_int_t 2097 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 2098 nxt_conf_value_t *conf) 2099 { 2100 uint32_t next, i; 2101 nxt_mp_t *mp; 2102 nxt_str_t *type, exten, str; 2103 nxt_int_t ret; 2104 nxt_uint_t exts; 2105 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 2106 2107 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 2108 2109 mp = rtcf->mem_pool; 2110 2111 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 2112 if (nxt_slow_path(ret != NXT_OK)) { 2113 return NXT_ERROR; 2114 } 2115 2116 if (conf == NULL) { 2117 return NXT_OK; 2118 } 2119 2120 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 2121 2122 if (mtypes_conf != NULL) { 2123 next = 0; 2124 2125 for ( ;; ) { 2126 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 2127 2128 if (ext_conf == NULL) { 2129 break; 2130 } 2131 2132 type = nxt_str_dup(mp, NULL, &str); 2133 if (nxt_slow_path(type == NULL)) { 2134 return NXT_ERROR; 2135 } 2136 2137 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 2138 nxt_conf_get_string(ext_conf, &str); 2139 2140 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2141 return NXT_ERROR; 2142 } 2143 2144 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2145 &exten, type); 2146 if (nxt_slow_path(ret != NXT_OK)) { 2147 return NXT_ERROR; 2148 } 2149 2150 continue; 2151 } 2152 2153 exts = nxt_conf_array_elements_count(ext_conf); 2154 2155 for (i = 0; i < exts; i++) { 2156 value = nxt_conf_get_array_element(ext_conf, i); 2157 2158 nxt_conf_get_string(value, &str); 2159 2160 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2161 return NXT_ERROR; 2162 } 2163 2164 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2165 &exten, type); 2166 if (nxt_slow_path(ret != NXT_OK)) { 2167 return NXT_ERROR; 2168 } 2169 } 2170 } 2171 } 2172 2173 return NXT_OK; 2174 } 2175 2176 2177 static nxt_http_forward_t * 2178 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf) 2179 { 2180 nxt_int_t ret; 2181 nxt_conf_value_t *header_conf, *client_ip_conf, *protocol_conf; 2182 nxt_conf_value_t *source_conf, *recursive_conf; 2183 nxt_http_forward_t *forward; 2184 nxt_http_route_addr_rule_t *source; 2185 2186 static nxt_str_t header_path = nxt_string("/header"); 2187 static nxt_str_t client_ip_path = nxt_string("/client_ip"); 2188 static nxt_str_t protocol_path = nxt_string("/protocol"); 2189 static nxt_str_t source_path = nxt_string("/source"); 2190 static nxt_str_t recursive_path = nxt_string("/recursive"); 2191 2192 header_conf = nxt_conf_get_path(conf, &header_path); 2193 2194 if (header_conf != NULL) { 2195 client_ip_conf = nxt_conf_get_path(conf, &header_path); 2196 protocol_conf = NULL; 2197 2198 } else { 2199 client_ip_conf = nxt_conf_get_path(conf, &client_ip_path); 2200 protocol_conf = nxt_conf_get_path(conf, &protocol_path); 2201 } 2202 2203 source_conf = nxt_conf_get_path(conf, &source_path); 2204 recursive_conf = nxt_conf_get_path(conf, &recursive_path); 2205 2206 if (source_conf == NULL 2207 || (protocol_conf == NULL && client_ip_conf == NULL)) 2208 { 2209 return NULL; 2210 } 2211 2212 forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t)); 2213 if (nxt_slow_path(forward == NULL)) { 2214 return NULL; 2215 } 2216 2217 source = nxt_http_route_addr_rule_create(task, mp, source_conf); 2218 if (nxt_slow_path(source == NULL)) { 2219 return NULL; 2220 } 2221 2222 forward->source = source; 2223 2224 if (recursive_conf != NULL) { 2225 forward->recursive = nxt_conf_get_boolean(recursive_conf); 2226 } 2227 2228 if (client_ip_conf != NULL) { 2229 ret = nxt_router_conf_forward_header(mp, client_ip_conf, 2230 &forward->client_ip); 2231 if (nxt_slow_path(ret != NXT_OK)) { 2232 return NULL; 2233 } 2234 } 2235 2236 if (protocol_conf != NULL) { 2237 ret = nxt_router_conf_forward_header(mp, protocol_conf, 2238 &forward->protocol); 2239 if (nxt_slow_path(ret != NXT_OK)) { 2240 return NULL; 2241 } 2242 } 2243 2244 return forward; 2245 } 2246 2247 2248 static nxt_int_t 2249 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf, 2250 nxt_http_forward_header_t *fh) 2251 { 2252 char c; 2253 size_t i; 2254 uint32_t hash; 2255 nxt_str_t header; 2256 2257 nxt_conf_get_string(conf, &header); 2258 2259 fh->header = nxt_str_dup(mp, NULL, &header); 2260 if (nxt_slow_path(fh->header == NULL)) { 2261 return NXT_ERROR; 2262 } 2263 2264 hash = NXT_HTTP_FIELD_HASH_INIT; 2265 2266 for (i = 0; i < fh->header->length; i++) { 2267 c = fh->header->start[i]; 2268 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c)); 2269 } 2270 2271 hash = nxt_http_field_hash_end(hash) & 0xFFFF; 2272 2273 fh->header_hash = hash; 2274 2275 return NXT_OK; 2276 } 2277 2278 2279 static nxt_app_t * 2280 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 2281 { 2282 nxt_app_t *app; 2283 2284 nxt_queue_each(app, queue, nxt_app_t, link) { 2285 2286 if (nxt_strstr_eq(name, &app->name)) { 2287 return app; 2288 } 2289 2290 } nxt_queue_loop; 2291 2292 return NULL; 2293 } 2294 2295 2296 static nxt_int_t 2297 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 2298 { 2299 void *mem; 2300 nxt_int_t fd; 2301 2302 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 2303 if (nxt_slow_path(fd == -1)) { 2304 return NXT_ERROR; 2305 } 2306 2307 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 2308 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2309 if (nxt_slow_path(mem == MAP_FAILED)) { 2310 nxt_fd_close(fd); 2311 2312 return NXT_ERROR; 2313 } 2314 2315 nxt_app_queue_init(mem); 2316 2317 port->queue_fd = fd; 2318 port->queue = mem; 2319 2320 return NXT_OK; 2321 } 2322 2323 2324 static nxt_int_t 2325 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 2326 { 2327 void *mem; 2328 nxt_int_t fd; 2329 2330 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 2331 if (nxt_slow_path(fd == -1)) { 2332 return NXT_ERROR; 2333 } 2334 2335 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2336 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2337 if (nxt_slow_path(mem == MAP_FAILED)) { 2338 nxt_fd_close(fd); 2339 2340 return NXT_ERROR; 2341 } 2342 2343 nxt_port_queue_init(mem); 2344 2345 port->queue_fd = fd; 2346 port->queue = mem; 2347 2348 return NXT_OK; 2349 } 2350 2351 2352 static nxt_int_t 2353 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 2354 { 2355 void *mem; 2356 2357 nxt_assert(fd != -1); 2358 2359 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2360 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2361 if (nxt_slow_path(mem == MAP_FAILED)) { 2362 2363 return NXT_ERROR; 2364 } 2365 2366 port->queue = mem; 2367 2368 return NXT_OK; 2369 } 2370 2371 2372 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2373 NXT_LVLHSH_DEFAULT, 2374 nxt_router_apps_hash_test, 2375 nxt_mp_lvlhsh_alloc, 2376 nxt_mp_lvlhsh_free, 2377 }; 2378 2379 2380 static nxt_int_t 2381 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2382 { 2383 nxt_app_t *app; 2384 2385 app = data; 2386 2387 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2388 } 2389 2390 2391 static nxt_int_t 2392 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2393 { 2394 nxt_lvlhsh_query_t lhq; 2395 2396 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2397 lhq.replace = 0; 2398 lhq.key = app->name; 2399 lhq.value = app; 2400 lhq.proto = &nxt_router_apps_hash_proto; 2401 lhq.pool = rtcf->mem_pool; 2402 2403 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2404 2405 case NXT_OK: 2406 return NXT_OK; 2407 2408 case NXT_DECLINED: 2409 nxt_thread_log_alert("router app hash adding failed: " 2410 "\"%V\" is already in hash", &lhq.key); 2411 /* Fall through. */ 2412 default: 2413 return NXT_ERROR; 2414 } 2415 } 2416 2417 2418 static nxt_app_t * 2419 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2420 { 2421 nxt_lvlhsh_query_t lhq; 2422 2423 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2424 lhq.key = *name; 2425 lhq.proto = &nxt_router_apps_hash_proto; 2426 2427 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2428 return NULL; 2429 } 2430 2431 return lhq.value; 2432 } 2433 2434 2435 static void 2436 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2437 { 2438 nxt_app_t *app; 2439 nxt_lvlhsh_each_t lhe; 2440 2441 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2442 2443 for ( ;; ) { 2444 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2445 2446 if (app == NULL) { 2447 break; 2448 } 2449 2450 nxt_router_app_use(task, app, i); 2451 } 2452 } 2453 2454 2455 typedef struct { 2456 nxt_app_t *app; 2457 nxt_int_t target; 2458 } nxt_http_app_conf_t; 2459 2460 2461 nxt_int_t 2462 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name, 2463 nxt_str_t *target, nxt_http_action_t *action) 2464 { 2465 nxt_app_t *app; 2466 nxt_str_t *targets; 2467 nxt_uint_t i; 2468 nxt_http_app_conf_t *conf; 2469 2470 app = nxt_router_apps_hash_get(rtcf, name); 2471 if (app == NULL) { 2472 return NXT_DECLINED; 2473 } 2474 2475 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t)); 2476 if (nxt_slow_path(conf == NULL)) { 2477 return NXT_ERROR; 2478 } 2479 2480 action->handler = nxt_http_application_handler; 2481 action->u.conf = conf; 2482 2483 conf->app = app; 2484 2485 if (target != NULL && target->length != 0) { 2486 targets = app->targets; 2487 2488 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++); 2489 2490 conf->target = i; 2491 2492 } else { 2493 conf->target = 0; 2494 } 2495 2496 return NXT_OK; 2497 } 2498 2499 2500 static nxt_socket_conf_t * 2501 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2502 nxt_str_t *name) 2503 { 2504 size_t size; 2505 nxt_int_t ret; 2506 nxt_bool_t wildcard; 2507 nxt_sockaddr_t *sa; 2508 nxt_socket_conf_t *skcf; 2509 nxt_listen_socket_t *ls; 2510 2511 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2512 if (nxt_slow_path(sa == NULL)) { 2513 nxt_alert(task, "invalid listener \"%V\"", name); 2514 return NULL; 2515 } 2516 2517 sa->type = SOCK_STREAM; 2518 2519 nxt_debug(task, "router listener: \"%*s\"", 2520 (size_t) sa->length, nxt_sockaddr_start(sa)); 2521 2522 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2523 if (nxt_slow_path(skcf == NULL)) { 2524 return NULL; 2525 } 2526 2527 size = nxt_sockaddr_size(sa); 2528 2529 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2530 2531 if (ret != NXT_OK) { 2532 2533 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2534 if (nxt_slow_path(ls == NULL)) { 2535 return NULL; 2536 } 2537 2538 skcf->listen = ls; 2539 2540 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2541 nxt_memcpy(ls->sockaddr, sa, size); 2542 2543 nxt_listen_socket_remote_size(ls); 2544 2545 ls->socket = -1; 2546 ls->backlog = NXT_LISTEN_BACKLOG; 2547 ls->flags = NXT_NONBLOCK; 2548 ls->read_after_accept = 1; 2549 } 2550 2551 switch (sa->u.sockaddr.sa_family) { 2552 #if (NXT_HAVE_UNIX_DOMAIN) 2553 case AF_UNIX: 2554 wildcard = 0; 2555 break; 2556 #endif 2557 #if (NXT_INET6) 2558 case AF_INET6: 2559 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2560 break; 2561 #endif 2562 case AF_INET: 2563 default: 2564 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2565 break; 2566 } 2567 2568 if (!wildcard) { 2569 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2570 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2571 return NULL; 2572 } 2573 2574 nxt_memcpy(skcf->sockaddr, sa, size); 2575 } 2576 2577 return skcf; 2578 } 2579 2580 2581 static nxt_int_t 2582 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2583 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2584 { 2585 nxt_router_t *router; 2586 nxt_queue_link_t *qlk; 2587 nxt_socket_conf_t *skcf; 2588 2589 router = tmcf->router_conf->router; 2590 2591 for (qlk = nxt_queue_first(&router->sockets); 2592 qlk != nxt_queue_tail(&router->sockets); 2593 qlk = nxt_queue_next(qlk)) 2594 { 2595 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2596 2597 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2598 nskcf->listen = skcf->listen; 2599 2600 nxt_queue_remove(qlk); 2601 nxt_queue_insert_tail(&keeping_sockets, qlk); 2602 2603 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2604 2605 return NXT_OK; 2606 } 2607 } 2608 2609 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2610 2611 return NXT_DECLINED; 2612 } 2613 2614 2615 static void 2616 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2617 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2618 { 2619 size_t size; 2620 uint32_t stream; 2621 nxt_int_t ret; 2622 nxt_buf_t *b; 2623 nxt_port_t *main_port, *router_port; 2624 nxt_runtime_t *rt; 2625 nxt_socket_rpc_t *rpc; 2626 2627 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2628 if (rpc == NULL) { 2629 goto fail; 2630 } 2631 2632 rpc->socket_conf = skcf; 2633 rpc->temp_conf = tmcf; 2634 2635 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2636 2637 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2638 if (b == NULL) { 2639 goto fail; 2640 } 2641 2642 b->completion_handler = nxt_buf_dummy_completion; 2643 2644 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2645 2646 rt = task->thread->runtime; 2647 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2648 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2649 2650 stream = nxt_port_rpc_register_handler(task, router_port, 2651 nxt_router_listen_socket_ready, 2652 nxt_router_listen_socket_error, 2653 main_port->pid, rpc); 2654 if (nxt_slow_path(stream == 0)) { 2655 goto fail; 2656 } 2657 2658 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2659 stream, router_port->id, b); 2660 2661 if (nxt_slow_path(ret != NXT_OK)) { 2662 nxt_port_rpc_cancel(task, router_port, stream); 2663 goto fail; 2664 } 2665 2666 return; 2667 2668 fail: 2669 2670 nxt_router_conf_error(task, tmcf); 2671 } 2672 2673 2674 static void 2675 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2676 void *data) 2677 { 2678 nxt_int_t ret; 2679 nxt_socket_t s; 2680 nxt_socket_rpc_t *rpc; 2681 2682 rpc = data; 2683 2684 s = msg->fd[0]; 2685 2686 ret = nxt_socket_nonblocking(task, s); 2687 if (nxt_slow_path(ret != NXT_OK)) { 2688 goto fail; 2689 } 2690 2691 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2692 2693 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2694 if (nxt_slow_path(ret != NXT_OK)) { 2695 goto fail; 2696 } 2697 2698 rpc->socket_conf->listen->socket = s; 2699 2700 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2701 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2702 2703 return; 2704 2705 fail: 2706 2707 nxt_socket_close(task, s); 2708 2709 nxt_router_conf_error(task, rpc->temp_conf); 2710 } 2711 2712 2713 static void 2714 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2715 void *data) 2716 { 2717 nxt_socket_rpc_t *rpc; 2718 nxt_router_temp_conf_t *tmcf; 2719 2720 rpc = data; 2721 tmcf = rpc->temp_conf; 2722 2723 #if 0 2724 u_char *p; 2725 size_t size; 2726 uint8_t error; 2727 nxt_buf_t *in, *out; 2728 nxt_sockaddr_t *sa; 2729 2730 static nxt_str_t socket_errors[] = { 2731 nxt_string("ListenerSystem"), 2732 nxt_string("ListenerNoIPv6"), 2733 nxt_string("ListenerPort"), 2734 nxt_string("ListenerInUse"), 2735 nxt_string("ListenerNoAddress"), 2736 nxt_string("ListenerNoAccess"), 2737 nxt_string("ListenerPath"), 2738 }; 2739 2740 sa = rpc->socket_conf->listen->sockaddr; 2741 2742 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2743 2744 if (nxt_slow_path(in == NULL)) { 2745 return; 2746 } 2747 2748 p = in->mem.pos; 2749 2750 error = *p++; 2751 2752 size = nxt_length("listen socket error: ") 2753 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2754 + sa->length + socket_errors[error].length + (in->mem.free - p); 2755 2756 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2757 if (nxt_slow_path(out == NULL)) { 2758 return; 2759 } 2760 2761 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2762 "listen socket error: " 2763 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2764 (size_t) sa->length, nxt_sockaddr_start(sa), 2765 &socket_errors[error], in->mem.free - p, p); 2766 2767 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2768 #endif 2769 2770 nxt_router_conf_error(task, tmcf); 2771 } 2772 2773 2774 #if (NXT_TLS) 2775 2776 static void 2777 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2778 void *data) 2779 { 2780 nxt_mp_t *mp; 2781 nxt_int_t ret; 2782 nxt_tls_conf_t *tlscf; 2783 nxt_router_tlssock_t *tls; 2784 nxt_tls_bundle_conf_t *bundle; 2785 nxt_router_temp_conf_t *tmcf; 2786 2787 nxt_debug(task, "tls rpc handler"); 2788 2789 tls = data; 2790 tmcf = tls->temp_conf; 2791 2792 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2793 goto fail; 2794 } 2795 2796 mp = tmcf->router_conf->mem_pool; 2797 2798 if (tls->socket_conf->tls == NULL){ 2799 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2800 if (nxt_slow_path(tlscf == NULL)) { 2801 goto fail; 2802 } 2803 2804 tlscf->no_wait_shutdown = 1; 2805 tls->socket_conf->tls = tlscf; 2806 2807 } else { 2808 tlscf = tls->socket_conf->tls; 2809 } 2810 2811 tls->tls_init->conf = tlscf; 2812 2813 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); 2814 if (nxt_slow_path(bundle == NULL)) { 2815 goto fail; 2816 } 2817 2818 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) { 2819 goto fail; 2820 } 2821 2822 bundle->chain_file = msg->fd[0]; 2823 bundle->next = tlscf->bundle; 2824 tlscf->bundle = bundle; 2825 2826 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init, 2827 tls->last); 2828 if (nxt_slow_path(ret != NXT_OK)) { 2829 goto fail; 2830 } 2831 2832 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2833 nxt_router_conf_apply, task, tmcf, NULL); 2834 return; 2835 2836 fail: 2837 2838 nxt_router_conf_error(task, tmcf); 2839 } 2840 2841 #endif 2842 2843 2844 static void 2845 nxt_router_app_rpc_create(nxt_task_t *task, 2846 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2847 { 2848 size_t size; 2849 uint32_t stream; 2850 nxt_fd_t port_fd, queue_fd; 2851 nxt_int_t ret; 2852 nxt_buf_t *b; 2853 nxt_port_t *router_port, *dport; 2854 nxt_runtime_t *rt; 2855 nxt_app_rpc_t *rpc; 2856 2857 rt = task->thread->runtime; 2858 2859 dport = app->proto_port; 2860 2861 if (dport == NULL) { 2862 nxt_debug(task, "app '%V' prototype prefork", &app->name); 2863 2864 size = app->name.length + 1 + app->conf.length; 2865 2866 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2867 if (nxt_slow_path(b == NULL)) { 2868 goto fail; 2869 } 2870 2871 b->completion_handler = nxt_buf_dummy_completion; 2872 2873 nxt_buf_cpystr(b, &app->name); 2874 *b->mem.free++ = '\0'; 2875 nxt_buf_cpystr(b, &app->conf); 2876 2877 dport = rt->port_by_type[NXT_PROCESS_MAIN]; 2878 2879 port_fd = app->shared_port->pair[0]; 2880 queue_fd = app->shared_port->queue_fd; 2881 2882 } else { 2883 nxt_debug(task, "app '%V' prefork", &app->name); 2884 2885 b = NULL; 2886 port_fd = -1; 2887 queue_fd = -1; 2888 } 2889 2890 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2891 2892 rpc = nxt_port_rpc_register_handler_ex(task, router_port, 2893 nxt_router_app_prefork_ready, 2894 nxt_router_app_prefork_error, 2895 sizeof(nxt_app_rpc_t)); 2896 if (nxt_slow_path(rpc == NULL)) { 2897 goto fail; 2898 } 2899 2900 rpc->app = app; 2901 rpc->temp_conf = tmcf; 2902 rpc->proto = (b != NULL); 2903 2904 stream = nxt_port_rpc_ex_stream(rpc); 2905 2906 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, 2907 port_fd, queue_fd, stream, router_port->id, b); 2908 if (nxt_slow_path(ret != NXT_OK)) { 2909 nxt_port_rpc_cancel(task, router_port, stream); 2910 goto fail; 2911 } 2912 2913 if (b == NULL) { 2914 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid); 2915 2916 app->pending_processes++; 2917 } 2918 2919 return; 2920 2921 fail: 2922 2923 nxt_router_conf_error(task, tmcf); 2924 } 2925 2926 2927 static void 2928 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2929 void *data) 2930 { 2931 nxt_app_t *app; 2932 nxt_port_t *port; 2933 nxt_app_rpc_t *rpc; 2934 nxt_event_engine_t *engine; 2935 2936 rpc = data; 2937 app = rpc->app; 2938 2939 port = msg->u.new_port; 2940 2941 nxt_assert(port != NULL); 2942 nxt_assert(port->id == 0); 2943 2944 if (rpc->proto) { 2945 nxt_assert(app->proto_port == NULL); 2946 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); 2947 2948 nxt_port_inc_use(port); 2949 2950 app->proto_port = port; 2951 port->app = app; 2952 2953 nxt_router_app_rpc_create(task, rpc->temp_conf, app); 2954 2955 return; 2956 } 2957 2958 nxt_assert(port->type == NXT_PROCESS_APP); 2959 2960 port->app = app; 2961 port->main_app_port = port; 2962 2963 app->pending_processes--; 2964 app->processes++; 2965 app->idle_processes++; 2966 2967 engine = task->thread->engine; 2968 2969 nxt_queue_insert_tail(&app->ports, &port->app_link); 2970 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2971 2972 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2973 &app->name, port->pid, port->id); 2974 2975 nxt_port_hash_add(&app->port_hash, port); 2976 app->port_hash_count++; 2977 2978 port->idle_start = 0; 2979 2980 nxt_port_inc_use(port); 2981 2982 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 2983 2984 nxt_work_queue_add(&engine->fast_work_queue, 2985 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2986 } 2987 2988 2989 static void 2990 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2991 void *data) 2992 { 2993 nxt_app_t *app; 2994 nxt_app_rpc_t *rpc; 2995 nxt_router_temp_conf_t *tmcf; 2996 2997 rpc = data; 2998 app = rpc->app; 2999 tmcf = rpc->temp_conf; 3000 3001 if (rpc->proto) { 3002 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"", 3003 &app->name); 3004 3005 } else { 3006 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 3007 &app->name); 3008 3009 app->pending_processes--; 3010 } 3011 3012 nxt_router_conf_error(task, tmcf); 3013 } 3014 3015 3016 static nxt_int_t 3017 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 3018 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 3019 { 3020 nxt_int_t ret; 3021 nxt_uint_t n, threads; 3022 nxt_queue_link_t *qlk; 3023 nxt_router_engine_conf_t *recf; 3024 3025 threads = tmcf->router_conf->threads; 3026 3027 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 3028 sizeof(nxt_router_engine_conf_t)); 3029 if (nxt_slow_path(tmcf->engines == NULL)) { 3030 return NXT_ERROR; 3031 } 3032 3033 n = 0; 3034 3035 for (qlk = nxt_queue_first(&router->engines); 3036 qlk != nxt_queue_tail(&router->engines); 3037 qlk = nxt_queue_next(qlk)) 3038 { 3039 recf = nxt_array_zero_add(tmcf->engines); 3040 if (nxt_slow_path(recf == NULL)) { 3041 return NXT_ERROR; 3042 } 3043 3044 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 3045 3046 if (n < threads) { 3047 recf->action = NXT_ROUTER_ENGINE_KEEP; 3048 ret = nxt_router_engine_conf_update(tmcf, recf); 3049 3050 } else { 3051 recf->action = NXT_ROUTER_ENGINE_DELETE; 3052 ret = nxt_router_engine_conf_delete(tmcf, recf); 3053 } 3054 3055 if (nxt_slow_path(ret != NXT_OK)) { 3056 return ret; 3057 } 3058 3059 n++; 3060 } 3061 3062 tmcf->new_threads = n; 3063 3064 while (n < threads) { 3065 recf = nxt_array_zero_add(tmcf->engines); 3066 if (nxt_slow_path(recf == NULL)) { 3067 return NXT_ERROR; 3068 } 3069 3070 recf->action = NXT_ROUTER_ENGINE_ADD; 3071 3072 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 3073 if (nxt_slow_path(recf->engine == NULL)) { 3074 return NXT_ERROR; 3075 } 3076 3077 ret = nxt_router_engine_conf_create(tmcf, recf); 3078 if (nxt_slow_path(ret != NXT_OK)) { 3079 return ret; 3080 } 3081 3082 n++; 3083 } 3084 3085 return NXT_OK; 3086 } 3087 3088 3089 static nxt_int_t 3090 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 3091 nxt_router_engine_conf_t *recf) 3092 { 3093 nxt_int_t ret; 3094 3095 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 3096 nxt_router_listen_socket_create); 3097 if (nxt_slow_path(ret != NXT_OK)) { 3098 return ret; 3099 } 3100 3101 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 3102 nxt_router_listen_socket_create); 3103 if (nxt_slow_path(ret != NXT_OK)) { 3104 return ret; 3105 } 3106 3107 return ret; 3108 } 3109 3110 3111 static nxt_int_t 3112 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 3113 nxt_router_engine_conf_t *recf) 3114 { 3115 nxt_int_t ret; 3116 3117 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 3118 nxt_router_listen_socket_create); 3119 if (nxt_slow_path(ret != NXT_OK)) { 3120 return ret; 3121 } 3122 3123 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 3124 nxt_router_listen_socket_update); 3125 if (nxt_slow_path(ret != NXT_OK)) { 3126 return ret; 3127 } 3128 3129 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3130 if (nxt_slow_path(ret != NXT_OK)) { 3131 return ret; 3132 } 3133 3134 return ret; 3135 } 3136 3137 3138 static nxt_int_t 3139 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 3140 nxt_router_engine_conf_t *recf) 3141 { 3142 nxt_int_t ret; 3143 3144 ret = nxt_router_engine_quit(tmcf, recf); 3145 if (nxt_slow_path(ret != NXT_OK)) { 3146 return ret; 3147 } 3148 3149 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 3150 if (nxt_slow_path(ret != NXT_OK)) { 3151 return ret; 3152 } 3153 3154 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3155 } 3156 3157 3158 static nxt_int_t 3159 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 3160 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 3161 nxt_work_handler_t handler) 3162 { 3163 nxt_int_t ret; 3164 nxt_joint_job_t *job; 3165 nxt_queue_link_t *qlk; 3166 nxt_socket_conf_t *skcf; 3167 nxt_socket_conf_joint_t *joint; 3168 3169 for (qlk = nxt_queue_first(sockets); 3170 qlk != nxt_queue_tail(sockets); 3171 qlk = nxt_queue_next(qlk)) 3172 { 3173 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3174 if (nxt_slow_path(job == NULL)) { 3175 return NXT_ERROR; 3176 } 3177 3178 job->work.next = recf->jobs; 3179 recf->jobs = &job->work; 3180 3181 job->task = tmcf->engine->task; 3182 job->work.handler = handler; 3183 job->work.task = &job->task; 3184 job->work.obj = job; 3185 job->tmcf = tmcf; 3186 3187 tmcf->count++; 3188 3189 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 3190 sizeof(nxt_socket_conf_joint_t)); 3191 if (nxt_slow_path(joint == NULL)) { 3192 return NXT_ERROR; 3193 } 3194 3195 job->work.data = joint; 3196 3197 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 3198 if (nxt_slow_path(ret != NXT_OK)) { 3199 return ret; 3200 } 3201 3202 joint->count = 1; 3203 3204 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3205 skcf->count++; 3206 joint->socket_conf = skcf; 3207 3208 joint->engine = recf->engine; 3209 } 3210 3211 return NXT_OK; 3212 } 3213 3214 3215 static nxt_int_t 3216 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 3217 nxt_router_engine_conf_t *recf) 3218 { 3219 nxt_joint_job_t *job; 3220 3221 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3222 if (nxt_slow_path(job == NULL)) { 3223 return NXT_ERROR; 3224 } 3225 3226 job->work.next = recf->jobs; 3227 recf->jobs = &job->work; 3228 3229 job->task = tmcf->engine->task; 3230 job->work.handler = nxt_router_worker_thread_quit; 3231 job->work.task = &job->task; 3232 job->work.obj = NULL; 3233 job->work.data = NULL; 3234 job->tmcf = NULL; 3235 3236 return NXT_OK; 3237 } 3238 3239 3240 static nxt_int_t 3241 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 3242 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 3243 { 3244 nxt_joint_job_t *job; 3245 nxt_queue_link_t *qlk; 3246 3247 for (qlk = nxt_queue_first(sockets); 3248 qlk != nxt_queue_tail(sockets); 3249 qlk = nxt_queue_next(qlk)) 3250 { 3251 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3252 if (nxt_slow_path(job == NULL)) { 3253 return NXT_ERROR; 3254 } 3255 3256 job->work.next = recf->jobs; 3257 recf->jobs = &job->work; 3258 3259 job->task = tmcf->engine->task; 3260 job->work.handler = nxt_router_listen_socket_delete; 3261 job->work.task = &job->task; 3262 job->work.obj = job; 3263 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3264 job->tmcf = tmcf; 3265 3266 tmcf->count++; 3267 } 3268 3269 return NXT_OK; 3270 } 3271 3272 3273 static nxt_int_t 3274 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 3275 nxt_router_temp_conf_t *tmcf) 3276 { 3277 nxt_int_t ret; 3278 nxt_uint_t i, threads; 3279 nxt_router_engine_conf_t *recf; 3280 3281 recf = tmcf->engines->elts; 3282 threads = tmcf->router_conf->threads; 3283 3284 for (i = tmcf->new_threads; i < threads; i++) { 3285 ret = nxt_router_thread_create(task, rt, recf[i].engine); 3286 if (nxt_slow_path(ret != NXT_OK)) { 3287 return ret; 3288 } 3289 } 3290 3291 return NXT_OK; 3292 } 3293 3294 3295 static nxt_int_t 3296 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 3297 nxt_event_engine_t *engine) 3298 { 3299 nxt_int_t ret; 3300 nxt_thread_link_t *link; 3301 nxt_thread_handle_t handle; 3302 3303 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 3304 3305 if (nxt_slow_path(link == NULL)) { 3306 return NXT_ERROR; 3307 } 3308 3309 link->start = nxt_router_thread_start; 3310 link->engine = engine; 3311 link->work.handler = nxt_router_thread_exit_handler; 3312 link->work.task = task; 3313 link->work.data = link; 3314 3315 nxt_queue_insert_tail(&rt->engines, &engine->link); 3316 3317 ret = nxt_thread_create(&handle, link); 3318 3319 if (nxt_slow_path(ret != NXT_OK)) { 3320 nxt_queue_remove(&engine->link); 3321 } 3322 3323 return ret; 3324 } 3325 3326 3327 static void 3328 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 3329 nxt_router_temp_conf_t *tmcf) 3330 { 3331 nxt_app_t *app; 3332 3333 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 3334 3335 nxt_router_app_unlink(task, app); 3336 3337 } nxt_queue_loop; 3338 3339 nxt_queue_add(&router->apps, &tmcf->previous); 3340 nxt_queue_add(&router->apps, &tmcf->apps); 3341 } 3342 3343 3344 static void 3345 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 3346 { 3347 nxt_uint_t n; 3348 nxt_event_engine_t *engine; 3349 nxt_router_engine_conf_t *recf; 3350 3351 recf = tmcf->engines->elts; 3352 3353 for (n = tmcf->engines->nelts; n != 0; n--) { 3354 engine = recf->engine; 3355 3356 switch (recf->action) { 3357 3358 case NXT_ROUTER_ENGINE_KEEP: 3359 break; 3360 3361 case NXT_ROUTER_ENGINE_ADD: 3362 nxt_queue_insert_tail(&router->engines, &engine->link0); 3363 break; 3364 3365 case NXT_ROUTER_ENGINE_DELETE: 3366 nxt_queue_remove(&engine->link0); 3367 break; 3368 } 3369 3370 nxt_router_engine_post(engine, recf->jobs); 3371 3372 recf++; 3373 } 3374 } 3375 3376 3377 static void 3378 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 3379 { 3380 nxt_work_t *work, *next; 3381 3382 for (work = jobs; work != NULL; work = next) { 3383 next = work->next; 3384 work->next = NULL; 3385 3386 nxt_event_engine_post(engine, work); 3387 } 3388 } 3389 3390 3391 static nxt_port_handlers_t nxt_router_app_port_handlers = { 3392 .rpc_error = nxt_port_rpc_handler, 3393 .mmap = nxt_port_mmap_handler, 3394 .data = nxt_port_rpc_handler, 3395 .oosm = nxt_router_oosm_handler, 3396 .req_headers_ack = nxt_port_rpc_handler, 3397 }; 3398 3399 3400 static void 3401 nxt_router_thread_start(void *data) 3402 { 3403 nxt_int_t ret; 3404 nxt_port_t *port; 3405 nxt_task_t *task; 3406 nxt_work_t *work; 3407 nxt_thread_t *thread; 3408 nxt_thread_link_t *link; 3409 nxt_event_engine_t *engine; 3410 3411 link = data; 3412 engine = link->engine; 3413 task = &engine->task; 3414 3415 thread = nxt_thread(); 3416 3417 nxt_event_engine_thread_adopt(engine); 3418 3419 /* STUB */ 3420 thread->runtime = engine->task.thread->runtime; 3421 3422 engine->task.thread = thread; 3423 engine->task.log = thread->log; 3424 thread->engine = engine; 3425 thread->task = &engine->task; 3426 #if 0 3427 thread->fiber = &engine->fibers->fiber; 3428 #endif 3429 3430 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 3431 if (nxt_slow_path(engine->mem_pool == NULL)) { 3432 return; 3433 } 3434 3435 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 3436 NXT_PROCESS_ROUTER); 3437 if (nxt_slow_path(port == NULL)) { 3438 return; 3439 } 3440 3441 ret = nxt_port_socket_init(task, port, 0); 3442 if (nxt_slow_path(ret != NXT_OK)) { 3443 nxt_port_use(task, port, -1); 3444 return; 3445 } 3446 3447 ret = nxt_router_port_queue_init(task, port); 3448 if (nxt_slow_path(ret != NXT_OK)) { 3449 nxt_port_use(task, port, -1); 3450 return; 3451 } 3452 3453 engine->port = port; 3454 3455 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 3456 3457 work = nxt_zalloc(sizeof(nxt_work_t)); 3458 if (nxt_slow_path(work == NULL)) { 3459 return; 3460 } 3461 3462 work->handler = nxt_router_rt_add_port; 3463 work->task = link->work.task; 3464 work->obj = work; 3465 work->data = port; 3466 3467 nxt_event_engine_post(link->work.task->thread->engine, work); 3468 3469 nxt_event_engine_start(engine); 3470 } 3471 3472 3473 static void 3474 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) 3475 { 3476 nxt_int_t res; 3477 nxt_port_t *port; 3478 nxt_runtime_t *rt; 3479 3480 rt = task->thread->runtime; 3481 port = data; 3482 3483 nxt_free(obj); 3484 3485 res = nxt_port_hash_add(&rt->ports, port); 3486 3487 if (nxt_fast_path(res == NXT_OK)) { 3488 nxt_port_use(task, port, 1); 3489 } 3490 } 3491 3492 3493 static void 3494 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 3495 { 3496 nxt_joint_job_t *job; 3497 nxt_socket_conf_t *skcf; 3498 nxt_listen_event_t *lev; 3499 nxt_listen_socket_t *ls; 3500 nxt_thread_spinlock_t *lock; 3501 nxt_socket_conf_joint_t *joint; 3502 3503 job = obj; 3504 joint = data; 3505 3506 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 3507 3508 skcf = joint->socket_conf; 3509 ls = skcf->listen; 3510 3511 lev = nxt_listen_event(task, ls); 3512 if (nxt_slow_path(lev == NULL)) { 3513 nxt_router_listen_socket_release(task, skcf); 3514 return; 3515 } 3516 3517 lev->socket.data = joint; 3518 3519 lock = &skcf->router_conf->router->lock; 3520 3521 nxt_thread_spin_lock(lock); 3522 ls->count++; 3523 nxt_thread_spin_unlock(lock); 3524 3525 job->work.next = NULL; 3526 job->work.handler = nxt_router_conf_wait; 3527 3528 nxt_event_engine_post(job->tmcf->engine, &job->work); 3529 } 3530 3531 3532 nxt_inline nxt_listen_event_t * 3533 nxt_router_listen_event(nxt_queue_t *listen_connections, 3534 nxt_socket_conf_t *skcf) 3535 { 3536 nxt_socket_t fd; 3537 nxt_queue_link_t *qlk; 3538 nxt_listen_event_t *lev; 3539 3540 fd = skcf->listen->socket; 3541 3542 for (qlk = nxt_queue_first(listen_connections); 3543 qlk != nxt_queue_tail(listen_connections); 3544 qlk = nxt_queue_next(qlk)) 3545 { 3546 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 3547 3548 if (fd == lev->socket.fd) { 3549 return lev; 3550 } 3551 } 3552 3553 return NULL; 3554 } 3555 3556 3557 static void 3558 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 3559 { 3560 nxt_joint_job_t *job; 3561 nxt_event_engine_t *engine; 3562 nxt_listen_event_t *lev; 3563 nxt_socket_conf_joint_t *joint, *old; 3564 3565 job = obj; 3566 joint = data; 3567 3568 engine = task->thread->engine; 3569 3570 nxt_queue_insert_tail(&engine->joints, &joint->link); 3571 3572 lev = nxt_router_listen_event(&engine->listen_connections, 3573 joint->socket_conf); 3574 3575 old = lev->socket.data; 3576 lev->socket.data = joint; 3577 lev->listen = joint->socket_conf->listen; 3578 3579 job->work.next = NULL; 3580 job->work.handler = nxt_router_conf_wait; 3581 3582 nxt_event_engine_post(job->tmcf->engine, &job->work); 3583 3584 /* 3585 * The task is allocated from configuration temporary 3586 * memory pool so it can be freed after engine post operation. 3587 */ 3588 3589 nxt_router_conf_release(&engine->task, old); 3590 } 3591 3592 3593 static void 3594 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 3595 { 3596 nxt_socket_conf_t *skcf; 3597 nxt_listen_event_t *lev; 3598 nxt_event_engine_t *engine; 3599 nxt_socket_conf_joint_t *joint; 3600 3601 skcf = data; 3602 3603 engine = task->thread->engine; 3604 3605 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 3606 3607 nxt_fd_event_delete(engine, &lev->socket); 3608 3609 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 3610 lev->socket.fd); 3611 3612 joint = lev->socket.data; 3613 joint->close_job = obj; 3614 3615 lev->timer.handler = nxt_router_listen_socket_close; 3616 lev->timer.work_queue = &engine->fast_work_queue; 3617 3618 nxt_timer_add(engine, &lev->timer, 0); 3619 } 3620 3621 3622 static void 3623 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 3624 { 3625 nxt_event_engine_t *engine; 3626 3627 nxt_debug(task, "router worker thread quit"); 3628 3629 engine = task->thread->engine; 3630 3631 engine->shutdown = 1; 3632 3633 if (nxt_queue_is_empty(&engine->joints)) { 3634 nxt_thread_exit(task->thread); 3635 } 3636 } 3637 3638 3639 static void 3640 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3641 { 3642 nxt_timer_t *timer; 3643 nxt_joint_job_t *job; 3644 nxt_listen_event_t *lev; 3645 nxt_socket_conf_joint_t *joint; 3646 3647 timer = obj; 3648 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3649 3650 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3651 lev->socket.fd); 3652 3653 nxt_queue_remove(&lev->link); 3654 3655 joint = lev->socket.data; 3656 lev->socket.data = NULL; 3657 3658 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3659 task = &task->thread->engine->task; 3660 3661 nxt_router_listen_socket_release(task, joint->socket_conf); 3662 3663 job = joint->close_job; 3664 job->work.next = NULL; 3665 job->work.handler = nxt_router_conf_wait; 3666 3667 nxt_event_engine_post(job->tmcf->engine, &job->work); 3668 3669 nxt_router_listen_event_release(task, lev, joint); 3670 } 3671 3672 3673 static void 3674 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3675 { 3676 nxt_listen_socket_t *ls; 3677 nxt_thread_spinlock_t *lock; 3678 3679 ls = skcf->listen; 3680 lock = &skcf->router_conf->router->lock; 3681 3682 nxt_thread_spin_lock(lock); 3683 3684 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3685 task->thread->engine, ls->count); 3686 3687 if (--ls->count != 0) { 3688 ls = NULL; 3689 } 3690 3691 nxt_thread_spin_unlock(lock); 3692 3693 if (ls != NULL) { 3694 nxt_socket_close(task, ls->socket); 3695 nxt_free(ls); 3696 } 3697 } 3698 3699 3700 void 3701 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3702 nxt_socket_conf_joint_t *joint) 3703 { 3704 nxt_event_engine_t *engine; 3705 3706 nxt_debug(task, "listen event count: %D", lev->count); 3707 3708 engine = task->thread->engine; 3709 3710 if (--lev->count == 0) { 3711 if (lev->next != NULL) { 3712 nxt_sockaddr_cache_free(engine, lev->next); 3713 3714 nxt_conn_free(task, lev->next); 3715 } 3716 3717 nxt_free(lev); 3718 } 3719 3720 if (joint != NULL) { 3721 nxt_router_conf_release(task, joint); 3722 } 3723 3724 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3725 nxt_thread_exit(task->thread); 3726 } 3727 } 3728 3729 3730 void 3731 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3732 { 3733 nxt_socket_conf_t *skcf; 3734 nxt_router_conf_t *rtcf; 3735 nxt_thread_spinlock_t *lock; 3736 3737 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3738 3739 if (--joint->count != 0) { 3740 return; 3741 } 3742 3743 nxt_queue_remove(&joint->link); 3744 3745 /* 3746 * The joint content can not be safely used after the critical 3747 * section protected by the spinlock because its memory pool may 3748 * be already destroyed by another thread. 3749 */ 3750 skcf = joint->socket_conf; 3751 rtcf = skcf->router_conf; 3752 lock = &rtcf->router->lock; 3753 3754 nxt_thread_spin_lock(lock); 3755 3756 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3757 rtcf, rtcf->count); 3758 3759 if (--skcf->count != 0) { 3760 skcf = NULL; 3761 rtcf = NULL; 3762 3763 } else { 3764 nxt_queue_remove(&skcf->link); 3765 3766 if (--rtcf->count != 0) { 3767 rtcf = NULL; 3768 } 3769 } 3770 3771 nxt_thread_spin_unlock(lock); 3772 3773 #if (NXT_TLS) 3774 if (skcf != NULL && skcf->tls != NULL) { 3775 task->thread->runtime->tls->server_free(task, skcf->tls); 3776 } 3777 #endif 3778 3779 /* TODO remove engine->port */ 3780 3781 if (rtcf != NULL) { 3782 nxt_debug(task, "old router conf is destroyed"); 3783 3784 nxt_router_apps_hash_use(task, rtcf, -1); 3785 3786 nxt_router_access_log_release(task, lock, rtcf->access_log); 3787 3788 nxt_mp_thread_adopt(rtcf->mem_pool); 3789 3790 nxt_mp_destroy(rtcf->mem_pool); 3791 } 3792 } 3793 3794 3795 static void 3796 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 3797 { 3798 nxt_port_t *port; 3799 nxt_thread_link_t *link; 3800 nxt_event_engine_t *engine; 3801 nxt_thread_handle_t handle; 3802 3803 handle = (nxt_thread_handle_t) (uintptr_t) obj; 3804 link = data; 3805 3806 nxt_thread_wait(handle); 3807 3808 engine = link->engine; 3809 3810 nxt_queue_remove(&engine->link); 3811 3812 port = engine->port; 3813 3814 // TODO notify all apps 3815 3816 port->engine = task->thread->engine; 3817 nxt_mp_thread_adopt(port->mem_pool); 3818 nxt_port_use(task, port, -1); 3819 3820 nxt_mp_thread_adopt(engine->mem_pool); 3821 nxt_mp_destroy(engine->mem_pool); 3822 3823 nxt_event_engine_free(engine); 3824 3825 nxt_free(link); 3826 } 3827 3828 3829 static void 3830 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3831 void *data) 3832 { 3833 size_t b_size, count; 3834 nxt_int_t ret; 3835 nxt_app_t *app; 3836 nxt_buf_t *b, *next; 3837 nxt_port_t *app_port; 3838 nxt_unit_field_t *f; 3839 nxt_http_field_t *field; 3840 nxt_http_request_t *r; 3841 nxt_unit_response_t *resp; 3842 nxt_request_rpc_data_t *req_rpc_data; 3843 3844 req_rpc_data = data; 3845 3846 r = req_rpc_data->request; 3847 if (nxt_slow_path(r == NULL)) { 3848 return; 3849 } 3850 3851 if (r->error) { 3852 nxt_request_rpc_data_unlink(task, req_rpc_data); 3853 return; 3854 } 3855 3856 app = req_rpc_data->app; 3857 nxt_assert(app != NULL); 3858 3859 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { 3860 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); 3861 3862 return; 3863 } 3864 3865 b = (msg->size == 0) ? NULL : msg->buf; 3866 3867 if (msg->port_msg.last != 0) { 3868 nxt_debug(task, "router data create last buf"); 3869 3870 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 3871 3872 req_rpc_data->rpc_cancel = 0; 3873 3874 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { 3875 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; 3876 } 3877 3878 nxt_request_rpc_data_unlink(task, req_rpc_data); 3879 3880 } else { 3881 if (app->timeout != 0) { 3882 r->timer.handler = nxt_router_app_timeout; 3883 r->timer_data = req_rpc_data; 3884 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 3885 } 3886 } 3887 3888 if (b == NULL) { 3889 return; 3890 } 3891 3892 if (msg->buf == b) { 3893 /* Disable instant buffer completion/re-using by port. */ 3894 msg->buf = NULL; 3895 } 3896 3897 if (r->header_sent) { 3898 nxt_buf_chain_add(&r->out, b); 3899 nxt_http_request_send_body(task, r, NULL); 3900 3901 } else { 3902 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0; 3903 3904 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) { 3905 nxt_alert(task, "response buffer too small: %z", b_size); 3906 goto fail; 3907 } 3908 3909 resp = (void *) b->mem.pos; 3910 count = (b_size - sizeof(nxt_unit_response_t)) 3911 / sizeof(nxt_unit_field_t); 3912 3913 if (nxt_slow_path(count < resp->fields_count)) { 3914 nxt_alert(task, "response buffer too small for fields count: %D", 3915 resp->fields_count); 3916 goto fail; 3917 } 3918 3919 field = NULL; 3920 3921 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 3922 if (f->skip) { 3923 continue; 3924 } 3925 3926 field = nxt_list_add(r->resp.fields); 3927 3928 if (nxt_slow_path(field == NULL)) { 3929 goto fail; 3930 } 3931 3932 field->hash = f->hash; 3933 field->skip = 0; 3934 field->hopbyhop = 0; 3935 3936 field->name_length = f->name_length; 3937 field->value_length = f->value_length; 3938 field->name = nxt_unit_sptr_get(&f->name); 3939 field->value = nxt_unit_sptr_get(&f->value); 3940 3941 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 3942 if (nxt_slow_path(ret != NXT_OK)) { 3943 goto fail; 3944 } 3945 3946 nxt_debug(task, "header%s: %*s: %*s", 3947 (field->skip ? " skipped" : ""), 3948 (size_t) field->name_length, field->name, 3949 (size_t) field->value_length, field->value); 3950 3951 if (field->skip) { 3952 r->resp.fields->last->nelts--; 3953 } 3954 } 3955 3956 r->status = resp->status; 3957 3958 if (resp->piggyback_content_length != 0) { 3959 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 3960 b->mem.free = b->mem.pos + resp->piggyback_content_length; 3961 3962 } else { 3963 b->mem.pos = b->mem.free; 3964 } 3965 3966 if (nxt_buf_mem_used_size(&b->mem) == 0) { 3967 next = b->next; 3968 b->next = NULL; 3969 3970 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3971 b->completion_handler, task, b, b->parent); 3972 3973 b = next; 3974 } 3975 3976 if (b != NULL) { 3977 nxt_buf_chain_add(&r->out, b); 3978 } 3979 3980 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 3981 3982 if (r->websocket_handshake 3983 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 3984 { 3985 app_port = req_rpc_data->app_port; 3986 if (nxt_slow_path(app_port == NULL)) { 3987 goto fail; 3988 } 3989 3990 nxt_thread_mutex_lock(&app->mutex); 3991 3992 app_port->main_app_port->active_websockets++; 3993 3994 nxt_thread_mutex_unlock(&app->mutex); 3995 3996 nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE); 3997 req_rpc_data->apr_action = NXT_APR_CLOSE; 3998 3999 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 4000 4001 r->state = &nxt_http_websocket; 4002 4003 } else { 4004 r->state = &nxt_http_request_send_state; 4005 } 4006 } 4007 4008 return; 4009 4010 fail: 4011 4012 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4013 4014 nxt_request_rpc_data_unlink(task, req_rpc_data); 4015 } 4016 4017 4018 static void 4019 nxt_router_req_headers_ack_handler(nxt_task_t *task, 4020 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 4021 { 4022 int res; 4023 nxt_app_t *app; 4024 nxt_buf_t *b; 4025 nxt_bool_t start_process, unlinked; 4026 nxt_port_t *app_port, *main_app_port, *idle_port; 4027 nxt_queue_link_t *idle_lnk; 4028 nxt_http_request_t *r; 4029 4030 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 4031 req_rpc_data->stream, 4032 msg->port_msg.pid, msg->port_msg.reply_port); 4033 4034 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 4035 msg->port_msg.pid); 4036 4037 app = req_rpc_data->app; 4038 r = req_rpc_data->request; 4039 4040 start_process = 0; 4041 unlinked = 0; 4042 4043 nxt_thread_mutex_lock(&app->mutex); 4044 4045 if (r->app_link.next != NULL) { 4046 nxt_queue_remove(&r->app_link); 4047 r->app_link.next = NULL; 4048 4049 unlinked = 1; 4050 } 4051 4052 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 4053 msg->port_msg.reply_port); 4054 if (nxt_slow_path(app_port == NULL)) { 4055 nxt_thread_mutex_unlock(&app->mutex); 4056 4057 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4058 4059 if (unlinked) { 4060 nxt_mp_release(r->mem_pool); 4061 } 4062 4063 return; 4064 } 4065 4066 main_app_port = app_port->main_app_port; 4067 4068 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 4069 app->idle_processes--; 4070 4071 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", 4072 &app->name, main_app_port->pid, main_app_port->id, 4073 (main_app_port->idle_start ? "idle_ports" : "spare_ports")); 4074 4075 /* Check port was in 'spare_ports' using idle_start field. */ 4076 if (main_app_port->idle_start == 0 4077 && app->idle_processes >= app->spare_processes) 4078 { 4079 /* 4080 * If there is a vacant space in spare ports, 4081 * move the last idle to spare_ports. 4082 */ 4083 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4084 4085 idle_lnk = nxt_queue_last(&app->idle_ports); 4086 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4087 nxt_queue_remove(idle_lnk); 4088 4089 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4090 4091 idle_port->idle_start = 0; 4092 4093 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4094 "to spare_ports", 4095 &app->name, idle_port->pid, idle_port->id); 4096 } 4097 4098 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4099 app->pending_processes++; 4100 start_process = 1; 4101 } 4102 } 4103 4104 main_app_port->active_requests++; 4105 4106 nxt_port_inc_use(app_port); 4107 4108 nxt_thread_mutex_unlock(&app->mutex); 4109 4110 if (unlinked) { 4111 nxt_mp_release(r->mem_pool); 4112 } 4113 4114 if (start_process) { 4115 nxt_router_start_app_process(task, app); 4116 } 4117 4118 nxt_port_use(task, req_rpc_data->app_port, -1); 4119 4120 req_rpc_data->app_port = app_port; 4121 4122 b = req_rpc_data->msg_info.buf; 4123 4124 if (b != NULL) { 4125 /* First buffer is already sent. Start from second. */ 4126 b = b->next; 4127 4128 req_rpc_data->msg_info.buf->next = NULL; 4129 } 4130 4131 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) { 4132 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, 4133 req_rpc_data->msg_info.body_fd); 4134 4135 if (req_rpc_data->msg_info.body_fd != -1) { 4136 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 4137 } 4138 4139 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 4140 req_rpc_data->msg_info.body_fd, 4141 req_rpc_data->stream, 4142 task->thread->engine->port->id, b); 4143 4144 if (nxt_slow_path(res != NXT_OK)) { 4145 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4146 } 4147 } 4148 4149 if (app->timeout != 0) { 4150 r->timer.handler = nxt_router_app_timeout; 4151 r->timer_data = req_rpc_data; 4152 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4153 } 4154 } 4155 4156 4157 static const nxt_http_request_state_t nxt_http_request_send_state 4158 nxt_aligned(64) = 4159 { 4160 .error_handler = nxt_http_request_error_handler, 4161 }; 4162 4163 4164 static void 4165 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 4166 { 4167 nxt_buf_t *out; 4168 nxt_http_request_t *r; 4169 4170 r = obj; 4171 4172 out = r->out; 4173 4174 if (out != NULL) { 4175 r->out = NULL; 4176 nxt_http_request_send(task, r, out); 4177 } 4178 } 4179 4180 4181 static void 4182 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4183 void *data) 4184 { 4185 nxt_request_rpc_data_t *req_rpc_data; 4186 4187 req_rpc_data = data; 4188 4189 req_rpc_data->rpc_cancel = 0; 4190 4191 /* TODO cancel message and return if cancelled. */ 4192 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 4193 4194 if (req_rpc_data->request != NULL) { 4195 nxt_http_request_error(task, req_rpc_data->request, 4196 NXT_HTTP_SERVICE_UNAVAILABLE); 4197 } 4198 4199 nxt_request_rpc_data_unlink(task, req_rpc_data); 4200 } 4201 4202 4203 static void 4204 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4205 void *data) 4206 { 4207 uint32_t n; 4208 nxt_app_t *app; 4209 nxt_bool_t start_process, restarted; 4210 nxt_port_t *port; 4211 nxt_app_joint_t *app_joint; 4212 nxt_app_joint_rpc_t *app_joint_rpc; 4213 4214 nxt_assert(data != NULL); 4215 4216 app_joint_rpc = data; 4217 app_joint = app_joint_rpc->app_joint; 4218 port = msg->u.new_port; 4219 4220 nxt_assert(app_joint != NULL); 4221 nxt_assert(port != NULL); 4222 nxt_assert(port->id == 0); 4223 4224 app = app_joint->app; 4225 4226 nxt_router_app_joint_use(task, app_joint, -1); 4227 4228 if (nxt_slow_path(app == NULL)) { 4229 nxt_debug(task, "new port ready for released app, send QUIT"); 4230 4231 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4232 4233 return; 4234 } 4235 4236 nxt_thread_mutex_lock(&app->mutex); 4237 4238 restarted = (app->generation != app_joint_rpc->generation); 4239 4240 if (app_joint_rpc->proto) { 4241 nxt_assert(app->proto_port == NULL); 4242 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); 4243 4244 n = app->proto_port_requests; 4245 app->proto_port_requests = 0; 4246 4247 if (nxt_slow_path(restarted)) { 4248 nxt_thread_mutex_unlock(&app->mutex); 4249 4250 nxt_debug(task, "proto port ready for restarted app, send QUIT"); 4251 4252 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, 4253 NULL); 4254 4255 } else { 4256 port->app = app; 4257 app->proto_port = port; 4258 4259 nxt_thread_mutex_unlock(&app->mutex); 4260 4261 nxt_port_use(task, port, 1); 4262 } 4263 4264 port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER]; 4265 4266 while (n > 0) { 4267 nxt_router_app_use(task, app, 1); 4268 4269 nxt_router_start_app_process_handler(task, port, app); 4270 4271 n--; 4272 } 4273 4274 return; 4275 } 4276 4277 nxt_assert(port->type == NXT_PROCESS_APP); 4278 nxt_assert(app->pending_processes != 0); 4279 4280 app->pending_processes--; 4281 4282 if (nxt_slow_path(restarted)) { 4283 nxt_debug(task, "new port ready for restarted app, send QUIT"); 4284 4285 start_process = !task->thread->engine->shutdown 4286 && nxt_router_app_can_start(app) 4287 && nxt_router_app_need_start(app); 4288 4289 if (start_process) { 4290 app->pending_processes++; 4291 } 4292 4293 nxt_thread_mutex_unlock(&app->mutex); 4294 4295 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4296 4297 if (start_process) { 4298 nxt_router_start_app_process(task, app); 4299 } 4300 4301 return; 4302 } 4303 4304 port->app = app; 4305 port->main_app_port = port; 4306 4307 app->processes++; 4308 nxt_port_hash_add(&app->port_hash, port); 4309 app->port_hash_count++; 4310 4311 nxt_thread_mutex_unlock(&app->mutex); 4312 4313 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 4314 &app->name, port->pid, app->processes, app->pending_processes); 4315 4316 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 4317 4318 nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); 4319 } 4320 4321 4322 static void 4323 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4324 void *data) 4325 { 4326 nxt_app_t *app; 4327 nxt_app_joint_t *app_joint; 4328 nxt_queue_link_t *link; 4329 nxt_http_request_t *r; 4330 nxt_app_joint_rpc_t *app_joint_rpc; 4331 4332 nxt_assert(data != NULL); 4333 4334 app_joint_rpc = data; 4335 app_joint = app_joint_rpc->app_joint; 4336 4337 nxt_assert(app_joint != NULL); 4338 4339 app = app_joint->app; 4340 4341 nxt_router_app_joint_use(task, app_joint, -1); 4342 4343 if (nxt_slow_path(app == NULL)) { 4344 nxt_debug(task, "start error for released app"); 4345 4346 return; 4347 } 4348 4349 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4350 4351 link = NULL; 4352 4353 nxt_thread_mutex_lock(&app->mutex); 4354 4355 nxt_assert(app->pending_processes != 0); 4356 4357 app->pending_processes--; 4358 4359 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4360 link = nxt_queue_first(&app->ack_waiting_req); 4361 4362 nxt_queue_remove(link); 4363 link->next = NULL; 4364 } 4365 4366 nxt_thread_mutex_unlock(&app->mutex); 4367 4368 while (link != NULL) { 4369 r = nxt_container_of(link, nxt_http_request_t, app_link); 4370 4371 nxt_event_engine_post(r->engine, &r->err_work); 4372 4373 link = NULL; 4374 4375 nxt_thread_mutex_lock(&app->mutex); 4376 4377 if (app->processes == 0 && app->pending_processes == 0 4378 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4379 { 4380 link = nxt_queue_first(&app->ack_waiting_req); 4381 4382 nxt_queue_remove(link); 4383 link->next = NULL; 4384 } 4385 4386 nxt_thread_mutex_unlock(&app->mutex); 4387 } 4388 } 4389 4390 4391 nxt_inline nxt_port_t * 4392 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) 4393 { 4394 nxt_port_t *port; 4395 4396 port = NULL; 4397 4398 nxt_thread_mutex_lock(&app->mutex); 4399 4400 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 4401 4402 /* Caller is responsible to decrease port use count. */ 4403 nxt_queue_chk_remove(&port->app_link); 4404 4405 if (nxt_queue_chk_remove(&port->idle_link)) { 4406 app->idle_processes--; 4407 4408 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", 4409 &app->name, port->pid, port->id, 4410 (port->idle_start ? "idle_ports" : "spare_ports")); 4411 } 4412 4413 nxt_port_hash_remove(&app->port_hash, port); 4414 app->port_hash_count--; 4415 4416 port->app = NULL; 4417 app->processes--; 4418 4419 break; 4420 4421 } nxt_queue_loop; 4422 4423 nxt_thread_mutex_unlock(&app->mutex); 4424 4425 return port; 4426 } 4427 4428 4429 static void 4430 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4431 { 4432 int c; 4433 4434 c = nxt_atomic_fetch_add(&app->use_count, i); 4435 4436 if (i < 0 && c == -i) { 4437 4438 if (task->thread->engine != app->engine) { 4439 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 4440 4441 } else { 4442 nxt_router_free_app(task, app->joint, NULL); 4443 } 4444 } 4445 } 4446 4447 4448 static void 4449 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 4450 { 4451 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 4452 4453 nxt_queue_remove(&app->link); 4454 4455 nxt_router_app_use(task, app, -1); 4456 } 4457 4458 4459 static void 4460 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port, 4461 nxt_apr_action_t action) 4462 { 4463 int inc_use; 4464 uint32_t got_response, dec_requests; 4465 nxt_bool_t adjust_idle_timer; 4466 nxt_port_t *main_app_port; 4467 4468 nxt_assert(port != NULL); 4469 4470 inc_use = 0; 4471 got_response = 0; 4472 dec_requests = 0; 4473 4474 switch (action) { 4475 case NXT_APR_NEW_PORT: 4476 break; 4477 case NXT_APR_REQUEST_FAILED: 4478 dec_requests = 1; 4479 inc_use = -1; 4480 break; 4481 case NXT_APR_GOT_RESPONSE: 4482 got_response = 1; 4483 inc_use = -1; 4484 break; 4485 case NXT_APR_UPGRADE: 4486 got_response = 1; 4487 break; 4488 case NXT_APR_CLOSE: 4489 inc_use = -1; 4490 break; 4491 } 4492 4493 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, 4494 port->pid, port->id, 4495 (int) inc_use, (int) got_response); 4496 4497 if (port->id == NXT_SHARED_PORT_ID) { 4498 nxt_thread_mutex_lock(&app->mutex); 4499 4500 app->active_requests -= got_response + dec_requests; 4501 4502 nxt_thread_mutex_unlock(&app->mutex); 4503 4504 goto adjust_use; 4505 } 4506 4507 main_app_port = port->main_app_port; 4508 4509 nxt_thread_mutex_lock(&app->mutex); 4510 4511 main_app_port->active_requests -= got_response + dec_requests; 4512 app->active_requests -= got_response + dec_requests; 4513 4514 if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) { 4515 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4516 4517 nxt_port_inc_use(main_app_port); 4518 } 4519 4520 adjust_idle_timer = 0; 4521 4522 if (main_app_port->pair[1] != -1 4523 && main_app_port->active_requests == 0 4524 && main_app_port->active_websockets == 0 4525 && main_app_port->idle_link.next == NULL) 4526 { 4527 if (app->idle_processes == app->spare_processes 4528 && app->adjust_idle_work.data == NULL) 4529 { 4530 adjust_idle_timer = 1; 4531 app->adjust_idle_work.data = app; 4532 app->adjust_idle_work.next = NULL; 4533 } 4534 4535 if (app->idle_processes < app->spare_processes) { 4536 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4537 4538 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", 4539 &app->name, main_app_port->pid, main_app_port->id); 4540 } else { 4541 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4542 4543 main_app_port->idle_start = task->thread->engine->timers.now; 4544 4545 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", 4546 &app->name, main_app_port->pid, main_app_port->id); 4547 } 4548 4549 app->idle_processes++; 4550 } 4551 4552 nxt_thread_mutex_unlock(&app->mutex); 4553 4554 if (adjust_idle_timer) { 4555 nxt_router_app_use(task, app, 1); 4556 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4557 } 4558 4559 /* ? */ 4560 if (main_app_port->pair[1] == -1) { 4561 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4562 &app->name, app, main_app_port, main_app_port->pid); 4563 4564 goto adjust_use; 4565 } 4566 4567 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4568 &app->name, app); 4569 4570 adjust_use: 4571 4572 nxt_port_use(task, port, inc_use); 4573 } 4574 4575 4576 void 4577 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4578 { 4579 nxt_app_t *app; 4580 nxt_bool_t unchain, start_process; 4581 nxt_port_t *idle_port; 4582 nxt_queue_link_t *idle_lnk; 4583 4584 app = port->app; 4585 4586 nxt_assert(app != NULL); 4587 4588 nxt_thread_mutex_lock(&app->mutex); 4589 4590 if (port == app->proto_port) { 4591 app->proto_port = NULL; 4592 port->app = NULL; 4593 4594 nxt_thread_mutex_unlock(&app->mutex); 4595 4596 nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name, 4597 port->pid); 4598 4599 nxt_port_use(task, port, -1); 4600 4601 return; 4602 } 4603 4604 nxt_port_hash_remove(&app->port_hash, port); 4605 app->port_hash_count--; 4606 4607 if (port->id != 0) { 4608 nxt_thread_mutex_unlock(&app->mutex); 4609 4610 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, 4611 port->pid, port->id); 4612 4613 return; 4614 } 4615 4616 unchain = nxt_queue_chk_remove(&port->app_link); 4617 4618 if (nxt_queue_chk_remove(&port->idle_link)) { 4619 app->idle_processes--; 4620 4621 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", 4622 &app->name, port->pid, port->id, 4623 (port->idle_start ? "idle_ports" : "spare_ports")); 4624 4625 if (port->idle_start == 0 4626 && app->idle_processes >= app->spare_processes) 4627 { 4628 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4629 4630 idle_lnk = nxt_queue_last(&app->idle_ports); 4631 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4632 nxt_queue_remove(idle_lnk); 4633 4634 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4635 4636 idle_port->idle_start = 0; 4637 4638 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4639 "to spare_ports", 4640 &app->name, idle_port->pid, idle_port->id); 4641 } 4642 } 4643 4644 app->processes--; 4645 4646 start_process = !task->thread->engine->shutdown 4647 && nxt_router_app_can_start(app) 4648 && nxt_router_app_need_start(app); 4649 4650 if (start_process) { 4651 app->pending_processes++; 4652 } 4653 4654 nxt_thread_mutex_unlock(&app->mutex); 4655 4656 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 4657 4658 if (unchain) { 4659 nxt_port_use(task, port, -1); 4660 } 4661 4662 if (start_process) { 4663 nxt_router_start_app_process(task, app); 4664 } 4665 } 4666 4667 4668 static void 4669 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 4670 { 4671 nxt_app_t *app; 4672 nxt_bool_t queued; 4673 nxt_port_t *port; 4674 nxt_msec_t timeout, threshold; 4675 nxt_queue_link_t *lnk; 4676 nxt_event_engine_t *engine; 4677 4678 app = obj; 4679 queued = (data == app); 4680 4681 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 4682 &app->name, queued); 4683 4684 engine = task->thread->engine; 4685 4686 nxt_assert(app->engine == engine); 4687 4688 threshold = engine->timers.now + app->joint->idle_timer.bias; 4689 timeout = 0; 4690 4691 nxt_thread_mutex_lock(&app->mutex); 4692 4693 if (queued) { 4694 app->adjust_idle_work.data = NULL; 4695 } 4696 4697 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", 4698 &app->name, 4699 (int) app->idle_processes, (int) app->spare_processes); 4700 4701 while (app->idle_processes > app->spare_processes) { 4702 4703 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4704 4705 lnk = nxt_queue_first(&app->idle_ports); 4706 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 4707 4708 timeout = port->idle_start + app->idle_timeout; 4709 4710 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", 4711 &app->name, port->pid, 4712 port->idle_start, timeout, threshold); 4713 4714 if (timeout > threshold) { 4715 break; 4716 } 4717 4718 nxt_queue_remove(lnk); 4719 lnk->next = NULL; 4720 4721 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", 4722 &app->name, port->pid, port->id); 4723 4724 nxt_queue_chk_remove(&port->app_link); 4725 4726 nxt_port_hash_remove(&app->port_hash, port); 4727 app->port_hash_count--; 4728 4729 app->idle_processes--; 4730 app->processes--; 4731 port->app = NULL; 4732 4733 nxt_thread_mutex_unlock(&app->mutex); 4734 4735 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 4736 &app->name, port->pid); 4737 4738 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4739 4740 nxt_port_use(task, port, -1); 4741 4742 nxt_thread_mutex_lock(&app->mutex); 4743 } 4744 4745 nxt_thread_mutex_unlock(&app->mutex); 4746 4747 if (timeout > threshold) { 4748 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 4749 4750 } else { 4751 nxt_timer_disable(engine, &app->joint->idle_timer); 4752 } 4753 4754 if (queued) { 4755 nxt_router_app_use(task, app, -1); 4756 } 4757 } 4758 4759 4760 static void 4761 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 4762 { 4763 nxt_timer_t *timer; 4764 nxt_app_joint_t *app_joint; 4765 4766 timer = obj; 4767 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4768 4769 if (nxt_fast_path(app_joint->app != NULL)) { 4770 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 4771 } 4772 } 4773 4774 4775 static void 4776 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 4777 { 4778 nxt_timer_t *timer; 4779 nxt_app_joint_t *app_joint; 4780 4781 timer = obj; 4782 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4783 4784 nxt_router_app_joint_use(task, app_joint, -1); 4785 } 4786 4787 4788 static void 4789 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 4790 { 4791 nxt_app_t *app; 4792 nxt_port_t *port, *proto_port; 4793 nxt_app_joint_t *app_joint; 4794 4795 app_joint = obj; 4796 app = app_joint->app; 4797 4798 for ( ;; ) { 4799 port = nxt_router_app_get_port_for_quit(task, app); 4800 if (port == NULL) { 4801 break; 4802 } 4803 4804 nxt_port_use(task, port, -1); 4805 } 4806 4807 nxt_thread_mutex_lock(&app->mutex); 4808 4809 for ( ;; ) { 4810 port = nxt_port_hash_retrieve(&app->port_hash); 4811 if (port == NULL) { 4812 break; 4813 } 4814 4815 app->port_hash_count--; 4816 4817 port->app = NULL; 4818 4819 nxt_port_close(task, port); 4820 4821 nxt_port_use(task, port, -1); 4822 } 4823 4824 proto_port = app->proto_port; 4825 4826 if (proto_port != NULL) { 4827 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, 4828 proto_port->pid); 4829 4830 app->proto_port = NULL; 4831 proto_port->app = NULL; 4832 } 4833 4834 nxt_thread_mutex_unlock(&app->mutex); 4835 4836 if (proto_port != NULL) { 4837 nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, 4838 -1, 0, 0, NULL); 4839 4840 nxt_port_close(task, proto_port); 4841 4842 nxt_port_use(task, proto_port, -1); 4843 } 4844 4845 nxt_assert(app->proto_port == NULL); 4846 nxt_assert(app->processes == 0); 4847 nxt_assert(app->active_requests == 0); 4848 nxt_assert(app->port_hash_count == 0); 4849 nxt_assert(app->idle_processes == 0); 4850 nxt_assert(nxt_queue_is_empty(&app->ports)); 4851 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 4852 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 4853 4854 nxt_port_mmaps_destroy(&app->outgoing, 1); 4855 4856 nxt_thread_mutex_destroy(&app->outgoing.mutex); 4857 4858 if (app->shared_port != NULL) { 4859 app->shared_port->app = NULL; 4860 nxt_port_close(task, app->shared_port); 4861 nxt_port_use(task, app->shared_port, -1); 4862 4863 app->shared_port = NULL; 4864 } 4865 4866 nxt_thread_mutex_destroy(&app->mutex); 4867 nxt_mp_destroy(app->mem_pool); 4868 4869 app_joint->app = NULL; 4870 4871 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 4872 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 4873 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 4874 4875 } else { 4876 nxt_router_app_joint_use(task, app_joint, -1); 4877 } 4878 } 4879 4880 4881 static void 4882 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 4883 nxt_request_rpc_data_t *req_rpc_data) 4884 { 4885 nxt_bool_t start_process; 4886 nxt_port_t *port; 4887 nxt_http_request_t *r; 4888 4889 start_process = 0; 4890 4891 nxt_thread_mutex_lock(&app->mutex); 4892 4893 port = app->shared_port; 4894 nxt_port_inc_use(port); 4895 4896 app->active_requests++; 4897 4898 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4899 app->pending_processes++; 4900 start_process = 1; 4901 } 4902 4903 r = req_rpc_data->request; 4904 4905 /* 4906 * Put request into application-wide list to be able to cancel request 4907 * if something goes wrong with application processes. 4908 */ 4909 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 4910 4911 nxt_thread_mutex_unlock(&app->mutex); 4912 4913 /* 4914 * Retain request memory pool while request is linked in ack_waiting_req 4915 * to guarantee request structure memory is accessble. 4916 */ 4917 nxt_mp_retain(r->mem_pool); 4918 4919 req_rpc_data->app_port = port; 4920 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 4921 4922 if (start_process) { 4923 nxt_router_start_app_process(task, app); 4924 } 4925 } 4926 4927 4928 void 4929 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, 4930 nxt_http_action_t *action) 4931 { 4932 nxt_event_engine_t *engine; 4933 nxt_http_app_conf_t *conf; 4934 nxt_request_rpc_data_t *req_rpc_data; 4935 4936 conf = action->u.conf; 4937 engine = task->thread->engine; 4938 4939 r->app_target = conf->target; 4940 4941 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, 4942 nxt_router_response_ready_handler, 4943 nxt_router_response_error_handler, 4944 sizeof(nxt_request_rpc_data_t)); 4945 if (nxt_slow_path(req_rpc_data == NULL)) { 4946 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4947 return; 4948 } 4949 4950 /* 4951 * At this point we have request req_rpc_data allocated and registered 4952 * in port handlers. Need to fixup request memory pool. Counterpart 4953 * release will be called via following call chain: 4954 * nxt_request_rpc_data_unlink() -> 4955 * nxt_router_http_request_release_post() -> 4956 * nxt_router_http_request_release() 4957 */ 4958 nxt_mp_retain(r->mem_pool); 4959 4960 r->timer.task = &engine->task; 4961 r->timer.work_queue = &engine->fast_work_queue; 4962 r->timer.log = engine->task.log; 4963 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 4964 4965 r->engine = engine; 4966 r->err_work.handler = nxt_router_http_request_error; 4967 r->err_work.task = task; 4968 r->err_work.obj = r; 4969 4970 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 4971 req_rpc_data->app = conf->app; 4972 req_rpc_data->msg_info.body_fd = -1; 4973 req_rpc_data->rpc_cancel = 1; 4974 4975 nxt_router_app_use(task, conf->app, 1); 4976 4977 req_rpc_data->request = r; 4978 r->req_rpc_data = req_rpc_data; 4979 4980 if (r->last != NULL) { 4981 r->last->completion_handler = nxt_router_http_request_done; 4982 } 4983 4984 nxt_router_app_port_get(task, conf->app, req_rpc_data); 4985 nxt_router_app_prepare_request(task, req_rpc_data); 4986 } 4987 4988 4989 static void 4990 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 4991 { 4992 nxt_http_request_t *r; 4993 4994 r = obj; 4995 4996 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 4997 4998 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4999 5000 if (r->req_rpc_data != NULL) { 5001 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5002 } 5003 5004 nxt_mp_release(r->mem_pool); 5005 } 5006 5007 5008 static void 5009 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 5010 { 5011 nxt_http_request_t *r; 5012 5013 r = data; 5014 5015 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 5016 5017 if (r->req_rpc_data != NULL) { 5018 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5019 } 5020 5021 nxt_http_request_close_handler(task, r, r->proto.any); 5022 } 5023 5024 5025 static void 5026 nxt_router_app_prepare_request(nxt_task_t *task, 5027 nxt_request_rpc_data_t *req_rpc_data) 5028 { 5029 nxt_app_t *app; 5030 nxt_buf_t *buf, *body; 5031 nxt_int_t res; 5032 nxt_port_t *port, *reply_port; 5033 5034 int notify; 5035 struct { 5036 nxt_port_msg_t pm; 5037 nxt_port_mmap_msg_t mm; 5038 } msg; 5039 5040 5041 app = req_rpc_data->app; 5042 5043 nxt_assert(app != NULL); 5044 5045 port = req_rpc_data->app_port; 5046 5047 nxt_assert(port != NULL); 5048 nxt_assert(port->queue != NULL); 5049 5050 reply_port = task->thread->engine->port; 5051 5052 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, 5053 nxt_app_msg_prefix[app->type]); 5054 if (nxt_slow_path(buf == NULL)) { 5055 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", 5056 req_rpc_data->stream, &app->name); 5057 5058 nxt_http_request_error(task, req_rpc_data->request, 5059 NXT_HTTP_INTERNAL_SERVER_ERROR); 5060 5061 return; 5062 } 5063 5064 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 5065 nxt_buf_used_size(buf), 5066 port->socket.fd); 5067 5068 req_rpc_data->msg_info.buf = buf; 5069 5070 body = req_rpc_data->request->body; 5071 5072 if (body != NULL && nxt_buf_is_file(body)) { 5073 req_rpc_data->msg_info.body_fd = body->file->fd; 5074 5075 body->file->fd = -1; 5076 5077 } else { 5078 req_rpc_data->msg_info.body_fd = -1; 5079 } 5080 5081 msg.pm.stream = req_rpc_data->stream; 5082 msg.pm.pid = reply_port->pid; 5083 msg.pm.reply_port = reply_port->id; 5084 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; 5085 msg.pm.last = 0; 5086 msg.pm.mmap = 1; 5087 msg.pm.nf = 0; 5088 msg.pm.mf = 0; 5089 5090 nxt_port_mmap_handler_t *mmap_handler = buf->parent; 5091 nxt_port_mmap_header_t *hdr = mmap_handler->hdr; 5092 5093 msg.mm.mmap_id = hdr->id; 5094 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); 5095 msg.mm.size = nxt_buf_used_size(buf); 5096 5097 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), 5098 req_rpc_data->stream, ¬ify, 5099 &req_rpc_data->msg_info.tracking_cookie); 5100 if (nxt_fast_path(res == NXT_OK)) { 5101 if (notify != 0) { 5102 (void) nxt_port_socket_write(task, port, 5103 NXT_PORT_MSG_READ_QUEUE, 5104 -1, req_rpc_data->stream, 5105 reply_port->id, NULL); 5106 5107 } else { 5108 nxt_debug(task, "queue is not empty"); 5109 } 5110 5111 buf->is_port_mmap_sent = 1; 5112 buf->mem.pos = buf->mem.free; 5113 5114 } else { 5115 nxt_alert(task, "stream #%uD, app '%V': failed to send app message", 5116 req_rpc_data->stream, &app->name); 5117 5118 nxt_http_request_error(task, req_rpc_data->request, 5119 NXT_HTTP_INTERNAL_SERVER_ERROR); 5120 } 5121 } 5122 5123 5124 struct nxt_fields_iter_s { 5125 nxt_list_part_t *part; 5126 nxt_http_field_t *field; 5127 }; 5128 5129 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 5130 5131 5132 static nxt_http_field_t * 5133 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 5134 { 5135 if (part == NULL) { 5136 return NULL; 5137 } 5138 5139 while (part->nelts == 0) { 5140 part = part->next; 5141 if (part == NULL) { 5142 return NULL; 5143 } 5144 } 5145 5146 i->part = part; 5147 i->field = nxt_list_data(i->part); 5148 5149 return i->field; 5150 } 5151 5152 5153 static nxt_http_field_t * 5154 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 5155 { 5156 return nxt_fields_part_first(nxt_list_part(fields), i); 5157 } 5158 5159 5160 static nxt_http_field_t * 5161 nxt_fields_next(nxt_fields_iter_t *i) 5162 { 5163 nxt_http_field_t *end = nxt_list_data(i->part); 5164 5165 end += i->part->nelts; 5166 i->field++; 5167 5168 if (i->field < end) { 5169 return i->field; 5170 } 5171 5172 return nxt_fields_part_first(i->part->next, i); 5173 } 5174 5175 5176 static nxt_buf_t * 5177 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, 5178 nxt_app_t *app, const nxt_str_t *prefix) 5179 { 5180 void *target_pos, *query_pos; 5181 u_char *pos, *end, *p, c; 5182 size_t fields_count, req_size, size, free_size; 5183 size_t copy_size; 5184 nxt_off_t content_length; 5185 nxt_buf_t *b, *buf, *out, **tail; 5186 nxt_http_field_t *field, *dup; 5187 nxt_unit_field_t *dst_field; 5188 nxt_fields_iter_t iter, dup_iter; 5189 nxt_unit_request_t *req; 5190 5191 req_size = sizeof(nxt_unit_request_t) 5192 + r->method->length + 1 5193 + r->version.length + 1 5194 + r->remote->length + 1 5195 + r->local->length + 1 5196 + r->server_name.length + 1 5197 + r->target.length + 1 5198 + (r->path->start != r->target.start ? r->path->length + 1 : 0); 5199 5200 content_length = r->content_length_n < 0 ? 0 : r->content_length_n; 5201 fields_count = 0; 5202 5203 nxt_list_each(field, r->fields) { 5204 fields_count++; 5205 5206 req_size += field->name_length + prefix->length + 1 5207 + field->value_length + 1; 5208 } nxt_list_loop; 5209 5210 req_size += fields_count * sizeof(nxt_unit_field_t); 5211 5212 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 5213 nxt_alert(task, "headers to big to fit in shared memory (%d)", 5214 (int) req_size); 5215 5216 return NULL; 5217 } 5218 5219 out = nxt_port_mmap_get_buf(task, &app->outgoing, 5220 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); 5221 if (nxt_slow_path(out == NULL)) { 5222 return NULL; 5223 } 5224 5225 req = (nxt_unit_request_t *) out->mem.free; 5226 out->mem.free += req_size; 5227 5228 req->app_target = r->app_target; 5229 5230 req->content_length = content_length; 5231 5232 p = (u_char *) (req->fields + fields_count); 5233 5234 nxt_debug(task, "fields_count=%d", (int) fields_count); 5235 5236 req->method_length = r->method->length; 5237 nxt_unit_sptr_set(&req->method, p); 5238 p = nxt_cpymem(p, r->method->start, r->method->length); 5239 *p++ = '\0'; 5240 5241 req->version_length = r->version.length; 5242 nxt_unit_sptr_set(&req->version, p); 5243 p = nxt_cpymem(p, r->version.start, r->version.length); 5244 *p++ = '\0'; 5245 5246 req->remote_length = r->remote->address_length; 5247 nxt_unit_sptr_set(&req->remote, p); 5248 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote), 5249 r->remote->address_length); 5250 *p++ = '\0'; 5251 5252 req->local_length = r->local->address_length; 5253 nxt_unit_sptr_set(&req->local, p); 5254 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length); 5255 *p++ = '\0'; 5256 5257 req->tls = r->tls; 5258 req->websocket_handshake = r->websocket_handshake; 5259 5260 req->server_name_length = r->server_name.length; 5261 nxt_unit_sptr_set(&req->server_name, p); 5262 p = nxt_cpymem(p, r->server_name.start, r->server_name.length); 5263 *p++ = '\0'; 5264 5265 target_pos = p; 5266 req->target_length = (uint32_t) r->target.length; 5267 nxt_unit_sptr_set(&req->target, p); 5268 p = nxt_cpymem(p, r->target.start, r->target.length); 5269 *p++ = '\0'; 5270 5271 req->path_length = (uint32_t) r->path->length; 5272 if (r->path->start == r->target.start) { 5273 nxt_unit_sptr_set(&req->path, target_pos); 5274 5275 } else { 5276 nxt_unit_sptr_set(&req->path, p); 5277 p = nxt_cpymem(p, r->path->start, r->path->length); 5278 *p++ = '\0'; 5279 } 5280 5281 req->query_length = (uint32_t) r->args->length; 5282 if (r->args->start != NULL) { 5283 query_pos = nxt_pointer_to(target_pos, 5284 r->args->start - r->target.start); 5285 5286 nxt_unit_sptr_set(&req->query, query_pos); 5287 5288 } else { 5289 req->query.offset = 0; 5290 } 5291 5292 req->content_length_field = NXT_UNIT_NONE_FIELD; 5293 req->content_type_field = NXT_UNIT_NONE_FIELD; 5294 req->cookie_field = NXT_UNIT_NONE_FIELD; 5295 req->authorization_field = NXT_UNIT_NONE_FIELD; 5296 5297 dst_field = req->fields; 5298 5299 for (field = nxt_fields_first(r->fields, &iter); 5300 field != NULL; 5301 field = nxt_fields_next(&iter)) 5302 { 5303 if (field->skip) { 5304 continue; 5305 } 5306 5307 dst_field->hash = field->hash; 5308 dst_field->skip = 0; 5309 dst_field->name_length = field->name_length + prefix->length; 5310 dst_field->value_length = field->value_length; 5311 5312 if (field == r->content_length) { 5313 req->content_length_field = dst_field - req->fields; 5314 5315 } else if (field == r->content_type) { 5316 req->content_type_field = dst_field - req->fields; 5317 5318 } else if (field == r->cookie) { 5319 req->cookie_field = dst_field - req->fields; 5320 5321 } else if (field == r->authorization) { 5322 req->authorization_field = dst_field - req->fields; 5323 } 5324 5325 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 5326 (int) field->hash, (int) field->skip, 5327 (int) field->name_length, field->name, 5328 (int) field->value_length, field->value); 5329 5330 if (prefix->length != 0) { 5331 nxt_unit_sptr_set(&dst_field->name, p); 5332 p = nxt_cpymem(p, prefix->start, prefix->length); 5333 5334 end = field->name + field->name_length; 5335 for (pos = field->name; pos < end; pos++) { 5336 c = *pos; 5337 5338 if (c >= 'a' && c <= 'z') { 5339 *p++ = (c & ~0x20); 5340 continue; 5341 } 5342 5343 if (c == '-') { 5344 *p++ = '_'; 5345 continue; 5346 } 5347 5348 *p++ = c; 5349 } 5350 5351 } else { 5352 nxt_unit_sptr_set(&dst_field->name, p); 5353 p = nxt_cpymem(p, field->name, field->name_length); 5354 } 5355 5356 *p++ = '\0'; 5357 5358 nxt_unit_sptr_set(&dst_field->value, p); 5359 p = nxt_cpymem(p, field->value, field->value_length); 5360 5361 if (prefix->length != 0) { 5362 dup_iter = iter; 5363 5364 for (dup = nxt_fields_next(&dup_iter); 5365 dup != NULL; 5366 dup = nxt_fields_next(&dup_iter)) 5367 { 5368 if (dup->name_length != field->name_length 5369 || dup->skip 5370 || dup->hash != field->hash 5371 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 5372 { 5373 continue; 5374 } 5375 5376 p = nxt_cpymem(p, ", ", 2); 5377 p = nxt_cpymem(p, dup->value, dup->value_length); 5378 5379 dst_field->value_length += 2 + dup->value_length; 5380 5381 dup->skip = 1; 5382 } 5383 } 5384 5385 *p++ = '\0'; 5386 5387 dst_field++; 5388 } 5389 5390 req->fields_count = (uint32_t) (dst_field - req->fields); 5391 5392 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 5393 5394 buf = out; 5395 tail = &buf->next; 5396 5397 for (b = r->body; b != NULL; b = b->next) { 5398 size = nxt_buf_mem_used_size(&b->mem); 5399 pos = b->mem.pos; 5400 5401 while (size > 0) { 5402 if (buf == NULL) { 5403 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 5404 5405 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5406 if (nxt_slow_path(buf == NULL)) { 5407 while (out != NULL) { 5408 buf = out->next; 5409 out->next = NULL; 5410 out->completion_handler(task, out, out->parent); 5411 out = buf; 5412 } 5413 return NULL; 5414 } 5415 5416 *tail = buf; 5417 tail = &buf->next; 5418 5419 } else { 5420 free_size = nxt_buf_mem_free_size(&buf->mem); 5421 if (free_size < size 5422 && nxt_port_mmap_increase_buf(task, buf, size, 1) 5423 == NXT_OK) 5424 { 5425 free_size = nxt_buf_mem_free_size(&buf->mem); 5426 } 5427 } 5428 5429 if (free_size > 0) { 5430 copy_size = nxt_min(free_size, size); 5431 5432 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 5433 5434 size -= copy_size; 5435 pos += copy_size; 5436 5437 if (size == 0) { 5438 break; 5439 } 5440 } 5441 5442 buf = NULL; 5443 } 5444 } 5445 5446 return out; 5447 } 5448 5449 5450 static void 5451 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 5452 { 5453 nxt_timer_t *timer; 5454 nxt_http_request_t *r; 5455 nxt_request_rpc_data_t *req_rpc_data; 5456 5457 timer = obj; 5458 5459 nxt_debug(task, "router app timeout"); 5460 5461 r = nxt_timer_data(timer, nxt_http_request_t, timer); 5462 req_rpc_data = r->timer_data; 5463 5464 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5465 5466 nxt_request_rpc_data_unlink(task, req_rpc_data); 5467 } 5468 5469 5470 static void 5471 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5472 { 5473 r->timer.handler = nxt_router_http_request_release; 5474 nxt_timer_add(task->thread->engine, &r->timer, 0); 5475 } 5476 5477 5478 static void 5479 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) 5480 { 5481 nxt_http_request_t *r; 5482 5483 nxt_debug(task, "http request pool release"); 5484 5485 r = nxt_timer_data(obj, nxt_http_request_t, timer); 5486 5487 nxt_mp_release(r->mem_pool); 5488 } 5489 5490 5491 static void 5492 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5493 { 5494 size_t mi; 5495 uint32_t i; 5496 nxt_bool_t ack; 5497 nxt_process_t *process; 5498 nxt_free_map_t *m; 5499 nxt_port_mmap_handler_t *mmap_handler; 5500 5501 nxt_debug(task, "oosm in %PI", msg->port_msg.pid); 5502 5503 process = nxt_runtime_process_find(task->thread->runtime, 5504 msg->port_msg.pid); 5505 if (nxt_slow_path(process == NULL)) { 5506 return; 5507 } 5508 5509 ack = 0; 5510 5511 /* 5512 * To mitigate possible racing condition (when OOSM message received 5513 * after some of the memory was already freed), need to try to find 5514 * first free segment in shared memory and send ACK if found. 5515 */ 5516 5517 nxt_thread_mutex_lock(&process->incoming.mutex); 5518 5519 for (i = 0; i < process->incoming.size; i++) { 5520 mmap_handler = process->incoming.elts[i].mmap_handler; 5521 5522 if (nxt_slow_path(mmap_handler == NULL)) { 5523 continue; 5524 } 5525 5526 m = mmap_handler->hdr->free_map; 5527 5528 for (mi = 0; mi < MAX_FREE_IDX; mi++) { 5529 if (m[mi] != 0) { 5530 ack = 1; 5531 5532 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", 5533 i, mi, m[mi]); 5534 5535 break; 5536 } 5537 } 5538 } 5539 5540 nxt_thread_mutex_unlock(&process->incoming.mutex); 5541 5542 if (ack) { 5543 nxt_process_broadcast_shm_ack(task, process); 5544 } 5545 } 5546 5547 5548 static void 5549 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5550 { 5551 nxt_fd_t fd; 5552 nxt_port_t *port; 5553 nxt_runtime_t *rt; 5554 nxt_port_mmaps_t *mmaps; 5555 nxt_port_msg_get_mmap_t *get_mmap_msg; 5556 nxt_port_mmap_handler_t *mmap_handler; 5557 5558 rt = task->thread->runtime; 5559 5560 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5561 msg->port_msg.reply_port); 5562 if (nxt_slow_path(port == NULL)) { 5563 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", 5564 msg->port_msg.pid, msg->port_msg.reply_port); 5565 5566 return; 5567 } 5568 5569 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5570 < (int) sizeof(nxt_port_msg_get_mmap_t))) 5571 { 5572 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", 5573 (int) nxt_buf_used_size(msg->buf)); 5574 5575 return; 5576 } 5577 5578 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; 5579 5580 nxt_assert(port->type == NXT_PROCESS_APP); 5581 5582 if (nxt_slow_path(port->app == NULL)) { 5583 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", 5584 port->pid, port->id); 5585 5586 // FIXME 5587 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5588 -1, msg->port_msg.stream, 0, NULL); 5589 5590 return; 5591 } 5592 5593 mmaps = &port->app->outgoing; 5594 nxt_thread_mutex_lock(&mmaps->mutex); 5595 5596 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { 5597 nxt_thread_mutex_unlock(&mmaps->mutex); 5598 5599 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", 5600 (int) get_mmap_msg->id); 5601 5602 // FIXME 5603 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5604 -1, msg->port_msg.stream, 0, NULL); 5605 return; 5606 } 5607 5608 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; 5609 5610 fd = mmap_handler->fd; 5611 5612 nxt_thread_mutex_unlock(&mmaps->mutex); 5613 5614 nxt_debug(task, "get mmap %PI:%d found", 5615 msg->port_msg.pid, (int) get_mmap_msg->id); 5616 5617 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 5618 } 5619 5620 5621 static void 5622 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5623 { 5624 nxt_port_t *port, *reply_port; 5625 nxt_runtime_t *rt; 5626 nxt_port_msg_get_port_t *get_port_msg; 5627 5628 rt = task->thread->runtime; 5629 5630 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5631 msg->port_msg.reply_port); 5632 if (nxt_slow_path(reply_port == NULL)) { 5633 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", 5634 msg->port_msg.pid, msg->port_msg.reply_port); 5635 5636 return; 5637 } 5638 5639 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5640 < (int) sizeof(nxt_port_msg_get_port_t))) 5641 { 5642 nxt_alert(task, "get_port_handler: message buffer too small (%d)", 5643 (int) nxt_buf_used_size(msg->buf)); 5644 5645 return; 5646 } 5647 5648 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; 5649 5650 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); 5651 if (nxt_slow_path(port == NULL)) { 5652 nxt_alert(task, "get_port_handler: port %PI:%d not found", 5653 get_port_msg->pid, get_port_msg->id); 5654 5655 return; 5656 } 5657 5658 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, 5659 get_port_msg->id); 5660 5661 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); 5662 } 5663