1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 #include <nxt_app_queue.h> 19 #include <nxt_port_queue.h> 20 21 #define NXT_SHARED_PORT_ID 0xFFFFu 22 23 typedef struct { 24 nxt_str_t type; 25 uint32_t processes; 26 uint32_t max_processes; 27 uint32_t spare_processes; 28 nxt_msec_t timeout; 29 nxt_msec_t idle_timeout; 30 nxt_conf_value_t *limits_value; 31 nxt_conf_value_t *processes_value; 32 nxt_conf_value_t *targets_value; 33 } nxt_router_app_conf_t; 34 35 36 typedef struct { 37 nxt_str_t pass; 38 nxt_str_t application; 39 } nxt_router_listener_conf_t; 40 41 42 #if (NXT_TLS) 43 44 typedef struct { 45 nxt_str_t name; 46 nxt_socket_conf_t *socket_conf; 47 nxt_router_temp_conf_t *temp_conf; 48 nxt_tls_init_t *tls_init; 49 nxt_bool_t last; 50 51 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 52 } nxt_router_tlssock_t; 53 54 #endif 55 56 57 typedef struct { 58 nxt_str_t *name; 59 nxt_socket_conf_t *socket_conf; 60 nxt_router_temp_conf_t *temp_conf; 61 nxt_bool_t last; 62 } nxt_socket_rpc_t; 63 64 65 typedef struct { 66 nxt_app_t *app; 67 nxt_router_temp_conf_t *temp_conf; 68 uint8_t proto; /* 1 bit */ 69 } nxt_app_rpc_t; 70 71 72 typedef struct { 73 nxt_app_joint_t *app_joint; 74 uint32_t generation; 75 uint8_t proto; /* 1 bit */ 76 } nxt_app_joint_rpc_t; 77 78 79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 80 nxt_mp_t *mp); 81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 82 static void nxt_router_greet_controller(nxt_task_t *task, 83 nxt_port_t *controller_port); 84 85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 86 87 static void nxt_router_new_port_handler(nxt_task_t *task, 88 nxt_port_recv_msg_t *msg); 89 static void nxt_router_conf_data_handler(nxt_task_t *task, 90 nxt_port_recv_msg_t *msg); 91 static void nxt_router_app_restart_handler(nxt_task_t *task, 92 nxt_port_recv_msg_t *msg); 93 static void nxt_router_remove_pid_handler(nxt_task_t *task, 94 nxt_port_recv_msg_t *msg); 95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task, 96 nxt_port_recv_msg_t *msg); 97 98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 100 static void nxt_router_conf_ready(nxt_task_t *task, 101 nxt_router_temp_conf_t *tmcf); 102 static void nxt_router_conf_error(nxt_task_t *task, 103 nxt_router_temp_conf_t *tmcf); 104 static void nxt_router_conf_send(nxt_task_t *task, 105 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 106 107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 108 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 110 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 111 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task, 112 nxt_mp_t *mp, nxt_conf_value_t *conf); 113 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp, 114 nxt_conf_value_t *conf, nxt_http_forward_header_t *fh); 115 116 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 117 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 118 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 119 nxt_app_t *app); 120 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 121 nxt_str_t *name); 122 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 123 int i); 124 125 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 126 nxt_port_t *port); 127 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 128 nxt_port_t *port); 129 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 130 nxt_port_t *port, nxt_fd_t fd); 131 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 132 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 133 static void nxt_router_listen_socket_ready(nxt_task_t *task, 134 nxt_port_recv_msg_t *msg, void *data); 135 static void nxt_router_listen_socket_error(nxt_task_t *task, 136 nxt_port_recv_msg_t *msg, void *data); 137 #if (NXT_TLS) 138 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 139 nxt_port_recv_msg_t *msg, void *data); 140 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 141 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init, 142 nxt_bool_t last); 143 #endif 144 static void nxt_router_app_rpc_create(nxt_task_t *task, 145 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 146 static void nxt_router_app_prefork_ready(nxt_task_t *task, 147 nxt_port_recv_msg_t *msg, void *data); 148 static void nxt_router_app_prefork_error(nxt_task_t *task, 149 nxt_port_recv_msg_t *msg, void *data); 150 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 151 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 152 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 153 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 154 155 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 156 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 157 const nxt_event_interface_t *interface); 158 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 159 nxt_router_engine_conf_t *recf); 160 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 161 nxt_router_engine_conf_t *recf); 162 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 163 nxt_router_engine_conf_t *recf); 164 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 165 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 166 nxt_work_handler_t handler); 167 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 168 nxt_router_engine_conf_t *recf); 169 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 170 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 171 172 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 173 nxt_router_temp_conf_t *tmcf); 174 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 175 nxt_event_engine_t *engine); 176 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 177 nxt_router_temp_conf_t *tmcf); 178 179 static void nxt_router_engines_post(nxt_router_t *router, 180 nxt_router_temp_conf_t *tmcf); 181 static void nxt_router_engine_post(nxt_event_engine_t *engine, 182 nxt_work_t *jobs); 183 184 static void nxt_router_thread_start(void *data); 185 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 186 void *data); 187 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 188 void *data); 189 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 190 void *data); 191 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 192 void *data); 193 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 194 void *data); 195 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 196 void *data); 197 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 198 void *data); 199 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 200 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 201 static void nxt_router_listen_socket_release(nxt_task_t *task, 202 nxt_socket_conf_t *skcf); 203 204 static void nxt_router_access_log_writer(nxt_task_t *task, 205 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 206 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 207 struct tm *tm, size_t size, const char *format); 208 static void nxt_router_access_log_open(nxt_task_t *task, 209 nxt_router_temp_conf_t *tmcf); 210 static void nxt_router_access_log_ready(nxt_task_t *task, 211 nxt_port_recv_msg_t *msg, void *data); 212 static void nxt_router_access_log_error(nxt_task_t *task, 213 nxt_port_recv_msg_t *msg, void *data); 214 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock, 215 nxt_router_access_log_t *access_log); 216 static void nxt_router_access_log_release(nxt_task_t *task, 217 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 218 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 219 void *data); 220 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 221 nxt_port_recv_msg_t *msg, void *data); 222 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 223 nxt_port_recv_msg_t *msg, void *data); 224 225 static void nxt_router_app_port_ready(nxt_task_t *task, 226 nxt_port_recv_msg_t *msg, void *data); 227 static void nxt_router_app_port_error(nxt_task_t *task, 228 nxt_port_recv_msg_t *msg, void *data); 229 230 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 231 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 232 233 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, 234 nxt_port_t *port, nxt_apr_action_t action); 235 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 236 nxt_request_rpc_data_t *req_rpc_data); 237 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 238 void *data); 239 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 240 void *data); 241 242 static void nxt_router_app_prepare_request(nxt_task_t *task, 243 nxt_request_rpc_data_t *req_rpc_data); 244 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 245 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 246 247 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 248 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 249 void *data); 250 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 251 void *data); 252 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 253 void *data); 254 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 255 256 static const nxt_http_request_state_t nxt_http_request_send_state; 257 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 258 259 static void nxt_router_app_joint_use(nxt_task_t *task, 260 nxt_app_joint_t *app_joint, int i); 261 262 static void nxt_router_http_request_release_post(nxt_task_t *task, 263 nxt_http_request_t *r); 264 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 265 void *data); 266 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 267 static void nxt_router_get_port_handler(nxt_task_t *task, 268 nxt_port_recv_msg_t *msg); 269 static void nxt_router_get_mmap_handler(nxt_task_t *task, 270 nxt_port_recv_msg_t *msg); 271 272 extern const nxt_http_request_state_t nxt_http_websocket; 273 274 static nxt_router_t *nxt_router; 275 276 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 277 static const nxt_str_t empty_prefix = nxt_string(""); 278 279 static const nxt_str_t *nxt_app_msg_prefix[] = { 280 &empty_prefix, 281 &empty_prefix, 282 &http_prefix, 283 &http_prefix, 284 &http_prefix, 285 &empty_prefix, 286 }; 287 288 289 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 290 .quit = nxt_signal_quit_handler, 291 .new_port = nxt_router_new_port_handler, 292 .get_port = nxt_router_get_port_handler, 293 .change_file = nxt_port_change_log_file_handler, 294 .mmap = nxt_port_mmap_handler, 295 .get_mmap = nxt_router_get_mmap_handler, 296 .data = nxt_router_conf_data_handler, 297 .app_restart = nxt_router_app_restart_handler, 298 .remove_pid = nxt_router_remove_pid_handler, 299 .access_log = nxt_router_access_log_reopen_handler, 300 .rpc_ready = nxt_port_rpc_handler, 301 .rpc_error = nxt_port_rpc_handler, 302 .oosm = nxt_router_oosm_handler, 303 }; 304 305 306 const nxt_process_init_t nxt_router_process = { 307 .name = "router", 308 .type = NXT_PROCESS_ROUTER, 309 .prefork = nxt_router_prefork, 310 .restart = 1, 311 .setup = nxt_process_core_setup, 312 .start = nxt_router_start, 313 .port_handlers = &nxt_router_process_port_handlers, 314 .signals = nxt_process_signals, 315 }; 316 317 318 /* Queues of nxt_socket_conf_t */ 319 nxt_queue_t creating_sockets; 320 nxt_queue_t pending_sockets; 321 nxt_queue_t updating_sockets; 322 nxt_queue_t keeping_sockets; 323 nxt_queue_t deleting_sockets; 324 325 326 static nxt_int_t 327 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 328 { 329 nxt_runtime_stop_app_processes(task, task->thread->runtime); 330 331 return NXT_OK; 332 } 333 334 335 static nxt_int_t 336 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 337 { 338 nxt_int_t ret; 339 nxt_port_t *controller_port; 340 nxt_router_t *router; 341 nxt_runtime_t *rt; 342 343 rt = task->thread->runtime; 344 345 nxt_log(task, NXT_LOG_INFO, "router started"); 346 347 #if (NXT_TLS) 348 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 349 if (nxt_slow_path(rt->tls == NULL)) { 350 return NXT_ERROR; 351 } 352 353 ret = rt->tls->library_init(task); 354 if (nxt_slow_path(ret != NXT_OK)) { 355 return ret; 356 } 357 #endif 358 359 ret = nxt_http_init(task); 360 if (nxt_slow_path(ret != NXT_OK)) { 361 return ret; 362 } 363 364 router = nxt_zalloc(sizeof(nxt_router_t)); 365 if (nxt_slow_path(router == NULL)) { 366 return NXT_ERROR; 367 } 368 369 nxt_queue_init(&router->engines); 370 nxt_queue_init(&router->sockets); 371 nxt_queue_init(&router->apps); 372 373 nxt_router = router; 374 375 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 376 if (controller_port != NULL) { 377 nxt_router_greet_controller(task, controller_port); 378 } 379 380 return NXT_OK; 381 } 382 383 384 static void 385 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 386 { 387 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 388 -1, 0, 0, NULL); 389 } 390 391 392 static void 393 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 394 void *data) 395 { 396 size_t size; 397 uint32_t stream; 398 nxt_fd_t port_fd, queue_fd; 399 nxt_int_t ret; 400 nxt_app_t *app; 401 nxt_buf_t *b; 402 nxt_port_t *dport; 403 nxt_runtime_t *rt; 404 nxt_app_joint_rpc_t *app_joint_rpc; 405 406 app = data; 407 408 nxt_thread_mutex_lock(&app->mutex); 409 410 dport = app->proto_port; 411 412 nxt_thread_mutex_unlock(&app->mutex); 413 414 if (dport != NULL) { 415 nxt_debug(task, "app '%V' %p start process", &app->name, app); 416 417 b = NULL; 418 port_fd = -1; 419 queue_fd = -1; 420 421 } else { 422 if (app->proto_port_requests > 0) { 423 nxt_debug(task, "app '%V' %p wait for prototype process", 424 &app->name, app); 425 426 app->proto_port_requests++; 427 428 goto skip; 429 } 430 431 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app); 432 433 rt = task->thread->runtime; 434 dport = rt->port_by_type[NXT_PROCESS_MAIN]; 435 436 size = app->name.length + 1 + app->conf.length; 437 438 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0); 439 if (nxt_slow_path(b == NULL)) { 440 goto failed; 441 } 442 443 nxt_buf_cpystr(b, &app->name); 444 *b->mem.free++ = '\0'; 445 nxt_buf_cpystr(b, &app->conf); 446 447 port_fd = app->shared_port->pair[0]; 448 queue_fd = app->shared_port->queue_fd; 449 } 450 451 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, 452 nxt_router_app_port_ready, 453 nxt_router_app_port_error, 454 sizeof(nxt_app_joint_rpc_t)); 455 if (nxt_slow_path(app_joint_rpc == NULL)) { 456 goto failed; 457 } 458 459 stream = nxt_port_rpc_ex_stream(app_joint_rpc); 460 461 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, 462 port_fd, queue_fd, stream, port->id, b); 463 if (nxt_slow_path(ret != NXT_OK)) { 464 nxt_port_rpc_cancel(task, port, stream); 465 466 goto failed; 467 } 468 469 app_joint_rpc->app_joint = app->joint; 470 app_joint_rpc->generation = app->generation; 471 app_joint_rpc->proto = (b != NULL); 472 473 if (b != NULL) { 474 app->proto_port_requests++; 475 476 b = NULL; 477 } 478 479 nxt_router_app_joint_use(task, app->joint, 1); 480 481 failed: 482 483 if (b != NULL) { 484 nxt_mp_free(b->data, b); 485 } 486 487 skip: 488 489 nxt_router_app_use(task, app, -1); 490 } 491 492 493 static void 494 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 495 { 496 app_joint->use_count += i; 497 498 if (app_joint->use_count == 0) { 499 nxt_assert(app_joint->app == NULL); 500 501 nxt_free(app_joint); 502 } 503 } 504 505 506 static nxt_int_t 507 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 508 { 509 nxt_int_t res; 510 nxt_port_t *router_port; 511 nxt_runtime_t *rt; 512 513 nxt_debug(task, "app '%V' start process", &app->name); 514 515 rt = task->thread->runtime; 516 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 517 518 nxt_router_app_use(task, app, 1); 519 520 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 521 app); 522 523 if (res == NXT_OK) { 524 return res; 525 } 526 527 nxt_thread_mutex_lock(&app->mutex); 528 529 app->pending_processes--; 530 531 nxt_thread_mutex_unlock(&app->mutex); 532 533 nxt_router_app_use(task, app, -1); 534 535 return NXT_ERROR; 536 } 537 538 539 nxt_inline nxt_bool_t 540 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 541 { 542 nxt_buf_t *b, *next; 543 nxt_bool_t cancelled; 544 nxt_port_t *app_port; 545 nxt_msg_info_t *msg_info; 546 547 msg_info = &req_rpc_data->msg_info; 548 549 if (msg_info->buf == NULL) { 550 return 0; 551 } 552 553 app_port = req_rpc_data->app_port; 554 555 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) { 556 cancelled = nxt_app_queue_cancel(app_port->queue, 557 msg_info->tracking_cookie, 558 req_rpc_data->stream); 559 560 if (cancelled) { 561 nxt_debug(task, "stream #%uD: cancelled by router", 562 req_rpc_data->stream); 563 } 564 565 } else { 566 cancelled = 0; 567 } 568 569 for (b = msg_info->buf; b != NULL; b = next) { 570 next = b->next; 571 b->next = NULL; 572 573 if (b->is_port_mmap_sent) { 574 b->is_port_mmap_sent = cancelled == 0; 575 } 576 577 b->completion_handler(task, b, b->parent); 578 } 579 580 msg_info->buf = NULL; 581 582 return cancelled; 583 } 584 585 586 nxt_inline nxt_bool_t 587 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 588 { 589 if (lnk->next != NULL) { 590 nxt_queue_remove(lnk); 591 592 lnk->next = NULL; 593 594 return 1; 595 } 596 597 return 0; 598 } 599 600 601 nxt_inline void 602 nxt_request_rpc_data_unlink(nxt_task_t *task, 603 nxt_request_rpc_data_t *req_rpc_data) 604 { 605 nxt_app_t *app; 606 nxt_bool_t unlinked; 607 nxt_http_request_t *r; 608 609 nxt_router_msg_cancel(task, req_rpc_data); 610 611 app = req_rpc_data->app; 612 613 if (req_rpc_data->app_port != NULL) { 614 nxt_router_app_port_release(task, app, req_rpc_data->app_port, 615 req_rpc_data->apr_action); 616 617 req_rpc_data->app_port = NULL; 618 } 619 620 r = req_rpc_data->request; 621 622 if (r != NULL) { 623 r->timer_data = NULL; 624 625 nxt_router_http_request_release_post(task, r); 626 627 r->req_rpc_data = NULL; 628 req_rpc_data->request = NULL; 629 630 if (app != NULL) { 631 unlinked = 0; 632 633 nxt_thread_mutex_lock(&app->mutex); 634 635 if (r->app_link.next != NULL) { 636 nxt_queue_remove(&r->app_link); 637 r->app_link.next = NULL; 638 639 unlinked = 1; 640 } 641 642 nxt_thread_mutex_unlock(&app->mutex); 643 644 if (unlinked) { 645 nxt_mp_release(r->mem_pool); 646 } 647 } 648 } 649 650 if (app != NULL) { 651 nxt_router_app_use(task, app, -1); 652 653 req_rpc_data->app = NULL; 654 } 655 656 if (req_rpc_data->msg_info.body_fd != -1) { 657 nxt_fd_close(req_rpc_data->msg_info.body_fd); 658 659 req_rpc_data->msg_info.body_fd = -1; 660 } 661 662 if (req_rpc_data->rpc_cancel) { 663 req_rpc_data->rpc_cancel = 0; 664 665 nxt_port_rpc_cancel(task, task->thread->engine->port, 666 req_rpc_data->stream); 667 } 668 } 669 670 671 static void 672 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 673 { 674 nxt_int_t res; 675 nxt_app_t *app; 676 nxt_port_t *port, *main_app_port; 677 nxt_runtime_t *rt; 678 679 nxt_port_new_port_handler(task, msg); 680 681 port = msg->u.new_port; 682 683 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 684 nxt_router_greet_controller(task, msg->u.new_port); 685 } 686 687 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) { 688 nxt_port_rpc_handler(task, msg); 689 690 return; 691 } 692 693 if (port == NULL || port->type != NXT_PROCESS_APP) { 694 695 if (msg->port_msg.stream == 0) { 696 return; 697 } 698 699 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 700 701 } else { 702 if (msg->fd[1] != -1) { 703 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 704 if (nxt_slow_path(res != NXT_OK)) { 705 return; 706 } 707 708 nxt_fd_close(msg->fd[1]); 709 msg->fd[1] = -1; 710 } 711 } 712 713 if (msg->port_msg.stream != 0) { 714 nxt_port_rpc_handler(task, msg); 715 return; 716 } 717 718 nxt_debug(task, "new port id %d (%d)", port->id, port->type); 719 720 /* 721 * Port with "id == 0" is application 'main' port and it always 722 * should come with non-zero stream. 723 */ 724 nxt_assert(port->id != 0); 725 726 /* Find 'main' app port and get app reference. */ 727 rt = task->thread->runtime; 728 729 /* 730 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 731 * sent to main port (with id == 0) and processed in main thread. 732 */ 733 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 734 nxt_assert(main_app_port != NULL); 735 736 app = main_app_port->app; 737 738 if (nxt_fast_path(app != NULL)) { 739 nxt_thread_mutex_lock(&app->mutex); 740 741 /* TODO here should be find-and-add code because there can be 742 port waiters in port_hash */ 743 nxt_port_hash_add(&app->port_hash, port); 744 app->port_hash_count++; 745 746 nxt_thread_mutex_unlock(&app->mutex); 747 748 port->app = app; 749 } 750 751 port->main_app_port = main_app_port; 752 753 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 754 } 755 756 757 static void 758 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 759 { 760 void *p; 761 size_t size; 762 nxt_int_t ret; 763 nxt_port_t *port; 764 nxt_router_temp_conf_t *tmcf; 765 766 port = nxt_runtime_port_find(task->thread->runtime, 767 msg->port_msg.pid, 768 msg->port_msg.reply_port); 769 if (nxt_slow_path(port == NULL)) { 770 nxt_alert(task, "conf_data_handler: reply port not found"); 771 return; 772 } 773 774 p = MAP_FAILED; 775 776 /* 777 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 778 * initialized in 'cleanup' section. 779 */ 780 size = 0; 781 782 tmcf = nxt_router_temp_conf(task); 783 if (nxt_slow_path(tmcf == NULL)) { 784 goto fail; 785 } 786 787 if (nxt_slow_path(msg->fd[0] == -1)) { 788 nxt_alert(task, "conf_data_handler: invalid shm fd"); 789 goto fail; 790 } 791 792 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 793 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 794 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 795 goto fail; 796 } 797 798 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 799 800 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 801 802 nxt_fd_close(msg->fd[0]); 803 msg->fd[0] = -1; 804 805 if (nxt_slow_path(p == MAP_FAILED)) { 806 goto fail; 807 } 808 809 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 810 811 tmcf->router_conf->router = nxt_router; 812 tmcf->stream = msg->port_msg.stream; 813 tmcf->port = port; 814 815 nxt_port_use(task, tmcf->port, 1); 816 817 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 818 819 if (nxt_fast_path(ret == NXT_OK)) { 820 nxt_router_conf_apply(task, tmcf, NULL); 821 822 } else { 823 nxt_router_conf_error(task, tmcf); 824 } 825 826 goto cleanup; 827 828 fail: 829 830 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 831 msg->port_msg.stream, 0, NULL); 832 833 if (tmcf != NULL) { 834 nxt_mp_release(tmcf->mem_pool); 835 } 836 837 cleanup: 838 839 if (p != MAP_FAILED) { 840 nxt_mem_munmap(p, size); 841 } 842 843 if (msg->fd[0] != -1) { 844 nxt_fd_close(msg->fd[0]); 845 msg->fd[0] = -1; 846 } 847 } 848 849 850 static void 851 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 852 { 853 nxt_app_t *app; 854 nxt_int_t ret; 855 nxt_str_t app_name; 856 nxt_port_t *reply_port, *shared_port, *old_shared_port; 857 nxt_port_t *proto_port; 858 nxt_port_msg_type_t reply; 859 860 reply_port = nxt_runtime_port_find(task->thread->runtime, 861 msg->port_msg.pid, 862 msg->port_msg.reply_port); 863 if (nxt_slow_path(reply_port == NULL)) { 864 nxt_alert(task, "app_restart_handler: reply port not found"); 865 return; 866 } 867 868 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem); 869 app_name.start = msg->buf->mem.pos; 870 871 nxt_debug(task, "app_restart_handler: %V", &app_name); 872 873 app = nxt_router_app_find(&nxt_router->apps, &app_name); 874 875 if (nxt_fast_path(app != NULL)) { 876 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 877 NXT_PROCESS_APP); 878 if (nxt_slow_path(shared_port == NULL)) { 879 goto fail; 880 } 881 882 ret = nxt_port_socket_init(task, shared_port, 0); 883 if (nxt_slow_path(ret != NXT_OK)) { 884 nxt_port_use(task, shared_port, -1); 885 goto fail; 886 } 887 888 ret = nxt_router_app_queue_init(task, shared_port); 889 if (nxt_slow_path(ret != NXT_OK)) { 890 nxt_port_write_close(shared_port); 891 nxt_port_read_close(shared_port); 892 nxt_port_use(task, shared_port, -1); 893 goto fail; 894 } 895 896 nxt_port_write_enable(task, shared_port); 897 898 nxt_thread_mutex_lock(&app->mutex); 899 900 proto_port = app->proto_port; 901 902 if (proto_port != NULL) { 903 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, 904 proto_port->pid); 905 906 app->proto_port = NULL; 907 proto_port->app = NULL; 908 } 909 910 app->generation++; 911 912 shared_port->app = app; 913 914 old_shared_port = app->shared_port; 915 old_shared_port->app = NULL; 916 917 app->shared_port = shared_port; 918 919 nxt_thread_mutex_unlock(&app->mutex); 920 921 nxt_port_close(task, old_shared_port); 922 nxt_port_use(task, old_shared_port, -1); 923 924 if (proto_port != NULL) { 925 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, 926 -1, 0, 0, NULL); 927 928 nxt_port_close(task, proto_port); 929 930 nxt_port_use(task, proto_port, -1); 931 } 932 933 reply = NXT_PORT_MSG_RPC_READY_LAST; 934 935 } else { 936 937 fail: 938 939 reply = NXT_PORT_MSG_RPC_ERROR; 940 } 941 942 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream, 943 0, NULL); 944 } 945 946 947 static void 948 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 949 void *data) 950 { 951 union { 952 nxt_pid_t removed_pid; 953 void *data; 954 } u; 955 956 u.data = data; 957 958 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 959 } 960 961 962 static void 963 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 964 { 965 nxt_event_engine_t *engine; 966 967 nxt_port_remove_pid_handler(task, msg); 968 969 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 970 { 971 if (nxt_fast_path(engine->port != NULL)) { 972 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 973 msg->u.data); 974 } 975 } 976 nxt_queue_loop; 977 978 if (msg->port_msg.stream == 0) { 979 return; 980 } 981 982 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 983 984 nxt_port_rpc_handler(task, msg); 985 } 986 987 988 static nxt_router_temp_conf_t * 989 nxt_router_temp_conf(nxt_task_t *task) 990 { 991 nxt_mp_t *mp, *tmp; 992 nxt_router_conf_t *rtcf; 993 nxt_router_temp_conf_t *tmcf; 994 995 mp = nxt_mp_create(1024, 128, 256, 32); 996 if (nxt_slow_path(mp == NULL)) { 997 return NULL; 998 } 999 1000 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 1001 if (nxt_slow_path(rtcf == NULL)) { 1002 goto fail; 1003 } 1004 1005 rtcf->mem_pool = mp; 1006 1007 rtcf->var_fields = nxt_array_create(mp, 4, sizeof(nxt_var_field_t)); 1008 if (nxt_slow_path(rtcf->var_fields == NULL)) { 1009 goto fail; 1010 } 1011 1012 tmp = nxt_mp_create(1024, 128, 256, 32); 1013 if (nxt_slow_path(tmp == NULL)) { 1014 goto fail; 1015 } 1016 1017 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 1018 if (nxt_slow_path(tmcf == NULL)) { 1019 goto temp_fail; 1020 } 1021 1022 tmcf->mem_pool = tmp; 1023 tmcf->router_conf = rtcf; 1024 tmcf->count = 1; 1025 tmcf->engine = task->thread->engine; 1026 1027 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 1028 sizeof(nxt_router_engine_conf_t)); 1029 if (nxt_slow_path(tmcf->engines == NULL)) { 1030 goto temp_fail; 1031 } 1032 1033 nxt_queue_init(&creating_sockets); 1034 nxt_queue_init(&pending_sockets); 1035 nxt_queue_init(&updating_sockets); 1036 nxt_queue_init(&keeping_sockets); 1037 nxt_queue_init(&deleting_sockets); 1038 1039 #if (NXT_TLS) 1040 nxt_queue_init(&tmcf->tls); 1041 #endif 1042 1043 nxt_queue_init(&tmcf->apps); 1044 nxt_queue_init(&tmcf->previous); 1045 1046 return tmcf; 1047 1048 temp_fail: 1049 1050 nxt_mp_destroy(tmp); 1051 1052 fail: 1053 1054 nxt_mp_destroy(mp); 1055 1056 return NULL; 1057 } 1058 1059 1060 nxt_inline nxt_bool_t 1061 nxt_router_app_can_start(nxt_app_t *app) 1062 { 1063 return app->processes + app->pending_processes < app->max_processes 1064 && app->pending_processes < app->max_pending_processes; 1065 } 1066 1067 1068 nxt_inline nxt_bool_t 1069 nxt_router_app_need_start(nxt_app_t *app) 1070 { 1071 return (app->active_requests 1072 > app->port_hash_count + app->pending_processes) 1073 || (app->spare_processes 1074 > app->idle_processes + app->pending_processes); 1075 } 1076 1077 1078 static void 1079 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1080 { 1081 nxt_int_t ret; 1082 nxt_app_t *app; 1083 nxt_router_t *router; 1084 nxt_runtime_t *rt; 1085 nxt_queue_link_t *qlk; 1086 nxt_socket_conf_t *skcf; 1087 nxt_router_conf_t *rtcf; 1088 nxt_router_temp_conf_t *tmcf; 1089 const nxt_event_interface_t *interface; 1090 #if (NXT_TLS) 1091 nxt_router_tlssock_t *tls; 1092 #endif 1093 1094 tmcf = obj; 1095 1096 qlk = nxt_queue_first(&pending_sockets); 1097 1098 if (qlk != nxt_queue_tail(&pending_sockets)) { 1099 nxt_queue_remove(qlk); 1100 nxt_queue_insert_tail(&creating_sockets, qlk); 1101 1102 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1103 1104 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1105 1106 return; 1107 } 1108 1109 #if (NXT_TLS) 1110 qlk = nxt_queue_last(&tmcf->tls); 1111 1112 if (qlk != nxt_queue_head(&tmcf->tls)) { 1113 nxt_queue_remove(qlk); 1114 1115 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1116 1117 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 1118 nxt_router_tls_rpc_handler, tls); 1119 return; 1120 } 1121 #endif 1122 1123 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1124 1125 if (nxt_router_app_need_start(app)) { 1126 nxt_router_app_rpc_create(task, tmcf, app); 1127 return; 1128 } 1129 1130 } nxt_queue_loop; 1131 1132 rtcf = tmcf->router_conf; 1133 1134 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1135 nxt_router_access_log_open(task, tmcf); 1136 return; 1137 } 1138 1139 rt = task->thread->runtime; 1140 1141 interface = nxt_service_get(rt->services, "engine", NULL); 1142 1143 router = rtcf->router; 1144 1145 ret = nxt_router_engines_create(task, router, tmcf, interface); 1146 if (nxt_slow_path(ret != NXT_OK)) { 1147 goto fail; 1148 } 1149 1150 ret = nxt_router_threads_create(task, rt, tmcf); 1151 if (nxt_slow_path(ret != NXT_OK)) { 1152 goto fail; 1153 } 1154 1155 nxt_router_apps_sort(task, router, tmcf); 1156 1157 nxt_router_apps_hash_use(task, rtcf, 1); 1158 1159 nxt_router_engines_post(router, tmcf); 1160 1161 nxt_queue_add(&router->sockets, &updating_sockets); 1162 nxt_queue_add(&router->sockets, &creating_sockets); 1163 1164 if (router->access_log != rtcf->access_log) { 1165 nxt_router_access_log_use(&router->lock, rtcf->access_log); 1166 1167 nxt_router_access_log_release(task, &router->lock, router->access_log); 1168 1169 router->access_log = rtcf->access_log; 1170 } 1171 1172 nxt_router_conf_ready(task, tmcf); 1173 1174 return; 1175 1176 fail: 1177 1178 nxt_router_conf_error(task, tmcf); 1179 1180 return; 1181 } 1182 1183 1184 static void 1185 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1186 { 1187 nxt_joint_job_t *job; 1188 1189 job = obj; 1190 1191 nxt_router_conf_ready(task, job->tmcf); 1192 } 1193 1194 1195 static void 1196 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1197 { 1198 uint32_t count; 1199 nxt_router_conf_t *rtcf; 1200 nxt_thread_spinlock_t *lock; 1201 1202 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1203 1204 if (--tmcf->count > 0) { 1205 return; 1206 } 1207 1208 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1209 1210 rtcf = tmcf->router_conf; 1211 1212 lock = &rtcf->router->lock; 1213 1214 nxt_thread_spin_lock(lock); 1215 1216 count = rtcf->count; 1217 1218 nxt_thread_spin_unlock(lock); 1219 1220 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1221 1222 if (count == 0) { 1223 nxt_router_apps_hash_use(task, rtcf, -1); 1224 1225 nxt_router_access_log_release(task, lock, rtcf->access_log); 1226 1227 nxt_mp_destroy(rtcf->mem_pool); 1228 } 1229 1230 nxt_mp_release(tmcf->mem_pool); 1231 } 1232 1233 1234 static void 1235 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1236 { 1237 nxt_app_t *app; 1238 nxt_socket_t s; 1239 nxt_router_t *router; 1240 nxt_queue_link_t *qlk; 1241 nxt_socket_conf_t *skcf; 1242 nxt_router_conf_t *rtcf; 1243 1244 nxt_alert(task, "failed to apply new conf"); 1245 1246 for (qlk = nxt_queue_first(&creating_sockets); 1247 qlk != nxt_queue_tail(&creating_sockets); 1248 qlk = nxt_queue_next(qlk)) 1249 { 1250 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1251 s = skcf->listen->socket; 1252 1253 if (s != -1) { 1254 nxt_socket_close(task, s); 1255 } 1256 1257 nxt_free(skcf->listen); 1258 } 1259 1260 rtcf = tmcf->router_conf; 1261 1262 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1263 1264 nxt_router_app_unlink(task, app); 1265 1266 } nxt_queue_loop; 1267 1268 router = rtcf->router; 1269 1270 nxt_queue_add(&router->sockets, &keeping_sockets); 1271 nxt_queue_add(&router->sockets, &deleting_sockets); 1272 1273 nxt_queue_add(&router->apps, &tmcf->previous); 1274 1275 // TODO: new engines and threads 1276 1277 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1278 1279 nxt_mp_destroy(rtcf->mem_pool); 1280 1281 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1282 1283 nxt_mp_release(tmcf->mem_pool); 1284 } 1285 1286 1287 static void 1288 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1289 nxt_port_msg_type_t type) 1290 { 1291 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1292 1293 nxt_port_use(task, tmcf->port, -1); 1294 1295 tmcf->port = NULL; 1296 } 1297 1298 1299 static nxt_conf_map_t nxt_router_conf[] = { 1300 { 1301 nxt_string("listeners_threads"), 1302 NXT_CONF_MAP_INT32, 1303 offsetof(nxt_router_conf_t, threads), 1304 }, 1305 }; 1306 1307 1308 static nxt_conf_map_t nxt_router_app_conf[] = { 1309 { 1310 nxt_string("type"), 1311 NXT_CONF_MAP_STR, 1312 offsetof(nxt_router_app_conf_t, type), 1313 }, 1314 1315 { 1316 nxt_string("limits"), 1317 NXT_CONF_MAP_PTR, 1318 offsetof(nxt_router_app_conf_t, limits_value), 1319 }, 1320 1321 { 1322 nxt_string("processes"), 1323 NXT_CONF_MAP_INT32, 1324 offsetof(nxt_router_app_conf_t, processes), 1325 }, 1326 1327 { 1328 nxt_string("processes"), 1329 NXT_CONF_MAP_PTR, 1330 offsetof(nxt_router_app_conf_t, processes_value), 1331 }, 1332 1333 { 1334 nxt_string("targets"), 1335 NXT_CONF_MAP_PTR, 1336 offsetof(nxt_router_app_conf_t, targets_value), 1337 }, 1338 }; 1339 1340 1341 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1342 { 1343 nxt_string("timeout"), 1344 NXT_CONF_MAP_MSEC, 1345 offsetof(nxt_router_app_conf_t, timeout), 1346 }, 1347 }; 1348 1349 1350 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1351 { 1352 nxt_string("spare"), 1353 NXT_CONF_MAP_INT32, 1354 offsetof(nxt_router_app_conf_t, spare_processes), 1355 }, 1356 1357 { 1358 nxt_string("max"), 1359 NXT_CONF_MAP_INT32, 1360 offsetof(nxt_router_app_conf_t, max_processes), 1361 }, 1362 1363 { 1364 nxt_string("idle_timeout"), 1365 NXT_CONF_MAP_MSEC, 1366 offsetof(nxt_router_app_conf_t, idle_timeout), 1367 }, 1368 }; 1369 1370 1371 static nxt_conf_map_t nxt_router_listener_conf[] = { 1372 { 1373 nxt_string("pass"), 1374 NXT_CONF_MAP_STR_COPY, 1375 offsetof(nxt_router_listener_conf_t, pass), 1376 }, 1377 1378 { 1379 nxt_string("application"), 1380 NXT_CONF_MAP_STR_COPY, 1381 offsetof(nxt_router_listener_conf_t, application), 1382 }, 1383 }; 1384 1385 1386 static nxt_conf_map_t nxt_router_http_conf[] = { 1387 { 1388 nxt_string("header_buffer_size"), 1389 NXT_CONF_MAP_SIZE, 1390 offsetof(nxt_socket_conf_t, header_buffer_size), 1391 }, 1392 1393 { 1394 nxt_string("large_header_buffer_size"), 1395 NXT_CONF_MAP_SIZE, 1396 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1397 }, 1398 1399 { 1400 nxt_string("large_header_buffers"), 1401 NXT_CONF_MAP_SIZE, 1402 offsetof(nxt_socket_conf_t, large_header_buffers), 1403 }, 1404 1405 { 1406 nxt_string("body_buffer_size"), 1407 NXT_CONF_MAP_SIZE, 1408 offsetof(nxt_socket_conf_t, body_buffer_size), 1409 }, 1410 1411 { 1412 nxt_string("max_body_size"), 1413 NXT_CONF_MAP_SIZE, 1414 offsetof(nxt_socket_conf_t, max_body_size), 1415 }, 1416 1417 { 1418 nxt_string("idle_timeout"), 1419 NXT_CONF_MAP_MSEC, 1420 offsetof(nxt_socket_conf_t, idle_timeout), 1421 }, 1422 1423 { 1424 nxt_string("header_read_timeout"), 1425 NXT_CONF_MAP_MSEC, 1426 offsetof(nxt_socket_conf_t, header_read_timeout), 1427 }, 1428 1429 { 1430 nxt_string("body_read_timeout"), 1431 NXT_CONF_MAP_MSEC, 1432 offsetof(nxt_socket_conf_t, body_read_timeout), 1433 }, 1434 1435 { 1436 nxt_string("send_timeout"), 1437 NXT_CONF_MAP_MSEC, 1438 offsetof(nxt_socket_conf_t, send_timeout), 1439 }, 1440 1441 { 1442 nxt_string("body_temp_path"), 1443 NXT_CONF_MAP_STR, 1444 offsetof(nxt_socket_conf_t, body_temp_path), 1445 }, 1446 1447 { 1448 nxt_string("discard_unsafe_fields"), 1449 NXT_CONF_MAP_INT8, 1450 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1451 }, 1452 }; 1453 1454 1455 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1456 { 1457 nxt_string("max_frame_size"), 1458 NXT_CONF_MAP_SIZE, 1459 offsetof(nxt_websocket_conf_t, max_frame_size), 1460 }, 1461 1462 { 1463 nxt_string("read_timeout"), 1464 NXT_CONF_MAP_MSEC, 1465 offsetof(nxt_websocket_conf_t, read_timeout), 1466 }, 1467 1468 { 1469 nxt_string("keepalive_interval"), 1470 NXT_CONF_MAP_MSEC, 1471 offsetof(nxt_websocket_conf_t, keepalive_interval), 1472 }, 1473 1474 }; 1475 1476 1477 static nxt_int_t 1478 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1479 u_char *start, u_char *end) 1480 { 1481 u_char *p; 1482 size_t size; 1483 nxt_mp_t *mp, *app_mp; 1484 uint32_t next, next_target; 1485 nxt_int_t ret; 1486 nxt_str_t name, path, target; 1487 nxt_app_t *app, *prev; 1488 nxt_str_t *t, *s, *targets; 1489 nxt_uint_t n, i; 1490 nxt_port_t *port; 1491 nxt_router_t *router; 1492 nxt_app_joint_t *app_joint; 1493 #if (NXT_TLS) 1494 nxt_tls_init_t *tls_init; 1495 nxt_conf_value_t *certificate; 1496 #endif 1497 nxt_conf_value_t *root, *conf, *http, *value, *websocket; 1498 nxt_conf_value_t *applications, *application; 1499 nxt_conf_value_t *listeners, *listener; 1500 nxt_socket_conf_t *skcf; 1501 nxt_router_conf_t *rtcf; 1502 nxt_http_routes_t *routes; 1503 nxt_event_engine_t *engine; 1504 nxt_app_lang_module_t *lang; 1505 nxt_router_app_conf_t apcf; 1506 nxt_router_access_log_t *access_log; 1507 nxt_router_listener_conf_t lscf; 1508 1509 static nxt_str_t http_path = nxt_string("/settings/http"); 1510 static nxt_str_t applications_path = nxt_string("/applications"); 1511 static nxt_str_t listeners_path = nxt_string("/listeners"); 1512 static nxt_str_t routes_path = nxt_string("/routes"); 1513 static nxt_str_t access_log_path = nxt_string("/access_log"); 1514 #if (NXT_TLS) 1515 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1516 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands"); 1517 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size"); 1518 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout"); 1519 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets"); 1520 #endif 1521 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1522 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1523 static nxt_str_t forwarded_path = nxt_string("/forwarded"); 1524 static nxt_str_t client_ip_path = nxt_string("/client_ip"); 1525 1526 root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1527 if (root == NULL) { 1528 nxt_alert(task, "configuration parsing error"); 1529 return NXT_ERROR; 1530 } 1531 1532 rtcf = tmcf->router_conf; 1533 mp = rtcf->mem_pool; 1534 1535 ret = nxt_conf_map_object(mp, root, nxt_router_conf, 1536 nxt_nitems(nxt_router_conf), rtcf); 1537 if (ret != NXT_OK) { 1538 nxt_alert(task, "root map error"); 1539 return NXT_ERROR; 1540 } 1541 1542 if (rtcf->threads == 0) { 1543 rtcf->threads = nxt_ncpu; 1544 } 1545 1546 conf = nxt_conf_get_path(root, &static_path); 1547 1548 ret = nxt_router_conf_process_static(task, rtcf, conf); 1549 if (nxt_slow_path(ret != NXT_OK)) { 1550 return NXT_ERROR; 1551 } 1552 1553 router = rtcf->router; 1554 1555 applications = nxt_conf_get_path(root, &applications_path); 1556 1557 if (applications != NULL) { 1558 next = 0; 1559 1560 for ( ;; ) { 1561 application = nxt_conf_next_object_member(applications, 1562 &name, &next); 1563 if (application == NULL) { 1564 break; 1565 } 1566 1567 nxt_debug(task, "application \"%V\"", &name); 1568 1569 size = nxt_conf_json_length(application, NULL); 1570 1571 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1572 if (nxt_slow_path(app_mp == NULL)) { 1573 goto fail; 1574 } 1575 1576 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1577 if (app == NULL) { 1578 goto app_fail; 1579 } 1580 1581 nxt_memzero(app, sizeof(nxt_app_t)); 1582 1583 app->mem_pool = app_mp; 1584 1585 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1586 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1587 + name.length); 1588 1589 p = nxt_conf_json_print(app->conf.start, application, NULL); 1590 app->conf.length = p - app->conf.start; 1591 1592 nxt_assert(app->conf.length <= size); 1593 1594 nxt_debug(task, "application conf \"%V\"", &app->conf); 1595 1596 prev = nxt_router_app_find(&router->apps, &name); 1597 1598 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1599 nxt_mp_destroy(app_mp); 1600 1601 nxt_queue_remove(&prev->link); 1602 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1603 1604 ret = nxt_router_apps_hash_add(rtcf, prev); 1605 if (nxt_slow_path(ret != NXT_OK)) { 1606 goto fail; 1607 } 1608 1609 continue; 1610 } 1611 1612 apcf.processes = 1; 1613 apcf.max_processes = 1; 1614 apcf.spare_processes = 0; 1615 apcf.timeout = 0; 1616 apcf.idle_timeout = 15000; 1617 apcf.limits_value = NULL; 1618 apcf.processes_value = NULL; 1619 apcf.targets_value = NULL; 1620 1621 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1622 if (nxt_slow_path(app_joint == NULL)) { 1623 goto app_fail; 1624 } 1625 1626 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1627 1628 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1629 nxt_nitems(nxt_router_app_conf), &apcf); 1630 if (ret != NXT_OK) { 1631 nxt_alert(task, "application map error"); 1632 goto app_fail; 1633 } 1634 1635 if (apcf.limits_value != NULL) { 1636 1637 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1638 nxt_alert(task, "application limits is not object"); 1639 goto app_fail; 1640 } 1641 1642 ret = nxt_conf_map_object(mp, apcf.limits_value, 1643 nxt_router_app_limits_conf, 1644 nxt_nitems(nxt_router_app_limits_conf), 1645 &apcf); 1646 if (ret != NXT_OK) { 1647 nxt_alert(task, "application limits map error"); 1648 goto app_fail; 1649 } 1650 } 1651 1652 if (apcf.processes_value != NULL 1653 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1654 { 1655 ret = nxt_conf_map_object(mp, apcf.processes_value, 1656 nxt_router_app_processes_conf, 1657 nxt_nitems(nxt_router_app_processes_conf), 1658 &apcf); 1659 if (ret != NXT_OK) { 1660 nxt_alert(task, "application processes map error"); 1661 goto app_fail; 1662 } 1663 1664 } else { 1665 apcf.max_processes = apcf.processes; 1666 apcf.spare_processes = apcf.processes; 1667 } 1668 1669 if (apcf.targets_value != NULL) { 1670 n = nxt_conf_object_members_count(apcf.targets_value); 1671 1672 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1673 if (nxt_slow_path(targets == NULL)) { 1674 goto app_fail; 1675 } 1676 1677 next_target = 0; 1678 1679 for (i = 0; i < n; i++) { 1680 (void) nxt_conf_next_object_member(apcf.targets_value, 1681 &target, &next_target); 1682 1683 s = nxt_str_dup(app_mp, &targets[i], &target); 1684 if (nxt_slow_path(s == NULL)) { 1685 goto app_fail; 1686 } 1687 } 1688 1689 } else { 1690 targets = NULL; 1691 } 1692 1693 nxt_debug(task, "application type: %V", &apcf.type); 1694 nxt_debug(task, "application processes: %D", apcf.processes); 1695 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1696 1697 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1698 1699 if (lang == NULL) { 1700 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1701 goto app_fail; 1702 } 1703 1704 nxt_debug(task, "application language module: \"%s\"", lang->file); 1705 1706 ret = nxt_thread_mutex_create(&app->mutex); 1707 if (ret != NXT_OK) { 1708 goto app_fail; 1709 } 1710 1711 nxt_queue_init(&app->ports); 1712 nxt_queue_init(&app->spare_ports); 1713 nxt_queue_init(&app->idle_ports); 1714 nxt_queue_init(&app->ack_waiting_req); 1715 1716 app->name.length = name.length; 1717 nxt_memcpy(app->name.start, name.start, name.length); 1718 1719 app->type = lang->type; 1720 app->max_processes = apcf.max_processes; 1721 app->spare_processes = apcf.spare_processes; 1722 app->max_pending_processes = apcf.spare_processes 1723 ? apcf.spare_processes : 1; 1724 app->timeout = apcf.timeout; 1725 app->idle_timeout = apcf.idle_timeout; 1726 1727 app->targets = targets; 1728 1729 engine = task->thread->engine; 1730 1731 app->engine = engine; 1732 1733 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1734 app->adjust_idle_work.task = &engine->task; 1735 app->adjust_idle_work.obj = app; 1736 1737 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1738 1739 ret = nxt_router_apps_hash_add(rtcf, app); 1740 if (nxt_slow_path(ret != NXT_OK)) { 1741 goto app_fail; 1742 } 1743 1744 nxt_router_app_use(task, app, 1); 1745 1746 app->joint = app_joint; 1747 1748 app_joint->use_count = 1; 1749 app_joint->app = app; 1750 1751 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1752 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1753 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1754 app_joint->idle_timer.task = &engine->task; 1755 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1756 1757 app_joint->free_app_work.handler = nxt_router_free_app; 1758 app_joint->free_app_work.task = &engine->task; 1759 app_joint->free_app_work.obj = app_joint; 1760 1761 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 1762 NXT_PROCESS_APP); 1763 if (nxt_slow_path(port == NULL)) { 1764 return NXT_ERROR; 1765 } 1766 1767 ret = nxt_port_socket_init(task, port, 0); 1768 if (nxt_slow_path(ret != NXT_OK)) { 1769 nxt_port_use(task, port, -1); 1770 return NXT_ERROR; 1771 } 1772 1773 ret = nxt_router_app_queue_init(task, port); 1774 if (nxt_slow_path(ret != NXT_OK)) { 1775 nxt_port_write_close(port); 1776 nxt_port_read_close(port); 1777 nxt_port_use(task, port, -1); 1778 return NXT_ERROR; 1779 } 1780 1781 nxt_port_write_enable(task, port); 1782 port->app = app; 1783 1784 app->shared_port = port; 1785 1786 nxt_thread_mutex_create(&app->outgoing.mutex); 1787 } 1788 } 1789 1790 conf = nxt_conf_get_path(root, &routes_path); 1791 if (nxt_fast_path(conf != NULL)) { 1792 routes = nxt_http_routes_create(task, tmcf, conf); 1793 if (nxt_slow_path(routes == NULL)) { 1794 return NXT_ERROR; 1795 } 1796 rtcf->routes = routes; 1797 } 1798 1799 ret = nxt_upstreams_create(task, tmcf, root); 1800 if (nxt_slow_path(ret != NXT_OK)) { 1801 return ret; 1802 } 1803 1804 http = nxt_conf_get_path(root, &http_path); 1805 #if 0 1806 if (http == NULL) { 1807 nxt_alert(task, "no \"http\" block"); 1808 return NXT_ERROR; 1809 } 1810 #endif 1811 1812 websocket = nxt_conf_get_path(root, &websocket_path); 1813 1814 listeners = nxt_conf_get_path(root, &listeners_path); 1815 1816 if (listeners != NULL) { 1817 next = 0; 1818 1819 for ( ;; ) { 1820 listener = nxt_conf_next_object_member(listeners, &name, &next); 1821 if (listener == NULL) { 1822 break; 1823 } 1824 1825 skcf = nxt_router_socket_conf(task, tmcf, &name); 1826 if (skcf == NULL) { 1827 goto fail; 1828 } 1829 1830 nxt_memzero(&lscf, sizeof(lscf)); 1831 1832 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1833 nxt_nitems(nxt_router_listener_conf), 1834 &lscf); 1835 if (ret != NXT_OK) { 1836 nxt_alert(task, "listener map error"); 1837 goto fail; 1838 } 1839 1840 nxt_debug(task, "application: %V", &lscf.application); 1841 1842 // STUB, default values if http block is not defined. 1843 skcf->header_buffer_size = 2048; 1844 skcf->large_header_buffer_size = 8192; 1845 skcf->large_header_buffers = 4; 1846 skcf->discard_unsafe_fields = 1; 1847 skcf->body_buffer_size = 16 * 1024; 1848 skcf->max_body_size = 8 * 1024 * 1024; 1849 skcf->proxy_header_buffer_size = 64 * 1024; 1850 skcf->proxy_buffer_size = 4096; 1851 skcf->proxy_buffers = 256; 1852 skcf->idle_timeout = 180 * 1000; 1853 skcf->header_read_timeout = 30 * 1000; 1854 skcf->body_read_timeout = 30 * 1000; 1855 skcf->send_timeout = 30 * 1000; 1856 skcf->proxy_timeout = 60 * 1000; 1857 skcf->proxy_send_timeout = 30 * 1000; 1858 skcf->proxy_read_timeout = 30 * 1000; 1859 1860 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1861 skcf->websocket_conf.read_timeout = 60 * 1000; 1862 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1863 1864 nxt_str_null(&skcf->body_temp_path); 1865 1866 if (http != NULL) { 1867 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1868 nxt_nitems(nxt_router_http_conf), 1869 skcf); 1870 if (ret != NXT_OK) { 1871 nxt_alert(task, "http map error"); 1872 goto fail; 1873 } 1874 } 1875 1876 if (websocket != NULL) { 1877 ret = nxt_conf_map_object(mp, websocket, 1878 nxt_router_websocket_conf, 1879 nxt_nitems(nxt_router_websocket_conf), 1880 &skcf->websocket_conf); 1881 if (ret != NXT_OK) { 1882 nxt_alert(task, "websocket map error"); 1883 goto fail; 1884 } 1885 } 1886 1887 t = &skcf->body_temp_path; 1888 1889 if (t->length == 0) { 1890 t->start = (u_char *) task->thread->runtime->tmp; 1891 t->length = nxt_strlen(t->start); 1892 } 1893 1894 conf = nxt_conf_get_path(listener, &forwarded_path); 1895 1896 if (conf != NULL) { 1897 skcf->forwarded = nxt_router_conf_forward(task, mp, conf); 1898 if (nxt_slow_path(skcf->forwarded == NULL)) { 1899 return NXT_ERROR; 1900 } 1901 } 1902 1903 conf = nxt_conf_get_path(listener, &client_ip_path); 1904 1905 if (conf != NULL) { 1906 skcf->client_ip = nxt_router_conf_forward(task, mp, conf); 1907 if (nxt_slow_path(skcf->client_ip == NULL)) { 1908 return NXT_ERROR; 1909 } 1910 } 1911 1912 #if (NXT_TLS) 1913 certificate = nxt_conf_get_path(listener, &certificate_path); 1914 1915 if (certificate != NULL) { 1916 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t)); 1917 if (nxt_slow_path(tls_init == NULL)) { 1918 return NXT_ERROR; 1919 } 1920 1921 tls_init->cache_size = 0; 1922 tls_init->timeout = 300; 1923 1924 value = nxt_conf_get_path(listener, &conf_cache_path); 1925 if (value != NULL) { 1926 tls_init->cache_size = nxt_conf_get_number(value); 1927 } 1928 1929 value = nxt_conf_get_path(listener, &conf_timeout_path); 1930 if (value != NULL) { 1931 tls_init->timeout = nxt_conf_get_number(value); 1932 } 1933 1934 tls_init->conf_cmds = nxt_conf_get_path(listener, 1935 &conf_commands_path); 1936 1937 tls_init->tickets_conf = nxt_conf_get_path(listener, 1938 &conf_tickets); 1939 1940 n = nxt_conf_array_elements_count_or_1(certificate); 1941 1942 for (i = 0; i < n; i++) { 1943 value = nxt_conf_get_array_element_or_itself(certificate, 1944 i); 1945 nxt_assert(value != NULL); 1946 1947 ret = nxt_router_conf_tls_insert(tmcf, value, skcf, 1948 tls_init, i == 0); 1949 if (nxt_slow_path(ret != NXT_OK)) { 1950 goto fail; 1951 } 1952 } 1953 } 1954 #endif 1955 1956 skcf->listen->handler = nxt_http_conn_init; 1957 skcf->router_conf = rtcf; 1958 skcf->router_conf->count++; 1959 1960 if (lscf.pass.length != 0) { 1961 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1962 1963 /* COMPATIBILITY: listener application. */ 1964 } else if (lscf.application.length > 0) { 1965 skcf->action = nxt_http_pass_application(task, rtcf, 1966 &lscf.application); 1967 } 1968 1969 if (nxt_slow_path(skcf->action == NULL)) { 1970 goto fail; 1971 } 1972 } 1973 } 1974 1975 ret = nxt_http_routes_resolve(task, tmcf); 1976 if (nxt_slow_path(ret != NXT_OK)) { 1977 goto fail; 1978 } 1979 1980 value = nxt_conf_get_path(root, &access_log_path); 1981 1982 if (value != NULL) { 1983 nxt_conf_get_string(value, &path); 1984 1985 access_log = router->access_log; 1986 1987 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1988 nxt_router_access_log_use(&router->lock, access_log); 1989 1990 } else { 1991 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1992 + path.length); 1993 if (access_log == NULL) { 1994 nxt_alert(task, "failed to allocate access log structure"); 1995 goto fail; 1996 } 1997 1998 access_log->fd = -1; 1999 access_log->handler = &nxt_router_access_log_writer; 2000 access_log->count = 1; 2001 2002 access_log->path.length = path.length; 2003 access_log->path.start = (u_char *) access_log 2004 + sizeof(nxt_router_access_log_t); 2005 2006 nxt_memcpy(access_log->path.start, path.start, path.length); 2007 } 2008 2009 rtcf->access_log = access_log; 2010 } 2011 2012 nxt_queue_add(&deleting_sockets, &router->sockets); 2013 nxt_queue_init(&router->sockets); 2014 2015 return NXT_OK; 2016 2017 app_fail: 2018 2019 nxt_mp_destroy(app_mp); 2020 2021 fail: 2022 2023 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 2024 2025 nxt_queue_remove(&app->link); 2026 nxt_thread_mutex_destroy(&app->mutex); 2027 nxt_mp_destroy(app->mem_pool); 2028 2029 } nxt_queue_loop; 2030 2031 return NXT_ERROR; 2032 } 2033 2034 2035 #if (NXT_TLS) 2036 2037 static nxt_int_t 2038 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 2039 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, 2040 nxt_tls_init_t *tls_init, nxt_bool_t last) 2041 { 2042 nxt_router_tlssock_t *tls; 2043 2044 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t)); 2045 if (nxt_slow_path(tls == NULL)) { 2046 return NXT_ERROR; 2047 } 2048 2049 tls->tls_init = tls_init; 2050 tls->socket_conf = skcf; 2051 tls->temp_conf = tmcf; 2052 tls->last = last; 2053 nxt_conf_get_string(value, &tls->name); 2054 2055 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 2056 2057 return NXT_OK; 2058 } 2059 2060 #endif 2061 2062 2063 static nxt_int_t 2064 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 2065 nxt_conf_value_t *conf) 2066 { 2067 uint32_t next, i; 2068 nxt_mp_t *mp; 2069 nxt_str_t *type, exten, str; 2070 nxt_int_t ret; 2071 nxt_uint_t exts; 2072 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 2073 2074 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 2075 2076 mp = rtcf->mem_pool; 2077 2078 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 2079 if (nxt_slow_path(ret != NXT_OK)) { 2080 return NXT_ERROR; 2081 } 2082 2083 if (conf == NULL) { 2084 return NXT_OK; 2085 } 2086 2087 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 2088 2089 if (mtypes_conf != NULL) { 2090 next = 0; 2091 2092 for ( ;; ) { 2093 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 2094 2095 if (ext_conf == NULL) { 2096 break; 2097 } 2098 2099 type = nxt_str_dup(mp, NULL, &str); 2100 if (nxt_slow_path(type == NULL)) { 2101 return NXT_ERROR; 2102 } 2103 2104 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 2105 nxt_conf_get_string(ext_conf, &str); 2106 2107 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2108 return NXT_ERROR; 2109 } 2110 2111 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2112 &exten, type); 2113 if (nxt_slow_path(ret != NXT_OK)) { 2114 return NXT_ERROR; 2115 } 2116 2117 continue; 2118 } 2119 2120 exts = nxt_conf_array_elements_count(ext_conf); 2121 2122 for (i = 0; i < exts; i++) { 2123 value = nxt_conf_get_array_element(ext_conf, i); 2124 2125 nxt_conf_get_string(value, &str); 2126 2127 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2128 return NXT_ERROR; 2129 } 2130 2131 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2132 &exten, type); 2133 if (nxt_slow_path(ret != NXT_OK)) { 2134 return NXT_ERROR; 2135 } 2136 } 2137 } 2138 } 2139 2140 return NXT_OK; 2141 } 2142 2143 2144 static nxt_http_forward_t * 2145 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf) 2146 { 2147 nxt_int_t ret; 2148 nxt_conf_value_t *header_conf, *client_ip_conf, *protocol_conf; 2149 nxt_conf_value_t *source_conf, *recursive_conf; 2150 nxt_http_forward_t *forward; 2151 nxt_http_route_addr_rule_t *source; 2152 2153 static nxt_str_t header_path = nxt_string("/header"); 2154 static nxt_str_t client_ip_path = nxt_string("/client_ip"); 2155 static nxt_str_t protocol_path = nxt_string("/protocol"); 2156 static nxt_str_t source_path = nxt_string("/source"); 2157 static nxt_str_t recursive_path = nxt_string("/recursive"); 2158 2159 header_conf = nxt_conf_get_path(conf, &header_path); 2160 2161 if (header_conf != NULL) { 2162 client_ip_conf = nxt_conf_get_path(conf, &header_path); 2163 protocol_conf = NULL; 2164 2165 } else { 2166 client_ip_conf = nxt_conf_get_path(conf, &client_ip_path); 2167 protocol_conf = nxt_conf_get_path(conf, &protocol_path); 2168 } 2169 2170 source_conf = nxt_conf_get_path(conf, &source_path); 2171 recursive_conf = nxt_conf_get_path(conf, &recursive_path); 2172 2173 if (source_conf == NULL 2174 || (protocol_conf == NULL && client_ip_conf == NULL)) 2175 { 2176 return NULL; 2177 } 2178 2179 forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t)); 2180 if (nxt_slow_path(forward == NULL)) { 2181 return NULL; 2182 } 2183 2184 source = nxt_http_route_addr_rule_create(task, mp, source_conf); 2185 if (nxt_slow_path(source == NULL)) { 2186 return NULL; 2187 } 2188 2189 forward->source = source; 2190 2191 if (recursive_conf != NULL) { 2192 forward->recursive = nxt_conf_get_boolean(recursive_conf); 2193 } 2194 2195 if (client_ip_conf != NULL) { 2196 ret = nxt_router_conf_forward_header(mp, client_ip_conf, 2197 &forward->client_ip); 2198 if (nxt_slow_path(ret != NXT_OK)) { 2199 return NULL; 2200 } 2201 } 2202 2203 if (protocol_conf != NULL) { 2204 ret = nxt_router_conf_forward_header(mp, protocol_conf, 2205 &forward->protocol); 2206 if (nxt_slow_path(ret != NXT_OK)) { 2207 return NULL; 2208 } 2209 } 2210 2211 return forward; 2212 } 2213 2214 2215 static nxt_int_t 2216 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf, 2217 nxt_http_forward_header_t *fh) 2218 { 2219 char c; 2220 size_t i; 2221 uint32_t hash; 2222 nxt_str_t header; 2223 2224 nxt_conf_get_string(conf, &header); 2225 2226 fh->header = nxt_str_dup(mp, NULL, &header); 2227 if (nxt_slow_path(fh->header == NULL)) { 2228 return NXT_ERROR; 2229 } 2230 2231 hash = NXT_HTTP_FIELD_HASH_INIT; 2232 2233 for (i = 0; i < fh->header->length; i++) { 2234 c = fh->header->start[i]; 2235 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c)); 2236 } 2237 2238 hash = nxt_http_field_hash_end(hash) & 0xFFFF; 2239 2240 fh->header_hash = hash; 2241 2242 return NXT_OK; 2243 } 2244 2245 2246 static nxt_app_t * 2247 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 2248 { 2249 nxt_app_t *app; 2250 2251 nxt_queue_each(app, queue, nxt_app_t, link) { 2252 2253 if (nxt_strstr_eq(name, &app->name)) { 2254 return app; 2255 } 2256 2257 } nxt_queue_loop; 2258 2259 return NULL; 2260 } 2261 2262 2263 static nxt_int_t 2264 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 2265 { 2266 void *mem; 2267 nxt_int_t fd; 2268 2269 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 2270 if (nxt_slow_path(fd == -1)) { 2271 return NXT_ERROR; 2272 } 2273 2274 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 2275 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2276 if (nxt_slow_path(mem == MAP_FAILED)) { 2277 nxt_fd_close(fd); 2278 2279 return NXT_ERROR; 2280 } 2281 2282 nxt_app_queue_init(mem); 2283 2284 port->queue_fd = fd; 2285 port->queue = mem; 2286 2287 return NXT_OK; 2288 } 2289 2290 2291 static nxt_int_t 2292 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 2293 { 2294 void *mem; 2295 nxt_int_t fd; 2296 2297 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 2298 if (nxt_slow_path(fd == -1)) { 2299 return NXT_ERROR; 2300 } 2301 2302 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2303 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2304 if (nxt_slow_path(mem == MAP_FAILED)) { 2305 nxt_fd_close(fd); 2306 2307 return NXT_ERROR; 2308 } 2309 2310 nxt_port_queue_init(mem); 2311 2312 port->queue_fd = fd; 2313 port->queue = mem; 2314 2315 return NXT_OK; 2316 } 2317 2318 2319 static nxt_int_t 2320 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 2321 { 2322 void *mem; 2323 2324 nxt_assert(fd != -1); 2325 2326 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2327 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2328 if (nxt_slow_path(mem == MAP_FAILED)) { 2329 2330 return NXT_ERROR; 2331 } 2332 2333 port->queue = mem; 2334 2335 return NXT_OK; 2336 } 2337 2338 2339 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2340 NXT_LVLHSH_DEFAULT, 2341 nxt_router_apps_hash_test, 2342 nxt_mp_lvlhsh_alloc, 2343 nxt_mp_lvlhsh_free, 2344 }; 2345 2346 2347 static nxt_int_t 2348 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2349 { 2350 nxt_app_t *app; 2351 2352 app = data; 2353 2354 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2355 } 2356 2357 2358 static nxt_int_t 2359 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2360 { 2361 nxt_lvlhsh_query_t lhq; 2362 2363 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2364 lhq.replace = 0; 2365 lhq.key = app->name; 2366 lhq.value = app; 2367 lhq.proto = &nxt_router_apps_hash_proto; 2368 lhq.pool = rtcf->mem_pool; 2369 2370 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2371 2372 case NXT_OK: 2373 return NXT_OK; 2374 2375 case NXT_DECLINED: 2376 nxt_thread_log_alert("router app hash adding failed: " 2377 "\"%V\" is already in hash", &lhq.key); 2378 /* Fall through. */ 2379 default: 2380 return NXT_ERROR; 2381 } 2382 } 2383 2384 2385 static nxt_app_t * 2386 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2387 { 2388 nxt_lvlhsh_query_t lhq; 2389 2390 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2391 lhq.key = *name; 2392 lhq.proto = &nxt_router_apps_hash_proto; 2393 2394 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2395 return NULL; 2396 } 2397 2398 return lhq.value; 2399 } 2400 2401 2402 static void 2403 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2404 { 2405 nxt_app_t *app; 2406 nxt_lvlhsh_each_t lhe; 2407 2408 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2409 2410 for ( ;; ) { 2411 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2412 2413 if (app == NULL) { 2414 break; 2415 } 2416 2417 nxt_router_app_use(task, app, i); 2418 } 2419 } 2420 2421 2422 typedef struct { 2423 nxt_app_t *app; 2424 nxt_int_t target; 2425 } nxt_http_app_conf_t; 2426 2427 2428 nxt_int_t 2429 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name, 2430 nxt_str_t *target, nxt_http_action_t *action) 2431 { 2432 nxt_app_t *app; 2433 nxt_str_t *targets; 2434 nxt_uint_t i; 2435 nxt_http_app_conf_t *conf; 2436 2437 app = nxt_router_apps_hash_get(rtcf, name); 2438 if (app == NULL) { 2439 return NXT_DECLINED; 2440 } 2441 2442 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t)); 2443 if (nxt_slow_path(conf == NULL)) { 2444 return NXT_ERROR; 2445 } 2446 2447 action->handler = nxt_http_application_handler; 2448 action->u.conf = conf; 2449 2450 conf->app = app; 2451 2452 if (target != NULL && target->length != 0) { 2453 targets = app->targets; 2454 2455 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++); 2456 2457 conf->target = i; 2458 2459 } else { 2460 conf->target = 0; 2461 } 2462 2463 return NXT_OK; 2464 } 2465 2466 2467 static nxt_socket_conf_t * 2468 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2469 nxt_str_t *name) 2470 { 2471 size_t size; 2472 nxt_int_t ret; 2473 nxt_bool_t wildcard; 2474 nxt_sockaddr_t *sa; 2475 nxt_socket_conf_t *skcf; 2476 nxt_listen_socket_t *ls; 2477 2478 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2479 if (nxt_slow_path(sa == NULL)) { 2480 nxt_alert(task, "invalid listener \"%V\"", name); 2481 return NULL; 2482 } 2483 2484 sa->type = SOCK_STREAM; 2485 2486 nxt_debug(task, "router listener: \"%*s\"", 2487 (size_t) sa->length, nxt_sockaddr_start(sa)); 2488 2489 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2490 if (nxt_slow_path(skcf == NULL)) { 2491 return NULL; 2492 } 2493 2494 size = nxt_sockaddr_size(sa); 2495 2496 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2497 2498 if (ret != NXT_OK) { 2499 2500 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2501 if (nxt_slow_path(ls == NULL)) { 2502 return NULL; 2503 } 2504 2505 skcf->listen = ls; 2506 2507 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2508 nxt_memcpy(ls->sockaddr, sa, size); 2509 2510 nxt_listen_socket_remote_size(ls); 2511 2512 ls->socket = -1; 2513 ls->backlog = NXT_LISTEN_BACKLOG; 2514 ls->flags = NXT_NONBLOCK; 2515 ls->read_after_accept = 1; 2516 } 2517 2518 switch (sa->u.sockaddr.sa_family) { 2519 #if (NXT_HAVE_UNIX_DOMAIN) 2520 case AF_UNIX: 2521 wildcard = 0; 2522 break; 2523 #endif 2524 #if (NXT_INET6) 2525 case AF_INET6: 2526 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2527 break; 2528 #endif 2529 case AF_INET: 2530 default: 2531 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2532 break; 2533 } 2534 2535 if (!wildcard) { 2536 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2537 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2538 return NULL; 2539 } 2540 2541 nxt_memcpy(skcf->sockaddr, sa, size); 2542 } 2543 2544 return skcf; 2545 } 2546 2547 2548 static nxt_int_t 2549 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2550 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2551 { 2552 nxt_router_t *router; 2553 nxt_queue_link_t *qlk; 2554 nxt_socket_conf_t *skcf; 2555 2556 router = tmcf->router_conf->router; 2557 2558 for (qlk = nxt_queue_first(&router->sockets); 2559 qlk != nxt_queue_tail(&router->sockets); 2560 qlk = nxt_queue_next(qlk)) 2561 { 2562 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2563 2564 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2565 nskcf->listen = skcf->listen; 2566 2567 nxt_queue_remove(qlk); 2568 nxt_queue_insert_tail(&keeping_sockets, qlk); 2569 2570 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2571 2572 return NXT_OK; 2573 } 2574 } 2575 2576 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2577 2578 return NXT_DECLINED; 2579 } 2580 2581 2582 static void 2583 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2584 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2585 { 2586 size_t size; 2587 uint32_t stream; 2588 nxt_int_t ret; 2589 nxt_buf_t *b; 2590 nxt_port_t *main_port, *router_port; 2591 nxt_runtime_t *rt; 2592 nxt_socket_rpc_t *rpc; 2593 2594 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2595 if (rpc == NULL) { 2596 goto fail; 2597 } 2598 2599 rpc->socket_conf = skcf; 2600 rpc->temp_conf = tmcf; 2601 2602 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2603 2604 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2605 if (b == NULL) { 2606 goto fail; 2607 } 2608 2609 b->completion_handler = nxt_buf_dummy_completion; 2610 2611 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2612 2613 rt = task->thread->runtime; 2614 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2615 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2616 2617 stream = nxt_port_rpc_register_handler(task, router_port, 2618 nxt_router_listen_socket_ready, 2619 nxt_router_listen_socket_error, 2620 main_port->pid, rpc); 2621 if (nxt_slow_path(stream == 0)) { 2622 goto fail; 2623 } 2624 2625 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2626 stream, router_port->id, b); 2627 2628 if (nxt_slow_path(ret != NXT_OK)) { 2629 nxt_port_rpc_cancel(task, router_port, stream); 2630 goto fail; 2631 } 2632 2633 return; 2634 2635 fail: 2636 2637 nxt_router_conf_error(task, tmcf); 2638 } 2639 2640 2641 static void 2642 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2643 void *data) 2644 { 2645 nxt_int_t ret; 2646 nxt_socket_t s; 2647 nxt_socket_rpc_t *rpc; 2648 2649 rpc = data; 2650 2651 s = msg->fd[0]; 2652 2653 ret = nxt_socket_nonblocking(task, s); 2654 if (nxt_slow_path(ret != NXT_OK)) { 2655 goto fail; 2656 } 2657 2658 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2659 2660 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2661 if (nxt_slow_path(ret != NXT_OK)) { 2662 goto fail; 2663 } 2664 2665 rpc->socket_conf->listen->socket = s; 2666 2667 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2668 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2669 2670 return; 2671 2672 fail: 2673 2674 nxt_socket_close(task, s); 2675 2676 nxt_router_conf_error(task, rpc->temp_conf); 2677 } 2678 2679 2680 static void 2681 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2682 void *data) 2683 { 2684 nxt_socket_rpc_t *rpc; 2685 nxt_router_temp_conf_t *tmcf; 2686 2687 rpc = data; 2688 tmcf = rpc->temp_conf; 2689 2690 #if 0 2691 u_char *p; 2692 size_t size; 2693 uint8_t error; 2694 nxt_buf_t *in, *out; 2695 nxt_sockaddr_t *sa; 2696 2697 static nxt_str_t socket_errors[] = { 2698 nxt_string("ListenerSystem"), 2699 nxt_string("ListenerNoIPv6"), 2700 nxt_string("ListenerPort"), 2701 nxt_string("ListenerInUse"), 2702 nxt_string("ListenerNoAddress"), 2703 nxt_string("ListenerNoAccess"), 2704 nxt_string("ListenerPath"), 2705 }; 2706 2707 sa = rpc->socket_conf->listen->sockaddr; 2708 2709 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2710 2711 if (nxt_slow_path(in == NULL)) { 2712 return; 2713 } 2714 2715 p = in->mem.pos; 2716 2717 error = *p++; 2718 2719 size = nxt_length("listen socket error: ") 2720 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2721 + sa->length + socket_errors[error].length + (in->mem.free - p); 2722 2723 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2724 if (nxt_slow_path(out == NULL)) { 2725 return; 2726 } 2727 2728 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2729 "listen socket error: " 2730 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2731 (size_t) sa->length, nxt_sockaddr_start(sa), 2732 &socket_errors[error], in->mem.free - p, p); 2733 2734 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2735 #endif 2736 2737 nxt_router_conf_error(task, tmcf); 2738 } 2739 2740 2741 #if (NXT_TLS) 2742 2743 static void 2744 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2745 void *data) 2746 { 2747 nxt_mp_t *mp; 2748 nxt_int_t ret; 2749 nxt_tls_conf_t *tlscf; 2750 nxt_router_tlssock_t *tls; 2751 nxt_tls_bundle_conf_t *bundle; 2752 nxt_router_temp_conf_t *tmcf; 2753 2754 nxt_debug(task, "tls rpc handler"); 2755 2756 tls = data; 2757 tmcf = tls->temp_conf; 2758 2759 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2760 goto fail; 2761 } 2762 2763 mp = tmcf->router_conf->mem_pool; 2764 2765 if (tls->socket_conf->tls == NULL){ 2766 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2767 if (nxt_slow_path(tlscf == NULL)) { 2768 goto fail; 2769 } 2770 2771 tlscf->no_wait_shutdown = 1; 2772 tls->socket_conf->tls = tlscf; 2773 2774 } else { 2775 tlscf = tls->socket_conf->tls; 2776 } 2777 2778 tls->tls_init->conf = tlscf; 2779 2780 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); 2781 if (nxt_slow_path(bundle == NULL)) { 2782 goto fail; 2783 } 2784 2785 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) { 2786 goto fail; 2787 } 2788 2789 bundle->chain_file = msg->fd[0]; 2790 bundle->next = tlscf->bundle; 2791 tlscf->bundle = bundle; 2792 2793 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init, 2794 tls->last); 2795 if (nxt_slow_path(ret != NXT_OK)) { 2796 goto fail; 2797 } 2798 2799 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2800 nxt_router_conf_apply, task, tmcf, NULL); 2801 return; 2802 2803 fail: 2804 2805 nxt_router_conf_error(task, tmcf); 2806 } 2807 2808 #endif 2809 2810 2811 static void 2812 nxt_router_app_rpc_create(nxt_task_t *task, 2813 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2814 { 2815 size_t size; 2816 uint32_t stream; 2817 nxt_fd_t port_fd, queue_fd; 2818 nxt_int_t ret; 2819 nxt_buf_t *b; 2820 nxt_port_t *router_port, *dport; 2821 nxt_runtime_t *rt; 2822 nxt_app_rpc_t *rpc; 2823 2824 rt = task->thread->runtime; 2825 2826 dport = app->proto_port; 2827 2828 if (dport == NULL) { 2829 nxt_debug(task, "app '%V' prototype prefork", &app->name); 2830 2831 size = app->name.length + 1 + app->conf.length; 2832 2833 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2834 if (nxt_slow_path(b == NULL)) { 2835 goto fail; 2836 } 2837 2838 b->completion_handler = nxt_buf_dummy_completion; 2839 2840 nxt_buf_cpystr(b, &app->name); 2841 *b->mem.free++ = '\0'; 2842 nxt_buf_cpystr(b, &app->conf); 2843 2844 dport = rt->port_by_type[NXT_PROCESS_MAIN]; 2845 2846 port_fd = app->shared_port->pair[0]; 2847 queue_fd = app->shared_port->queue_fd; 2848 2849 } else { 2850 nxt_debug(task, "app '%V' prefork", &app->name); 2851 2852 b = NULL; 2853 port_fd = -1; 2854 queue_fd = -1; 2855 } 2856 2857 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2858 2859 rpc = nxt_port_rpc_register_handler_ex(task, router_port, 2860 nxt_router_app_prefork_ready, 2861 nxt_router_app_prefork_error, 2862 sizeof(nxt_app_rpc_t)); 2863 if (nxt_slow_path(rpc == NULL)) { 2864 goto fail; 2865 } 2866 2867 rpc->app = app; 2868 rpc->temp_conf = tmcf; 2869 rpc->proto = (b != NULL); 2870 2871 stream = nxt_port_rpc_ex_stream(rpc); 2872 2873 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, 2874 port_fd, queue_fd, stream, router_port->id, b); 2875 if (nxt_slow_path(ret != NXT_OK)) { 2876 nxt_port_rpc_cancel(task, router_port, stream); 2877 goto fail; 2878 } 2879 2880 if (b == NULL) { 2881 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid); 2882 2883 app->pending_processes++; 2884 } 2885 2886 return; 2887 2888 fail: 2889 2890 nxt_router_conf_error(task, tmcf); 2891 } 2892 2893 2894 static void 2895 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2896 void *data) 2897 { 2898 nxt_app_t *app; 2899 nxt_port_t *port; 2900 nxt_app_rpc_t *rpc; 2901 nxt_event_engine_t *engine; 2902 2903 rpc = data; 2904 app = rpc->app; 2905 2906 port = msg->u.new_port; 2907 2908 nxt_assert(port != NULL); 2909 nxt_assert(port->id == 0); 2910 2911 if (rpc->proto) { 2912 nxt_assert(app->proto_port == NULL); 2913 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); 2914 2915 nxt_port_inc_use(port); 2916 2917 app->proto_port = port; 2918 port->app = app; 2919 2920 nxt_router_app_rpc_create(task, rpc->temp_conf, app); 2921 2922 return; 2923 } 2924 2925 nxt_assert(port->type == NXT_PROCESS_APP); 2926 2927 port->app = app; 2928 port->main_app_port = port; 2929 2930 app->pending_processes--; 2931 app->processes++; 2932 app->idle_processes++; 2933 2934 engine = task->thread->engine; 2935 2936 nxt_queue_insert_tail(&app->ports, &port->app_link); 2937 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2938 2939 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2940 &app->name, port->pid, port->id); 2941 2942 nxt_port_hash_add(&app->port_hash, port); 2943 app->port_hash_count++; 2944 2945 port->idle_start = 0; 2946 2947 nxt_port_inc_use(port); 2948 2949 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 2950 2951 nxt_work_queue_add(&engine->fast_work_queue, 2952 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2953 } 2954 2955 2956 static void 2957 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2958 void *data) 2959 { 2960 nxt_app_t *app; 2961 nxt_app_rpc_t *rpc; 2962 nxt_router_temp_conf_t *tmcf; 2963 2964 rpc = data; 2965 app = rpc->app; 2966 tmcf = rpc->temp_conf; 2967 2968 if (rpc->proto) { 2969 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"", 2970 &app->name); 2971 2972 } else { 2973 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2974 &app->name); 2975 2976 app->pending_processes--; 2977 } 2978 2979 nxt_router_conf_error(task, tmcf); 2980 } 2981 2982 2983 static nxt_int_t 2984 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2985 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2986 { 2987 nxt_int_t ret; 2988 nxt_uint_t n, threads; 2989 nxt_queue_link_t *qlk; 2990 nxt_router_engine_conf_t *recf; 2991 2992 threads = tmcf->router_conf->threads; 2993 2994 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2995 sizeof(nxt_router_engine_conf_t)); 2996 if (nxt_slow_path(tmcf->engines == NULL)) { 2997 return NXT_ERROR; 2998 } 2999 3000 n = 0; 3001 3002 for (qlk = nxt_queue_first(&router->engines); 3003 qlk != nxt_queue_tail(&router->engines); 3004 qlk = nxt_queue_next(qlk)) 3005 { 3006 recf = nxt_array_zero_add(tmcf->engines); 3007 if (nxt_slow_path(recf == NULL)) { 3008 return NXT_ERROR; 3009 } 3010 3011 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 3012 3013 if (n < threads) { 3014 recf->action = NXT_ROUTER_ENGINE_KEEP; 3015 ret = nxt_router_engine_conf_update(tmcf, recf); 3016 3017 } else { 3018 recf->action = NXT_ROUTER_ENGINE_DELETE; 3019 ret = nxt_router_engine_conf_delete(tmcf, recf); 3020 } 3021 3022 if (nxt_slow_path(ret != NXT_OK)) { 3023 return ret; 3024 } 3025 3026 n++; 3027 } 3028 3029 tmcf->new_threads = n; 3030 3031 while (n < threads) { 3032 recf = nxt_array_zero_add(tmcf->engines); 3033 if (nxt_slow_path(recf == NULL)) { 3034 return NXT_ERROR; 3035 } 3036 3037 recf->action = NXT_ROUTER_ENGINE_ADD; 3038 3039 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 3040 if (nxt_slow_path(recf->engine == NULL)) { 3041 return NXT_ERROR; 3042 } 3043 3044 ret = nxt_router_engine_conf_create(tmcf, recf); 3045 if (nxt_slow_path(ret != NXT_OK)) { 3046 return ret; 3047 } 3048 3049 n++; 3050 } 3051 3052 return NXT_OK; 3053 } 3054 3055 3056 static nxt_int_t 3057 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 3058 nxt_router_engine_conf_t *recf) 3059 { 3060 nxt_int_t ret; 3061 3062 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 3063 nxt_router_listen_socket_create); 3064 if (nxt_slow_path(ret != NXT_OK)) { 3065 return ret; 3066 } 3067 3068 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 3069 nxt_router_listen_socket_create); 3070 if (nxt_slow_path(ret != NXT_OK)) { 3071 return ret; 3072 } 3073 3074 return ret; 3075 } 3076 3077 3078 static nxt_int_t 3079 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 3080 nxt_router_engine_conf_t *recf) 3081 { 3082 nxt_int_t ret; 3083 3084 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 3085 nxt_router_listen_socket_create); 3086 if (nxt_slow_path(ret != NXT_OK)) { 3087 return ret; 3088 } 3089 3090 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 3091 nxt_router_listen_socket_update); 3092 if (nxt_slow_path(ret != NXT_OK)) { 3093 return ret; 3094 } 3095 3096 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3097 if (nxt_slow_path(ret != NXT_OK)) { 3098 return ret; 3099 } 3100 3101 return ret; 3102 } 3103 3104 3105 static nxt_int_t 3106 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 3107 nxt_router_engine_conf_t *recf) 3108 { 3109 nxt_int_t ret; 3110 3111 ret = nxt_router_engine_quit(tmcf, recf); 3112 if (nxt_slow_path(ret != NXT_OK)) { 3113 return ret; 3114 } 3115 3116 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 3117 if (nxt_slow_path(ret != NXT_OK)) { 3118 return ret; 3119 } 3120 3121 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3122 } 3123 3124 3125 static nxt_int_t 3126 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 3127 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 3128 nxt_work_handler_t handler) 3129 { 3130 nxt_int_t ret; 3131 nxt_joint_job_t *job; 3132 nxt_queue_link_t *qlk; 3133 nxt_socket_conf_t *skcf; 3134 nxt_socket_conf_joint_t *joint; 3135 3136 for (qlk = nxt_queue_first(sockets); 3137 qlk != nxt_queue_tail(sockets); 3138 qlk = nxt_queue_next(qlk)) 3139 { 3140 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3141 if (nxt_slow_path(job == NULL)) { 3142 return NXT_ERROR; 3143 } 3144 3145 job->work.next = recf->jobs; 3146 recf->jobs = &job->work; 3147 3148 job->task = tmcf->engine->task; 3149 job->work.handler = handler; 3150 job->work.task = &job->task; 3151 job->work.obj = job; 3152 job->tmcf = tmcf; 3153 3154 tmcf->count++; 3155 3156 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 3157 sizeof(nxt_socket_conf_joint_t)); 3158 if (nxt_slow_path(joint == NULL)) { 3159 return NXT_ERROR; 3160 } 3161 3162 job->work.data = joint; 3163 3164 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 3165 if (nxt_slow_path(ret != NXT_OK)) { 3166 return ret; 3167 } 3168 3169 joint->count = 1; 3170 3171 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3172 skcf->count++; 3173 joint->socket_conf = skcf; 3174 3175 joint->engine = recf->engine; 3176 } 3177 3178 return NXT_OK; 3179 } 3180 3181 3182 static nxt_int_t 3183 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 3184 nxt_router_engine_conf_t *recf) 3185 { 3186 nxt_joint_job_t *job; 3187 3188 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3189 if (nxt_slow_path(job == NULL)) { 3190 return NXT_ERROR; 3191 } 3192 3193 job->work.next = recf->jobs; 3194 recf->jobs = &job->work; 3195 3196 job->task = tmcf->engine->task; 3197 job->work.handler = nxt_router_worker_thread_quit; 3198 job->work.task = &job->task; 3199 job->work.obj = NULL; 3200 job->work.data = NULL; 3201 job->tmcf = NULL; 3202 3203 return NXT_OK; 3204 } 3205 3206 3207 static nxt_int_t 3208 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 3209 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 3210 { 3211 nxt_joint_job_t *job; 3212 nxt_queue_link_t *qlk; 3213 3214 for (qlk = nxt_queue_first(sockets); 3215 qlk != nxt_queue_tail(sockets); 3216 qlk = nxt_queue_next(qlk)) 3217 { 3218 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3219 if (nxt_slow_path(job == NULL)) { 3220 return NXT_ERROR; 3221 } 3222 3223 job->work.next = recf->jobs; 3224 recf->jobs = &job->work; 3225 3226 job->task = tmcf->engine->task; 3227 job->work.handler = nxt_router_listen_socket_delete; 3228 job->work.task = &job->task; 3229 job->work.obj = job; 3230 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3231 job->tmcf = tmcf; 3232 3233 tmcf->count++; 3234 } 3235 3236 return NXT_OK; 3237 } 3238 3239 3240 static nxt_int_t 3241 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 3242 nxt_router_temp_conf_t *tmcf) 3243 { 3244 nxt_int_t ret; 3245 nxt_uint_t i, threads; 3246 nxt_router_engine_conf_t *recf; 3247 3248 recf = tmcf->engines->elts; 3249 threads = tmcf->router_conf->threads; 3250 3251 for (i = tmcf->new_threads; i < threads; i++) { 3252 ret = nxt_router_thread_create(task, rt, recf[i].engine); 3253 if (nxt_slow_path(ret != NXT_OK)) { 3254 return ret; 3255 } 3256 } 3257 3258 return NXT_OK; 3259 } 3260 3261 3262 static nxt_int_t 3263 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 3264 nxt_event_engine_t *engine) 3265 { 3266 nxt_int_t ret; 3267 nxt_thread_link_t *link; 3268 nxt_thread_handle_t handle; 3269 3270 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 3271 3272 if (nxt_slow_path(link == NULL)) { 3273 return NXT_ERROR; 3274 } 3275 3276 link->start = nxt_router_thread_start; 3277 link->engine = engine; 3278 link->work.handler = nxt_router_thread_exit_handler; 3279 link->work.task = task; 3280 link->work.data = link; 3281 3282 nxt_queue_insert_tail(&rt->engines, &engine->link); 3283 3284 ret = nxt_thread_create(&handle, link); 3285 3286 if (nxt_slow_path(ret != NXT_OK)) { 3287 nxt_queue_remove(&engine->link); 3288 } 3289 3290 return ret; 3291 } 3292 3293 3294 static void 3295 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 3296 nxt_router_temp_conf_t *tmcf) 3297 { 3298 nxt_app_t *app; 3299 3300 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 3301 3302 nxt_router_app_unlink(task, app); 3303 3304 } nxt_queue_loop; 3305 3306 nxt_queue_add(&router->apps, &tmcf->previous); 3307 nxt_queue_add(&router->apps, &tmcf->apps); 3308 } 3309 3310 3311 static void 3312 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 3313 { 3314 nxt_uint_t n; 3315 nxt_event_engine_t *engine; 3316 nxt_router_engine_conf_t *recf; 3317 3318 recf = tmcf->engines->elts; 3319 3320 for (n = tmcf->engines->nelts; n != 0; n--) { 3321 engine = recf->engine; 3322 3323 switch (recf->action) { 3324 3325 case NXT_ROUTER_ENGINE_KEEP: 3326 break; 3327 3328 case NXT_ROUTER_ENGINE_ADD: 3329 nxt_queue_insert_tail(&router->engines, &engine->link0); 3330 break; 3331 3332 case NXT_ROUTER_ENGINE_DELETE: 3333 nxt_queue_remove(&engine->link0); 3334 break; 3335 } 3336 3337 nxt_router_engine_post(engine, recf->jobs); 3338 3339 recf++; 3340 } 3341 } 3342 3343 3344 static void 3345 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 3346 { 3347 nxt_work_t *work, *next; 3348 3349 for (work = jobs; work != NULL; work = next) { 3350 next = work->next; 3351 work->next = NULL; 3352 3353 nxt_event_engine_post(engine, work); 3354 } 3355 } 3356 3357 3358 static nxt_port_handlers_t nxt_router_app_port_handlers = { 3359 .rpc_error = nxt_port_rpc_handler, 3360 .mmap = nxt_port_mmap_handler, 3361 .data = nxt_port_rpc_handler, 3362 .oosm = nxt_router_oosm_handler, 3363 .req_headers_ack = nxt_port_rpc_handler, 3364 }; 3365 3366 3367 static void 3368 nxt_router_thread_start(void *data) 3369 { 3370 nxt_int_t ret; 3371 nxt_port_t *port; 3372 nxt_task_t *task; 3373 nxt_work_t *work; 3374 nxt_thread_t *thread; 3375 nxt_thread_link_t *link; 3376 nxt_event_engine_t *engine; 3377 3378 link = data; 3379 engine = link->engine; 3380 task = &engine->task; 3381 3382 thread = nxt_thread(); 3383 3384 nxt_event_engine_thread_adopt(engine); 3385 3386 /* STUB */ 3387 thread->runtime = engine->task.thread->runtime; 3388 3389 engine->task.thread = thread; 3390 engine->task.log = thread->log; 3391 thread->engine = engine; 3392 thread->task = &engine->task; 3393 #if 0 3394 thread->fiber = &engine->fibers->fiber; 3395 #endif 3396 3397 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 3398 if (nxt_slow_path(engine->mem_pool == NULL)) { 3399 return; 3400 } 3401 3402 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 3403 NXT_PROCESS_ROUTER); 3404 if (nxt_slow_path(port == NULL)) { 3405 return; 3406 } 3407 3408 ret = nxt_port_socket_init(task, port, 0); 3409 if (nxt_slow_path(ret != NXT_OK)) { 3410 nxt_port_use(task, port, -1); 3411 return; 3412 } 3413 3414 ret = nxt_router_port_queue_init(task, port); 3415 if (nxt_slow_path(ret != NXT_OK)) { 3416 nxt_port_use(task, port, -1); 3417 return; 3418 } 3419 3420 engine->port = port; 3421 3422 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 3423 3424 work = nxt_zalloc(sizeof(nxt_work_t)); 3425 if (nxt_slow_path(work == NULL)) { 3426 return; 3427 } 3428 3429 work->handler = nxt_router_rt_add_port; 3430 work->task = link->work.task; 3431 work->obj = work; 3432 work->data = port; 3433 3434 nxt_event_engine_post(link->work.task->thread->engine, work); 3435 3436 nxt_event_engine_start(engine); 3437 } 3438 3439 3440 static void 3441 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) 3442 { 3443 nxt_int_t res; 3444 nxt_port_t *port; 3445 nxt_runtime_t *rt; 3446 3447 rt = task->thread->runtime; 3448 port = data; 3449 3450 nxt_free(obj); 3451 3452 res = nxt_port_hash_add(&rt->ports, port); 3453 3454 if (nxt_fast_path(res == NXT_OK)) { 3455 nxt_port_use(task, port, 1); 3456 } 3457 } 3458 3459 3460 static void 3461 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 3462 { 3463 nxt_joint_job_t *job; 3464 nxt_socket_conf_t *skcf; 3465 nxt_listen_event_t *lev; 3466 nxt_listen_socket_t *ls; 3467 nxt_thread_spinlock_t *lock; 3468 nxt_socket_conf_joint_t *joint; 3469 3470 job = obj; 3471 joint = data; 3472 3473 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 3474 3475 skcf = joint->socket_conf; 3476 ls = skcf->listen; 3477 3478 lev = nxt_listen_event(task, ls); 3479 if (nxt_slow_path(lev == NULL)) { 3480 nxt_router_listen_socket_release(task, skcf); 3481 return; 3482 } 3483 3484 lev->socket.data = joint; 3485 3486 lock = &skcf->router_conf->router->lock; 3487 3488 nxt_thread_spin_lock(lock); 3489 ls->count++; 3490 nxt_thread_spin_unlock(lock); 3491 3492 job->work.next = NULL; 3493 job->work.handler = nxt_router_conf_wait; 3494 3495 nxt_event_engine_post(job->tmcf->engine, &job->work); 3496 } 3497 3498 3499 nxt_inline nxt_listen_event_t * 3500 nxt_router_listen_event(nxt_queue_t *listen_connections, 3501 nxt_socket_conf_t *skcf) 3502 { 3503 nxt_socket_t fd; 3504 nxt_queue_link_t *qlk; 3505 nxt_listen_event_t *lev; 3506 3507 fd = skcf->listen->socket; 3508 3509 for (qlk = nxt_queue_first(listen_connections); 3510 qlk != nxt_queue_tail(listen_connections); 3511 qlk = nxt_queue_next(qlk)) 3512 { 3513 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 3514 3515 if (fd == lev->socket.fd) { 3516 return lev; 3517 } 3518 } 3519 3520 return NULL; 3521 } 3522 3523 3524 static void 3525 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 3526 { 3527 nxt_joint_job_t *job; 3528 nxt_event_engine_t *engine; 3529 nxt_listen_event_t *lev; 3530 nxt_socket_conf_joint_t *joint, *old; 3531 3532 job = obj; 3533 joint = data; 3534 3535 engine = task->thread->engine; 3536 3537 nxt_queue_insert_tail(&engine->joints, &joint->link); 3538 3539 lev = nxt_router_listen_event(&engine->listen_connections, 3540 joint->socket_conf); 3541 3542 old = lev->socket.data; 3543 lev->socket.data = joint; 3544 lev->listen = joint->socket_conf->listen; 3545 3546 job->work.next = NULL; 3547 job->work.handler = nxt_router_conf_wait; 3548 3549 nxt_event_engine_post(job->tmcf->engine, &job->work); 3550 3551 /* 3552 * The task is allocated from configuration temporary 3553 * memory pool so it can be freed after engine post operation. 3554 */ 3555 3556 nxt_router_conf_release(&engine->task, old); 3557 } 3558 3559 3560 static void 3561 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 3562 { 3563 nxt_socket_conf_t *skcf; 3564 nxt_listen_event_t *lev; 3565 nxt_event_engine_t *engine; 3566 nxt_socket_conf_joint_t *joint; 3567 3568 skcf = data; 3569 3570 engine = task->thread->engine; 3571 3572 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 3573 3574 nxt_fd_event_delete(engine, &lev->socket); 3575 3576 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 3577 lev->socket.fd); 3578 3579 joint = lev->socket.data; 3580 joint->close_job = obj; 3581 3582 lev->timer.handler = nxt_router_listen_socket_close; 3583 lev->timer.work_queue = &engine->fast_work_queue; 3584 3585 nxt_timer_add(engine, &lev->timer, 0); 3586 } 3587 3588 3589 static void 3590 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 3591 { 3592 nxt_event_engine_t *engine; 3593 3594 nxt_debug(task, "router worker thread quit"); 3595 3596 engine = task->thread->engine; 3597 3598 engine->shutdown = 1; 3599 3600 if (nxt_queue_is_empty(&engine->joints)) { 3601 nxt_thread_exit(task->thread); 3602 } 3603 } 3604 3605 3606 static void 3607 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3608 { 3609 nxt_timer_t *timer; 3610 nxt_joint_job_t *job; 3611 nxt_listen_event_t *lev; 3612 nxt_socket_conf_joint_t *joint; 3613 3614 timer = obj; 3615 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3616 3617 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3618 lev->socket.fd); 3619 3620 nxt_queue_remove(&lev->link); 3621 3622 joint = lev->socket.data; 3623 lev->socket.data = NULL; 3624 3625 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3626 task = &task->thread->engine->task; 3627 3628 nxt_router_listen_socket_release(task, joint->socket_conf); 3629 3630 job = joint->close_job; 3631 job->work.next = NULL; 3632 job->work.handler = nxt_router_conf_wait; 3633 3634 nxt_event_engine_post(job->tmcf->engine, &job->work); 3635 3636 nxt_router_listen_event_release(task, lev, joint); 3637 } 3638 3639 3640 static void 3641 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3642 { 3643 nxt_listen_socket_t *ls; 3644 nxt_thread_spinlock_t *lock; 3645 3646 ls = skcf->listen; 3647 lock = &skcf->router_conf->router->lock; 3648 3649 nxt_thread_spin_lock(lock); 3650 3651 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3652 task->thread->engine, ls->count); 3653 3654 if (--ls->count != 0) { 3655 ls = NULL; 3656 } 3657 3658 nxt_thread_spin_unlock(lock); 3659 3660 if (ls != NULL) { 3661 nxt_socket_close(task, ls->socket); 3662 nxt_free(ls); 3663 } 3664 } 3665 3666 3667 void 3668 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3669 nxt_socket_conf_joint_t *joint) 3670 { 3671 nxt_event_engine_t *engine; 3672 3673 nxt_debug(task, "listen event count: %D", lev->count); 3674 3675 engine = task->thread->engine; 3676 3677 if (--lev->count == 0) { 3678 if (lev->next != NULL) { 3679 nxt_sockaddr_cache_free(engine, lev->next); 3680 3681 nxt_conn_free(task, lev->next); 3682 } 3683 3684 nxt_free(lev); 3685 } 3686 3687 if (joint != NULL) { 3688 nxt_router_conf_release(task, joint); 3689 } 3690 3691 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3692 nxt_thread_exit(task->thread); 3693 } 3694 } 3695 3696 3697 void 3698 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3699 { 3700 nxt_socket_conf_t *skcf; 3701 nxt_router_conf_t *rtcf; 3702 nxt_thread_spinlock_t *lock; 3703 3704 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3705 3706 if (--joint->count != 0) { 3707 return; 3708 } 3709 3710 nxt_queue_remove(&joint->link); 3711 3712 /* 3713 * The joint content can not be safely used after the critical 3714 * section protected by the spinlock because its memory pool may 3715 * be already destroyed by another thread. 3716 */ 3717 skcf = joint->socket_conf; 3718 rtcf = skcf->router_conf; 3719 lock = &rtcf->router->lock; 3720 3721 nxt_thread_spin_lock(lock); 3722 3723 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3724 rtcf, rtcf->count); 3725 3726 if (--skcf->count != 0) { 3727 skcf = NULL; 3728 rtcf = NULL; 3729 3730 } else { 3731 nxt_queue_remove(&skcf->link); 3732 3733 if (--rtcf->count != 0) { 3734 rtcf = NULL; 3735 } 3736 } 3737 3738 nxt_thread_spin_unlock(lock); 3739 3740 #if (NXT_TLS) 3741 if (skcf != NULL && skcf->tls != NULL) { 3742 task->thread->runtime->tls->server_free(task, skcf->tls); 3743 } 3744 #endif 3745 3746 /* TODO remove engine->port */ 3747 3748 if (rtcf != NULL) { 3749 nxt_debug(task, "old router conf is destroyed"); 3750 3751 nxt_router_apps_hash_use(task, rtcf, -1); 3752 3753 nxt_router_access_log_release(task, lock, rtcf->access_log); 3754 3755 nxt_mp_thread_adopt(rtcf->mem_pool); 3756 3757 nxt_mp_destroy(rtcf->mem_pool); 3758 } 3759 } 3760 3761 3762 static void 3763 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3764 nxt_router_access_log_t *access_log) 3765 { 3766 size_t size; 3767 u_char *buf, *p; 3768 nxt_off_t bytes; 3769 3770 static nxt_time_string_t date_cache = { 3771 (nxt_atomic_uint_t) -1, 3772 nxt_router_access_log_date, 3773 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3774 nxt_length("31/Dec/1986:19:40:00 +0300"), 3775 NXT_THREAD_TIME_LOCAL, 3776 NXT_THREAD_TIME_SEC, 3777 }; 3778 3779 size = r->remote->address_length 3780 + 6 /* ' - - [' */ 3781 + date_cache.size 3782 + 3 /* '] "' */ 3783 + r->method->length 3784 + 1 /* space */ 3785 + r->target.length 3786 + 1 /* space */ 3787 + r->version.length 3788 + 2 /* '" ' */ 3789 + 3 /* status */ 3790 + 1 /* space */ 3791 + NXT_OFF_T_LEN 3792 + 2 /* ' "' */ 3793 + (r->referer != NULL ? r->referer->value_length : 1) 3794 + 3 /* '" "' */ 3795 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3796 + 2 /* '"\n' */ 3797 ; 3798 3799 buf = nxt_mp_nget(r->mem_pool, size); 3800 if (nxt_slow_path(buf == NULL)) { 3801 return; 3802 } 3803 3804 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3805 r->remote->address_length); 3806 3807 p = nxt_cpymem(p, " - - [", 6); 3808 3809 p = nxt_thread_time_string(task->thread, &date_cache, p); 3810 3811 p = nxt_cpymem(p, "] \"", 3); 3812 3813 if (r->method->length != 0) { 3814 p = nxt_cpymem(p, r->method->start, r->method->length); 3815 3816 if (r->target.length != 0) { 3817 *p++ = ' '; 3818 p = nxt_cpymem(p, r->target.start, r->target.length); 3819 3820 if (r->version.length != 0) { 3821 *p++ = ' '; 3822 p = nxt_cpymem(p, r->version.start, r->version.length); 3823 } 3824 } 3825 3826 } else { 3827 *p++ = '-'; 3828 } 3829 3830 p = nxt_cpymem(p, "\" ", 2); 3831 3832 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3833 3834 *p++ = ' '; 3835 3836 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3837 3838 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3839 3840 p = nxt_cpymem(p, " \"", 2); 3841 3842 if (r->referer != NULL) { 3843 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3844 3845 } else { 3846 *p++ = '-'; 3847 } 3848 3849 p = nxt_cpymem(p, "\" \"", 3); 3850 3851 if (r->user_agent != NULL) { 3852 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3853 3854 } else { 3855 *p++ = '-'; 3856 } 3857 3858 p = nxt_cpymem(p, "\"\n", 2); 3859 3860 nxt_fd_write(access_log->fd, buf, p - buf); 3861 } 3862 3863 3864 static u_char * 3865 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3866 size_t size, const char *format) 3867 { 3868 u_char sign; 3869 time_t gmtoff; 3870 3871 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3872 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3873 3874 gmtoff = nxt_timezone(tm) / 60; 3875 3876 if (gmtoff < 0) { 3877 gmtoff = -gmtoff; 3878 sign = '-'; 3879 3880 } else { 3881 sign = '+'; 3882 } 3883 3884 return nxt_sprintf(buf, buf + size, format, 3885 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3886 tm->tm_hour, tm->tm_min, tm->tm_sec, 3887 sign, gmtoff / 60, gmtoff % 60); 3888 } 3889 3890 3891 static void 3892 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3893 { 3894 uint32_t stream; 3895 nxt_int_t ret; 3896 nxt_buf_t *b; 3897 nxt_port_t *main_port, *router_port; 3898 nxt_runtime_t *rt; 3899 nxt_router_access_log_t *access_log; 3900 3901 access_log = tmcf->router_conf->access_log; 3902 3903 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3904 if (nxt_slow_path(b == NULL)) { 3905 goto fail; 3906 } 3907 3908 b->completion_handler = nxt_buf_dummy_completion; 3909 3910 nxt_buf_cpystr(b, &access_log->path); 3911 *b->mem.free++ = '\0'; 3912 3913 rt = task->thread->runtime; 3914 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3915 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3916 3917 stream = nxt_port_rpc_register_handler(task, router_port, 3918 nxt_router_access_log_ready, 3919 nxt_router_access_log_error, 3920 -1, tmcf); 3921 if (nxt_slow_path(stream == 0)) { 3922 goto fail; 3923 } 3924 3925 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3926 stream, router_port->id, b); 3927 3928 if (nxt_slow_path(ret != NXT_OK)) { 3929 nxt_port_rpc_cancel(task, router_port, stream); 3930 goto fail; 3931 } 3932 3933 return; 3934 3935 fail: 3936 3937 nxt_router_conf_error(task, tmcf); 3938 } 3939 3940 3941 static void 3942 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3943 void *data) 3944 { 3945 nxt_router_temp_conf_t *tmcf; 3946 nxt_router_access_log_t *access_log; 3947 3948 tmcf = data; 3949 3950 access_log = tmcf->router_conf->access_log; 3951 3952 access_log->fd = msg->fd[0]; 3953 3954 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3955 nxt_router_conf_apply, task, tmcf, NULL); 3956 } 3957 3958 3959 static void 3960 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3961 void *data) 3962 { 3963 nxt_router_temp_conf_t *tmcf; 3964 3965 tmcf = data; 3966 3967 nxt_router_conf_error(task, tmcf); 3968 } 3969 3970 3971 static void 3972 nxt_router_access_log_use(nxt_thread_spinlock_t *lock, 3973 nxt_router_access_log_t *access_log) 3974 { 3975 if (access_log == NULL) { 3976 return; 3977 } 3978 3979 nxt_thread_spin_lock(lock); 3980 3981 access_log->count++; 3982 3983 nxt_thread_spin_unlock(lock); 3984 } 3985 3986 3987 static void 3988 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3989 nxt_router_access_log_t *access_log) 3990 { 3991 if (access_log == NULL) { 3992 return; 3993 } 3994 3995 nxt_thread_spin_lock(lock); 3996 3997 if (--access_log->count != 0) { 3998 access_log = NULL; 3999 } 4000 4001 nxt_thread_spin_unlock(lock); 4002 4003 if (access_log != NULL) { 4004 4005 if (access_log->fd != -1) { 4006 nxt_fd_close(access_log->fd); 4007 } 4008 4009 nxt_free(access_log); 4010 } 4011 } 4012 4013 4014 typedef struct { 4015 nxt_mp_t *mem_pool; 4016 nxt_router_access_log_t *access_log; 4017 } nxt_router_access_log_reopen_t; 4018 4019 4020 static void 4021 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 4022 { 4023 nxt_mp_t *mp; 4024 uint32_t stream; 4025 nxt_int_t ret; 4026 nxt_buf_t *b; 4027 nxt_port_t *main_port, *router_port; 4028 nxt_runtime_t *rt; 4029 nxt_router_access_log_t *access_log; 4030 nxt_router_access_log_reopen_t *reopen; 4031 4032 access_log = nxt_router->access_log; 4033 4034 if (access_log == NULL) { 4035 return; 4036 } 4037 4038 mp = nxt_mp_create(1024, 128, 256, 32); 4039 if (nxt_slow_path(mp == NULL)) { 4040 return; 4041 } 4042 4043 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 4044 if (nxt_slow_path(reopen == NULL)) { 4045 goto fail; 4046 } 4047 4048 reopen->mem_pool = mp; 4049 reopen->access_log = access_log; 4050 4051 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 4052 if (nxt_slow_path(b == NULL)) { 4053 goto fail; 4054 } 4055 4056 b->completion_handler = nxt_router_access_log_reopen_completion; 4057 4058 nxt_buf_cpystr(b, &access_log->path); 4059 *b->mem.free++ = '\0'; 4060 4061 rt = task->thread->runtime; 4062 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 4063 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 4064 4065 stream = nxt_port_rpc_register_handler(task, router_port, 4066 nxt_router_access_log_reopen_ready, 4067 nxt_router_access_log_reopen_error, 4068 -1, reopen); 4069 if (nxt_slow_path(stream == 0)) { 4070 goto fail; 4071 } 4072 4073 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 4074 stream, router_port->id, b); 4075 4076 if (nxt_slow_path(ret != NXT_OK)) { 4077 nxt_port_rpc_cancel(task, router_port, stream); 4078 goto fail; 4079 } 4080 4081 nxt_mp_retain(mp); 4082 4083 return; 4084 4085 fail: 4086 4087 nxt_mp_destroy(mp); 4088 } 4089 4090 4091 static void 4092 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 4093 { 4094 nxt_mp_t *mp; 4095 nxt_buf_t *b; 4096 4097 b = obj; 4098 mp = b->data; 4099 4100 nxt_mp_release(mp); 4101 } 4102 4103 4104 static void 4105 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4106 void *data) 4107 { 4108 nxt_router_access_log_t *access_log; 4109 nxt_router_access_log_reopen_t *reopen; 4110 4111 reopen = data; 4112 4113 access_log = reopen->access_log; 4114 4115 if (access_log == nxt_router->access_log) { 4116 4117 if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { 4118 nxt_alert(task, "dup2(%FD, %FD) failed %E", 4119 msg->fd[0], access_log->fd, nxt_errno); 4120 } 4121 } 4122 4123 nxt_fd_close(msg->fd[0]); 4124 nxt_mp_release(reopen->mem_pool); 4125 } 4126 4127 4128 static void 4129 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4130 void *data) 4131 { 4132 nxt_router_access_log_reopen_t *reopen; 4133 4134 reopen = data; 4135 4136 nxt_mp_release(reopen->mem_pool); 4137 } 4138 4139 4140 static void 4141 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 4142 { 4143 nxt_port_t *port; 4144 nxt_thread_link_t *link; 4145 nxt_event_engine_t *engine; 4146 nxt_thread_handle_t handle; 4147 4148 handle = (nxt_thread_handle_t) (uintptr_t) obj; 4149 link = data; 4150 4151 nxt_thread_wait(handle); 4152 4153 engine = link->engine; 4154 4155 nxt_queue_remove(&engine->link); 4156 4157 port = engine->port; 4158 4159 // TODO notify all apps 4160 4161 port->engine = task->thread->engine; 4162 nxt_mp_thread_adopt(port->mem_pool); 4163 nxt_port_use(task, port, -1); 4164 4165 nxt_mp_thread_adopt(engine->mem_pool); 4166 nxt_mp_destroy(engine->mem_pool); 4167 4168 nxt_event_engine_free(engine); 4169 4170 nxt_free(link); 4171 } 4172 4173 4174 static void 4175 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4176 void *data) 4177 { 4178 size_t b_size, count; 4179 nxt_int_t ret; 4180 nxt_app_t *app; 4181 nxt_buf_t *b, *next; 4182 nxt_port_t *app_port; 4183 nxt_unit_field_t *f; 4184 nxt_http_field_t *field; 4185 nxt_http_request_t *r; 4186 nxt_unit_response_t *resp; 4187 nxt_request_rpc_data_t *req_rpc_data; 4188 4189 req_rpc_data = data; 4190 4191 r = req_rpc_data->request; 4192 if (nxt_slow_path(r == NULL)) { 4193 return; 4194 } 4195 4196 if (r->error) { 4197 nxt_request_rpc_data_unlink(task, req_rpc_data); 4198 return; 4199 } 4200 4201 app = req_rpc_data->app; 4202 nxt_assert(app != NULL); 4203 4204 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { 4205 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); 4206 4207 return; 4208 } 4209 4210 b = (msg->size == 0) ? NULL : msg->buf; 4211 4212 if (msg->port_msg.last != 0) { 4213 nxt_debug(task, "router data create last buf"); 4214 4215 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 4216 4217 req_rpc_data->rpc_cancel = 0; 4218 4219 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { 4220 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; 4221 } 4222 4223 nxt_request_rpc_data_unlink(task, req_rpc_data); 4224 4225 } else { 4226 if (app->timeout != 0) { 4227 r->timer.handler = nxt_router_app_timeout; 4228 r->timer_data = req_rpc_data; 4229 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4230 } 4231 } 4232 4233 if (b == NULL) { 4234 return; 4235 } 4236 4237 if (msg->buf == b) { 4238 /* Disable instant buffer completion/re-using by port. */ 4239 msg->buf = NULL; 4240 } 4241 4242 if (r->header_sent) { 4243 nxt_buf_chain_add(&r->out, b); 4244 nxt_http_request_send_body(task, r, NULL); 4245 4246 } else { 4247 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0; 4248 4249 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) { 4250 nxt_alert(task, "response buffer too small: %z", b_size); 4251 goto fail; 4252 } 4253 4254 resp = (void *) b->mem.pos; 4255 count = (b_size - sizeof(nxt_unit_response_t)) 4256 / sizeof(nxt_unit_field_t); 4257 4258 if (nxt_slow_path(count < resp->fields_count)) { 4259 nxt_alert(task, "response buffer too small for fields count: %D", 4260 resp->fields_count); 4261 goto fail; 4262 } 4263 4264 field = NULL; 4265 4266 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 4267 if (f->skip) { 4268 continue; 4269 } 4270 4271 field = nxt_list_add(r->resp.fields); 4272 4273 if (nxt_slow_path(field == NULL)) { 4274 goto fail; 4275 } 4276 4277 field->hash = f->hash; 4278 field->skip = 0; 4279 field->hopbyhop = 0; 4280 4281 field->name_length = f->name_length; 4282 field->value_length = f->value_length; 4283 field->name = nxt_unit_sptr_get(&f->name); 4284 field->value = nxt_unit_sptr_get(&f->value); 4285 4286 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 4287 if (nxt_slow_path(ret != NXT_OK)) { 4288 goto fail; 4289 } 4290 4291 nxt_debug(task, "header%s: %*s: %*s", 4292 (field->skip ? " skipped" : ""), 4293 (size_t) field->name_length, field->name, 4294 (size_t) field->value_length, field->value); 4295 4296 if (field->skip) { 4297 r->resp.fields->last->nelts--; 4298 } 4299 } 4300 4301 r->status = resp->status; 4302 4303 if (resp->piggyback_content_length != 0) { 4304 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 4305 b->mem.free = b->mem.pos + resp->piggyback_content_length; 4306 4307 } else { 4308 b->mem.pos = b->mem.free; 4309 } 4310 4311 if (nxt_buf_mem_used_size(&b->mem) == 0) { 4312 next = b->next; 4313 b->next = NULL; 4314 4315 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 4316 b->completion_handler, task, b, b->parent); 4317 4318 b = next; 4319 } 4320 4321 if (b != NULL) { 4322 nxt_buf_chain_add(&r->out, b); 4323 } 4324 4325 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 4326 4327 if (r->websocket_handshake 4328 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 4329 { 4330 app_port = req_rpc_data->app_port; 4331 if (nxt_slow_path(app_port == NULL)) { 4332 goto fail; 4333 } 4334 4335 nxt_thread_mutex_lock(&app->mutex); 4336 4337 app_port->main_app_port->active_websockets++; 4338 4339 nxt_thread_mutex_unlock(&app->mutex); 4340 4341 nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE); 4342 req_rpc_data->apr_action = NXT_APR_CLOSE; 4343 4344 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 4345 4346 r->state = &nxt_http_websocket; 4347 4348 } else { 4349 r->state = &nxt_http_request_send_state; 4350 } 4351 } 4352 4353 return; 4354 4355 fail: 4356 4357 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4358 4359 nxt_request_rpc_data_unlink(task, req_rpc_data); 4360 } 4361 4362 4363 static void 4364 nxt_router_req_headers_ack_handler(nxt_task_t *task, 4365 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 4366 { 4367 int res; 4368 nxt_app_t *app; 4369 nxt_buf_t *b; 4370 nxt_bool_t start_process, unlinked; 4371 nxt_port_t *app_port, *main_app_port, *idle_port; 4372 nxt_queue_link_t *idle_lnk; 4373 nxt_http_request_t *r; 4374 4375 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 4376 req_rpc_data->stream, 4377 msg->port_msg.pid, msg->port_msg.reply_port); 4378 4379 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 4380 msg->port_msg.pid); 4381 4382 app = req_rpc_data->app; 4383 r = req_rpc_data->request; 4384 4385 start_process = 0; 4386 unlinked = 0; 4387 4388 nxt_thread_mutex_lock(&app->mutex); 4389 4390 if (r->app_link.next != NULL) { 4391 nxt_queue_remove(&r->app_link); 4392 r->app_link.next = NULL; 4393 4394 unlinked = 1; 4395 } 4396 4397 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 4398 msg->port_msg.reply_port); 4399 if (nxt_slow_path(app_port == NULL)) { 4400 nxt_thread_mutex_unlock(&app->mutex); 4401 4402 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4403 4404 if (unlinked) { 4405 nxt_mp_release(r->mem_pool); 4406 } 4407 4408 return; 4409 } 4410 4411 main_app_port = app_port->main_app_port; 4412 4413 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 4414 app->idle_processes--; 4415 4416 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", 4417 &app->name, main_app_port->pid, main_app_port->id, 4418 (main_app_port->idle_start ? "idle_ports" : "spare_ports")); 4419 4420 /* Check port was in 'spare_ports' using idle_start field. */ 4421 if (main_app_port->idle_start == 0 4422 && app->idle_processes >= app->spare_processes) 4423 { 4424 /* 4425 * If there is a vacant space in spare ports, 4426 * move the last idle to spare_ports. 4427 */ 4428 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4429 4430 idle_lnk = nxt_queue_last(&app->idle_ports); 4431 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4432 nxt_queue_remove(idle_lnk); 4433 4434 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4435 4436 idle_port->idle_start = 0; 4437 4438 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4439 "to spare_ports", 4440 &app->name, idle_port->pid, idle_port->id); 4441 } 4442 4443 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4444 app->pending_processes++; 4445 start_process = 1; 4446 } 4447 } 4448 4449 main_app_port->active_requests++; 4450 4451 nxt_port_inc_use(app_port); 4452 4453 nxt_thread_mutex_unlock(&app->mutex); 4454 4455 if (unlinked) { 4456 nxt_mp_release(r->mem_pool); 4457 } 4458 4459 if (start_process) { 4460 nxt_router_start_app_process(task, app); 4461 } 4462 4463 nxt_port_use(task, req_rpc_data->app_port, -1); 4464 4465 req_rpc_data->app_port = app_port; 4466 4467 b = req_rpc_data->msg_info.buf; 4468 4469 if (b != NULL) { 4470 /* First buffer is already sent. Start from second. */ 4471 b = b->next; 4472 4473 req_rpc_data->msg_info.buf->next = NULL; 4474 } 4475 4476 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) { 4477 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, 4478 req_rpc_data->msg_info.body_fd); 4479 4480 if (req_rpc_data->msg_info.body_fd != -1) { 4481 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 4482 } 4483 4484 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 4485 req_rpc_data->msg_info.body_fd, 4486 req_rpc_data->stream, 4487 task->thread->engine->port->id, b); 4488 4489 if (nxt_slow_path(res != NXT_OK)) { 4490 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4491 } 4492 } 4493 4494 if (app->timeout != 0) { 4495 r->timer.handler = nxt_router_app_timeout; 4496 r->timer_data = req_rpc_data; 4497 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4498 } 4499 } 4500 4501 4502 static const nxt_http_request_state_t nxt_http_request_send_state 4503 nxt_aligned(64) = 4504 { 4505 .error_handler = nxt_http_request_error_handler, 4506 }; 4507 4508 4509 static void 4510 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 4511 { 4512 nxt_buf_t *out; 4513 nxt_http_request_t *r; 4514 4515 r = obj; 4516 4517 out = r->out; 4518 4519 if (out != NULL) { 4520 r->out = NULL; 4521 nxt_http_request_send(task, r, out); 4522 } 4523 } 4524 4525 4526 static void 4527 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4528 void *data) 4529 { 4530 nxt_request_rpc_data_t *req_rpc_data; 4531 4532 req_rpc_data = data; 4533 4534 req_rpc_data->rpc_cancel = 0; 4535 4536 /* TODO cancel message and return if cancelled. */ 4537 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 4538 4539 if (req_rpc_data->request != NULL) { 4540 nxt_http_request_error(task, req_rpc_data->request, 4541 NXT_HTTP_SERVICE_UNAVAILABLE); 4542 } 4543 4544 nxt_request_rpc_data_unlink(task, req_rpc_data); 4545 } 4546 4547 4548 static void 4549 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4550 void *data) 4551 { 4552 uint32_t n; 4553 nxt_app_t *app; 4554 nxt_bool_t start_process, restarted; 4555 nxt_port_t *port; 4556 nxt_app_joint_t *app_joint; 4557 nxt_app_joint_rpc_t *app_joint_rpc; 4558 4559 nxt_assert(data != NULL); 4560 4561 app_joint_rpc = data; 4562 app_joint = app_joint_rpc->app_joint; 4563 port = msg->u.new_port; 4564 4565 nxt_assert(app_joint != NULL); 4566 nxt_assert(port != NULL); 4567 nxt_assert(port->id == 0); 4568 4569 app = app_joint->app; 4570 4571 nxt_router_app_joint_use(task, app_joint, -1); 4572 4573 if (nxt_slow_path(app == NULL)) { 4574 nxt_debug(task, "new port ready for released app, send QUIT"); 4575 4576 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4577 4578 return; 4579 } 4580 4581 nxt_thread_mutex_lock(&app->mutex); 4582 4583 restarted = (app->generation != app_joint_rpc->generation); 4584 4585 if (app_joint_rpc->proto) { 4586 nxt_assert(app->proto_port == NULL); 4587 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); 4588 4589 n = app->proto_port_requests; 4590 app->proto_port_requests = 0; 4591 4592 if (nxt_slow_path(restarted)) { 4593 nxt_thread_mutex_unlock(&app->mutex); 4594 4595 nxt_debug(task, "proto port ready for restarted app, send QUIT"); 4596 4597 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, 4598 NULL); 4599 4600 } else { 4601 port->app = app; 4602 app->proto_port = port; 4603 4604 nxt_thread_mutex_unlock(&app->mutex); 4605 4606 nxt_port_use(task, port, 1); 4607 } 4608 4609 port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER]; 4610 4611 while (n > 0) { 4612 nxt_router_app_use(task, app, 1); 4613 4614 nxt_router_start_app_process_handler(task, port, app); 4615 4616 n--; 4617 } 4618 4619 return; 4620 } 4621 4622 nxt_assert(port->type == NXT_PROCESS_APP); 4623 nxt_assert(app->pending_processes != 0); 4624 4625 app->pending_processes--; 4626 4627 if (nxt_slow_path(restarted)) { 4628 nxt_debug(task, "new port ready for restarted app, send QUIT"); 4629 4630 start_process = !task->thread->engine->shutdown 4631 && nxt_router_app_can_start(app) 4632 && nxt_router_app_need_start(app); 4633 4634 if (start_process) { 4635 app->pending_processes++; 4636 } 4637 4638 nxt_thread_mutex_unlock(&app->mutex); 4639 4640 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4641 4642 if (start_process) { 4643 nxt_router_start_app_process(task, app); 4644 } 4645 4646 return; 4647 } 4648 4649 port->app = app; 4650 port->main_app_port = port; 4651 4652 app->processes++; 4653 nxt_port_hash_add(&app->port_hash, port); 4654 app->port_hash_count++; 4655 4656 nxt_thread_mutex_unlock(&app->mutex); 4657 4658 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 4659 &app->name, port->pid, app->processes, app->pending_processes); 4660 4661 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 4662 4663 nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); 4664 } 4665 4666 4667 static void 4668 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4669 void *data) 4670 { 4671 nxt_app_t *app; 4672 nxt_app_joint_t *app_joint; 4673 nxt_queue_link_t *link; 4674 nxt_http_request_t *r; 4675 nxt_app_joint_rpc_t *app_joint_rpc; 4676 4677 nxt_assert(data != NULL); 4678 4679 app_joint_rpc = data; 4680 app_joint = app_joint_rpc->app_joint; 4681 4682 nxt_assert(app_joint != NULL); 4683 4684 app = app_joint->app; 4685 4686 nxt_router_app_joint_use(task, app_joint, -1); 4687 4688 if (nxt_slow_path(app == NULL)) { 4689 nxt_debug(task, "start error for released app"); 4690 4691 return; 4692 } 4693 4694 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4695 4696 link = NULL; 4697 4698 nxt_thread_mutex_lock(&app->mutex); 4699 4700 nxt_assert(app->pending_processes != 0); 4701 4702 app->pending_processes--; 4703 4704 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4705 link = nxt_queue_first(&app->ack_waiting_req); 4706 4707 nxt_queue_remove(link); 4708 link->next = NULL; 4709 } 4710 4711 nxt_thread_mutex_unlock(&app->mutex); 4712 4713 while (link != NULL) { 4714 r = nxt_container_of(link, nxt_http_request_t, app_link); 4715 4716 nxt_event_engine_post(r->engine, &r->err_work); 4717 4718 link = NULL; 4719 4720 nxt_thread_mutex_lock(&app->mutex); 4721 4722 if (app->processes == 0 && app->pending_processes == 0 4723 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4724 { 4725 link = nxt_queue_first(&app->ack_waiting_req); 4726 4727 nxt_queue_remove(link); 4728 link->next = NULL; 4729 } 4730 4731 nxt_thread_mutex_unlock(&app->mutex); 4732 } 4733 } 4734 4735 4736 nxt_inline nxt_port_t * 4737 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) 4738 { 4739 nxt_port_t *port; 4740 4741 port = NULL; 4742 4743 nxt_thread_mutex_lock(&app->mutex); 4744 4745 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 4746 4747 /* Caller is responsible to decrease port use count. */ 4748 nxt_queue_chk_remove(&port->app_link); 4749 4750 if (nxt_queue_chk_remove(&port->idle_link)) { 4751 app->idle_processes--; 4752 4753 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", 4754 &app->name, port->pid, port->id, 4755 (port->idle_start ? "idle_ports" : "spare_ports")); 4756 } 4757 4758 nxt_port_hash_remove(&app->port_hash, port); 4759 app->port_hash_count--; 4760 4761 port->app = NULL; 4762 app->processes--; 4763 4764 break; 4765 4766 } nxt_queue_loop; 4767 4768 nxt_thread_mutex_unlock(&app->mutex); 4769 4770 return port; 4771 } 4772 4773 4774 static void 4775 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4776 { 4777 int c; 4778 4779 c = nxt_atomic_fetch_add(&app->use_count, i); 4780 4781 if (i < 0 && c == -i) { 4782 4783 if (task->thread->engine != app->engine) { 4784 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 4785 4786 } else { 4787 nxt_router_free_app(task, app->joint, NULL); 4788 } 4789 } 4790 } 4791 4792 4793 static void 4794 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 4795 { 4796 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 4797 4798 nxt_queue_remove(&app->link); 4799 4800 nxt_router_app_use(task, app, -1); 4801 } 4802 4803 4804 static void 4805 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port, 4806 nxt_apr_action_t action) 4807 { 4808 int inc_use; 4809 uint32_t got_response, dec_requests; 4810 nxt_bool_t adjust_idle_timer; 4811 nxt_port_t *main_app_port; 4812 4813 nxt_assert(port != NULL); 4814 4815 inc_use = 0; 4816 got_response = 0; 4817 dec_requests = 0; 4818 4819 switch (action) { 4820 case NXT_APR_NEW_PORT: 4821 break; 4822 case NXT_APR_REQUEST_FAILED: 4823 dec_requests = 1; 4824 inc_use = -1; 4825 break; 4826 case NXT_APR_GOT_RESPONSE: 4827 got_response = 1; 4828 inc_use = -1; 4829 break; 4830 case NXT_APR_UPGRADE: 4831 got_response = 1; 4832 break; 4833 case NXT_APR_CLOSE: 4834 inc_use = -1; 4835 break; 4836 } 4837 4838 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, 4839 port->pid, port->id, 4840 (int) inc_use, (int) got_response); 4841 4842 if (port->id == NXT_SHARED_PORT_ID) { 4843 nxt_thread_mutex_lock(&app->mutex); 4844 4845 app->active_requests -= got_response + dec_requests; 4846 4847 nxt_thread_mutex_unlock(&app->mutex); 4848 4849 goto adjust_use; 4850 } 4851 4852 main_app_port = port->main_app_port; 4853 4854 nxt_thread_mutex_lock(&app->mutex); 4855 4856 main_app_port->active_requests -= got_response + dec_requests; 4857 app->active_requests -= got_response + dec_requests; 4858 4859 if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) { 4860 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4861 4862 nxt_port_inc_use(main_app_port); 4863 } 4864 4865 adjust_idle_timer = 0; 4866 4867 if (main_app_port->pair[1] != -1 4868 && main_app_port->active_requests == 0 4869 && main_app_port->active_websockets == 0 4870 && main_app_port->idle_link.next == NULL) 4871 { 4872 if (app->idle_processes == app->spare_processes 4873 && app->adjust_idle_work.data == NULL) 4874 { 4875 adjust_idle_timer = 1; 4876 app->adjust_idle_work.data = app; 4877 app->adjust_idle_work.next = NULL; 4878 } 4879 4880 if (app->idle_processes < app->spare_processes) { 4881 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4882 4883 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", 4884 &app->name, main_app_port->pid, main_app_port->id); 4885 } else { 4886 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4887 4888 main_app_port->idle_start = task->thread->engine->timers.now; 4889 4890 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", 4891 &app->name, main_app_port->pid, main_app_port->id); 4892 } 4893 4894 app->idle_processes++; 4895 } 4896 4897 nxt_thread_mutex_unlock(&app->mutex); 4898 4899 if (adjust_idle_timer) { 4900 nxt_router_app_use(task, app, 1); 4901 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4902 } 4903 4904 /* ? */ 4905 if (main_app_port->pair[1] == -1) { 4906 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4907 &app->name, app, main_app_port, main_app_port->pid); 4908 4909 goto adjust_use; 4910 } 4911 4912 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4913 &app->name, app); 4914 4915 adjust_use: 4916 4917 nxt_port_use(task, port, inc_use); 4918 } 4919 4920 4921 void 4922 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4923 { 4924 nxt_app_t *app; 4925 nxt_bool_t unchain, start_process; 4926 nxt_port_t *idle_port; 4927 nxt_queue_link_t *idle_lnk; 4928 4929 app = port->app; 4930 4931 nxt_assert(app != NULL); 4932 4933 nxt_thread_mutex_lock(&app->mutex); 4934 4935 if (port == app->proto_port) { 4936 app->proto_port = NULL; 4937 port->app = NULL; 4938 4939 nxt_thread_mutex_unlock(&app->mutex); 4940 4941 nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name, 4942 port->pid); 4943 4944 nxt_port_use(task, port, -1); 4945 4946 return; 4947 } 4948 4949 nxt_port_hash_remove(&app->port_hash, port); 4950 app->port_hash_count--; 4951 4952 if (port->id != 0) { 4953 nxt_thread_mutex_unlock(&app->mutex); 4954 4955 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, 4956 port->pid, port->id); 4957 4958 return; 4959 } 4960 4961 unchain = nxt_queue_chk_remove(&port->app_link); 4962 4963 if (nxt_queue_chk_remove(&port->idle_link)) { 4964 app->idle_processes--; 4965 4966 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", 4967 &app->name, port->pid, port->id, 4968 (port->idle_start ? "idle_ports" : "spare_ports")); 4969 4970 if (port->idle_start == 0 4971 && app->idle_processes >= app->spare_processes) 4972 { 4973 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4974 4975 idle_lnk = nxt_queue_last(&app->idle_ports); 4976 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4977 nxt_queue_remove(idle_lnk); 4978 4979 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4980 4981 idle_port->idle_start = 0; 4982 4983 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4984 "to spare_ports", 4985 &app->name, idle_port->pid, idle_port->id); 4986 } 4987 } 4988 4989 app->processes--; 4990 4991 start_process = !task->thread->engine->shutdown 4992 && nxt_router_app_can_start(app) 4993 && nxt_router_app_need_start(app); 4994 4995 if (start_process) { 4996 app->pending_processes++; 4997 } 4998 4999 nxt_thread_mutex_unlock(&app->mutex); 5000 5001 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 5002 5003 if (unchain) { 5004 nxt_port_use(task, port, -1); 5005 } 5006 5007 if (start_process) { 5008 nxt_router_start_app_process(task, app); 5009 } 5010 } 5011 5012 5013 static void 5014 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 5015 { 5016 nxt_app_t *app; 5017 nxt_bool_t queued; 5018 nxt_port_t *port; 5019 nxt_msec_t timeout, threshold; 5020 nxt_queue_link_t *lnk; 5021 nxt_event_engine_t *engine; 5022 5023 app = obj; 5024 queued = (data == app); 5025 5026 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 5027 &app->name, queued); 5028 5029 engine = task->thread->engine; 5030 5031 nxt_assert(app->engine == engine); 5032 5033 threshold = engine->timers.now + app->joint->idle_timer.bias; 5034 timeout = 0; 5035 5036 nxt_thread_mutex_lock(&app->mutex); 5037 5038 if (queued) { 5039 app->adjust_idle_work.data = NULL; 5040 } 5041 5042 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", 5043 &app->name, 5044 (int) app->idle_processes, (int) app->spare_processes); 5045 5046 while (app->idle_processes > app->spare_processes) { 5047 5048 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 5049 5050 lnk = nxt_queue_first(&app->idle_ports); 5051 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 5052 5053 timeout = port->idle_start + app->idle_timeout; 5054 5055 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", 5056 &app->name, port->pid, 5057 port->idle_start, timeout, threshold); 5058 5059 if (timeout > threshold) { 5060 break; 5061 } 5062 5063 nxt_queue_remove(lnk); 5064 lnk->next = NULL; 5065 5066 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", 5067 &app->name, port->pid, port->id); 5068 5069 nxt_queue_chk_remove(&port->app_link); 5070 5071 nxt_port_hash_remove(&app->port_hash, port); 5072 app->port_hash_count--; 5073 5074 app->idle_processes--; 5075 app->processes--; 5076 port->app = NULL; 5077 5078 nxt_thread_mutex_unlock(&app->mutex); 5079 5080 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 5081 &app->name, port->pid); 5082 5083 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 5084 5085 nxt_port_use(task, port, -1); 5086 5087 nxt_thread_mutex_lock(&app->mutex); 5088 } 5089 5090 nxt_thread_mutex_unlock(&app->mutex); 5091 5092 if (timeout > threshold) { 5093 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 5094 5095 } else { 5096 nxt_timer_disable(engine, &app->joint->idle_timer); 5097 } 5098 5099 if (queued) { 5100 nxt_router_app_use(task, app, -1); 5101 } 5102 } 5103 5104 5105 static void 5106 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 5107 { 5108 nxt_timer_t *timer; 5109 nxt_app_joint_t *app_joint; 5110 5111 timer = obj; 5112 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 5113 5114 if (nxt_fast_path(app_joint->app != NULL)) { 5115 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 5116 } 5117 } 5118 5119 5120 static void 5121 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 5122 { 5123 nxt_timer_t *timer; 5124 nxt_app_joint_t *app_joint; 5125 5126 timer = obj; 5127 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 5128 5129 nxt_router_app_joint_use(task, app_joint, -1); 5130 } 5131 5132 5133 static void 5134 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 5135 { 5136 nxt_app_t *app; 5137 nxt_port_t *port, *proto_port; 5138 nxt_app_joint_t *app_joint; 5139 5140 app_joint = obj; 5141 app = app_joint->app; 5142 5143 for ( ;; ) { 5144 port = nxt_router_app_get_port_for_quit(task, app); 5145 if (port == NULL) { 5146 break; 5147 } 5148 5149 nxt_port_use(task, port, -1); 5150 } 5151 5152 nxt_thread_mutex_lock(&app->mutex); 5153 5154 for ( ;; ) { 5155 port = nxt_port_hash_retrieve(&app->port_hash); 5156 if (port == NULL) { 5157 break; 5158 } 5159 5160 app->port_hash_count--; 5161 5162 port->app = NULL; 5163 5164 nxt_port_close(task, port); 5165 5166 nxt_port_use(task, port, -1); 5167 } 5168 5169 proto_port = app->proto_port; 5170 5171 if (proto_port != NULL) { 5172 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, 5173 proto_port->pid); 5174 5175 app->proto_port = NULL; 5176 proto_port->app = NULL; 5177 } 5178 5179 nxt_thread_mutex_unlock(&app->mutex); 5180 5181 if (proto_port != NULL) { 5182 nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, 5183 -1, 0, 0, NULL); 5184 5185 nxt_port_close(task, proto_port); 5186 5187 nxt_port_use(task, proto_port, -1); 5188 } 5189 5190 nxt_assert(app->proto_port == NULL); 5191 nxt_assert(app->processes == 0); 5192 nxt_assert(app->active_requests == 0); 5193 nxt_assert(app->port_hash_count == 0); 5194 nxt_assert(app->idle_processes == 0); 5195 nxt_assert(nxt_queue_is_empty(&app->ports)); 5196 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 5197 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 5198 5199 nxt_port_mmaps_destroy(&app->outgoing, 1); 5200 5201 nxt_thread_mutex_destroy(&app->outgoing.mutex); 5202 5203 if (app->shared_port != NULL) { 5204 app->shared_port->app = NULL; 5205 nxt_port_close(task, app->shared_port); 5206 nxt_port_use(task, app->shared_port, -1); 5207 5208 app->shared_port = NULL; 5209 } 5210 5211 nxt_thread_mutex_destroy(&app->mutex); 5212 nxt_mp_destroy(app->mem_pool); 5213 5214 app_joint->app = NULL; 5215 5216 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 5217 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 5218 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 5219 5220 } else { 5221 nxt_router_app_joint_use(task, app_joint, -1); 5222 } 5223 } 5224 5225 5226 static void 5227 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 5228 nxt_request_rpc_data_t *req_rpc_data) 5229 { 5230 nxt_bool_t start_process; 5231 nxt_port_t *port; 5232 nxt_http_request_t *r; 5233 5234 start_process = 0; 5235 5236 nxt_thread_mutex_lock(&app->mutex); 5237 5238 port = app->shared_port; 5239 nxt_port_inc_use(port); 5240 5241 app->active_requests++; 5242 5243 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 5244 app->pending_processes++; 5245 start_process = 1; 5246 } 5247 5248 r = req_rpc_data->request; 5249 5250 /* 5251 * Put request into application-wide list to be able to cancel request 5252 * if something goes wrong with application processes. 5253 */ 5254 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 5255 5256 nxt_thread_mutex_unlock(&app->mutex); 5257 5258 /* 5259 * Retain request memory pool while request is linked in ack_waiting_req 5260 * to guarantee request structure memory is accessble. 5261 */ 5262 nxt_mp_retain(r->mem_pool); 5263 5264 req_rpc_data->app_port = port; 5265 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 5266 5267 if (start_process) { 5268 nxt_router_start_app_process(task, app); 5269 } 5270 } 5271 5272 5273 void 5274 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, 5275 nxt_http_action_t *action) 5276 { 5277 nxt_event_engine_t *engine; 5278 nxt_http_app_conf_t *conf; 5279 nxt_request_rpc_data_t *req_rpc_data; 5280 5281 conf = action->u.conf; 5282 engine = task->thread->engine; 5283 5284 r->app_target = conf->target; 5285 5286 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, 5287 nxt_router_response_ready_handler, 5288 nxt_router_response_error_handler, 5289 sizeof(nxt_request_rpc_data_t)); 5290 if (nxt_slow_path(req_rpc_data == NULL)) { 5291 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 5292 return; 5293 } 5294 5295 /* 5296 * At this point we have request req_rpc_data allocated and registered 5297 * in port handlers. Need to fixup request memory pool. Counterpart 5298 * release will be called via following call chain: 5299 * nxt_request_rpc_data_unlink() -> 5300 * nxt_router_http_request_release_post() -> 5301 * nxt_router_http_request_release() 5302 */ 5303 nxt_mp_retain(r->mem_pool); 5304 5305 r->timer.task = &engine->task; 5306 r->timer.work_queue = &engine->fast_work_queue; 5307 r->timer.log = engine->task.log; 5308 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 5309 5310 r->engine = engine; 5311 r->err_work.handler = nxt_router_http_request_error; 5312 r->err_work.task = task; 5313 r->err_work.obj = r; 5314 5315 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 5316 req_rpc_data->app = conf->app; 5317 req_rpc_data->msg_info.body_fd = -1; 5318 req_rpc_data->rpc_cancel = 1; 5319 5320 nxt_router_app_use(task, conf->app, 1); 5321 5322 req_rpc_data->request = r; 5323 r->req_rpc_data = req_rpc_data; 5324 5325 if (r->last != NULL) { 5326 r->last->completion_handler = nxt_router_http_request_done; 5327 } 5328 5329 nxt_router_app_port_get(task, conf->app, req_rpc_data); 5330 nxt_router_app_prepare_request(task, req_rpc_data); 5331 } 5332 5333 5334 static void 5335 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 5336 { 5337 nxt_http_request_t *r; 5338 5339 r = obj; 5340 5341 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 5342 5343 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5344 5345 if (r->req_rpc_data != NULL) { 5346 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5347 } 5348 5349 nxt_mp_release(r->mem_pool); 5350 } 5351 5352 5353 static void 5354 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 5355 { 5356 nxt_http_request_t *r; 5357 5358 r = data; 5359 5360 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 5361 5362 if (r->req_rpc_data != NULL) { 5363 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5364 } 5365 5366 nxt_http_request_close_handler(task, r, r->proto.any); 5367 } 5368 5369 5370 static void 5371 nxt_router_app_prepare_request(nxt_task_t *task, 5372 nxt_request_rpc_data_t *req_rpc_data) 5373 { 5374 nxt_app_t *app; 5375 nxt_buf_t *buf, *body; 5376 nxt_int_t res; 5377 nxt_port_t *port, *reply_port; 5378 5379 int notify; 5380 struct { 5381 nxt_port_msg_t pm; 5382 nxt_port_mmap_msg_t mm; 5383 } msg; 5384 5385 5386 app = req_rpc_data->app; 5387 5388 nxt_assert(app != NULL); 5389 5390 port = req_rpc_data->app_port; 5391 5392 nxt_assert(port != NULL); 5393 nxt_assert(port->queue != NULL); 5394 5395 reply_port = task->thread->engine->port; 5396 5397 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, 5398 nxt_app_msg_prefix[app->type]); 5399 if (nxt_slow_path(buf == NULL)) { 5400 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", 5401 req_rpc_data->stream, &app->name); 5402 5403 nxt_http_request_error(task, req_rpc_data->request, 5404 NXT_HTTP_INTERNAL_SERVER_ERROR); 5405 5406 return; 5407 } 5408 5409 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 5410 nxt_buf_used_size(buf), 5411 port->socket.fd); 5412 5413 req_rpc_data->msg_info.buf = buf; 5414 5415 body = req_rpc_data->request->body; 5416 5417 if (body != NULL && nxt_buf_is_file(body)) { 5418 req_rpc_data->msg_info.body_fd = body->file->fd; 5419 5420 body->file->fd = -1; 5421 5422 } else { 5423 req_rpc_data->msg_info.body_fd = -1; 5424 } 5425 5426 msg.pm.stream = req_rpc_data->stream; 5427 msg.pm.pid = reply_port->pid; 5428 msg.pm.reply_port = reply_port->id; 5429 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; 5430 msg.pm.last = 0; 5431 msg.pm.mmap = 1; 5432 msg.pm.nf = 0; 5433 msg.pm.mf = 0; 5434 5435 nxt_port_mmap_handler_t *mmap_handler = buf->parent; 5436 nxt_port_mmap_header_t *hdr = mmap_handler->hdr; 5437 5438 msg.mm.mmap_id = hdr->id; 5439 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); 5440 msg.mm.size = nxt_buf_used_size(buf); 5441 5442 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), 5443 req_rpc_data->stream, ¬ify, 5444 &req_rpc_data->msg_info.tracking_cookie); 5445 if (nxt_fast_path(res == NXT_OK)) { 5446 if (notify != 0) { 5447 (void) nxt_port_socket_write(task, port, 5448 NXT_PORT_MSG_READ_QUEUE, 5449 -1, req_rpc_data->stream, 5450 reply_port->id, NULL); 5451 5452 } else { 5453 nxt_debug(task, "queue is not empty"); 5454 } 5455 5456 buf->is_port_mmap_sent = 1; 5457 buf->mem.pos = buf->mem.free; 5458 5459 } else { 5460 nxt_alert(task, "stream #%uD, app '%V': failed to send app message", 5461 req_rpc_data->stream, &app->name); 5462 5463 nxt_http_request_error(task, req_rpc_data->request, 5464 NXT_HTTP_INTERNAL_SERVER_ERROR); 5465 } 5466 } 5467 5468 5469 struct nxt_fields_iter_s { 5470 nxt_list_part_t *part; 5471 nxt_http_field_t *field; 5472 }; 5473 5474 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 5475 5476 5477 static nxt_http_field_t * 5478 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 5479 { 5480 if (part == NULL) { 5481 return NULL; 5482 } 5483 5484 while (part->nelts == 0) { 5485 part = part->next; 5486 if (part == NULL) { 5487 return NULL; 5488 } 5489 } 5490 5491 i->part = part; 5492 i->field = nxt_list_data(i->part); 5493 5494 return i->field; 5495 } 5496 5497 5498 static nxt_http_field_t * 5499 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 5500 { 5501 return nxt_fields_part_first(nxt_list_part(fields), i); 5502 } 5503 5504 5505 static nxt_http_field_t * 5506 nxt_fields_next(nxt_fields_iter_t *i) 5507 { 5508 nxt_http_field_t *end = nxt_list_data(i->part); 5509 5510 end += i->part->nelts; 5511 i->field++; 5512 5513 if (i->field < end) { 5514 return i->field; 5515 } 5516 5517 return nxt_fields_part_first(i->part->next, i); 5518 } 5519 5520 5521 static nxt_buf_t * 5522 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, 5523 nxt_app_t *app, const nxt_str_t *prefix) 5524 { 5525 void *target_pos, *query_pos; 5526 u_char *pos, *end, *p, c; 5527 size_t fields_count, req_size, size, free_size; 5528 size_t copy_size; 5529 nxt_off_t content_length; 5530 nxt_buf_t *b, *buf, *out, **tail; 5531 nxt_http_field_t *field, *dup; 5532 nxt_unit_field_t *dst_field; 5533 nxt_fields_iter_t iter, dup_iter; 5534 nxt_unit_request_t *req; 5535 5536 req_size = sizeof(nxt_unit_request_t) 5537 + r->method->length + 1 5538 + r->version.length + 1 5539 + r->remote->length + 1 5540 + r->local->length + 1 5541 + r->server_name.length + 1 5542 + r->target.length + 1 5543 + (r->path->start != r->target.start ? r->path->length + 1 : 0); 5544 5545 content_length = r->content_length_n < 0 ? 0 : r->content_length_n; 5546 fields_count = 0; 5547 5548 nxt_list_each(field, r->fields) { 5549 fields_count++; 5550 5551 req_size += field->name_length + prefix->length + 1 5552 + field->value_length + 1; 5553 } nxt_list_loop; 5554 5555 req_size += fields_count * sizeof(nxt_unit_field_t); 5556 5557 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 5558 nxt_alert(task, "headers to big to fit in shared memory (%d)", 5559 (int) req_size); 5560 5561 return NULL; 5562 } 5563 5564 out = nxt_port_mmap_get_buf(task, &app->outgoing, 5565 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); 5566 if (nxt_slow_path(out == NULL)) { 5567 return NULL; 5568 } 5569 5570 req = (nxt_unit_request_t *) out->mem.free; 5571 out->mem.free += req_size; 5572 5573 req->app_target = r->app_target; 5574 5575 req->content_length = content_length; 5576 5577 p = (u_char *) (req->fields + fields_count); 5578 5579 nxt_debug(task, "fields_count=%d", (int) fields_count); 5580 5581 req->method_length = r->method->length; 5582 nxt_unit_sptr_set(&req->method, p); 5583 p = nxt_cpymem(p, r->method->start, r->method->length); 5584 *p++ = '\0'; 5585 5586 req->version_length = r->version.length; 5587 nxt_unit_sptr_set(&req->version, p); 5588 p = nxt_cpymem(p, r->version.start, r->version.length); 5589 *p++ = '\0'; 5590 5591 req->remote_length = r->remote->address_length; 5592 nxt_unit_sptr_set(&req->remote, p); 5593 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote), 5594 r->remote->address_length); 5595 *p++ = '\0'; 5596 5597 req->local_length = r->local->address_length; 5598 nxt_unit_sptr_set(&req->local, p); 5599 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length); 5600 *p++ = '\0'; 5601 5602 req->tls = r->tls; 5603 req->websocket_handshake = r->websocket_handshake; 5604 5605 req->server_name_length = r->server_name.length; 5606 nxt_unit_sptr_set(&req->server_name, p); 5607 p = nxt_cpymem(p, r->server_name.start, r->server_name.length); 5608 *p++ = '\0'; 5609 5610 target_pos = p; 5611 req->target_length = (uint32_t) r->target.length; 5612 nxt_unit_sptr_set(&req->target, p); 5613 p = nxt_cpymem(p, r->target.start, r->target.length); 5614 *p++ = '\0'; 5615 5616 req->path_length = (uint32_t) r->path->length; 5617 if (r->path->start == r->target.start) { 5618 nxt_unit_sptr_set(&req->path, target_pos); 5619 5620 } else { 5621 nxt_unit_sptr_set(&req->path, p); 5622 p = nxt_cpymem(p, r->path->start, r->path->length); 5623 *p++ = '\0'; 5624 } 5625 5626 req->query_length = (uint32_t) r->args->length; 5627 if (r->args->start != NULL) { 5628 query_pos = nxt_pointer_to(target_pos, 5629 r->args->start - r->target.start); 5630 5631 nxt_unit_sptr_set(&req->query, query_pos); 5632 5633 } else { 5634 req->query.offset = 0; 5635 } 5636 5637 req->content_length_field = NXT_UNIT_NONE_FIELD; 5638 req->content_type_field = NXT_UNIT_NONE_FIELD; 5639 req->cookie_field = NXT_UNIT_NONE_FIELD; 5640 req->authorization_field = NXT_UNIT_NONE_FIELD; 5641 5642 dst_field = req->fields; 5643 5644 for (field = nxt_fields_first(r->fields, &iter); 5645 field != NULL; 5646 field = nxt_fields_next(&iter)) 5647 { 5648 if (field->skip) { 5649 continue; 5650 } 5651 5652 dst_field->hash = field->hash; 5653 dst_field->skip = 0; 5654 dst_field->name_length = field->name_length + prefix->length; 5655 dst_field->value_length = field->value_length; 5656 5657 if (field == r->content_length) { 5658 req->content_length_field = dst_field - req->fields; 5659 5660 } else if (field == r->content_type) { 5661 req->content_type_field = dst_field - req->fields; 5662 5663 } else if (field == r->cookie) { 5664 req->cookie_field = dst_field - req->fields; 5665 5666 } else if (field == r->authorization) { 5667 req->authorization_field = dst_field - req->fields; 5668 } 5669 5670 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 5671 (int) field->hash, (int) field->skip, 5672 (int) field->name_length, field->name, 5673 (int) field->value_length, field->value); 5674 5675 if (prefix->length != 0) { 5676 nxt_unit_sptr_set(&dst_field->name, p); 5677 p = nxt_cpymem(p, prefix->start, prefix->length); 5678 5679 end = field->name + field->name_length; 5680 for (pos = field->name; pos < end; pos++) { 5681 c = *pos; 5682 5683 if (c >= 'a' && c <= 'z') { 5684 *p++ = (c & ~0x20); 5685 continue; 5686 } 5687 5688 if (c == '-') { 5689 *p++ = '_'; 5690 continue; 5691 } 5692 5693 *p++ = c; 5694 } 5695 5696 } else { 5697 nxt_unit_sptr_set(&dst_field->name, p); 5698 p = nxt_cpymem(p, field->name, field->name_length); 5699 } 5700 5701 *p++ = '\0'; 5702 5703 nxt_unit_sptr_set(&dst_field->value, p); 5704 p = nxt_cpymem(p, field->value, field->value_length); 5705 5706 if (prefix->length != 0) { 5707 dup_iter = iter; 5708 5709 for (dup = nxt_fields_next(&dup_iter); 5710 dup != NULL; 5711 dup = nxt_fields_next(&dup_iter)) 5712 { 5713 if (dup->name_length != field->name_length 5714 || dup->skip 5715 || dup->hash != field->hash 5716 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 5717 { 5718 continue; 5719 } 5720 5721 p = nxt_cpymem(p, ", ", 2); 5722 p = nxt_cpymem(p, dup->value, dup->value_length); 5723 5724 dst_field->value_length += 2 + dup->value_length; 5725 5726 dup->skip = 1; 5727 } 5728 } 5729 5730 *p++ = '\0'; 5731 5732 dst_field++; 5733 } 5734 5735 req->fields_count = (uint32_t) (dst_field - req->fields); 5736 5737 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 5738 5739 buf = out; 5740 tail = &buf->next; 5741 5742 for (b = r->body; b != NULL; b = b->next) { 5743 size = nxt_buf_mem_used_size(&b->mem); 5744 pos = b->mem.pos; 5745 5746 while (size > 0) { 5747 if (buf == NULL) { 5748 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 5749 5750 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5751 if (nxt_slow_path(buf == NULL)) { 5752 while (out != NULL) { 5753 buf = out->next; 5754 out->next = NULL; 5755 out->completion_handler(task, out, out->parent); 5756 out = buf; 5757 } 5758 return NULL; 5759 } 5760 5761 *tail = buf; 5762 tail = &buf->next; 5763 5764 } else { 5765 free_size = nxt_buf_mem_free_size(&buf->mem); 5766 if (free_size < size 5767 && nxt_port_mmap_increase_buf(task, buf, size, 1) 5768 == NXT_OK) 5769 { 5770 free_size = nxt_buf_mem_free_size(&buf->mem); 5771 } 5772 } 5773 5774 if (free_size > 0) { 5775 copy_size = nxt_min(free_size, size); 5776 5777 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 5778 5779 size -= copy_size; 5780 pos += copy_size; 5781 5782 if (size == 0) { 5783 break; 5784 } 5785 } 5786 5787 buf = NULL; 5788 } 5789 } 5790 5791 return out; 5792 } 5793 5794 5795 static void 5796 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 5797 { 5798 nxt_timer_t *timer; 5799 nxt_http_request_t *r; 5800 nxt_request_rpc_data_t *req_rpc_data; 5801 5802 timer = obj; 5803 5804 nxt_debug(task, "router app timeout"); 5805 5806 r = nxt_timer_data(timer, nxt_http_request_t, timer); 5807 req_rpc_data = r->timer_data; 5808 5809 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5810 5811 nxt_request_rpc_data_unlink(task, req_rpc_data); 5812 } 5813 5814 5815 static void 5816 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5817 { 5818 r->timer.handler = nxt_router_http_request_release; 5819 nxt_timer_add(task->thread->engine, &r->timer, 0); 5820 } 5821 5822 5823 static void 5824 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) 5825 { 5826 nxt_http_request_t *r; 5827 5828 nxt_debug(task, "http request pool release"); 5829 5830 r = nxt_timer_data(obj, nxt_http_request_t, timer); 5831 5832 nxt_mp_release(r->mem_pool); 5833 } 5834 5835 5836 static void 5837 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5838 { 5839 size_t mi; 5840 uint32_t i; 5841 nxt_bool_t ack; 5842 nxt_process_t *process; 5843 nxt_free_map_t *m; 5844 nxt_port_mmap_handler_t *mmap_handler; 5845 5846 nxt_debug(task, "oosm in %PI", msg->port_msg.pid); 5847 5848 process = nxt_runtime_process_find(task->thread->runtime, 5849 msg->port_msg.pid); 5850 if (nxt_slow_path(process == NULL)) { 5851 return; 5852 } 5853 5854 ack = 0; 5855 5856 /* 5857 * To mitigate possible racing condition (when OOSM message received 5858 * after some of the memory was already freed), need to try to find 5859 * first free segment in shared memory and send ACK if found. 5860 */ 5861 5862 nxt_thread_mutex_lock(&process->incoming.mutex); 5863 5864 for (i = 0; i < process->incoming.size; i++) { 5865 mmap_handler = process->incoming.elts[i].mmap_handler; 5866 5867 if (nxt_slow_path(mmap_handler == NULL)) { 5868 continue; 5869 } 5870 5871 m = mmap_handler->hdr->free_map; 5872 5873 for (mi = 0; mi < MAX_FREE_IDX; mi++) { 5874 if (m[mi] != 0) { 5875 ack = 1; 5876 5877 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", 5878 i, mi, m[mi]); 5879 5880 break; 5881 } 5882 } 5883 } 5884 5885 nxt_thread_mutex_unlock(&process->incoming.mutex); 5886 5887 if (ack) { 5888 nxt_process_broadcast_shm_ack(task, process); 5889 } 5890 } 5891 5892 5893 static void 5894 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5895 { 5896 nxt_fd_t fd; 5897 nxt_port_t *port; 5898 nxt_runtime_t *rt; 5899 nxt_port_mmaps_t *mmaps; 5900 nxt_port_msg_get_mmap_t *get_mmap_msg; 5901 nxt_port_mmap_handler_t *mmap_handler; 5902 5903 rt = task->thread->runtime; 5904 5905 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5906 msg->port_msg.reply_port); 5907 if (nxt_slow_path(port == NULL)) { 5908 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", 5909 msg->port_msg.pid, msg->port_msg.reply_port); 5910 5911 return; 5912 } 5913 5914 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5915 < (int) sizeof(nxt_port_msg_get_mmap_t))) 5916 { 5917 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", 5918 (int) nxt_buf_used_size(msg->buf)); 5919 5920 return; 5921 } 5922 5923 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; 5924 5925 nxt_assert(port->type == NXT_PROCESS_APP); 5926 5927 if (nxt_slow_path(port->app == NULL)) { 5928 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", 5929 port->pid, port->id); 5930 5931 // FIXME 5932 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5933 -1, msg->port_msg.stream, 0, NULL); 5934 5935 return; 5936 } 5937 5938 mmaps = &port->app->outgoing; 5939 nxt_thread_mutex_lock(&mmaps->mutex); 5940 5941 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { 5942 nxt_thread_mutex_unlock(&mmaps->mutex); 5943 5944 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", 5945 (int) get_mmap_msg->id); 5946 5947 // FIXME 5948 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5949 -1, msg->port_msg.stream, 0, NULL); 5950 return; 5951 } 5952 5953 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; 5954 5955 fd = mmap_handler->fd; 5956 5957 nxt_thread_mutex_unlock(&mmaps->mutex); 5958 5959 nxt_debug(task, "get mmap %PI:%d found", 5960 msg->port_msg.pid, (int) get_mmap_msg->id); 5961 5962 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 5963 } 5964 5965 5966 static void 5967 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5968 { 5969 nxt_port_t *port, *reply_port; 5970 nxt_runtime_t *rt; 5971 nxt_port_msg_get_port_t *get_port_msg; 5972 5973 rt = task->thread->runtime; 5974 5975 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5976 msg->port_msg.reply_port); 5977 if (nxt_slow_path(reply_port == NULL)) { 5978 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", 5979 msg->port_msg.pid, msg->port_msg.reply_port); 5980 5981 return; 5982 } 5983 5984 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5985 < (int) sizeof(nxt_port_msg_get_port_t))) 5986 { 5987 nxt_alert(task, "get_port_handler: message buffer too small (%d)", 5988 (int) nxt_buf_used_size(msg->buf)); 5989 5990 return; 5991 } 5992 5993 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; 5994 5995 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); 5996 if (nxt_slow_path(port == NULL)) { 5997 nxt_alert(task, "get_port_handler: port %PI:%d not found", 5998 get_port_msg->pid, get_port_msg->id); 5999 6000 return; 6001 } 6002 6003 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, 6004 get_port_msg->id); 6005 6006 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); 6007 } 6008