1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 #include <nxt_app_queue.h> 19 #include <nxt_port_queue.h> 20 21 typedef struct { 22 nxt_str_t type; 23 uint32_t processes; 24 uint32_t max_processes; 25 uint32_t spare_processes; 26 nxt_msec_t timeout; 27 nxt_msec_t idle_timeout; 28 uint32_t requests; 29 nxt_conf_value_t *limits_value; 30 nxt_conf_value_t *processes_value; 31 nxt_conf_value_t *targets_value; 32 } nxt_router_app_conf_t; 33 34 35 typedef struct { 36 nxt_str_t pass; 37 nxt_str_t application; 38 } nxt_router_listener_conf_t; 39 40 41 #if (NXT_TLS) 42 43 typedef struct { 44 nxt_str_t name; 45 nxt_socket_conf_t *socket_conf; 46 nxt_router_temp_conf_t *temp_conf; 47 nxt_conf_value_t *conf_cmds; 48 nxt_bool_t last; 49 50 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 51 } nxt_router_tlssock_t; 52 53 #endif 54 55 56 typedef struct { 57 nxt_str_t *name; 58 nxt_socket_conf_t *socket_conf; 59 nxt_router_temp_conf_t *temp_conf; 60 nxt_bool_t last; 61 } nxt_socket_rpc_t; 62 63 64 typedef struct { 65 nxt_app_t *app; 66 nxt_router_temp_conf_t *temp_conf; 67 } nxt_app_rpc_t; 68 69 70 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 71 nxt_mp_t *mp); 72 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 73 static void nxt_router_greet_controller(nxt_task_t *task, 74 nxt_port_t *controller_port); 75 76 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 77 78 static void nxt_router_new_port_handler(nxt_task_t *task, 79 nxt_port_recv_msg_t *msg); 80 static void nxt_router_conf_data_handler(nxt_task_t *task, 81 nxt_port_recv_msg_t *msg); 82 static void nxt_router_remove_pid_handler(nxt_task_t *task, 83 nxt_port_recv_msg_t *msg); 84 static void nxt_router_access_log_reopen_handler(nxt_task_t *task, 85 nxt_port_recv_msg_t *msg); 86 87 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 88 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 89 static void nxt_router_conf_ready(nxt_task_t *task, 90 nxt_router_temp_conf_t *tmcf); 91 static void nxt_router_conf_error(nxt_task_t *task, 92 nxt_router_temp_conf_t *tmcf); 93 static void nxt_router_conf_send(nxt_task_t *task, 94 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 95 96 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 97 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 98 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 99 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 100 101 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 102 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 103 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 104 nxt_app_t *app); 105 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 106 nxt_str_t *name); 107 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 108 int i); 109 110 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 111 nxt_port_t *port); 112 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 113 nxt_port_t *port); 114 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 115 nxt_port_t *port, nxt_fd_t fd); 116 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 117 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 118 static void nxt_router_listen_socket_ready(nxt_task_t *task, 119 nxt_port_recv_msg_t *msg, void *data); 120 static void nxt_router_listen_socket_error(nxt_task_t *task, 121 nxt_port_recv_msg_t *msg, void *data); 122 #if (NXT_TLS) 123 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 124 nxt_port_recv_msg_t *msg, void *data); 125 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 126 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, 127 nxt_conf_value_t * conf_cmds, nxt_bool_t last); 128 #endif 129 static void nxt_router_app_rpc_create(nxt_task_t *task, 130 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 131 static void nxt_router_app_prefork_ready(nxt_task_t *task, 132 nxt_port_recv_msg_t *msg, void *data); 133 static void nxt_router_app_prefork_error(nxt_task_t *task, 134 nxt_port_recv_msg_t *msg, void *data); 135 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 136 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 137 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 138 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 139 140 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 141 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 142 const nxt_event_interface_t *interface); 143 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 144 nxt_router_engine_conf_t *recf); 145 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 146 nxt_router_engine_conf_t *recf); 147 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 148 nxt_router_engine_conf_t *recf); 149 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 150 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 151 nxt_work_handler_t handler); 152 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 153 nxt_router_engine_conf_t *recf); 154 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 155 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 156 157 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 158 nxt_router_temp_conf_t *tmcf); 159 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 160 nxt_event_engine_t *engine); 161 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 162 nxt_router_temp_conf_t *tmcf); 163 164 static void nxt_router_engines_post(nxt_router_t *router, 165 nxt_router_temp_conf_t *tmcf); 166 static void nxt_router_engine_post(nxt_event_engine_t *engine, 167 nxt_work_t *jobs); 168 169 static void nxt_router_thread_start(void *data); 170 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 171 void *data); 172 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 173 void *data); 174 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 175 void *data); 176 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 177 void *data); 178 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 179 void *data); 180 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 181 void *data); 182 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 183 void *data); 184 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 185 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 186 static void nxt_router_listen_socket_release(nxt_task_t *task, 187 nxt_socket_conf_t *skcf); 188 189 static void nxt_router_access_log_writer(nxt_task_t *task, 190 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 191 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 192 struct tm *tm, size_t size, const char *format); 193 static void nxt_router_access_log_open(nxt_task_t *task, 194 nxt_router_temp_conf_t *tmcf); 195 static void nxt_router_access_log_ready(nxt_task_t *task, 196 nxt_port_recv_msg_t *msg, void *data); 197 static void nxt_router_access_log_error(nxt_task_t *task, 198 nxt_port_recv_msg_t *msg, void *data); 199 static void nxt_router_access_log_release(nxt_task_t *task, 200 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 201 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 202 void *data); 203 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 204 nxt_port_recv_msg_t *msg, void *data); 205 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 206 nxt_port_recv_msg_t *msg, void *data); 207 208 static void nxt_router_app_port_ready(nxt_task_t *task, 209 nxt_port_recv_msg_t *msg, void *data); 210 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, 211 nxt_port_t *app_port); 212 static void nxt_router_app_port_error(nxt_task_t *task, 213 nxt_port_recv_msg_t *msg, void *data); 214 215 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 216 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 217 218 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 219 nxt_apr_action_t action); 220 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 221 nxt_request_rpc_data_t *req_rpc_data); 222 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 223 void *data); 224 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 225 void *data); 226 227 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, 228 void *data); 229 static void nxt_router_app_prepare_request(nxt_task_t *task, 230 nxt_request_rpc_data_t *req_rpc_data); 231 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 232 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 233 234 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 235 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 236 void *data); 237 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 238 void *data); 239 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 240 void *data); 241 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 242 243 static const nxt_http_request_state_t nxt_http_request_send_state; 244 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 245 246 static void nxt_router_app_joint_use(nxt_task_t *task, 247 nxt_app_joint_t *app_joint, int i); 248 249 static void nxt_router_http_request_release_post(nxt_task_t *task, 250 nxt_http_request_t *r); 251 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 252 void *data); 253 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 254 static void nxt_router_get_port_handler(nxt_task_t *task, 255 nxt_port_recv_msg_t *msg); 256 static void nxt_router_get_mmap_handler(nxt_task_t *task, 257 nxt_port_recv_msg_t *msg); 258 259 extern const nxt_http_request_state_t nxt_http_websocket; 260 261 static nxt_router_t *nxt_router; 262 263 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 264 static const nxt_str_t empty_prefix = nxt_string(""); 265 266 static const nxt_str_t *nxt_app_msg_prefix[] = { 267 &empty_prefix, 268 &empty_prefix, 269 &http_prefix, 270 &http_prefix, 271 &http_prefix, 272 &empty_prefix, 273 }; 274 275 276 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 277 .quit = nxt_signal_quit_handler, 278 .new_port = nxt_router_new_port_handler, 279 .get_port = nxt_router_get_port_handler, 280 .change_file = nxt_port_change_log_file_handler, 281 .mmap = nxt_port_mmap_handler, 282 .get_mmap = nxt_router_get_mmap_handler, 283 .data = nxt_router_conf_data_handler, 284 .remove_pid = nxt_router_remove_pid_handler, 285 .access_log = nxt_router_access_log_reopen_handler, 286 .rpc_ready = nxt_port_rpc_handler, 287 .rpc_error = nxt_port_rpc_handler, 288 .oosm = nxt_router_oosm_handler, 289 }; 290 291 292 const nxt_process_init_t nxt_router_process = { 293 .name = "router", 294 .type = NXT_PROCESS_ROUTER, 295 .prefork = nxt_router_prefork, 296 .restart = 1, 297 .setup = nxt_process_core_setup, 298 .start = nxt_router_start, 299 .port_handlers = &nxt_router_process_port_handlers, 300 .signals = nxt_process_signals, 301 }; 302 303 304 /* Queues of nxt_socket_conf_t */ 305 nxt_queue_t creating_sockets; 306 nxt_queue_t pending_sockets; 307 nxt_queue_t updating_sockets; 308 nxt_queue_t keeping_sockets; 309 nxt_queue_t deleting_sockets; 310 311 312 static nxt_int_t 313 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 314 { 315 nxt_runtime_stop_app_processes(task, task->thread->runtime); 316 317 return NXT_OK; 318 } 319 320 321 static nxt_int_t 322 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 323 { 324 nxt_int_t ret; 325 nxt_port_t *controller_port; 326 nxt_router_t *router; 327 nxt_runtime_t *rt; 328 329 rt = task->thread->runtime; 330 331 nxt_log(task, NXT_LOG_INFO, "router started"); 332 333 #if (NXT_TLS) 334 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 335 if (nxt_slow_path(rt->tls == NULL)) { 336 return NXT_ERROR; 337 } 338 339 ret = rt->tls->library_init(task); 340 if (nxt_slow_path(ret != NXT_OK)) { 341 return ret; 342 } 343 #endif 344 345 ret = nxt_http_init(task); 346 if (nxt_slow_path(ret != NXT_OK)) { 347 return ret; 348 } 349 350 router = nxt_zalloc(sizeof(nxt_router_t)); 351 if (nxt_slow_path(router == NULL)) { 352 return NXT_ERROR; 353 } 354 355 nxt_queue_init(&router->engines); 356 nxt_queue_init(&router->sockets); 357 nxt_queue_init(&router->apps); 358 359 nxt_router = router; 360 361 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 362 if (controller_port != NULL) { 363 nxt_router_greet_controller(task, controller_port); 364 } 365 366 return NXT_OK; 367 } 368 369 370 static void 371 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 372 { 373 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 374 -1, 0, 0, NULL); 375 } 376 377 378 static void 379 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 380 void *data) 381 { 382 size_t size; 383 uint32_t stream; 384 nxt_mp_t *mp; 385 nxt_int_t ret; 386 nxt_app_t *app; 387 nxt_buf_t *b; 388 nxt_port_t *main_port; 389 nxt_runtime_t *rt; 390 391 app = data; 392 393 rt = task->thread->runtime; 394 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 395 396 nxt_debug(task, "app '%V' %p start process", &app->name, app); 397 398 size = app->name.length + 1 + app->conf.length; 399 400 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 401 402 if (nxt_slow_path(b == NULL)) { 403 goto failed; 404 } 405 406 nxt_buf_cpystr(b, &app->name); 407 *b->mem.free++ = '\0'; 408 nxt_buf_cpystr(b, &app->conf); 409 410 nxt_router_app_joint_use(task, app->joint, 1); 411 412 stream = nxt_port_rpc_register_handler(task, port, 413 nxt_router_app_port_ready, 414 nxt_router_app_port_error, 415 -1, app->joint); 416 417 if (nxt_slow_path(stream == 0)) { 418 nxt_router_app_joint_use(task, app->joint, -1); 419 420 goto failed; 421 } 422 423 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 424 -1, stream, port->id, b); 425 426 if (nxt_slow_path(ret != NXT_OK)) { 427 nxt_port_rpc_cancel(task, port, stream); 428 429 nxt_router_app_joint_use(task, app->joint, -1); 430 431 goto failed; 432 } 433 434 nxt_router_app_use(task, app, -1); 435 436 return; 437 438 failed: 439 440 if (b != NULL) { 441 mp = b->data; 442 nxt_mp_free(mp, b); 443 nxt_mp_release(mp); 444 } 445 446 nxt_thread_mutex_lock(&app->mutex); 447 448 app->pending_processes--; 449 450 nxt_thread_mutex_unlock(&app->mutex); 451 452 nxt_router_app_use(task, app, -1); 453 } 454 455 456 static void 457 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 458 { 459 app_joint->use_count += i; 460 461 if (app_joint->use_count == 0) { 462 nxt_assert(app_joint->app == NULL); 463 464 nxt_free(app_joint); 465 } 466 } 467 468 469 static nxt_int_t 470 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 471 { 472 nxt_int_t res; 473 nxt_port_t *router_port; 474 nxt_runtime_t *rt; 475 476 nxt_debug(task, "app '%V' start process", &app->name); 477 478 rt = task->thread->runtime; 479 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 480 481 nxt_router_app_use(task, app, 1); 482 483 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 484 app); 485 486 if (res == NXT_OK) { 487 return res; 488 } 489 490 nxt_thread_mutex_lock(&app->mutex); 491 492 app->pending_processes--; 493 494 nxt_thread_mutex_unlock(&app->mutex); 495 496 nxt_router_app_use(task, app, -1); 497 498 return NXT_ERROR; 499 } 500 501 502 nxt_inline nxt_bool_t 503 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 504 { 505 nxt_buf_t *b, *next; 506 nxt_bool_t cancelled; 507 nxt_msg_info_t *msg_info; 508 509 msg_info = &req_rpc_data->msg_info; 510 511 if (msg_info->buf == NULL) { 512 return 0; 513 } 514 515 cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue, 516 msg_info->tracking_cookie, 517 req_rpc_data->stream); 518 519 if (cancelled) { 520 nxt_debug(task, "stream #%uD: cancelled by router", 521 req_rpc_data->stream); 522 } 523 524 for (b = msg_info->buf; b != NULL; b = next) { 525 next = b->next; 526 b->next = NULL; 527 528 if (b->is_port_mmap_sent) { 529 b->is_port_mmap_sent = cancelled == 0; 530 } 531 532 b->completion_handler(task, b, b->parent); 533 } 534 535 msg_info->buf = NULL; 536 537 return cancelled; 538 } 539 540 541 nxt_inline nxt_bool_t 542 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 543 { 544 if (lnk->next != NULL) { 545 nxt_queue_remove(lnk); 546 547 lnk->next = NULL; 548 549 return 1; 550 } 551 552 return 0; 553 } 554 555 556 nxt_inline void 557 nxt_request_rpc_data_unlink(nxt_task_t *task, 558 nxt_request_rpc_data_t *req_rpc_data) 559 { 560 nxt_app_t *app; 561 nxt_bool_t unlinked; 562 nxt_http_request_t *r; 563 564 nxt_router_msg_cancel(task, req_rpc_data); 565 566 if (req_rpc_data->app_port != NULL) { 567 nxt_router_app_port_release(task, req_rpc_data->app_port, 568 req_rpc_data->apr_action); 569 570 req_rpc_data->app_port = NULL; 571 } 572 573 app = req_rpc_data->app; 574 r = req_rpc_data->request; 575 576 if (r != NULL) { 577 r->timer_data = NULL; 578 579 nxt_router_http_request_release_post(task, r); 580 581 r->req_rpc_data = NULL; 582 req_rpc_data->request = NULL; 583 584 if (app != NULL) { 585 unlinked = 0; 586 587 nxt_thread_mutex_lock(&app->mutex); 588 589 if (r->app_link.next != NULL) { 590 nxt_queue_remove(&r->app_link); 591 r->app_link.next = NULL; 592 593 unlinked = 1; 594 } 595 596 nxt_thread_mutex_unlock(&app->mutex); 597 598 if (unlinked) { 599 nxt_mp_release(r->mem_pool); 600 } 601 } 602 } 603 604 if (app != NULL) { 605 nxt_router_app_use(task, app, -1); 606 607 req_rpc_data->app = NULL; 608 } 609 610 if (req_rpc_data->msg_info.body_fd != -1) { 611 nxt_fd_close(req_rpc_data->msg_info.body_fd); 612 613 req_rpc_data->msg_info.body_fd = -1; 614 } 615 616 if (req_rpc_data->rpc_cancel) { 617 req_rpc_data->rpc_cancel = 0; 618 619 nxt_port_rpc_cancel(task, task->thread->engine->port, 620 req_rpc_data->stream); 621 } 622 } 623 624 625 static void 626 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 627 { 628 nxt_int_t res; 629 nxt_app_t *app; 630 nxt_port_t *port, *main_app_port; 631 nxt_runtime_t *rt; 632 633 nxt_port_new_port_handler(task, msg); 634 635 port = msg->u.new_port; 636 637 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 638 nxt_router_greet_controller(task, msg->u.new_port); 639 } 640 641 if (port == NULL || port->type != NXT_PROCESS_APP) { 642 643 if (msg->port_msg.stream == 0) { 644 return; 645 } 646 647 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 648 649 } else { 650 if (msg->fd[1] != -1) { 651 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 652 if (nxt_slow_path(res != NXT_OK)) { 653 return; 654 } 655 656 nxt_fd_close(msg->fd[1]); 657 msg->fd[1] = -1; 658 } 659 } 660 661 if (msg->port_msg.stream != 0) { 662 nxt_port_rpc_handler(task, msg); 663 return; 664 } 665 666 /* 667 * Port with "id == 0" is application 'main' port and it always 668 * should come with non-zero stream. 669 */ 670 nxt_assert(port->id != 0); 671 672 /* Find 'main' app port and get app reference. */ 673 rt = task->thread->runtime; 674 675 /* 676 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 677 * sent to main port (with id == 0) and processed in main thread. 678 */ 679 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 680 nxt_assert(main_app_port != NULL); 681 682 app = main_app_port->app; 683 684 if (nxt_fast_path(app != NULL)) { 685 nxt_thread_mutex_lock(&app->mutex); 686 687 /* TODO here should be find-and-add code because there can be 688 port waiters in port_hash */ 689 nxt_port_hash_add(&app->port_hash, port); 690 app->port_hash_count++; 691 692 nxt_thread_mutex_unlock(&app->mutex); 693 694 port->app = app; 695 } 696 697 port->main_app_port = main_app_port; 698 699 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 700 } 701 702 703 static void 704 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 705 { 706 void *p; 707 size_t size; 708 nxt_int_t ret; 709 nxt_port_t *port; 710 nxt_router_temp_conf_t *tmcf; 711 712 port = nxt_runtime_port_find(task->thread->runtime, 713 msg->port_msg.pid, 714 msg->port_msg.reply_port); 715 if (nxt_slow_path(port == NULL)) { 716 nxt_alert(task, "conf_data_handler: reply port not found"); 717 return; 718 } 719 720 p = MAP_FAILED; 721 722 /* 723 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 724 * initialized in 'cleanup' section. 725 */ 726 size = 0; 727 728 tmcf = nxt_router_temp_conf(task); 729 if (nxt_slow_path(tmcf == NULL)) { 730 goto fail; 731 } 732 733 if (nxt_slow_path(msg->fd[0] == -1)) { 734 nxt_alert(task, "conf_data_handler: invalid shm fd"); 735 goto fail; 736 } 737 738 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 739 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 740 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 741 goto fail; 742 } 743 744 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 745 746 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 747 748 nxt_fd_close(msg->fd[0]); 749 msg->fd[0] = -1; 750 751 if (nxt_slow_path(p == MAP_FAILED)) { 752 goto fail; 753 } 754 755 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 756 757 tmcf->router_conf->router = nxt_router; 758 tmcf->stream = msg->port_msg.stream; 759 tmcf->port = port; 760 761 nxt_port_use(task, tmcf->port, 1); 762 763 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 764 765 if (nxt_fast_path(ret == NXT_OK)) { 766 nxt_router_conf_apply(task, tmcf, NULL); 767 768 } else { 769 nxt_router_conf_error(task, tmcf); 770 } 771 772 goto cleanup; 773 774 fail: 775 776 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 777 msg->port_msg.stream, 0, NULL); 778 779 if (tmcf != NULL) { 780 nxt_mp_release(tmcf->mem_pool); 781 } 782 783 cleanup: 784 785 if (p != MAP_FAILED) { 786 nxt_mem_munmap(p, size); 787 } 788 789 if (msg->fd[0] != -1) { 790 nxt_fd_close(msg->fd[0]); 791 msg->fd[0] = -1; 792 } 793 } 794 795 796 static void 797 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 798 void *data) 799 { 800 union { 801 nxt_pid_t removed_pid; 802 void *data; 803 } u; 804 805 u.data = data; 806 807 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 808 } 809 810 811 static void 812 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 813 { 814 nxt_event_engine_t *engine; 815 816 nxt_port_remove_pid_handler(task, msg); 817 818 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 819 { 820 if (nxt_fast_path(engine->port != NULL)) { 821 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 822 msg->u.data); 823 } 824 } 825 nxt_queue_loop; 826 827 if (msg->port_msg.stream == 0) { 828 return; 829 } 830 831 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 832 833 nxt_port_rpc_handler(task, msg); 834 } 835 836 837 static nxt_router_temp_conf_t * 838 nxt_router_temp_conf(nxt_task_t *task) 839 { 840 nxt_mp_t *mp, *tmp; 841 nxt_router_conf_t *rtcf; 842 nxt_router_temp_conf_t *tmcf; 843 844 mp = nxt_mp_create(1024, 128, 256, 32); 845 if (nxt_slow_path(mp == NULL)) { 846 return NULL; 847 } 848 849 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 850 if (nxt_slow_path(rtcf == NULL)) { 851 goto fail; 852 } 853 854 rtcf->mem_pool = mp; 855 856 tmp = nxt_mp_create(1024, 128, 256, 32); 857 if (nxt_slow_path(tmp == NULL)) { 858 goto fail; 859 } 860 861 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 862 if (nxt_slow_path(tmcf == NULL)) { 863 goto temp_fail; 864 } 865 866 tmcf->mem_pool = tmp; 867 tmcf->router_conf = rtcf; 868 tmcf->count = 1; 869 tmcf->engine = task->thread->engine; 870 871 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 872 sizeof(nxt_router_engine_conf_t)); 873 if (nxt_slow_path(tmcf->engines == NULL)) { 874 goto temp_fail; 875 } 876 877 nxt_queue_init(&creating_sockets); 878 nxt_queue_init(&pending_sockets); 879 nxt_queue_init(&updating_sockets); 880 nxt_queue_init(&keeping_sockets); 881 nxt_queue_init(&deleting_sockets); 882 883 #if (NXT_TLS) 884 nxt_queue_init(&tmcf->tls); 885 #endif 886 887 nxt_queue_init(&tmcf->apps); 888 nxt_queue_init(&tmcf->previous); 889 890 return tmcf; 891 892 temp_fail: 893 894 nxt_mp_destroy(tmp); 895 896 fail: 897 898 nxt_mp_destroy(mp); 899 900 return NULL; 901 } 902 903 904 nxt_inline nxt_bool_t 905 nxt_router_app_can_start(nxt_app_t *app) 906 { 907 return app->processes + app->pending_processes < app->max_processes 908 && app->pending_processes < app->max_pending_processes; 909 } 910 911 912 nxt_inline nxt_bool_t 913 nxt_router_app_need_start(nxt_app_t *app) 914 { 915 return (app->active_requests 916 > app->port_hash_count + app->pending_processes) 917 || (app->spare_processes 918 > app->idle_processes + app->pending_processes); 919 } 920 921 922 static void 923 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 924 { 925 nxt_int_t ret; 926 nxt_app_t *app; 927 nxt_router_t *router; 928 nxt_runtime_t *rt; 929 nxt_queue_link_t *qlk; 930 nxt_socket_conf_t *skcf; 931 nxt_router_conf_t *rtcf; 932 nxt_router_temp_conf_t *tmcf; 933 const nxt_event_interface_t *interface; 934 #if (NXT_TLS) 935 nxt_router_tlssock_t *tls; 936 #endif 937 938 tmcf = obj; 939 940 qlk = nxt_queue_first(&pending_sockets); 941 942 if (qlk != nxt_queue_tail(&pending_sockets)) { 943 nxt_queue_remove(qlk); 944 nxt_queue_insert_tail(&creating_sockets, qlk); 945 946 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 947 948 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 949 950 return; 951 } 952 953 #if (NXT_TLS) 954 qlk = nxt_queue_last(&tmcf->tls); 955 956 if (qlk != nxt_queue_head(&tmcf->tls)) { 957 nxt_queue_remove(qlk); 958 959 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 960 961 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 962 nxt_router_tls_rpc_handler, tls); 963 return; 964 } 965 #endif 966 967 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 968 969 if (nxt_router_app_need_start(app)) { 970 nxt_router_app_rpc_create(task, tmcf, app); 971 return; 972 } 973 974 } nxt_queue_loop; 975 976 rtcf = tmcf->router_conf; 977 978 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 979 nxt_router_access_log_open(task, tmcf); 980 return; 981 } 982 983 rt = task->thread->runtime; 984 985 interface = nxt_service_get(rt->services, "engine", NULL); 986 987 router = rtcf->router; 988 989 ret = nxt_router_engines_create(task, router, tmcf, interface); 990 if (nxt_slow_path(ret != NXT_OK)) { 991 goto fail; 992 } 993 994 ret = nxt_router_threads_create(task, rt, tmcf); 995 if (nxt_slow_path(ret != NXT_OK)) { 996 goto fail; 997 } 998 999 nxt_router_apps_sort(task, router, tmcf); 1000 1001 nxt_router_apps_hash_use(task, rtcf, 1); 1002 1003 nxt_router_engines_post(router, tmcf); 1004 1005 nxt_queue_add(&router->sockets, &updating_sockets); 1006 nxt_queue_add(&router->sockets, &creating_sockets); 1007 1008 router->access_log = rtcf->access_log; 1009 1010 nxt_router_conf_ready(task, tmcf); 1011 1012 return; 1013 1014 fail: 1015 1016 nxt_router_conf_error(task, tmcf); 1017 1018 return; 1019 } 1020 1021 1022 static void 1023 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1024 { 1025 nxt_joint_job_t *job; 1026 1027 job = obj; 1028 1029 nxt_router_conf_ready(task, job->tmcf); 1030 } 1031 1032 1033 static void 1034 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1035 { 1036 uint32_t count; 1037 nxt_router_conf_t *rtcf; 1038 nxt_thread_spinlock_t *lock; 1039 1040 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1041 1042 if (--tmcf->count > 0) { 1043 return; 1044 } 1045 1046 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1047 1048 rtcf = tmcf->router_conf; 1049 1050 lock = &rtcf->router->lock; 1051 1052 nxt_thread_spin_lock(lock); 1053 1054 count = rtcf->count; 1055 1056 nxt_thread_spin_unlock(lock); 1057 1058 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1059 1060 if (count == 0) { 1061 nxt_router_apps_hash_use(task, rtcf, -1); 1062 1063 nxt_router_access_log_release(task, lock, rtcf->access_log); 1064 1065 nxt_mp_destroy(rtcf->mem_pool); 1066 } 1067 1068 nxt_mp_release(tmcf->mem_pool); 1069 } 1070 1071 1072 static void 1073 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1074 { 1075 nxt_app_t *app; 1076 nxt_queue_t new_socket_confs; 1077 nxt_socket_t s; 1078 nxt_router_t *router; 1079 nxt_queue_link_t *qlk; 1080 nxt_socket_conf_t *skcf; 1081 nxt_router_conf_t *rtcf; 1082 1083 nxt_alert(task, "failed to apply new conf"); 1084 1085 for (qlk = nxt_queue_first(&creating_sockets); 1086 qlk != nxt_queue_tail(&creating_sockets); 1087 qlk = nxt_queue_next(qlk)) 1088 { 1089 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1090 s = skcf->listen->socket; 1091 1092 if (s != -1) { 1093 nxt_socket_close(task, s); 1094 } 1095 1096 nxt_free(skcf->listen); 1097 } 1098 1099 nxt_queue_init(&new_socket_confs); 1100 nxt_queue_add(&new_socket_confs, &updating_sockets); 1101 nxt_queue_add(&new_socket_confs, &pending_sockets); 1102 nxt_queue_add(&new_socket_confs, &creating_sockets); 1103 1104 rtcf = tmcf->router_conf; 1105 1106 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1107 1108 nxt_router_app_unlink(task, app); 1109 1110 } nxt_queue_loop; 1111 1112 router = rtcf->router; 1113 1114 nxt_queue_add(&router->sockets, &keeping_sockets); 1115 nxt_queue_add(&router->sockets, &deleting_sockets); 1116 1117 nxt_queue_add(&router->apps, &tmcf->previous); 1118 1119 // TODO: new engines and threads 1120 1121 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1122 1123 nxt_mp_destroy(rtcf->mem_pool); 1124 1125 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1126 1127 nxt_mp_release(tmcf->mem_pool); 1128 } 1129 1130 1131 static void 1132 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1133 nxt_port_msg_type_t type) 1134 { 1135 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1136 1137 nxt_port_use(task, tmcf->port, -1); 1138 1139 tmcf->port = NULL; 1140 } 1141 1142 1143 static nxt_conf_map_t nxt_router_conf[] = { 1144 { 1145 nxt_string("listeners_threads"), 1146 NXT_CONF_MAP_INT32, 1147 offsetof(nxt_router_conf_t, threads), 1148 }, 1149 }; 1150 1151 1152 static nxt_conf_map_t nxt_router_app_conf[] = { 1153 { 1154 nxt_string("type"), 1155 NXT_CONF_MAP_STR, 1156 offsetof(nxt_router_app_conf_t, type), 1157 }, 1158 1159 { 1160 nxt_string("limits"), 1161 NXT_CONF_MAP_PTR, 1162 offsetof(nxt_router_app_conf_t, limits_value), 1163 }, 1164 1165 { 1166 nxt_string("processes"), 1167 NXT_CONF_MAP_INT32, 1168 offsetof(nxt_router_app_conf_t, processes), 1169 }, 1170 1171 { 1172 nxt_string("processes"), 1173 NXT_CONF_MAP_PTR, 1174 offsetof(nxt_router_app_conf_t, processes_value), 1175 }, 1176 1177 { 1178 nxt_string("targets"), 1179 NXT_CONF_MAP_PTR, 1180 offsetof(nxt_router_app_conf_t, targets_value), 1181 }, 1182 }; 1183 1184 1185 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1186 { 1187 nxt_string("timeout"), 1188 NXT_CONF_MAP_MSEC, 1189 offsetof(nxt_router_app_conf_t, timeout), 1190 }, 1191 1192 { 1193 nxt_string("requests"), 1194 NXT_CONF_MAP_INT32, 1195 offsetof(nxt_router_app_conf_t, requests), 1196 }, 1197 }; 1198 1199 1200 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1201 { 1202 nxt_string("spare"), 1203 NXT_CONF_MAP_INT32, 1204 offsetof(nxt_router_app_conf_t, spare_processes), 1205 }, 1206 1207 { 1208 nxt_string("max"), 1209 NXT_CONF_MAP_INT32, 1210 offsetof(nxt_router_app_conf_t, max_processes), 1211 }, 1212 1213 { 1214 nxt_string("idle_timeout"), 1215 NXT_CONF_MAP_MSEC, 1216 offsetof(nxt_router_app_conf_t, idle_timeout), 1217 }, 1218 }; 1219 1220 1221 static nxt_conf_map_t nxt_router_listener_conf[] = { 1222 { 1223 nxt_string("pass"), 1224 NXT_CONF_MAP_STR_COPY, 1225 offsetof(nxt_router_listener_conf_t, pass), 1226 }, 1227 1228 { 1229 nxt_string("application"), 1230 NXT_CONF_MAP_STR_COPY, 1231 offsetof(nxt_router_listener_conf_t, application), 1232 }, 1233 }; 1234 1235 1236 static nxt_conf_map_t nxt_router_http_conf[] = { 1237 { 1238 nxt_string("header_buffer_size"), 1239 NXT_CONF_MAP_SIZE, 1240 offsetof(nxt_socket_conf_t, header_buffer_size), 1241 }, 1242 1243 { 1244 nxt_string("large_header_buffer_size"), 1245 NXT_CONF_MAP_SIZE, 1246 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1247 }, 1248 1249 { 1250 nxt_string("large_header_buffers"), 1251 NXT_CONF_MAP_SIZE, 1252 offsetof(nxt_socket_conf_t, large_header_buffers), 1253 }, 1254 1255 { 1256 nxt_string("body_buffer_size"), 1257 NXT_CONF_MAP_SIZE, 1258 offsetof(nxt_socket_conf_t, body_buffer_size), 1259 }, 1260 1261 { 1262 nxt_string("max_body_size"), 1263 NXT_CONF_MAP_SIZE, 1264 offsetof(nxt_socket_conf_t, max_body_size), 1265 }, 1266 1267 { 1268 nxt_string("idle_timeout"), 1269 NXT_CONF_MAP_MSEC, 1270 offsetof(nxt_socket_conf_t, idle_timeout), 1271 }, 1272 1273 { 1274 nxt_string("header_read_timeout"), 1275 NXT_CONF_MAP_MSEC, 1276 offsetof(nxt_socket_conf_t, header_read_timeout), 1277 }, 1278 1279 { 1280 nxt_string("body_read_timeout"), 1281 NXT_CONF_MAP_MSEC, 1282 offsetof(nxt_socket_conf_t, body_read_timeout), 1283 }, 1284 1285 { 1286 nxt_string("send_timeout"), 1287 NXT_CONF_MAP_MSEC, 1288 offsetof(nxt_socket_conf_t, send_timeout), 1289 }, 1290 1291 { 1292 nxt_string("body_temp_path"), 1293 NXT_CONF_MAP_STR, 1294 offsetof(nxt_socket_conf_t, body_temp_path), 1295 }, 1296 1297 { 1298 nxt_string("discard_unsafe_fields"), 1299 NXT_CONF_MAP_INT8, 1300 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1301 }, 1302 }; 1303 1304 1305 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1306 { 1307 nxt_string("max_frame_size"), 1308 NXT_CONF_MAP_SIZE, 1309 offsetof(nxt_websocket_conf_t, max_frame_size), 1310 }, 1311 1312 { 1313 nxt_string("read_timeout"), 1314 NXT_CONF_MAP_MSEC, 1315 offsetof(nxt_websocket_conf_t, read_timeout), 1316 }, 1317 1318 { 1319 nxt_string("keepalive_interval"), 1320 NXT_CONF_MAP_MSEC, 1321 offsetof(nxt_websocket_conf_t, keepalive_interval), 1322 }, 1323 1324 }; 1325 1326 1327 static nxt_int_t 1328 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1329 u_char *start, u_char *end) 1330 { 1331 u_char *p; 1332 size_t size; 1333 nxt_mp_t *mp, *app_mp; 1334 uint32_t next, next_target; 1335 nxt_int_t ret; 1336 nxt_str_t name, path, target; 1337 nxt_app_t *app, *prev; 1338 nxt_str_t *t, *s, *targets; 1339 nxt_uint_t n, i; 1340 nxt_port_t *port; 1341 nxt_router_t *router; 1342 nxt_app_joint_t *app_joint; 1343 #if (NXT_TLS) 1344 nxt_conf_value_t *certificate, *conf_cmds; 1345 #endif 1346 nxt_conf_value_t *conf, *http, *value, *websocket; 1347 nxt_conf_value_t *applications, *application; 1348 nxt_conf_value_t *listeners, *listener; 1349 nxt_conf_value_t *routes_conf, *static_conf; 1350 nxt_socket_conf_t *skcf; 1351 nxt_http_routes_t *routes; 1352 nxt_event_engine_t *engine; 1353 nxt_app_lang_module_t *lang; 1354 nxt_router_app_conf_t apcf; 1355 nxt_router_access_log_t *access_log; 1356 nxt_router_listener_conf_t lscf; 1357 1358 static nxt_str_t http_path = nxt_string("/settings/http"); 1359 static nxt_str_t applications_path = nxt_string("/applications"); 1360 static nxt_str_t listeners_path = nxt_string("/listeners"); 1361 static nxt_str_t routes_path = nxt_string("/routes"); 1362 static nxt_str_t access_log_path = nxt_string("/access_log"); 1363 #if (NXT_TLS) 1364 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1365 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands"); 1366 #endif 1367 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1368 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1369 1370 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1371 if (conf == NULL) { 1372 nxt_alert(task, "configuration parsing error"); 1373 return NXT_ERROR; 1374 } 1375 1376 mp = tmcf->router_conf->mem_pool; 1377 1378 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1379 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1380 if (ret != NXT_OK) { 1381 nxt_alert(task, "root map error"); 1382 return NXT_ERROR; 1383 } 1384 1385 if (tmcf->router_conf->threads == 0) { 1386 tmcf->router_conf->threads = nxt_ncpu; 1387 } 1388 1389 static_conf = nxt_conf_get_path(conf, &static_path); 1390 1391 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf); 1392 if (nxt_slow_path(ret != NXT_OK)) { 1393 return NXT_ERROR; 1394 } 1395 1396 router = tmcf->router_conf->router; 1397 1398 applications = nxt_conf_get_path(conf, &applications_path); 1399 1400 if (applications != NULL) { 1401 next = 0; 1402 1403 for ( ;; ) { 1404 application = nxt_conf_next_object_member(applications, 1405 &name, &next); 1406 if (application == NULL) { 1407 break; 1408 } 1409 1410 nxt_debug(task, "application \"%V\"", &name); 1411 1412 size = nxt_conf_json_length(application, NULL); 1413 1414 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1415 if (nxt_slow_path(app_mp == NULL)) { 1416 goto fail; 1417 } 1418 1419 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1420 if (app == NULL) { 1421 goto app_fail; 1422 } 1423 1424 nxt_memzero(app, sizeof(nxt_app_t)); 1425 1426 app->mem_pool = app_mp; 1427 1428 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1429 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1430 + name.length); 1431 1432 p = nxt_conf_json_print(app->conf.start, application, NULL); 1433 app->conf.length = p - app->conf.start; 1434 1435 nxt_assert(app->conf.length <= size); 1436 1437 nxt_debug(task, "application conf \"%V\"", &app->conf); 1438 1439 prev = nxt_router_app_find(&router->apps, &name); 1440 1441 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1442 nxt_mp_destroy(app_mp); 1443 1444 nxt_queue_remove(&prev->link); 1445 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1446 1447 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev); 1448 if (nxt_slow_path(ret != NXT_OK)) { 1449 goto fail; 1450 } 1451 1452 continue; 1453 } 1454 1455 apcf.processes = 1; 1456 apcf.max_processes = 1; 1457 apcf.spare_processes = 0; 1458 apcf.timeout = 0; 1459 apcf.idle_timeout = 15000; 1460 apcf.requests = 0; 1461 apcf.limits_value = NULL; 1462 apcf.processes_value = NULL; 1463 apcf.targets_value = NULL; 1464 1465 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1466 if (nxt_slow_path(app_joint == NULL)) { 1467 goto app_fail; 1468 } 1469 1470 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1471 1472 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1473 nxt_nitems(nxt_router_app_conf), &apcf); 1474 if (ret != NXT_OK) { 1475 nxt_alert(task, "application map error"); 1476 goto app_fail; 1477 } 1478 1479 if (apcf.limits_value != NULL) { 1480 1481 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1482 nxt_alert(task, "application limits is not object"); 1483 goto app_fail; 1484 } 1485 1486 ret = nxt_conf_map_object(mp, apcf.limits_value, 1487 nxt_router_app_limits_conf, 1488 nxt_nitems(nxt_router_app_limits_conf), 1489 &apcf); 1490 if (ret != NXT_OK) { 1491 nxt_alert(task, "application limits map error"); 1492 goto app_fail; 1493 } 1494 } 1495 1496 if (apcf.processes_value != NULL 1497 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1498 { 1499 ret = nxt_conf_map_object(mp, apcf.processes_value, 1500 nxt_router_app_processes_conf, 1501 nxt_nitems(nxt_router_app_processes_conf), 1502 &apcf); 1503 if (ret != NXT_OK) { 1504 nxt_alert(task, "application processes map error"); 1505 goto app_fail; 1506 } 1507 1508 } else { 1509 apcf.max_processes = apcf.processes; 1510 apcf.spare_processes = apcf.processes; 1511 } 1512 1513 if (apcf.targets_value != NULL) { 1514 n = nxt_conf_object_members_count(apcf.targets_value); 1515 1516 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1517 if (nxt_slow_path(targets == NULL)) { 1518 goto app_fail; 1519 } 1520 1521 next_target = 0; 1522 1523 for (i = 0; i < n; i++) { 1524 (void) nxt_conf_next_object_member(apcf.targets_value, 1525 &target, &next_target); 1526 1527 s = nxt_str_dup(app_mp, &targets[i], &target); 1528 if (nxt_slow_path(s == NULL)) { 1529 goto app_fail; 1530 } 1531 } 1532 1533 } else { 1534 targets = NULL; 1535 } 1536 1537 nxt_debug(task, "application type: %V", &apcf.type); 1538 nxt_debug(task, "application processes: %D", apcf.processes); 1539 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1540 nxt_debug(task, "application requests: %D", apcf.requests); 1541 1542 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1543 1544 if (lang == NULL) { 1545 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1546 goto app_fail; 1547 } 1548 1549 nxt_debug(task, "application language module: \"%s\"", lang->file); 1550 1551 ret = nxt_thread_mutex_create(&app->mutex); 1552 if (ret != NXT_OK) { 1553 goto app_fail; 1554 } 1555 1556 nxt_queue_init(&app->ports); 1557 nxt_queue_init(&app->spare_ports); 1558 nxt_queue_init(&app->idle_ports); 1559 nxt_queue_init(&app->ack_waiting_req); 1560 1561 app->name.length = name.length; 1562 nxt_memcpy(app->name.start, name.start, name.length); 1563 1564 app->type = lang->type; 1565 app->max_processes = apcf.max_processes; 1566 app->spare_processes = apcf.spare_processes; 1567 app->max_pending_processes = apcf.spare_processes 1568 ? apcf.spare_processes : 1; 1569 app->timeout = apcf.timeout; 1570 app->idle_timeout = apcf.idle_timeout; 1571 app->max_requests = apcf.requests; 1572 1573 app->targets = targets; 1574 1575 engine = task->thread->engine; 1576 1577 app->engine = engine; 1578 1579 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1580 app->adjust_idle_work.task = &engine->task; 1581 app->adjust_idle_work.obj = app; 1582 1583 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1584 1585 ret = nxt_router_apps_hash_add(tmcf->router_conf, app); 1586 if (nxt_slow_path(ret != NXT_OK)) { 1587 goto app_fail; 1588 } 1589 1590 nxt_router_app_use(task, app, 1); 1591 1592 app->joint = app_joint; 1593 1594 app_joint->use_count = 1; 1595 app_joint->app = app; 1596 1597 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1598 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1599 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1600 app_joint->idle_timer.task = &engine->task; 1601 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1602 1603 app_joint->free_app_work.handler = nxt_router_free_app; 1604 app_joint->free_app_work.task = &engine->task; 1605 app_joint->free_app_work.obj = app_joint; 1606 1607 port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid, 1608 NXT_PROCESS_APP); 1609 if (nxt_slow_path(port == NULL)) { 1610 return NXT_ERROR; 1611 } 1612 1613 ret = nxt_port_socket_init(task, port, 0); 1614 if (nxt_slow_path(ret != NXT_OK)) { 1615 nxt_port_use(task, port, -1); 1616 return NXT_ERROR; 1617 } 1618 1619 ret = nxt_router_app_queue_init(task, port); 1620 if (nxt_slow_path(ret != NXT_OK)) { 1621 nxt_port_write_close(port); 1622 nxt_port_read_close(port); 1623 nxt_port_use(task, port, -1); 1624 return NXT_ERROR; 1625 } 1626 1627 nxt_port_write_enable(task, port); 1628 port->app = app; 1629 1630 app->shared_port = port; 1631 1632 nxt_thread_mutex_create(&app->outgoing.mutex); 1633 } 1634 } 1635 1636 routes_conf = nxt_conf_get_path(conf, &routes_path); 1637 if (nxt_fast_path(routes_conf != NULL)) { 1638 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1639 if (nxt_slow_path(routes == NULL)) { 1640 return NXT_ERROR; 1641 } 1642 tmcf->router_conf->routes = routes; 1643 } 1644 1645 ret = nxt_upstreams_create(task, tmcf, conf); 1646 if (nxt_slow_path(ret != NXT_OK)) { 1647 return ret; 1648 } 1649 1650 http = nxt_conf_get_path(conf, &http_path); 1651 #if 0 1652 if (http == NULL) { 1653 nxt_alert(task, "no \"http\" block"); 1654 return NXT_ERROR; 1655 } 1656 #endif 1657 1658 websocket = nxt_conf_get_path(conf, &websocket_path); 1659 1660 listeners = nxt_conf_get_path(conf, &listeners_path); 1661 1662 if (listeners != NULL) { 1663 next = 0; 1664 1665 for ( ;; ) { 1666 listener = nxt_conf_next_object_member(listeners, &name, &next); 1667 if (listener == NULL) { 1668 break; 1669 } 1670 1671 skcf = nxt_router_socket_conf(task, tmcf, &name); 1672 if (skcf == NULL) { 1673 goto fail; 1674 } 1675 1676 nxt_memzero(&lscf, sizeof(lscf)); 1677 1678 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1679 nxt_nitems(nxt_router_listener_conf), 1680 &lscf); 1681 if (ret != NXT_OK) { 1682 nxt_alert(task, "listener map error"); 1683 goto fail; 1684 } 1685 1686 nxt_debug(task, "application: %V", &lscf.application); 1687 1688 // STUB, default values if http block is not defined. 1689 skcf->header_buffer_size = 2048; 1690 skcf->large_header_buffer_size = 8192; 1691 skcf->large_header_buffers = 4; 1692 skcf->discard_unsafe_fields = 1; 1693 skcf->body_buffer_size = 16 * 1024; 1694 skcf->max_body_size = 8 * 1024 * 1024; 1695 skcf->proxy_header_buffer_size = 64 * 1024; 1696 skcf->proxy_buffer_size = 4096; 1697 skcf->proxy_buffers = 256; 1698 skcf->idle_timeout = 180 * 1000; 1699 skcf->header_read_timeout = 30 * 1000; 1700 skcf->body_read_timeout = 30 * 1000; 1701 skcf->send_timeout = 30 * 1000; 1702 skcf->proxy_timeout = 60 * 1000; 1703 skcf->proxy_send_timeout = 30 * 1000; 1704 skcf->proxy_read_timeout = 30 * 1000; 1705 1706 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1707 skcf->websocket_conf.read_timeout = 60 * 1000; 1708 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1709 1710 nxt_str_null(&skcf->body_temp_path); 1711 1712 if (http != NULL) { 1713 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1714 nxt_nitems(nxt_router_http_conf), 1715 skcf); 1716 if (ret != NXT_OK) { 1717 nxt_alert(task, "http map error"); 1718 goto fail; 1719 } 1720 } 1721 1722 if (websocket != NULL) { 1723 ret = nxt_conf_map_object(mp, websocket, 1724 nxt_router_websocket_conf, 1725 nxt_nitems(nxt_router_websocket_conf), 1726 &skcf->websocket_conf); 1727 if (ret != NXT_OK) { 1728 nxt_alert(task, "websocket map error"); 1729 goto fail; 1730 } 1731 } 1732 1733 t = &skcf->body_temp_path; 1734 1735 if (t->length == 0) { 1736 t->start = (u_char *) task->thread->runtime->tmp; 1737 t->length = nxt_strlen(t->start); 1738 } 1739 1740 #if (NXT_TLS) 1741 certificate = nxt_conf_get_path(listener, &certificate_path); 1742 1743 if (certificate != NULL) { 1744 conf_cmds = nxt_conf_get_path(listener, &conf_commands_path); 1745 1746 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) { 1747 n = nxt_conf_array_elements_count(certificate); 1748 1749 for (i = 0; i < n; i++) { 1750 value = nxt_conf_get_array_element(certificate, i); 1751 1752 nxt_assert(value != NULL); 1753 1754 ret = nxt_router_conf_tls_insert(tmcf, value, skcf, 1755 conf_cmds, i == 0); 1756 if (nxt_slow_path(ret != NXT_OK)) { 1757 goto fail; 1758 } 1759 } 1760 1761 } else { 1762 /* NXT_CONF_STRING */ 1763 ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf, 1764 conf_cmds, 1); 1765 if (nxt_slow_path(ret != NXT_OK)) { 1766 goto fail; 1767 } 1768 } 1769 } 1770 #endif 1771 1772 skcf->listen->handler = nxt_http_conn_init; 1773 skcf->router_conf = tmcf->router_conf; 1774 skcf->router_conf->count++; 1775 1776 if (lscf.pass.length != 0) { 1777 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1778 1779 /* COMPATIBILITY: listener application. */ 1780 } else if (lscf.application.length > 0) { 1781 skcf->action = nxt_http_pass_application(task, 1782 tmcf->router_conf, 1783 &lscf.application); 1784 } 1785 1786 if (nxt_slow_path(skcf->action == NULL)) { 1787 goto fail; 1788 } 1789 } 1790 } 1791 1792 ret = nxt_http_routes_resolve(task, tmcf); 1793 if (nxt_slow_path(ret != NXT_OK)) { 1794 goto fail; 1795 } 1796 1797 value = nxt_conf_get_path(conf, &access_log_path); 1798 1799 if (value != NULL) { 1800 nxt_conf_get_string(value, &path); 1801 1802 access_log = router->access_log; 1803 1804 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1805 nxt_thread_spin_lock(&router->lock); 1806 access_log->count++; 1807 nxt_thread_spin_unlock(&router->lock); 1808 1809 } else { 1810 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1811 + path.length); 1812 if (access_log == NULL) { 1813 nxt_alert(task, "failed to allocate access log structure"); 1814 goto fail; 1815 } 1816 1817 access_log->fd = -1; 1818 access_log->handler = &nxt_router_access_log_writer; 1819 access_log->count = 1; 1820 1821 access_log->path.length = path.length; 1822 access_log->path.start = (u_char *) access_log 1823 + sizeof(nxt_router_access_log_t); 1824 1825 nxt_memcpy(access_log->path.start, path.start, path.length); 1826 } 1827 1828 tmcf->router_conf->access_log = access_log; 1829 } 1830 1831 nxt_queue_add(&deleting_sockets, &router->sockets); 1832 nxt_queue_init(&router->sockets); 1833 1834 return NXT_OK; 1835 1836 app_fail: 1837 1838 nxt_mp_destroy(app_mp); 1839 1840 fail: 1841 1842 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1843 1844 nxt_queue_remove(&app->link); 1845 nxt_thread_mutex_destroy(&app->mutex); 1846 nxt_mp_destroy(app->mem_pool); 1847 1848 } nxt_queue_loop; 1849 1850 return NXT_ERROR; 1851 } 1852 1853 1854 #if (NXT_TLS) 1855 1856 static nxt_int_t 1857 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 1858 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, 1859 nxt_conf_value_t *conf_cmds, nxt_bool_t last) 1860 { 1861 nxt_router_tlssock_t *tls; 1862 1863 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t)); 1864 if (nxt_slow_path(tls == NULL)) { 1865 return NXT_ERROR; 1866 } 1867 1868 tls->socket_conf = skcf; 1869 tls->conf_cmds = conf_cmds; 1870 tls->temp_conf = tmcf; 1871 tls->last = last; 1872 nxt_conf_get_string(value, &tls->name); 1873 1874 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 1875 1876 return NXT_OK; 1877 } 1878 1879 #endif 1880 1881 1882 static nxt_int_t 1883 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 1884 nxt_conf_value_t *conf) 1885 { 1886 uint32_t next, i; 1887 nxt_mp_t *mp; 1888 nxt_str_t *type, extension, str; 1889 nxt_int_t ret; 1890 nxt_uint_t exts; 1891 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 1892 1893 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 1894 1895 mp = rtcf->mem_pool; 1896 1897 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 1898 if (nxt_slow_path(ret != NXT_OK)) { 1899 return NXT_ERROR; 1900 } 1901 1902 if (conf == NULL) { 1903 return NXT_OK; 1904 } 1905 1906 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 1907 1908 if (mtypes_conf != NULL) { 1909 next = 0; 1910 1911 for ( ;; ) { 1912 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 1913 1914 if (ext_conf == NULL) { 1915 break; 1916 } 1917 1918 type = nxt_str_dup(mp, NULL, &str); 1919 if (nxt_slow_path(type == NULL)) { 1920 return NXT_ERROR; 1921 } 1922 1923 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 1924 nxt_conf_get_string(ext_conf, &str); 1925 1926 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { 1927 return NXT_ERROR; 1928 } 1929 1930 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1931 &extension, type); 1932 if (nxt_slow_path(ret != NXT_OK)) { 1933 return NXT_ERROR; 1934 } 1935 1936 continue; 1937 } 1938 1939 exts = nxt_conf_array_elements_count(ext_conf); 1940 1941 for (i = 0; i < exts; i++) { 1942 value = nxt_conf_get_array_element(ext_conf, i); 1943 1944 nxt_conf_get_string(value, &str); 1945 1946 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { 1947 return NXT_ERROR; 1948 } 1949 1950 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1951 &extension, type); 1952 if (nxt_slow_path(ret != NXT_OK)) { 1953 return NXT_ERROR; 1954 } 1955 } 1956 } 1957 } 1958 1959 return NXT_OK; 1960 } 1961 1962 1963 static nxt_app_t * 1964 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1965 { 1966 nxt_app_t *app; 1967 1968 nxt_queue_each(app, queue, nxt_app_t, link) { 1969 1970 if (nxt_strstr_eq(name, &app->name)) { 1971 return app; 1972 } 1973 1974 } nxt_queue_loop; 1975 1976 return NULL; 1977 } 1978 1979 1980 static nxt_int_t 1981 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 1982 { 1983 void *mem; 1984 nxt_int_t fd; 1985 1986 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 1987 if (nxt_slow_path(fd == -1)) { 1988 return NXT_ERROR; 1989 } 1990 1991 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 1992 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 1993 if (nxt_slow_path(mem == MAP_FAILED)) { 1994 nxt_fd_close(fd); 1995 1996 return NXT_ERROR; 1997 } 1998 1999 nxt_app_queue_init(mem); 2000 2001 port->queue_fd = fd; 2002 port->queue = mem; 2003 2004 return NXT_OK; 2005 } 2006 2007 2008 static nxt_int_t 2009 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 2010 { 2011 void *mem; 2012 nxt_int_t fd; 2013 2014 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 2015 if (nxt_slow_path(fd == -1)) { 2016 return NXT_ERROR; 2017 } 2018 2019 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2020 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2021 if (nxt_slow_path(mem == MAP_FAILED)) { 2022 nxt_fd_close(fd); 2023 2024 return NXT_ERROR; 2025 } 2026 2027 nxt_port_queue_init(mem); 2028 2029 port->queue_fd = fd; 2030 port->queue = mem; 2031 2032 return NXT_OK; 2033 } 2034 2035 2036 static nxt_int_t 2037 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 2038 { 2039 void *mem; 2040 2041 nxt_assert(fd != -1); 2042 2043 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2044 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2045 if (nxt_slow_path(mem == MAP_FAILED)) { 2046 2047 return NXT_ERROR; 2048 } 2049 2050 port->queue = mem; 2051 2052 return NXT_OK; 2053 } 2054 2055 2056 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2057 NXT_LVLHSH_DEFAULT, 2058 nxt_router_apps_hash_test, 2059 nxt_mp_lvlhsh_alloc, 2060 nxt_mp_lvlhsh_free, 2061 }; 2062 2063 2064 static nxt_int_t 2065 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2066 { 2067 nxt_app_t *app; 2068 2069 app = data; 2070 2071 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2072 } 2073 2074 2075 static nxt_int_t 2076 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2077 { 2078 nxt_lvlhsh_query_t lhq; 2079 2080 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2081 lhq.replace = 0; 2082 lhq.key = app->name; 2083 lhq.value = app; 2084 lhq.proto = &nxt_router_apps_hash_proto; 2085 lhq.pool = rtcf->mem_pool; 2086 2087 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2088 2089 case NXT_OK: 2090 return NXT_OK; 2091 2092 case NXT_DECLINED: 2093 nxt_thread_log_alert("router app hash adding failed: " 2094 "\"%V\" is already in hash", &lhq.key); 2095 /* Fall through. */ 2096 default: 2097 return NXT_ERROR; 2098 } 2099 } 2100 2101 2102 static nxt_app_t * 2103 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2104 { 2105 nxt_lvlhsh_query_t lhq; 2106 2107 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2108 lhq.key = *name; 2109 lhq.proto = &nxt_router_apps_hash_proto; 2110 2111 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2112 return NULL; 2113 } 2114 2115 return lhq.value; 2116 } 2117 2118 2119 static void 2120 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2121 { 2122 nxt_app_t *app; 2123 nxt_lvlhsh_each_t lhe; 2124 2125 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2126 2127 for ( ;; ) { 2128 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2129 2130 if (app == NULL) { 2131 break; 2132 } 2133 2134 nxt_router_app_use(task, app, i); 2135 } 2136 } 2137 2138 2139 2140 nxt_int_t 2141 nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name, 2142 nxt_http_action_t *action) 2143 { 2144 nxt_app_t *app; 2145 2146 app = nxt_router_apps_hash_get(rtcf, name); 2147 2148 if (app == NULL) { 2149 return NXT_DECLINED; 2150 } 2151 2152 action->u.app.application = app; 2153 action->handler = nxt_http_application_handler; 2154 2155 return NXT_OK; 2156 } 2157 2158 2159 static nxt_socket_conf_t * 2160 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2161 nxt_str_t *name) 2162 { 2163 size_t size; 2164 nxt_int_t ret; 2165 nxt_bool_t wildcard; 2166 nxt_sockaddr_t *sa; 2167 nxt_socket_conf_t *skcf; 2168 nxt_listen_socket_t *ls; 2169 2170 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2171 if (nxt_slow_path(sa == NULL)) { 2172 nxt_alert(task, "invalid listener \"%V\"", name); 2173 return NULL; 2174 } 2175 2176 sa->type = SOCK_STREAM; 2177 2178 nxt_debug(task, "router listener: \"%*s\"", 2179 (size_t) sa->length, nxt_sockaddr_start(sa)); 2180 2181 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2182 if (nxt_slow_path(skcf == NULL)) { 2183 return NULL; 2184 } 2185 2186 size = nxt_sockaddr_size(sa); 2187 2188 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2189 2190 if (ret != NXT_OK) { 2191 2192 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2193 if (nxt_slow_path(ls == NULL)) { 2194 return NULL; 2195 } 2196 2197 skcf->listen = ls; 2198 2199 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2200 nxt_memcpy(ls->sockaddr, sa, size); 2201 2202 nxt_listen_socket_remote_size(ls); 2203 2204 ls->socket = -1; 2205 ls->backlog = NXT_LISTEN_BACKLOG; 2206 ls->flags = NXT_NONBLOCK; 2207 ls->read_after_accept = 1; 2208 } 2209 2210 switch (sa->u.sockaddr.sa_family) { 2211 #if (NXT_HAVE_UNIX_DOMAIN) 2212 case AF_UNIX: 2213 wildcard = 0; 2214 break; 2215 #endif 2216 #if (NXT_INET6) 2217 case AF_INET6: 2218 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2219 break; 2220 #endif 2221 case AF_INET: 2222 default: 2223 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2224 break; 2225 } 2226 2227 if (!wildcard) { 2228 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2229 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2230 return NULL; 2231 } 2232 2233 nxt_memcpy(skcf->sockaddr, sa, size); 2234 } 2235 2236 return skcf; 2237 } 2238 2239 2240 static nxt_int_t 2241 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2242 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2243 { 2244 nxt_router_t *router; 2245 nxt_queue_link_t *qlk; 2246 nxt_socket_conf_t *skcf; 2247 2248 router = tmcf->router_conf->router; 2249 2250 for (qlk = nxt_queue_first(&router->sockets); 2251 qlk != nxt_queue_tail(&router->sockets); 2252 qlk = nxt_queue_next(qlk)) 2253 { 2254 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2255 2256 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2257 nskcf->listen = skcf->listen; 2258 2259 nxt_queue_remove(qlk); 2260 nxt_queue_insert_tail(&keeping_sockets, qlk); 2261 2262 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2263 2264 return NXT_OK; 2265 } 2266 } 2267 2268 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2269 2270 return NXT_DECLINED; 2271 } 2272 2273 2274 static void 2275 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2276 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2277 { 2278 size_t size; 2279 uint32_t stream; 2280 nxt_int_t ret; 2281 nxt_buf_t *b; 2282 nxt_port_t *main_port, *router_port; 2283 nxt_runtime_t *rt; 2284 nxt_socket_rpc_t *rpc; 2285 2286 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2287 if (rpc == NULL) { 2288 goto fail; 2289 } 2290 2291 rpc->socket_conf = skcf; 2292 rpc->temp_conf = tmcf; 2293 2294 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2295 2296 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2297 if (b == NULL) { 2298 goto fail; 2299 } 2300 2301 b->completion_handler = nxt_router_dummy_buf_completion; 2302 2303 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2304 2305 rt = task->thread->runtime; 2306 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2307 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2308 2309 stream = nxt_port_rpc_register_handler(task, router_port, 2310 nxt_router_listen_socket_ready, 2311 nxt_router_listen_socket_error, 2312 main_port->pid, rpc); 2313 if (nxt_slow_path(stream == 0)) { 2314 goto fail; 2315 } 2316 2317 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2318 stream, router_port->id, b); 2319 2320 if (nxt_slow_path(ret != NXT_OK)) { 2321 nxt_port_rpc_cancel(task, router_port, stream); 2322 goto fail; 2323 } 2324 2325 return; 2326 2327 fail: 2328 2329 nxt_router_conf_error(task, tmcf); 2330 } 2331 2332 2333 static void 2334 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2335 void *data) 2336 { 2337 nxt_int_t ret; 2338 nxt_socket_t s; 2339 nxt_socket_rpc_t *rpc; 2340 2341 rpc = data; 2342 2343 s = msg->fd[0]; 2344 2345 ret = nxt_socket_nonblocking(task, s); 2346 if (nxt_slow_path(ret != NXT_OK)) { 2347 goto fail; 2348 } 2349 2350 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2351 2352 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2353 if (nxt_slow_path(ret != NXT_OK)) { 2354 goto fail; 2355 } 2356 2357 rpc->socket_conf->listen->socket = s; 2358 2359 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2360 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2361 2362 return; 2363 2364 fail: 2365 2366 nxt_socket_close(task, s); 2367 2368 nxt_router_conf_error(task, rpc->temp_conf); 2369 } 2370 2371 2372 static void 2373 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2374 void *data) 2375 { 2376 nxt_socket_rpc_t *rpc; 2377 nxt_router_temp_conf_t *tmcf; 2378 2379 rpc = data; 2380 tmcf = rpc->temp_conf; 2381 2382 #if 0 2383 u_char *p; 2384 size_t size; 2385 uint8_t error; 2386 nxt_buf_t *in, *out; 2387 nxt_sockaddr_t *sa; 2388 2389 static nxt_str_t socket_errors[] = { 2390 nxt_string("ListenerSystem"), 2391 nxt_string("ListenerNoIPv6"), 2392 nxt_string("ListenerPort"), 2393 nxt_string("ListenerInUse"), 2394 nxt_string("ListenerNoAddress"), 2395 nxt_string("ListenerNoAccess"), 2396 nxt_string("ListenerPath"), 2397 }; 2398 2399 sa = rpc->socket_conf->listen->sockaddr; 2400 2401 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2402 2403 if (nxt_slow_path(in == NULL)) { 2404 return; 2405 } 2406 2407 p = in->mem.pos; 2408 2409 error = *p++; 2410 2411 size = nxt_length("listen socket error: ") 2412 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2413 + sa->length + socket_errors[error].length + (in->mem.free - p); 2414 2415 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2416 if (nxt_slow_path(out == NULL)) { 2417 return; 2418 } 2419 2420 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2421 "listen socket error: " 2422 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2423 (size_t) sa->length, nxt_sockaddr_start(sa), 2424 &socket_errors[error], in->mem.free - p, p); 2425 2426 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2427 #endif 2428 2429 nxt_router_conf_error(task, tmcf); 2430 } 2431 2432 2433 #if (NXT_TLS) 2434 2435 static void 2436 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2437 void *data) 2438 { 2439 nxt_mp_t *mp; 2440 nxt_int_t ret; 2441 nxt_tls_conf_t *tlscf; 2442 nxt_router_tlssock_t *tls; 2443 nxt_tls_bundle_conf_t *bundle; 2444 nxt_router_temp_conf_t *tmcf; 2445 2446 nxt_debug(task, "tls rpc handler"); 2447 2448 tls = data; 2449 tmcf = tls->temp_conf; 2450 2451 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2452 goto fail; 2453 } 2454 2455 mp = tmcf->router_conf->mem_pool; 2456 2457 if (tls->socket_conf->tls == NULL){ 2458 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2459 if (nxt_slow_path(tlscf == NULL)) { 2460 goto fail; 2461 } 2462 2463 tlscf->no_wait_shutdown = 1; 2464 tls->socket_conf->tls = tlscf; 2465 2466 } else { 2467 tlscf = tls->socket_conf->tls; 2468 } 2469 2470 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); 2471 if (nxt_slow_path(bundle == NULL)) { 2472 goto fail; 2473 } 2474 2475 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) { 2476 goto fail; 2477 } 2478 2479 bundle->chain_file = msg->fd[0]; 2480 bundle->next = tlscf->bundle; 2481 tlscf->bundle = bundle; 2482 2483 ret = task->thread->runtime->tls->server_init(task, tlscf, mp, 2484 tls->conf_cmds, tls->last); 2485 if (nxt_slow_path(ret != NXT_OK)) { 2486 goto fail; 2487 } 2488 2489 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2490 nxt_router_conf_apply, task, tmcf, NULL); 2491 return; 2492 2493 fail: 2494 2495 nxt_router_conf_error(task, tmcf); 2496 } 2497 2498 #endif 2499 2500 2501 static void 2502 nxt_router_app_rpc_create(nxt_task_t *task, 2503 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2504 { 2505 size_t size; 2506 uint32_t stream; 2507 nxt_int_t ret; 2508 nxt_buf_t *b; 2509 nxt_port_t *main_port, *router_port; 2510 nxt_runtime_t *rt; 2511 nxt_app_rpc_t *rpc; 2512 2513 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2514 if (rpc == NULL) { 2515 goto fail; 2516 } 2517 2518 rpc->app = app; 2519 rpc->temp_conf = tmcf; 2520 2521 nxt_debug(task, "app '%V' prefork", &app->name); 2522 2523 size = app->name.length + 1 + app->conf.length; 2524 2525 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2526 if (nxt_slow_path(b == NULL)) { 2527 goto fail; 2528 } 2529 2530 b->completion_handler = nxt_router_dummy_buf_completion; 2531 2532 nxt_buf_cpystr(b, &app->name); 2533 *b->mem.free++ = '\0'; 2534 nxt_buf_cpystr(b, &app->conf); 2535 2536 rt = task->thread->runtime; 2537 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2538 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2539 2540 stream = nxt_port_rpc_register_handler(task, router_port, 2541 nxt_router_app_prefork_ready, 2542 nxt_router_app_prefork_error, 2543 -1, rpc); 2544 if (nxt_slow_path(stream == 0)) { 2545 goto fail; 2546 } 2547 2548 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 2549 -1, stream, router_port->id, b); 2550 2551 if (nxt_slow_path(ret != NXT_OK)) { 2552 nxt_port_rpc_cancel(task, router_port, stream); 2553 goto fail; 2554 } 2555 2556 app->pending_processes++; 2557 2558 return; 2559 2560 fail: 2561 2562 nxt_router_conf_error(task, tmcf); 2563 } 2564 2565 2566 static void 2567 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2568 void *data) 2569 { 2570 nxt_app_t *app; 2571 nxt_port_t *port; 2572 nxt_app_rpc_t *rpc; 2573 nxt_event_engine_t *engine; 2574 2575 rpc = data; 2576 app = rpc->app; 2577 2578 port = msg->u.new_port; 2579 2580 nxt_assert(port != NULL); 2581 nxt_assert(port->type == NXT_PROCESS_APP); 2582 nxt_assert(port->id == 0); 2583 2584 port->app = app; 2585 port->main_app_port = port; 2586 2587 app->pending_processes--; 2588 app->processes++; 2589 app->idle_processes++; 2590 2591 engine = task->thread->engine; 2592 2593 nxt_queue_insert_tail(&app->ports, &port->app_link); 2594 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2595 2596 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2597 &app->name, port->pid, port->id); 2598 2599 nxt_port_hash_add(&app->port_hash, port); 2600 app->port_hash_count++; 2601 2602 port->idle_start = 0; 2603 2604 nxt_port_inc_use(port); 2605 2606 nxt_router_app_shared_port_send(task, port); 2607 2608 nxt_work_queue_add(&engine->fast_work_queue, 2609 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2610 } 2611 2612 2613 static void 2614 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2615 void *data) 2616 { 2617 nxt_app_t *app; 2618 nxt_app_rpc_t *rpc; 2619 nxt_router_temp_conf_t *tmcf; 2620 2621 rpc = data; 2622 app = rpc->app; 2623 tmcf = rpc->temp_conf; 2624 2625 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2626 &app->name); 2627 2628 app->pending_processes--; 2629 2630 nxt_router_conf_error(task, tmcf); 2631 } 2632 2633 2634 static nxt_int_t 2635 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2636 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2637 { 2638 nxt_int_t ret; 2639 nxt_uint_t n, threads; 2640 nxt_queue_link_t *qlk; 2641 nxt_router_engine_conf_t *recf; 2642 2643 threads = tmcf->router_conf->threads; 2644 2645 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2646 sizeof(nxt_router_engine_conf_t)); 2647 if (nxt_slow_path(tmcf->engines == NULL)) { 2648 return NXT_ERROR; 2649 } 2650 2651 n = 0; 2652 2653 for (qlk = nxt_queue_first(&router->engines); 2654 qlk != nxt_queue_tail(&router->engines); 2655 qlk = nxt_queue_next(qlk)) 2656 { 2657 recf = nxt_array_zero_add(tmcf->engines); 2658 if (nxt_slow_path(recf == NULL)) { 2659 return NXT_ERROR; 2660 } 2661 2662 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2663 2664 if (n < threads) { 2665 recf->action = NXT_ROUTER_ENGINE_KEEP; 2666 ret = nxt_router_engine_conf_update(tmcf, recf); 2667 2668 } else { 2669 recf->action = NXT_ROUTER_ENGINE_DELETE; 2670 ret = nxt_router_engine_conf_delete(tmcf, recf); 2671 } 2672 2673 if (nxt_slow_path(ret != NXT_OK)) { 2674 return ret; 2675 } 2676 2677 n++; 2678 } 2679 2680 tmcf->new_threads = n; 2681 2682 while (n < threads) { 2683 recf = nxt_array_zero_add(tmcf->engines); 2684 if (nxt_slow_path(recf == NULL)) { 2685 return NXT_ERROR; 2686 } 2687 2688 recf->action = NXT_ROUTER_ENGINE_ADD; 2689 2690 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2691 if (nxt_slow_path(recf->engine == NULL)) { 2692 return NXT_ERROR; 2693 } 2694 2695 ret = nxt_router_engine_conf_create(tmcf, recf); 2696 if (nxt_slow_path(ret != NXT_OK)) { 2697 return ret; 2698 } 2699 2700 n++; 2701 } 2702 2703 return NXT_OK; 2704 } 2705 2706 2707 static nxt_int_t 2708 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2709 nxt_router_engine_conf_t *recf) 2710 { 2711 nxt_int_t ret; 2712 2713 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2714 nxt_router_listen_socket_create); 2715 if (nxt_slow_path(ret != NXT_OK)) { 2716 return ret; 2717 } 2718 2719 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2720 nxt_router_listen_socket_create); 2721 if (nxt_slow_path(ret != NXT_OK)) { 2722 return ret; 2723 } 2724 2725 return ret; 2726 } 2727 2728 2729 static nxt_int_t 2730 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2731 nxt_router_engine_conf_t *recf) 2732 { 2733 nxt_int_t ret; 2734 2735 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2736 nxt_router_listen_socket_create); 2737 if (nxt_slow_path(ret != NXT_OK)) { 2738 return ret; 2739 } 2740 2741 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2742 nxt_router_listen_socket_update); 2743 if (nxt_slow_path(ret != NXT_OK)) { 2744 return ret; 2745 } 2746 2747 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 2748 if (nxt_slow_path(ret != NXT_OK)) { 2749 return ret; 2750 } 2751 2752 return ret; 2753 } 2754 2755 2756 static nxt_int_t 2757 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2758 nxt_router_engine_conf_t *recf) 2759 { 2760 nxt_int_t ret; 2761 2762 ret = nxt_router_engine_quit(tmcf, recf); 2763 if (nxt_slow_path(ret != NXT_OK)) { 2764 return ret; 2765 } 2766 2767 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 2768 if (nxt_slow_path(ret != NXT_OK)) { 2769 return ret; 2770 } 2771 2772 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 2773 } 2774 2775 2776 static nxt_int_t 2777 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 2778 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 2779 nxt_work_handler_t handler) 2780 { 2781 nxt_int_t ret; 2782 nxt_joint_job_t *job; 2783 nxt_queue_link_t *qlk; 2784 nxt_socket_conf_t *skcf; 2785 nxt_socket_conf_joint_t *joint; 2786 2787 for (qlk = nxt_queue_first(sockets); 2788 qlk != nxt_queue_tail(sockets); 2789 qlk = nxt_queue_next(qlk)) 2790 { 2791 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2792 if (nxt_slow_path(job == NULL)) { 2793 return NXT_ERROR; 2794 } 2795 2796 job->work.next = recf->jobs; 2797 recf->jobs = &job->work; 2798 2799 job->task = tmcf->engine->task; 2800 job->work.handler = handler; 2801 job->work.task = &job->task; 2802 job->work.obj = job; 2803 job->tmcf = tmcf; 2804 2805 tmcf->count++; 2806 2807 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 2808 sizeof(nxt_socket_conf_joint_t)); 2809 if (nxt_slow_path(joint == NULL)) { 2810 return NXT_ERROR; 2811 } 2812 2813 job->work.data = joint; 2814 2815 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 2816 if (nxt_slow_path(ret != NXT_OK)) { 2817 return ret; 2818 } 2819 2820 joint->count = 1; 2821 2822 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2823 skcf->count++; 2824 joint->socket_conf = skcf; 2825 2826 joint->engine = recf->engine; 2827 } 2828 2829 return NXT_OK; 2830 } 2831 2832 2833 static nxt_int_t 2834 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 2835 nxt_router_engine_conf_t *recf) 2836 { 2837 nxt_joint_job_t *job; 2838 2839 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2840 if (nxt_slow_path(job == NULL)) { 2841 return NXT_ERROR; 2842 } 2843 2844 job->work.next = recf->jobs; 2845 recf->jobs = &job->work; 2846 2847 job->task = tmcf->engine->task; 2848 job->work.handler = nxt_router_worker_thread_quit; 2849 job->work.task = &job->task; 2850 job->work.obj = NULL; 2851 job->work.data = NULL; 2852 job->tmcf = NULL; 2853 2854 return NXT_OK; 2855 } 2856 2857 2858 static nxt_int_t 2859 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 2860 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 2861 { 2862 nxt_joint_job_t *job; 2863 nxt_queue_link_t *qlk; 2864 2865 for (qlk = nxt_queue_first(sockets); 2866 qlk != nxt_queue_tail(sockets); 2867 qlk = nxt_queue_next(qlk)) 2868 { 2869 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2870 if (nxt_slow_path(job == NULL)) { 2871 return NXT_ERROR; 2872 } 2873 2874 job->work.next = recf->jobs; 2875 recf->jobs = &job->work; 2876 2877 job->task = tmcf->engine->task; 2878 job->work.handler = nxt_router_listen_socket_delete; 2879 job->work.task = &job->task; 2880 job->work.obj = job; 2881 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2882 job->tmcf = tmcf; 2883 2884 tmcf->count++; 2885 } 2886 2887 return NXT_OK; 2888 } 2889 2890 2891 static nxt_int_t 2892 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 2893 nxt_router_temp_conf_t *tmcf) 2894 { 2895 nxt_int_t ret; 2896 nxt_uint_t i, threads; 2897 nxt_router_engine_conf_t *recf; 2898 2899 recf = tmcf->engines->elts; 2900 threads = tmcf->router_conf->threads; 2901 2902 for (i = tmcf->new_threads; i < threads; i++) { 2903 ret = nxt_router_thread_create(task, rt, recf[i].engine); 2904 if (nxt_slow_path(ret != NXT_OK)) { 2905 return ret; 2906 } 2907 } 2908 2909 return NXT_OK; 2910 } 2911 2912 2913 static nxt_int_t 2914 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 2915 nxt_event_engine_t *engine) 2916 { 2917 nxt_int_t ret; 2918 nxt_thread_link_t *link; 2919 nxt_thread_handle_t handle; 2920 2921 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2922 2923 if (nxt_slow_path(link == NULL)) { 2924 return NXT_ERROR; 2925 } 2926 2927 link->start = nxt_router_thread_start; 2928 link->engine = engine; 2929 link->work.handler = nxt_router_thread_exit_handler; 2930 link->work.task = task; 2931 link->work.data = link; 2932 2933 nxt_queue_insert_tail(&rt->engines, &engine->link); 2934 2935 ret = nxt_thread_create(&handle, link); 2936 2937 if (nxt_slow_path(ret != NXT_OK)) { 2938 nxt_queue_remove(&engine->link); 2939 } 2940 2941 return ret; 2942 } 2943 2944 2945 static void 2946 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 2947 nxt_router_temp_conf_t *tmcf) 2948 { 2949 nxt_app_t *app; 2950 2951 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 2952 2953 nxt_router_app_unlink(task, app); 2954 2955 } nxt_queue_loop; 2956 2957 nxt_queue_add(&router->apps, &tmcf->previous); 2958 nxt_queue_add(&router->apps, &tmcf->apps); 2959 } 2960 2961 2962 static void 2963 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 2964 { 2965 nxt_uint_t n; 2966 nxt_event_engine_t *engine; 2967 nxt_router_engine_conf_t *recf; 2968 2969 recf = tmcf->engines->elts; 2970 2971 for (n = tmcf->engines->nelts; n != 0; n--) { 2972 engine = recf->engine; 2973 2974 switch (recf->action) { 2975 2976 case NXT_ROUTER_ENGINE_KEEP: 2977 break; 2978 2979 case NXT_ROUTER_ENGINE_ADD: 2980 nxt_queue_insert_tail(&router->engines, &engine->link0); 2981 break; 2982 2983 case NXT_ROUTER_ENGINE_DELETE: 2984 nxt_queue_remove(&engine->link0); 2985 break; 2986 } 2987 2988 nxt_router_engine_post(engine, recf->jobs); 2989 2990 recf++; 2991 } 2992 } 2993 2994 2995 static void 2996 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 2997 { 2998 nxt_work_t *work, *next; 2999 3000 for (work = jobs; work != NULL; work = next) { 3001 next = work->next; 3002 work->next = NULL; 3003 3004 nxt_event_engine_post(engine, work); 3005 } 3006 } 3007 3008 3009 static nxt_port_handlers_t nxt_router_app_port_handlers = { 3010 .rpc_error = nxt_port_rpc_handler, 3011 .mmap = nxt_port_mmap_handler, 3012 .data = nxt_port_rpc_handler, 3013 .oosm = nxt_router_oosm_handler, 3014 .req_headers_ack = nxt_port_rpc_handler, 3015 }; 3016 3017 3018 static void 3019 nxt_router_thread_start(void *data) 3020 { 3021 nxt_int_t ret; 3022 nxt_port_t *port; 3023 nxt_task_t *task; 3024 nxt_work_t *work; 3025 nxt_thread_t *thread; 3026 nxt_thread_link_t *link; 3027 nxt_event_engine_t *engine; 3028 3029 link = data; 3030 engine = link->engine; 3031 task = &engine->task; 3032 3033 thread = nxt_thread(); 3034 3035 nxt_event_engine_thread_adopt(engine); 3036 3037 /* STUB */ 3038 thread->runtime = engine->task.thread->runtime; 3039 3040 engine->task.thread = thread; 3041 engine->task.log = thread->log; 3042 thread->engine = engine; 3043 thread->task = &engine->task; 3044 #if 0 3045 thread->fiber = &engine->fibers->fiber; 3046 #endif 3047 3048 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 3049 if (nxt_slow_path(engine->mem_pool == NULL)) { 3050 return; 3051 } 3052 3053 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 3054 NXT_PROCESS_ROUTER); 3055 if (nxt_slow_path(port == NULL)) { 3056 return; 3057 } 3058 3059 ret = nxt_port_socket_init(task, port, 0); 3060 if (nxt_slow_path(ret != NXT_OK)) { 3061 nxt_port_use(task, port, -1); 3062 return; 3063 } 3064 3065 ret = nxt_router_port_queue_init(task, port); 3066 if (nxt_slow_path(ret != NXT_OK)) { 3067 nxt_port_use(task, port, -1); 3068 return; 3069 } 3070 3071 engine->port = port; 3072 3073 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 3074 3075 work = nxt_zalloc(sizeof(nxt_work_t)); 3076 if (nxt_slow_path(work == NULL)) { 3077 return; 3078 } 3079 3080 work->handler = nxt_router_rt_add_port; 3081 work->task = link->work.task; 3082 work->obj = work; 3083 work->data = port; 3084 3085 nxt_event_engine_post(link->work.task->thread->engine, work); 3086 3087 nxt_event_engine_start(engine); 3088 } 3089 3090 3091 static void 3092 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) 3093 { 3094 nxt_int_t res; 3095 nxt_port_t *port; 3096 nxt_runtime_t *rt; 3097 3098 rt = task->thread->runtime; 3099 port = data; 3100 3101 nxt_free(obj); 3102 3103 res = nxt_port_hash_add(&rt->ports, port); 3104 3105 if (nxt_fast_path(res == NXT_OK)) { 3106 nxt_port_use(task, port, 1); 3107 } 3108 } 3109 3110 3111 static void 3112 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 3113 { 3114 nxt_joint_job_t *job; 3115 nxt_socket_conf_t *skcf; 3116 nxt_listen_event_t *lev; 3117 nxt_listen_socket_t *ls; 3118 nxt_thread_spinlock_t *lock; 3119 nxt_socket_conf_joint_t *joint; 3120 3121 job = obj; 3122 joint = data; 3123 3124 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 3125 3126 skcf = joint->socket_conf; 3127 ls = skcf->listen; 3128 3129 lev = nxt_listen_event(task, ls); 3130 if (nxt_slow_path(lev == NULL)) { 3131 nxt_router_listen_socket_release(task, skcf); 3132 return; 3133 } 3134 3135 lev->socket.data = joint; 3136 3137 lock = &skcf->router_conf->router->lock; 3138 3139 nxt_thread_spin_lock(lock); 3140 ls->count++; 3141 nxt_thread_spin_unlock(lock); 3142 3143 job->work.next = NULL; 3144 job->work.handler = nxt_router_conf_wait; 3145 3146 nxt_event_engine_post(job->tmcf->engine, &job->work); 3147 } 3148 3149 3150 nxt_inline nxt_listen_event_t * 3151 nxt_router_listen_event(nxt_queue_t *listen_connections, 3152 nxt_socket_conf_t *skcf) 3153 { 3154 nxt_socket_t fd; 3155 nxt_queue_link_t *qlk; 3156 nxt_listen_event_t *lev; 3157 3158 fd = skcf->listen->socket; 3159 3160 for (qlk = nxt_queue_first(listen_connections); 3161 qlk != nxt_queue_tail(listen_connections); 3162 qlk = nxt_queue_next(qlk)) 3163 { 3164 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 3165 3166 if (fd == lev->socket.fd) { 3167 return lev; 3168 } 3169 } 3170 3171 return NULL; 3172 } 3173 3174 3175 static void 3176 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 3177 { 3178 nxt_joint_job_t *job; 3179 nxt_event_engine_t *engine; 3180 nxt_listen_event_t *lev; 3181 nxt_socket_conf_joint_t *joint, *old; 3182 3183 job = obj; 3184 joint = data; 3185 3186 engine = task->thread->engine; 3187 3188 nxt_queue_insert_tail(&engine->joints, &joint->link); 3189 3190 lev = nxt_router_listen_event(&engine->listen_connections, 3191 joint->socket_conf); 3192 3193 old = lev->socket.data; 3194 lev->socket.data = joint; 3195 lev->listen = joint->socket_conf->listen; 3196 3197 job->work.next = NULL; 3198 job->work.handler = nxt_router_conf_wait; 3199 3200 nxt_event_engine_post(job->tmcf->engine, &job->work); 3201 3202 /* 3203 * The task is allocated from configuration temporary 3204 * memory pool so it can be freed after engine post operation. 3205 */ 3206 3207 nxt_router_conf_release(&engine->task, old); 3208 } 3209 3210 3211 static void 3212 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 3213 { 3214 nxt_socket_conf_t *skcf; 3215 nxt_listen_event_t *lev; 3216 nxt_event_engine_t *engine; 3217 nxt_socket_conf_joint_t *joint; 3218 3219 skcf = data; 3220 3221 engine = task->thread->engine; 3222 3223 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 3224 3225 nxt_fd_event_delete(engine, &lev->socket); 3226 3227 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 3228 lev->socket.fd); 3229 3230 joint = lev->socket.data; 3231 joint->close_job = obj; 3232 3233 lev->timer.handler = nxt_router_listen_socket_close; 3234 lev->timer.work_queue = &engine->fast_work_queue; 3235 3236 nxt_timer_add(engine, &lev->timer, 0); 3237 } 3238 3239 3240 static void 3241 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 3242 { 3243 nxt_event_engine_t *engine; 3244 3245 nxt_debug(task, "router worker thread quit"); 3246 3247 engine = task->thread->engine; 3248 3249 engine->shutdown = 1; 3250 3251 if (nxt_queue_is_empty(&engine->joints)) { 3252 nxt_thread_exit(task->thread); 3253 } 3254 } 3255 3256 3257 static void 3258 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3259 { 3260 nxt_timer_t *timer; 3261 nxt_joint_job_t *job; 3262 nxt_listen_event_t *lev; 3263 nxt_socket_conf_joint_t *joint; 3264 3265 timer = obj; 3266 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3267 3268 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3269 lev->socket.fd); 3270 3271 nxt_queue_remove(&lev->link); 3272 3273 joint = lev->socket.data; 3274 lev->socket.data = NULL; 3275 3276 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3277 task = &task->thread->engine->task; 3278 3279 nxt_router_listen_socket_release(task, joint->socket_conf); 3280 3281 job = joint->close_job; 3282 job->work.next = NULL; 3283 job->work.handler = nxt_router_conf_wait; 3284 3285 nxt_event_engine_post(job->tmcf->engine, &job->work); 3286 3287 nxt_router_listen_event_release(task, lev, joint); 3288 } 3289 3290 3291 static void 3292 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3293 { 3294 nxt_listen_socket_t *ls; 3295 nxt_thread_spinlock_t *lock; 3296 3297 ls = skcf->listen; 3298 lock = &skcf->router_conf->router->lock; 3299 3300 nxt_thread_spin_lock(lock); 3301 3302 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3303 task->thread->engine, ls->count); 3304 3305 if (--ls->count != 0) { 3306 ls = NULL; 3307 } 3308 3309 nxt_thread_spin_unlock(lock); 3310 3311 if (ls != NULL) { 3312 nxt_socket_close(task, ls->socket); 3313 nxt_free(ls); 3314 } 3315 } 3316 3317 3318 void 3319 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3320 nxt_socket_conf_joint_t *joint) 3321 { 3322 nxt_event_engine_t *engine; 3323 3324 nxt_debug(task, "listen event count: %D", lev->count); 3325 3326 engine = task->thread->engine; 3327 3328 if (--lev->count == 0) { 3329 if (lev->next != NULL) { 3330 nxt_sockaddr_cache_free(engine, lev->next); 3331 3332 nxt_conn_free(task, lev->next); 3333 } 3334 3335 nxt_free(lev); 3336 } 3337 3338 if (joint != NULL) { 3339 nxt_router_conf_release(task, joint); 3340 } 3341 3342 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3343 nxt_thread_exit(task->thread); 3344 } 3345 } 3346 3347 3348 void 3349 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3350 { 3351 nxt_socket_conf_t *skcf; 3352 nxt_router_conf_t *rtcf; 3353 nxt_thread_spinlock_t *lock; 3354 3355 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3356 3357 if (--joint->count != 0) { 3358 return; 3359 } 3360 3361 nxt_queue_remove(&joint->link); 3362 3363 /* 3364 * The joint content can not be safely used after the critical 3365 * section protected by the spinlock because its memory pool may 3366 * be already destroyed by another thread. 3367 */ 3368 skcf = joint->socket_conf; 3369 rtcf = skcf->router_conf; 3370 lock = &rtcf->router->lock; 3371 3372 nxt_thread_spin_lock(lock); 3373 3374 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3375 rtcf, rtcf->count); 3376 3377 if (--skcf->count != 0) { 3378 skcf = NULL; 3379 rtcf = NULL; 3380 3381 } else { 3382 nxt_queue_remove(&skcf->link); 3383 3384 if (--rtcf->count != 0) { 3385 rtcf = NULL; 3386 } 3387 } 3388 3389 nxt_thread_spin_unlock(lock); 3390 3391 #if (NXT_TLS) 3392 if (skcf != NULL && skcf->tls != NULL) { 3393 task->thread->runtime->tls->server_free(task, skcf->tls); 3394 } 3395 #endif 3396 3397 /* TODO remove engine->port */ 3398 3399 if (rtcf != NULL) { 3400 nxt_debug(task, "old router conf is destroyed"); 3401 3402 nxt_router_apps_hash_use(task, rtcf, -1); 3403 3404 nxt_router_access_log_release(task, lock, rtcf->access_log); 3405 3406 nxt_mp_thread_adopt(rtcf->mem_pool); 3407 3408 nxt_mp_destroy(rtcf->mem_pool); 3409 } 3410 } 3411 3412 3413 static void 3414 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3415 nxt_router_access_log_t *access_log) 3416 { 3417 size_t size; 3418 u_char *buf, *p; 3419 nxt_off_t bytes; 3420 3421 static nxt_time_string_t date_cache = { 3422 (nxt_atomic_uint_t) -1, 3423 nxt_router_access_log_date, 3424 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3425 nxt_length("31/Dec/1986:19:40:00 +0300"), 3426 NXT_THREAD_TIME_LOCAL, 3427 NXT_THREAD_TIME_SEC, 3428 }; 3429 3430 size = r->remote->address_length 3431 + 6 /* ' - - [' */ 3432 + date_cache.size 3433 + 3 /* '] "' */ 3434 + r->method->length 3435 + 1 /* space */ 3436 + r->target.length 3437 + 1 /* space */ 3438 + r->version.length 3439 + 2 /* '" ' */ 3440 + 3 /* status */ 3441 + 1 /* space */ 3442 + NXT_OFF_T_LEN 3443 + 2 /* ' "' */ 3444 + (r->referer != NULL ? r->referer->value_length : 1) 3445 + 3 /* '" "' */ 3446 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3447 + 2 /* '"\n' */ 3448 ; 3449 3450 buf = nxt_mp_nget(r->mem_pool, size); 3451 if (nxt_slow_path(buf == NULL)) { 3452 return; 3453 } 3454 3455 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3456 r->remote->address_length); 3457 3458 p = nxt_cpymem(p, " - - [", 6); 3459 3460 p = nxt_thread_time_string(task->thread, &date_cache, p); 3461 3462 p = nxt_cpymem(p, "] \"", 3); 3463 3464 if (r->method->length != 0) { 3465 p = nxt_cpymem(p, r->method->start, r->method->length); 3466 3467 if (r->target.length != 0) { 3468 *p++ = ' '; 3469 p = nxt_cpymem(p, r->target.start, r->target.length); 3470 3471 if (r->version.length != 0) { 3472 *p++ = ' '; 3473 p = nxt_cpymem(p, r->version.start, r->version.length); 3474 } 3475 } 3476 3477 } else { 3478 *p++ = '-'; 3479 } 3480 3481 p = nxt_cpymem(p, "\" ", 2); 3482 3483 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3484 3485 *p++ = ' '; 3486 3487 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3488 3489 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3490 3491 p = nxt_cpymem(p, " \"", 2); 3492 3493 if (r->referer != NULL) { 3494 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3495 3496 } else { 3497 *p++ = '-'; 3498 } 3499 3500 p = nxt_cpymem(p, "\" \"", 3); 3501 3502 if (r->user_agent != NULL) { 3503 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3504 3505 } else { 3506 *p++ = '-'; 3507 } 3508 3509 p = nxt_cpymem(p, "\"\n", 2); 3510 3511 nxt_fd_write(access_log->fd, buf, p - buf); 3512 } 3513 3514 3515 static u_char * 3516 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3517 size_t size, const char *format) 3518 { 3519 u_char sign; 3520 time_t gmtoff; 3521 3522 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3523 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3524 3525 gmtoff = nxt_timezone(tm) / 60; 3526 3527 if (gmtoff < 0) { 3528 gmtoff = -gmtoff; 3529 sign = '-'; 3530 3531 } else { 3532 sign = '+'; 3533 } 3534 3535 return nxt_sprintf(buf, buf + size, format, 3536 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3537 tm->tm_hour, tm->tm_min, tm->tm_sec, 3538 sign, gmtoff / 60, gmtoff % 60); 3539 } 3540 3541 3542 static void 3543 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3544 { 3545 uint32_t stream; 3546 nxt_int_t ret; 3547 nxt_buf_t *b; 3548 nxt_port_t *main_port, *router_port; 3549 nxt_runtime_t *rt; 3550 nxt_router_access_log_t *access_log; 3551 3552 access_log = tmcf->router_conf->access_log; 3553 3554 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3555 if (nxt_slow_path(b == NULL)) { 3556 goto fail; 3557 } 3558 3559 b->completion_handler = nxt_router_dummy_buf_completion; 3560 3561 nxt_buf_cpystr(b, &access_log->path); 3562 *b->mem.free++ = '\0'; 3563 3564 rt = task->thread->runtime; 3565 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3566 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3567 3568 stream = nxt_port_rpc_register_handler(task, router_port, 3569 nxt_router_access_log_ready, 3570 nxt_router_access_log_error, 3571 -1, tmcf); 3572 if (nxt_slow_path(stream == 0)) { 3573 goto fail; 3574 } 3575 3576 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3577 stream, router_port->id, b); 3578 3579 if (nxt_slow_path(ret != NXT_OK)) { 3580 nxt_port_rpc_cancel(task, router_port, stream); 3581 goto fail; 3582 } 3583 3584 return; 3585 3586 fail: 3587 3588 nxt_router_conf_error(task, tmcf); 3589 } 3590 3591 3592 static void 3593 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3594 void *data) 3595 { 3596 nxt_router_temp_conf_t *tmcf; 3597 nxt_router_access_log_t *access_log; 3598 3599 tmcf = data; 3600 3601 access_log = tmcf->router_conf->access_log; 3602 3603 access_log->fd = msg->fd[0]; 3604 3605 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3606 nxt_router_conf_apply, task, tmcf, NULL); 3607 } 3608 3609 3610 static void 3611 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3612 void *data) 3613 { 3614 nxt_router_temp_conf_t *tmcf; 3615 3616 tmcf = data; 3617 3618 nxt_router_conf_error(task, tmcf); 3619 } 3620 3621 3622 static void 3623 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3624 nxt_router_access_log_t *access_log) 3625 { 3626 if (access_log == NULL) { 3627 return; 3628 } 3629 3630 nxt_thread_spin_lock(lock); 3631 3632 if (--access_log->count != 0) { 3633 access_log = NULL; 3634 } 3635 3636 nxt_thread_spin_unlock(lock); 3637 3638 if (access_log != NULL) { 3639 3640 if (access_log->fd != -1) { 3641 nxt_fd_close(access_log->fd); 3642 } 3643 3644 nxt_free(access_log); 3645 } 3646 } 3647 3648 3649 typedef struct { 3650 nxt_mp_t *mem_pool; 3651 nxt_router_access_log_t *access_log; 3652 } nxt_router_access_log_reopen_t; 3653 3654 3655 static void 3656 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3657 { 3658 nxt_mp_t *mp; 3659 uint32_t stream; 3660 nxt_int_t ret; 3661 nxt_buf_t *b; 3662 nxt_port_t *main_port, *router_port; 3663 nxt_runtime_t *rt; 3664 nxt_router_access_log_t *access_log; 3665 nxt_router_access_log_reopen_t *reopen; 3666 3667 access_log = nxt_router->access_log; 3668 3669 if (access_log == NULL) { 3670 return; 3671 } 3672 3673 mp = nxt_mp_create(1024, 128, 256, 32); 3674 if (nxt_slow_path(mp == NULL)) { 3675 return; 3676 } 3677 3678 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3679 if (nxt_slow_path(reopen == NULL)) { 3680 goto fail; 3681 } 3682 3683 reopen->mem_pool = mp; 3684 reopen->access_log = access_log; 3685 3686 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 3687 if (nxt_slow_path(b == NULL)) { 3688 goto fail; 3689 } 3690 3691 b->completion_handler = nxt_router_access_log_reopen_completion; 3692 3693 nxt_buf_cpystr(b, &access_log->path); 3694 *b->mem.free++ = '\0'; 3695 3696 rt = task->thread->runtime; 3697 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3698 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3699 3700 stream = nxt_port_rpc_register_handler(task, router_port, 3701 nxt_router_access_log_reopen_ready, 3702 nxt_router_access_log_reopen_error, 3703 -1, reopen); 3704 if (nxt_slow_path(stream == 0)) { 3705 goto fail; 3706 } 3707 3708 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3709 stream, router_port->id, b); 3710 3711 if (nxt_slow_path(ret != NXT_OK)) { 3712 nxt_port_rpc_cancel(task, router_port, stream); 3713 goto fail; 3714 } 3715 3716 nxt_mp_retain(mp); 3717 3718 return; 3719 3720 fail: 3721 3722 nxt_mp_destroy(mp); 3723 } 3724 3725 3726 static void 3727 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 3728 { 3729 nxt_mp_t *mp; 3730 nxt_buf_t *b; 3731 3732 b = obj; 3733 mp = b->data; 3734 3735 nxt_mp_release(mp); 3736 } 3737 3738 3739 static void 3740 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3741 void *data) 3742 { 3743 nxt_router_access_log_t *access_log; 3744 nxt_router_access_log_reopen_t *reopen; 3745 3746 reopen = data; 3747 3748 access_log = reopen->access_log; 3749 3750 if (access_log == nxt_router->access_log) { 3751 3752 if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { 3753 nxt_alert(task, "dup2(%FD, %FD) failed %E", 3754 msg->fd[0], access_log->fd, nxt_errno); 3755 } 3756 } 3757 3758 nxt_fd_close(msg->fd[0]); 3759 nxt_mp_release(reopen->mem_pool); 3760 } 3761 3762 3763 static void 3764 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3765 void *data) 3766 { 3767 nxt_router_access_log_reopen_t *reopen; 3768 3769 reopen = data; 3770 3771 nxt_mp_release(reopen->mem_pool); 3772 } 3773 3774 3775 static void 3776 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 3777 { 3778 nxt_port_t *port; 3779 nxt_thread_link_t *link; 3780 nxt_event_engine_t *engine; 3781 nxt_thread_handle_t handle; 3782 3783 handle = (nxt_thread_handle_t) (uintptr_t) obj; 3784 link = data; 3785 3786 nxt_thread_wait(handle); 3787 3788 engine = link->engine; 3789 3790 nxt_queue_remove(&engine->link); 3791 3792 port = engine->port; 3793 3794 // TODO notify all apps 3795 3796 port->engine = task->thread->engine; 3797 nxt_mp_thread_adopt(port->mem_pool); 3798 nxt_port_use(task, port, -1); 3799 3800 nxt_mp_thread_adopt(engine->mem_pool); 3801 nxt_mp_destroy(engine->mem_pool); 3802 3803 nxt_event_engine_free(engine); 3804 3805 nxt_free(link); 3806 } 3807 3808 3809 static void 3810 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3811 void *data) 3812 { 3813 size_t b_size, count; 3814 nxt_int_t ret; 3815 nxt_app_t *app; 3816 nxt_buf_t *b, *next; 3817 nxt_port_t *app_port; 3818 nxt_unit_field_t *f; 3819 nxt_http_field_t *field; 3820 nxt_http_request_t *r; 3821 nxt_unit_response_t *resp; 3822 nxt_request_rpc_data_t *req_rpc_data; 3823 3824 req_rpc_data = data; 3825 3826 r = req_rpc_data->request; 3827 if (nxt_slow_path(r == NULL)) { 3828 return; 3829 } 3830 3831 if (r->error) { 3832 nxt_request_rpc_data_unlink(task, req_rpc_data); 3833 return; 3834 } 3835 3836 app = req_rpc_data->app; 3837 nxt_assert(app != NULL); 3838 3839 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { 3840 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); 3841 3842 return; 3843 } 3844 3845 b = (msg->size == 0) ? NULL : msg->buf; 3846 3847 if (msg->port_msg.last != 0) { 3848 nxt_debug(task, "router data create last buf"); 3849 3850 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 3851 3852 req_rpc_data->rpc_cancel = 0; 3853 3854 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { 3855 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; 3856 } 3857 3858 nxt_request_rpc_data_unlink(task, req_rpc_data); 3859 3860 } else { 3861 if (app->timeout != 0) { 3862 r->timer.handler = nxt_router_app_timeout; 3863 r->timer_data = req_rpc_data; 3864 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 3865 } 3866 } 3867 3868 if (b == NULL) { 3869 return; 3870 } 3871 3872 if (msg->buf == b) { 3873 /* Disable instant buffer completion/re-using by port. */ 3874 msg->buf = NULL; 3875 } 3876 3877 if (r->header_sent) { 3878 nxt_buf_chain_add(&r->out, b); 3879 nxt_http_request_send_body(task, r, NULL); 3880 3881 } else { 3882 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0; 3883 3884 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) { 3885 nxt_alert(task, "response buffer too small: %z", b_size); 3886 goto fail; 3887 } 3888 3889 resp = (void *) b->mem.pos; 3890 count = (b_size - sizeof(nxt_unit_response_t)) 3891 / sizeof(nxt_unit_field_t); 3892 3893 if (nxt_slow_path(count < resp->fields_count)) { 3894 nxt_alert(task, "response buffer too small for fields count: %D", 3895 resp->fields_count); 3896 goto fail; 3897 } 3898 3899 field = NULL; 3900 3901 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 3902 if (f->skip) { 3903 continue; 3904 } 3905 3906 field = nxt_list_add(r->resp.fields); 3907 3908 if (nxt_slow_path(field == NULL)) { 3909 goto fail; 3910 } 3911 3912 field->hash = f->hash; 3913 field->skip = 0; 3914 field->hopbyhop = 0; 3915 3916 field->name_length = f->name_length; 3917 field->value_length = f->value_length; 3918 field->name = nxt_unit_sptr_get(&f->name); 3919 field->value = nxt_unit_sptr_get(&f->value); 3920 3921 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 3922 if (nxt_slow_path(ret != NXT_OK)) { 3923 goto fail; 3924 } 3925 3926 nxt_debug(task, "header%s: %*s: %*s", 3927 (field->skip ? " skipped" : ""), 3928 (size_t) field->name_length, field->name, 3929 (size_t) field->value_length, field->value); 3930 3931 if (field->skip) { 3932 r->resp.fields->last->nelts--; 3933 } 3934 } 3935 3936 r->status = resp->status; 3937 3938 if (resp->piggyback_content_length != 0) { 3939 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 3940 b->mem.free = b->mem.pos + resp->piggyback_content_length; 3941 3942 } else { 3943 b->mem.pos = b->mem.free; 3944 } 3945 3946 if (nxt_buf_mem_used_size(&b->mem) == 0) { 3947 next = b->next; 3948 b->next = NULL; 3949 3950 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3951 b->completion_handler, task, b, b->parent); 3952 3953 b = next; 3954 } 3955 3956 if (b != NULL) { 3957 nxt_buf_chain_add(&r->out, b); 3958 } 3959 3960 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 3961 3962 if (r->websocket_handshake 3963 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 3964 { 3965 app_port = req_rpc_data->app_port; 3966 if (nxt_slow_path(app_port == NULL)) { 3967 goto fail; 3968 } 3969 3970 nxt_thread_mutex_lock(&app->mutex); 3971 3972 app_port->main_app_port->active_websockets++; 3973 3974 nxt_thread_mutex_unlock(&app->mutex); 3975 3976 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); 3977 req_rpc_data->apr_action = NXT_APR_CLOSE; 3978 3979 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 3980 3981 r->state = &nxt_http_websocket; 3982 3983 } else { 3984 r->state = &nxt_http_request_send_state; 3985 } 3986 } 3987 3988 return; 3989 3990 fail: 3991 3992 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 3993 3994 nxt_request_rpc_data_unlink(task, req_rpc_data); 3995 } 3996 3997 3998 static void 3999 nxt_router_req_headers_ack_handler(nxt_task_t *task, 4000 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 4001 { 4002 int res; 4003 nxt_app_t *app; 4004 nxt_buf_t *b; 4005 nxt_bool_t start_process, unlinked; 4006 nxt_port_t *app_port, *main_app_port, *idle_port; 4007 nxt_queue_link_t *idle_lnk; 4008 nxt_http_request_t *r; 4009 4010 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 4011 req_rpc_data->stream, 4012 msg->port_msg.pid, msg->port_msg.reply_port); 4013 4014 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 4015 msg->port_msg.pid); 4016 4017 app = req_rpc_data->app; 4018 r = req_rpc_data->request; 4019 4020 start_process = 0; 4021 unlinked = 0; 4022 4023 nxt_thread_mutex_lock(&app->mutex); 4024 4025 if (r->app_link.next != NULL) { 4026 nxt_queue_remove(&r->app_link); 4027 r->app_link.next = NULL; 4028 4029 unlinked = 1; 4030 } 4031 4032 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 4033 msg->port_msg.reply_port); 4034 if (nxt_slow_path(app_port == NULL)) { 4035 nxt_thread_mutex_unlock(&app->mutex); 4036 4037 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4038 4039 if (unlinked) { 4040 nxt_mp_release(r->mem_pool); 4041 } 4042 4043 return; 4044 } 4045 4046 main_app_port = app_port->main_app_port; 4047 4048 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 4049 app->idle_processes--; 4050 4051 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", 4052 &app->name, main_app_port->pid, main_app_port->id, 4053 (main_app_port->idle_start ? "idle_ports" : "spare_ports")); 4054 4055 /* Check port was in 'spare_ports' using idle_start field. */ 4056 if (main_app_port->idle_start == 0 4057 && app->idle_processes >= app->spare_processes) 4058 { 4059 /* 4060 * If there is a vacant space in spare ports, 4061 * move the last idle to spare_ports. 4062 */ 4063 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4064 4065 idle_lnk = nxt_queue_last(&app->idle_ports); 4066 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4067 nxt_queue_remove(idle_lnk); 4068 4069 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4070 4071 idle_port->idle_start = 0; 4072 4073 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4074 "to spare_ports", 4075 &app->name, idle_port->pid, idle_port->id); 4076 } 4077 4078 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4079 app->pending_processes++; 4080 start_process = 1; 4081 } 4082 } 4083 4084 main_app_port->active_requests++; 4085 4086 nxt_port_inc_use(app_port); 4087 4088 nxt_thread_mutex_unlock(&app->mutex); 4089 4090 if (unlinked) { 4091 nxt_mp_release(r->mem_pool); 4092 } 4093 4094 if (start_process) { 4095 nxt_router_start_app_process(task, app); 4096 } 4097 4098 nxt_port_use(task, req_rpc_data->app_port, -1); 4099 4100 req_rpc_data->app_port = app_port; 4101 4102 b = req_rpc_data->msg_info.buf; 4103 4104 if (b != NULL) { 4105 /* First buffer is already sent. Start from second. */ 4106 b = b->next; 4107 4108 req_rpc_data->msg_info.buf->next = NULL; 4109 } 4110 4111 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) { 4112 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, 4113 req_rpc_data->msg_info.body_fd); 4114 4115 if (req_rpc_data->msg_info.body_fd != -1) { 4116 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 4117 } 4118 4119 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 4120 req_rpc_data->msg_info.body_fd, 4121 req_rpc_data->stream, 4122 task->thread->engine->port->id, b); 4123 4124 if (nxt_slow_path(res != NXT_OK)) { 4125 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4126 } 4127 } 4128 4129 if (app->timeout != 0) { 4130 r->timer.handler = nxt_router_app_timeout; 4131 r->timer_data = req_rpc_data; 4132 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4133 } 4134 } 4135 4136 4137 static const nxt_http_request_state_t nxt_http_request_send_state 4138 nxt_aligned(64) = 4139 { 4140 .error_handler = nxt_http_request_error_handler, 4141 }; 4142 4143 4144 static void 4145 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 4146 { 4147 nxt_buf_t *out; 4148 nxt_http_request_t *r; 4149 4150 r = obj; 4151 4152 out = r->out; 4153 4154 if (out != NULL) { 4155 r->out = NULL; 4156 nxt_http_request_send(task, r, out); 4157 } 4158 } 4159 4160 4161 static void 4162 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4163 void *data) 4164 { 4165 nxt_request_rpc_data_t *req_rpc_data; 4166 4167 req_rpc_data = data; 4168 4169 req_rpc_data->rpc_cancel = 0; 4170 4171 /* TODO cancel message and return if cancelled. */ 4172 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 4173 4174 if (req_rpc_data->request != NULL) { 4175 nxt_http_request_error(task, req_rpc_data->request, 4176 NXT_HTTP_SERVICE_UNAVAILABLE); 4177 } 4178 4179 nxt_request_rpc_data_unlink(task, req_rpc_data); 4180 } 4181 4182 4183 static void 4184 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4185 void *data) 4186 { 4187 nxt_app_t *app; 4188 nxt_port_t *port; 4189 nxt_app_joint_t *app_joint; 4190 4191 app_joint = data; 4192 port = msg->u.new_port; 4193 4194 nxt_assert(app_joint != NULL); 4195 nxt_assert(port != NULL); 4196 nxt_assert(port->type == NXT_PROCESS_APP); 4197 nxt_assert(port->id == 0); 4198 4199 app = app_joint->app; 4200 4201 nxt_router_app_joint_use(task, app_joint, -1); 4202 4203 if (nxt_slow_path(app == NULL)) { 4204 nxt_debug(task, "new port ready for released app, send QUIT"); 4205 4206 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4207 4208 return; 4209 } 4210 4211 port->app = app; 4212 port->main_app_port = port; 4213 4214 nxt_thread_mutex_lock(&app->mutex); 4215 4216 nxt_assert(app->pending_processes != 0); 4217 4218 app->pending_processes--; 4219 app->processes++; 4220 nxt_port_hash_add(&app->port_hash, port); 4221 app->port_hash_count++; 4222 4223 nxt_thread_mutex_unlock(&app->mutex); 4224 4225 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 4226 &app->name, port->pid, app->processes, app->pending_processes); 4227 4228 nxt_router_app_shared_port_send(task, port); 4229 4230 nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); 4231 } 4232 4233 4234 static nxt_int_t 4235 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) 4236 { 4237 nxt_buf_t *b; 4238 nxt_port_t *port; 4239 nxt_port_msg_new_port_t *msg; 4240 4241 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 4242 sizeof(nxt_port_data_t)); 4243 if (nxt_slow_path(b == NULL)) { 4244 return NXT_ERROR; 4245 } 4246 4247 port = app_port->app->shared_port; 4248 4249 nxt_debug(task, "send port %FD to process %PI", 4250 port->pair[0], app_port->pid); 4251 4252 b->mem.free += sizeof(nxt_port_msg_new_port_t); 4253 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 4254 4255 msg->id = port->id; 4256 msg->pid = port->pid; 4257 msg->max_size = port->max_size; 4258 msg->max_share = port->max_share; 4259 msg->type = port->type; 4260 4261 return nxt_port_socket_write2(task, app_port, 4262 NXT_PORT_MSG_NEW_PORT, 4263 port->pair[0], port->queue_fd, 4264 0, 0, b); 4265 } 4266 4267 4268 static void 4269 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4270 void *data) 4271 { 4272 nxt_app_t *app; 4273 nxt_app_joint_t *app_joint; 4274 nxt_queue_link_t *link; 4275 nxt_http_request_t *r; 4276 4277 app_joint = data; 4278 4279 nxt_assert(app_joint != NULL); 4280 4281 app = app_joint->app; 4282 4283 nxt_router_app_joint_use(task, app_joint, -1); 4284 4285 if (nxt_slow_path(app == NULL)) { 4286 nxt_debug(task, "start error for released app"); 4287 4288 return; 4289 } 4290 4291 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4292 4293 link = NULL; 4294 4295 nxt_thread_mutex_lock(&app->mutex); 4296 4297 nxt_assert(app->pending_processes != 0); 4298 4299 app->pending_processes--; 4300 4301 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4302 link = nxt_queue_first(&app->ack_waiting_req); 4303 4304 nxt_queue_remove(link); 4305 link->next = NULL; 4306 } 4307 4308 nxt_thread_mutex_unlock(&app->mutex); 4309 4310 while (link != NULL) { 4311 r = nxt_container_of(link, nxt_http_request_t, app_link); 4312 4313 nxt_event_engine_post(r->engine, &r->err_work); 4314 4315 link = NULL; 4316 4317 nxt_thread_mutex_lock(&app->mutex); 4318 4319 if (app->processes == 0 && app->pending_processes == 0 4320 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4321 { 4322 link = nxt_queue_first(&app->ack_waiting_req); 4323 4324 nxt_queue_remove(link); 4325 link->next = NULL; 4326 } 4327 4328 nxt_thread_mutex_unlock(&app->mutex); 4329 } 4330 } 4331 4332 4333 4334 nxt_inline nxt_port_t * 4335 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) 4336 { 4337 nxt_port_t *port; 4338 4339 port = NULL; 4340 4341 nxt_thread_mutex_lock(&app->mutex); 4342 4343 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 4344 4345 /* Caller is responsible to decrease port use count. */ 4346 nxt_queue_chk_remove(&port->app_link); 4347 4348 if (nxt_queue_chk_remove(&port->idle_link)) { 4349 app->idle_processes--; 4350 4351 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", 4352 &app->name, port->pid, port->id, 4353 (port->idle_start ? "idle_ports" : "spare_ports")); 4354 } 4355 4356 nxt_port_hash_remove(&app->port_hash, port); 4357 app->port_hash_count--; 4358 4359 port->app = NULL; 4360 app->processes--; 4361 4362 break; 4363 4364 } nxt_queue_loop; 4365 4366 nxt_thread_mutex_unlock(&app->mutex); 4367 4368 return port; 4369 } 4370 4371 4372 static void 4373 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4374 { 4375 int c; 4376 4377 c = nxt_atomic_fetch_add(&app->use_count, i); 4378 4379 if (i < 0 && c == -i) { 4380 4381 if (task->thread->engine != app->engine) { 4382 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 4383 4384 } else { 4385 nxt_router_free_app(task, app->joint, NULL); 4386 } 4387 } 4388 } 4389 4390 4391 static void 4392 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 4393 { 4394 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 4395 4396 nxt_queue_remove(&app->link); 4397 4398 nxt_router_app_use(task, app, -1); 4399 } 4400 4401 4402 static void 4403 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 4404 nxt_apr_action_t action) 4405 { 4406 int inc_use; 4407 uint32_t got_response, dec_requests; 4408 nxt_app_t *app; 4409 nxt_bool_t port_unchained, send_quit, adjust_idle_timer; 4410 nxt_port_t *main_app_port; 4411 4412 nxt_assert(port != NULL); 4413 nxt_assert(port->app != NULL); 4414 4415 app = port->app; 4416 4417 inc_use = 0; 4418 got_response = 0; 4419 dec_requests = 0; 4420 4421 switch (action) { 4422 case NXT_APR_NEW_PORT: 4423 break; 4424 case NXT_APR_REQUEST_FAILED: 4425 dec_requests = 1; 4426 inc_use = -1; 4427 break; 4428 case NXT_APR_GOT_RESPONSE: 4429 got_response = 1; 4430 inc_use = -1; 4431 break; 4432 case NXT_APR_UPGRADE: 4433 got_response = 1; 4434 break; 4435 case NXT_APR_CLOSE: 4436 inc_use = -1; 4437 break; 4438 } 4439 4440 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, 4441 port->pid, port->id, 4442 (int) inc_use, (int) got_response); 4443 4444 if (port == app->shared_port) { 4445 nxt_thread_mutex_lock(&app->mutex); 4446 4447 app->active_requests -= got_response + dec_requests; 4448 4449 nxt_thread_mutex_unlock(&app->mutex); 4450 4451 goto adjust_use; 4452 } 4453 4454 main_app_port = port->main_app_port; 4455 4456 nxt_thread_mutex_lock(&app->mutex); 4457 4458 main_app_port->app_responses += got_response; 4459 main_app_port->active_requests -= got_response + dec_requests; 4460 app->active_requests -= got_response + dec_requests; 4461 4462 if (main_app_port->pair[1] != -1 4463 && (app->max_requests == 0 4464 || main_app_port->app_responses < app->max_requests)) 4465 { 4466 if (main_app_port->app_link.next == NULL) { 4467 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4468 4469 nxt_port_inc_use(main_app_port); 4470 } 4471 } 4472 4473 send_quit = (app->max_requests > 0 4474 && main_app_port->app_responses >= app->max_requests); 4475 4476 if (send_quit) { 4477 port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); 4478 4479 nxt_port_hash_remove(&app->port_hash, main_app_port); 4480 app->port_hash_count--; 4481 4482 main_app_port->app = NULL; 4483 app->processes--; 4484 4485 } else { 4486 port_unchained = 0; 4487 } 4488 4489 adjust_idle_timer = 0; 4490 4491 if (main_app_port->pair[1] != -1 && !send_quit 4492 && main_app_port->active_requests == 0 4493 && main_app_port->active_websockets == 0 4494 && main_app_port->idle_link.next == NULL) 4495 { 4496 if (app->idle_processes == app->spare_processes 4497 && app->adjust_idle_work.data == NULL) 4498 { 4499 adjust_idle_timer = 1; 4500 app->adjust_idle_work.data = app; 4501 app->adjust_idle_work.next = NULL; 4502 } 4503 4504 if (app->idle_processes < app->spare_processes) { 4505 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4506 4507 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", 4508 &app->name, main_app_port->pid, main_app_port->id); 4509 } else { 4510 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4511 4512 main_app_port->idle_start = task->thread->engine->timers.now; 4513 4514 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", 4515 &app->name, main_app_port->pid, main_app_port->id); 4516 } 4517 4518 app->idle_processes++; 4519 } 4520 4521 nxt_thread_mutex_unlock(&app->mutex); 4522 4523 if (adjust_idle_timer) { 4524 nxt_router_app_use(task, app, 1); 4525 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4526 } 4527 4528 /* ? */ 4529 if (main_app_port->pair[1] == -1) { 4530 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4531 &app->name, app, main_app_port, main_app_port->pid); 4532 4533 goto adjust_use; 4534 } 4535 4536 if (send_quit) { 4537 nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); 4538 4539 nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, 4540 NULL); 4541 4542 if (port_unchained) { 4543 nxt_port_use(task, main_app_port, -1); 4544 } 4545 4546 goto adjust_use; 4547 } 4548 4549 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4550 &app->name, app); 4551 4552 adjust_use: 4553 4554 nxt_port_use(task, port, inc_use); 4555 } 4556 4557 4558 void 4559 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4560 { 4561 nxt_app_t *app; 4562 nxt_bool_t unchain, start_process; 4563 nxt_port_t *idle_port; 4564 nxt_queue_link_t *idle_lnk; 4565 4566 app = port->app; 4567 4568 nxt_assert(app != NULL); 4569 4570 nxt_thread_mutex_lock(&app->mutex); 4571 4572 nxt_port_hash_remove(&app->port_hash, port); 4573 app->port_hash_count--; 4574 4575 if (port->id != 0) { 4576 nxt_thread_mutex_unlock(&app->mutex); 4577 4578 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, 4579 port->pid, port->id); 4580 4581 return; 4582 } 4583 4584 unchain = nxt_queue_chk_remove(&port->app_link); 4585 4586 if (nxt_queue_chk_remove(&port->idle_link)) { 4587 app->idle_processes--; 4588 4589 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", 4590 &app->name, port->pid, port->id, 4591 (port->idle_start ? "idle_ports" : "spare_ports")); 4592 4593 if (port->idle_start == 0 4594 && app->idle_processes >= app->spare_processes) 4595 { 4596 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4597 4598 idle_lnk = nxt_queue_last(&app->idle_ports); 4599 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4600 nxt_queue_remove(idle_lnk); 4601 4602 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4603 4604 idle_port->idle_start = 0; 4605 4606 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4607 "to spare_ports", 4608 &app->name, idle_port->pid, idle_port->id); 4609 } 4610 } 4611 4612 app->processes--; 4613 4614 start_process = !task->thread->engine->shutdown 4615 && nxt_router_app_can_start(app) 4616 && nxt_router_app_need_start(app); 4617 4618 if (start_process) { 4619 app->pending_processes++; 4620 } 4621 4622 nxt_thread_mutex_unlock(&app->mutex); 4623 4624 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 4625 4626 if (unchain) { 4627 nxt_port_use(task, port, -1); 4628 } 4629 4630 if (start_process) { 4631 nxt_router_start_app_process(task, app); 4632 } 4633 } 4634 4635 4636 static void 4637 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 4638 { 4639 nxt_app_t *app; 4640 nxt_bool_t queued; 4641 nxt_port_t *port; 4642 nxt_msec_t timeout, threshold; 4643 nxt_queue_link_t *lnk; 4644 nxt_event_engine_t *engine; 4645 4646 app = obj; 4647 queued = (data == app); 4648 4649 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 4650 &app->name, queued); 4651 4652 engine = task->thread->engine; 4653 4654 nxt_assert(app->engine == engine); 4655 4656 threshold = engine->timers.now + app->joint->idle_timer.bias; 4657 timeout = 0; 4658 4659 nxt_thread_mutex_lock(&app->mutex); 4660 4661 if (queued) { 4662 app->adjust_idle_work.data = NULL; 4663 } 4664 4665 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", 4666 &app->name, 4667 (int) app->idle_processes, (int) app->spare_processes); 4668 4669 while (app->idle_processes > app->spare_processes) { 4670 4671 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4672 4673 lnk = nxt_queue_first(&app->idle_ports); 4674 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 4675 4676 timeout = port->idle_start + app->idle_timeout; 4677 4678 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", 4679 &app->name, port->pid, 4680 port->idle_start, timeout, threshold); 4681 4682 if (timeout > threshold) { 4683 break; 4684 } 4685 4686 nxt_queue_remove(lnk); 4687 lnk->next = NULL; 4688 4689 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", 4690 &app->name, port->pid, port->id); 4691 4692 nxt_queue_chk_remove(&port->app_link); 4693 4694 nxt_port_hash_remove(&app->port_hash, port); 4695 app->port_hash_count--; 4696 4697 app->idle_processes--; 4698 app->processes--; 4699 port->app = NULL; 4700 4701 nxt_thread_mutex_unlock(&app->mutex); 4702 4703 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 4704 &app->name, port->pid); 4705 4706 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4707 4708 nxt_port_use(task, port, -1); 4709 4710 nxt_thread_mutex_lock(&app->mutex); 4711 } 4712 4713 nxt_thread_mutex_unlock(&app->mutex); 4714 4715 if (timeout > threshold) { 4716 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 4717 4718 } else { 4719 nxt_timer_disable(engine, &app->joint->idle_timer); 4720 } 4721 4722 if (queued) { 4723 nxt_router_app_use(task, app, -1); 4724 } 4725 } 4726 4727 4728 static void 4729 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 4730 { 4731 nxt_timer_t *timer; 4732 nxt_app_joint_t *app_joint; 4733 4734 timer = obj; 4735 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4736 4737 if (nxt_fast_path(app_joint->app != NULL)) { 4738 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 4739 } 4740 } 4741 4742 4743 static void 4744 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 4745 { 4746 nxt_timer_t *timer; 4747 nxt_app_joint_t *app_joint; 4748 4749 timer = obj; 4750 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4751 4752 nxt_router_app_joint_use(task, app_joint, -1); 4753 } 4754 4755 4756 static void 4757 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 4758 { 4759 nxt_app_t *app; 4760 nxt_port_t *port; 4761 nxt_app_joint_t *app_joint; 4762 4763 app_joint = obj; 4764 app = app_joint->app; 4765 4766 for ( ;; ) { 4767 port = nxt_router_app_get_port_for_quit(task, app); 4768 if (port == NULL) { 4769 break; 4770 } 4771 4772 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); 4773 4774 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4775 4776 nxt_port_use(task, port, -1); 4777 } 4778 4779 nxt_thread_mutex_lock(&app->mutex); 4780 4781 for ( ;; ) { 4782 port = nxt_port_hash_retrieve(&app->port_hash); 4783 if (port == NULL) { 4784 break; 4785 } 4786 4787 app->port_hash_count--; 4788 4789 port->app = NULL; 4790 4791 nxt_port_close(task, port); 4792 4793 nxt_port_use(task, port, -1); 4794 } 4795 4796 nxt_thread_mutex_unlock(&app->mutex); 4797 4798 nxt_assert(app->processes == 0); 4799 nxt_assert(app->active_requests == 0); 4800 nxt_assert(app->port_hash_count == 0); 4801 nxt_assert(app->idle_processes == 0); 4802 nxt_assert(nxt_queue_is_empty(&app->ports)); 4803 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 4804 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 4805 4806 nxt_port_mmaps_destroy(&app->outgoing, 1); 4807 4808 nxt_thread_mutex_destroy(&app->outgoing.mutex); 4809 4810 if (app->shared_port != NULL) { 4811 app->shared_port->app = NULL; 4812 nxt_port_close(task, app->shared_port); 4813 nxt_port_use(task, app->shared_port, -1); 4814 } 4815 4816 nxt_thread_mutex_destroy(&app->mutex); 4817 nxt_mp_destroy(app->mem_pool); 4818 4819 app_joint->app = NULL; 4820 4821 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 4822 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 4823 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 4824 4825 } else { 4826 nxt_router_app_joint_use(task, app_joint, -1); 4827 } 4828 } 4829 4830 4831 static void 4832 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 4833 nxt_request_rpc_data_t *req_rpc_data) 4834 { 4835 nxt_bool_t start_process; 4836 nxt_port_t *port; 4837 nxt_http_request_t *r; 4838 4839 start_process = 0; 4840 4841 nxt_thread_mutex_lock(&app->mutex); 4842 4843 port = app->shared_port; 4844 nxt_port_inc_use(port); 4845 4846 app->active_requests++; 4847 4848 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4849 app->pending_processes++; 4850 start_process = 1; 4851 } 4852 4853 r = req_rpc_data->request; 4854 4855 /* 4856 * Put request into application-wide list to be able to cancel request 4857 * if something goes wrong with application processes. 4858 */ 4859 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 4860 4861 nxt_thread_mutex_unlock(&app->mutex); 4862 4863 /* 4864 * Retain request memory pool while request is linked in ack_waiting_req 4865 * to guarantee request structure memory is accessble. 4866 */ 4867 nxt_mp_retain(r->mem_pool); 4868 4869 req_rpc_data->app_port = port; 4870 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 4871 4872 if (start_process) { 4873 nxt_router_start_app_process(task, app); 4874 } 4875 } 4876 4877 4878 void 4879 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, 4880 nxt_app_t *app) 4881 { 4882 nxt_event_engine_t *engine; 4883 nxt_request_rpc_data_t *req_rpc_data; 4884 4885 engine = task->thread->engine; 4886 4887 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, 4888 nxt_router_response_ready_handler, 4889 nxt_router_response_error_handler, 4890 sizeof(nxt_request_rpc_data_t)); 4891 if (nxt_slow_path(req_rpc_data == NULL)) { 4892 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4893 return; 4894 } 4895 4896 /* 4897 * At this point we have request req_rpc_data allocated and registered 4898 * in port handlers. Need to fixup request memory pool. Counterpart 4899 * release will be called via following call chain: 4900 * nxt_request_rpc_data_unlink() -> 4901 * nxt_router_http_request_release_post() -> 4902 * nxt_router_http_request_release() 4903 */ 4904 nxt_mp_retain(r->mem_pool); 4905 4906 r->timer.task = &engine->task; 4907 r->timer.work_queue = &engine->fast_work_queue; 4908 r->timer.log = engine->task.log; 4909 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 4910 4911 r->engine = engine; 4912 r->err_work.handler = nxt_router_http_request_error; 4913 r->err_work.task = task; 4914 r->err_work.obj = r; 4915 4916 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 4917 req_rpc_data->app = app; 4918 req_rpc_data->msg_info.body_fd = -1; 4919 req_rpc_data->rpc_cancel = 1; 4920 4921 nxt_router_app_use(task, app, 1); 4922 4923 req_rpc_data->request = r; 4924 r->req_rpc_data = req_rpc_data; 4925 4926 if (r->last != NULL) { 4927 r->last->completion_handler = nxt_router_http_request_done; 4928 } 4929 4930 nxt_router_app_port_get(task, app, req_rpc_data); 4931 nxt_router_app_prepare_request(task, req_rpc_data); 4932 } 4933 4934 4935 static void 4936 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 4937 { 4938 nxt_http_request_t *r; 4939 4940 r = obj; 4941 4942 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 4943 4944 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4945 4946 if (r->req_rpc_data != NULL) { 4947 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4948 } 4949 4950 nxt_mp_release(r->mem_pool); 4951 } 4952 4953 4954 static void 4955 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 4956 { 4957 nxt_http_request_t *r; 4958 4959 r = data; 4960 4961 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 4962 4963 if (r->req_rpc_data != NULL) { 4964 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4965 } 4966 4967 nxt_http_request_close_handler(task, r, r->proto.any); 4968 } 4969 4970 4971 static void 4972 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) 4973 { 4974 } 4975 4976 4977 static void 4978 nxt_router_app_prepare_request(nxt_task_t *task, 4979 nxt_request_rpc_data_t *req_rpc_data) 4980 { 4981 nxt_app_t *app; 4982 nxt_buf_t *buf, *body; 4983 nxt_int_t res; 4984 nxt_port_t *port, *reply_port; 4985 4986 int notify; 4987 struct { 4988 nxt_port_msg_t pm; 4989 nxt_port_mmap_msg_t mm; 4990 } msg; 4991 4992 4993 app = req_rpc_data->app; 4994 4995 nxt_assert(app != NULL); 4996 4997 port = req_rpc_data->app_port; 4998 4999 nxt_assert(port != NULL); 5000 nxt_assert(port->queue != NULL); 5001 5002 reply_port = task->thread->engine->port; 5003 5004 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, 5005 nxt_app_msg_prefix[app->type]); 5006 if (nxt_slow_path(buf == NULL)) { 5007 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", 5008 req_rpc_data->stream, &app->name); 5009 5010 nxt_http_request_error(task, req_rpc_data->request, 5011 NXT_HTTP_INTERNAL_SERVER_ERROR); 5012 5013 return; 5014 } 5015 5016 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 5017 nxt_buf_used_size(buf), 5018 port->socket.fd); 5019 5020 req_rpc_data->msg_info.buf = buf; 5021 5022 body = req_rpc_data->request->body; 5023 5024 if (body != NULL && nxt_buf_is_file(body)) { 5025 req_rpc_data->msg_info.body_fd = body->file->fd; 5026 5027 body->file->fd = -1; 5028 5029 } else { 5030 req_rpc_data->msg_info.body_fd = -1; 5031 } 5032 5033 msg.pm.stream = req_rpc_data->stream; 5034 msg.pm.pid = reply_port->pid; 5035 msg.pm.reply_port = reply_port->id; 5036 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; 5037 msg.pm.last = 0; 5038 msg.pm.mmap = 1; 5039 msg.pm.nf = 0; 5040 msg.pm.mf = 0; 5041 msg.pm.tracking = 0; 5042 5043 nxt_port_mmap_handler_t *mmap_handler = buf->parent; 5044 nxt_port_mmap_header_t *hdr = mmap_handler->hdr; 5045 5046 msg.mm.mmap_id = hdr->id; 5047 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); 5048 msg.mm.size = nxt_buf_used_size(buf); 5049 5050 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), 5051 req_rpc_data->stream, ¬ify, 5052 &req_rpc_data->msg_info.tracking_cookie); 5053 if (nxt_fast_path(res == NXT_OK)) { 5054 if (notify != 0) { 5055 (void) nxt_port_socket_write(task, port, 5056 NXT_PORT_MSG_READ_QUEUE, 5057 -1, req_rpc_data->stream, 5058 reply_port->id, NULL); 5059 5060 } else { 5061 nxt_debug(task, "queue is not empty"); 5062 } 5063 5064 buf->is_port_mmap_sent = 1; 5065 buf->mem.pos = buf->mem.free; 5066 5067 } else { 5068 nxt_alert(task, "stream #%uD, app '%V': failed to send app message", 5069 req_rpc_data->stream, &app->name); 5070 5071 nxt_http_request_error(task, req_rpc_data->request, 5072 NXT_HTTP_INTERNAL_SERVER_ERROR); 5073 } 5074 } 5075 5076 5077 struct nxt_fields_iter_s { 5078 nxt_list_part_t *part; 5079 nxt_http_field_t *field; 5080 }; 5081 5082 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 5083 5084 5085 static nxt_http_field_t * 5086 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 5087 { 5088 if (part == NULL) { 5089 return NULL; 5090 } 5091 5092 while (part->nelts == 0) { 5093 part = part->next; 5094 if (part == NULL) { 5095 return NULL; 5096 } 5097 } 5098 5099 i->part = part; 5100 i->field = nxt_list_data(i->part); 5101 5102 return i->field; 5103 } 5104 5105 5106 static nxt_http_field_t * 5107 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 5108 { 5109 return nxt_fields_part_first(nxt_list_part(fields), i); 5110 } 5111 5112 5113 static nxt_http_field_t * 5114 nxt_fields_next(nxt_fields_iter_t *i) 5115 { 5116 nxt_http_field_t *end = nxt_list_data(i->part); 5117 5118 end += i->part->nelts; 5119 i->field++; 5120 5121 if (i->field < end) { 5122 return i->field; 5123 } 5124 5125 return nxt_fields_part_first(i->part->next, i); 5126 } 5127 5128 5129 static nxt_buf_t * 5130 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, 5131 nxt_app_t *app, const nxt_str_t *prefix) 5132 { 5133 void *target_pos, *query_pos; 5134 u_char *pos, *end, *p, c; 5135 size_t fields_count, req_size, size, free_size; 5136 size_t copy_size; 5137 nxt_off_t content_length; 5138 nxt_buf_t *b, *buf, *out, **tail; 5139 nxt_http_field_t *field, *dup; 5140 nxt_unit_field_t *dst_field; 5141 nxt_fields_iter_t iter, dup_iter; 5142 nxt_unit_request_t *req; 5143 5144 req_size = sizeof(nxt_unit_request_t) 5145 + r->method->length + 1 5146 + r->version.length + 1 5147 + r->remote->length + 1 5148 + r->local->length + 1 5149 + r->server_name.length + 1 5150 + r->target.length + 1 5151 + (r->path->start != r->target.start ? r->path->length + 1 : 0); 5152 5153 content_length = r->content_length_n < 0 ? 0 : r->content_length_n; 5154 fields_count = 0; 5155 5156 nxt_list_each(field, r->fields) { 5157 fields_count++; 5158 5159 req_size += field->name_length + prefix->length + 1 5160 + field->value_length + 1; 5161 } nxt_list_loop; 5162 5163 req_size += fields_count * sizeof(nxt_unit_field_t); 5164 5165 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 5166 nxt_alert(task, "headers to big to fit in shared memory (%d)", 5167 (int) req_size); 5168 5169 return NULL; 5170 } 5171 5172 out = nxt_port_mmap_get_buf(task, &app->outgoing, 5173 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); 5174 if (nxt_slow_path(out == NULL)) { 5175 return NULL; 5176 } 5177 5178 req = (nxt_unit_request_t *) out->mem.free; 5179 out->mem.free += req_size; 5180 5181 req->app_target = r->app_target; 5182 5183 req->content_length = content_length; 5184 5185 p = (u_char *) (req->fields + fields_count); 5186 5187 nxt_debug(task, "fields_count=%d", (int) fields_count); 5188 5189 req->method_length = r->method->length; 5190 nxt_unit_sptr_set(&req->method, p); 5191 p = nxt_cpymem(p, r->method->start, r->method->length); 5192 *p++ = '\0'; 5193 5194 req->version_length = r->version.length; 5195 nxt_unit_sptr_set(&req->version, p); 5196 p = nxt_cpymem(p, r->version.start, r->version.length); 5197 *p++ = '\0'; 5198 5199 req->remote_length = r->remote->address_length; 5200 nxt_unit_sptr_set(&req->remote, p); 5201 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote), 5202 r->remote->address_length); 5203 *p++ = '\0'; 5204 5205 req->local_length = r->local->address_length; 5206 nxt_unit_sptr_set(&req->local, p); 5207 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length); 5208 *p++ = '\0'; 5209 5210 req->tls = (r->tls != NULL); 5211 req->websocket_handshake = r->websocket_handshake; 5212 5213 req->server_name_length = r->server_name.length; 5214 nxt_unit_sptr_set(&req->server_name, p); 5215 p = nxt_cpymem(p, r->server_name.start, r->server_name.length); 5216 *p++ = '\0'; 5217 5218 target_pos = p; 5219 req->target_length = (uint32_t) r->target.length; 5220 nxt_unit_sptr_set(&req->target, p); 5221 p = nxt_cpymem(p, r->target.start, r->target.length); 5222 *p++ = '\0'; 5223 5224 req->path_length = (uint32_t) r->path->length; 5225 if (r->path->start == r->target.start) { 5226 nxt_unit_sptr_set(&req->path, target_pos); 5227 5228 } else { 5229 nxt_unit_sptr_set(&req->path, p); 5230 p = nxt_cpymem(p, r->path->start, r->path->length); 5231 *p++ = '\0'; 5232 } 5233 5234 req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0; 5235 if (r->args != NULL && r->args->start != NULL) { 5236 query_pos = nxt_pointer_to(target_pos, 5237 r->args->start - r->target.start); 5238 5239 nxt_unit_sptr_set(&req->query, query_pos); 5240 5241 } else { 5242 req->query.offset = 0; 5243 } 5244 5245 req->content_length_field = NXT_UNIT_NONE_FIELD; 5246 req->content_type_field = NXT_UNIT_NONE_FIELD; 5247 req->cookie_field = NXT_UNIT_NONE_FIELD; 5248 req->authorization_field = NXT_UNIT_NONE_FIELD; 5249 5250 dst_field = req->fields; 5251 5252 for (field = nxt_fields_first(r->fields, &iter); 5253 field != NULL; 5254 field = nxt_fields_next(&iter)) 5255 { 5256 if (field->skip) { 5257 continue; 5258 } 5259 5260 dst_field->hash = field->hash; 5261 dst_field->skip = 0; 5262 dst_field->name_length = field->name_length + prefix->length; 5263 dst_field->value_length = field->value_length; 5264 5265 if (field == r->content_length) { 5266 req->content_length_field = dst_field - req->fields; 5267 5268 } else if (field == r->content_type) { 5269 req->content_type_field = dst_field - req->fields; 5270 5271 } else if (field == r->cookie) { 5272 req->cookie_field = dst_field - req->fields; 5273 5274 } else if (field == r->authorization) { 5275 req->authorization_field = dst_field - req->fields; 5276 } 5277 5278 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 5279 (int) field->hash, (int) field->skip, 5280 (int) field->name_length, field->name, 5281 (int) field->value_length, field->value); 5282 5283 if (prefix->length != 0) { 5284 nxt_unit_sptr_set(&dst_field->name, p); 5285 p = nxt_cpymem(p, prefix->start, prefix->length); 5286 5287 end = field->name + field->name_length; 5288 for (pos = field->name; pos < end; pos++) { 5289 c = *pos; 5290 5291 if (c >= 'a' && c <= 'z') { 5292 *p++ = (c & ~0x20); 5293 continue; 5294 } 5295 5296 if (c == '-') { 5297 *p++ = '_'; 5298 continue; 5299 } 5300 5301 *p++ = c; 5302 } 5303 5304 } else { 5305 nxt_unit_sptr_set(&dst_field->name, p); 5306 p = nxt_cpymem(p, field->name, field->name_length); 5307 } 5308 5309 *p++ = '\0'; 5310 5311 nxt_unit_sptr_set(&dst_field->value, p); 5312 p = nxt_cpymem(p, field->value, field->value_length); 5313 5314 if (prefix->length != 0) { 5315 dup_iter = iter; 5316 5317 for (dup = nxt_fields_next(&dup_iter); 5318 dup != NULL; 5319 dup = nxt_fields_next(&dup_iter)) 5320 { 5321 if (dup->name_length != field->name_length 5322 || dup->skip 5323 || dup->hash != field->hash 5324 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 5325 { 5326 continue; 5327 } 5328 5329 p = nxt_cpymem(p, ", ", 2); 5330 p = nxt_cpymem(p, dup->value, dup->value_length); 5331 5332 dst_field->value_length += 2 + dup->value_length; 5333 5334 dup->skip = 1; 5335 } 5336 } 5337 5338 *p++ = '\0'; 5339 5340 dst_field++; 5341 } 5342 5343 req->fields_count = (uint32_t) (dst_field - req->fields); 5344 5345 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 5346 5347 buf = out; 5348 tail = &buf->next; 5349 5350 for (b = r->body; b != NULL; b = b->next) { 5351 size = nxt_buf_mem_used_size(&b->mem); 5352 pos = b->mem.pos; 5353 5354 while (size > 0) { 5355 if (buf == NULL) { 5356 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 5357 5358 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5359 if (nxt_slow_path(buf == NULL)) { 5360 while (out != NULL) { 5361 buf = out->next; 5362 out->next = NULL; 5363 out->completion_handler(task, out, out->parent); 5364 out = buf; 5365 } 5366 return NULL; 5367 } 5368 5369 *tail = buf; 5370 tail = &buf->next; 5371 5372 } else { 5373 free_size = nxt_buf_mem_free_size(&buf->mem); 5374 if (free_size < size 5375 && nxt_port_mmap_increase_buf(task, buf, size, 1) 5376 == NXT_OK) 5377 { 5378 free_size = nxt_buf_mem_free_size(&buf->mem); 5379 } 5380 } 5381 5382 if (free_size > 0) { 5383 copy_size = nxt_min(free_size, size); 5384 5385 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 5386 5387 size -= copy_size; 5388 pos += copy_size; 5389 5390 if (size == 0) { 5391 break; 5392 } 5393 } 5394 5395 buf = NULL; 5396 } 5397 } 5398 5399 return out; 5400 } 5401 5402 5403 static void 5404 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 5405 { 5406 nxt_timer_t *timer; 5407 nxt_http_request_t *r; 5408 nxt_request_rpc_data_t *req_rpc_data; 5409 5410 timer = obj; 5411 5412 nxt_debug(task, "router app timeout"); 5413 5414 r = nxt_timer_data(timer, nxt_http_request_t, timer); 5415 req_rpc_data = r->timer_data; 5416 5417 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5418 5419 nxt_request_rpc_data_unlink(task, req_rpc_data); 5420 } 5421 5422 5423 static void 5424 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5425 { 5426 r->timer.handler = nxt_router_http_request_release; 5427 nxt_timer_add(task->thread->engine, &r->timer, 0); 5428 } 5429 5430 5431 static void 5432 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) 5433 { 5434 nxt_http_request_t *r; 5435 5436 nxt_debug(task, "http request pool release"); 5437 5438 r = nxt_timer_data(obj, nxt_http_request_t, timer); 5439 5440 nxt_mp_release(r->mem_pool); 5441 } 5442 5443 5444 static void 5445 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5446 { 5447 size_t mi; 5448 uint32_t i; 5449 nxt_bool_t ack; 5450 nxt_process_t *process; 5451 nxt_free_map_t *m; 5452 nxt_port_mmap_handler_t *mmap_handler; 5453 5454 nxt_debug(task, "oosm in %PI", msg->port_msg.pid); 5455 5456 process = nxt_runtime_process_find(task->thread->runtime, 5457 msg->port_msg.pid); 5458 if (nxt_slow_path(process == NULL)) { 5459 return; 5460 } 5461 5462 ack = 0; 5463 5464 /* 5465 * To mitigate possible racing condition (when OOSM message received 5466 * after some of the memory was already freed), need to try to find 5467 * first free segment in shared memory and send ACK if found. 5468 */ 5469 5470 nxt_thread_mutex_lock(&process->incoming.mutex); 5471 5472 for (i = 0; i < process->incoming.size; i++) { 5473 mmap_handler = process->incoming.elts[i].mmap_handler; 5474 5475 if (nxt_slow_path(mmap_handler == NULL)) { 5476 continue; 5477 } 5478 5479 m = mmap_handler->hdr->free_map; 5480 5481 for (mi = 0; mi < MAX_FREE_IDX; mi++) { 5482 if (m[mi] != 0) { 5483 ack = 1; 5484 5485 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", 5486 i, mi, m[mi]); 5487 5488 break; 5489 } 5490 } 5491 } 5492 5493 nxt_thread_mutex_unlock(&process->incoming.mutex); 5494 5495 if (ack) { 5496 nxt_process_broadcast_shm_ack(task, process); 5497 } 5498 } 5499 5500 5501 static void 5502 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5503 { 5504 nxt_fd_t fd; 5505 nxt_port_t *port; 5506 nxt_runtime_t *rt; 5507 nxt_port_mmaps_t *mmaps; 5508 nxt_port_msg_get_mmap_t *get_mmap_msg; 5509 nxt_port_mmap_handler_t *mmap_handler; 5510 5511 rt = task->thread->runtime; 5512 5513 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5514 msg->port_msg.reply_port); 5515 if (nxt_slow_path(port == NULL)) { 5516 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", 5517 msg->port_msg.pid, msg->port_msg.reply_port); 5518 5519 return; 5520 } 5521 5522 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5523 < (int) sizeof(nxt_port_msg_get_mmap_t))) 5524 { 5525 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", 5526 (int) nxt_buf_used_size(msg->buf)); 5527 5528 return; 5529 } 5530 5531 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; 5532 5533 nxt_assert(port->type == NXT_PROCESS_APP); 5534 5535 if (nxt_slow_path(port->app == NULL)) { 5536 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", 5537 port->pid, port->id); 5538 5539 // FIXME 5540 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5541 -1, msg->port_msg.stream, 0, NULL); 5542 5543 return; 5544 } 5545 5546 mmaps = &port->app->outgoing; 5547 nxt_thread_mutex_lock(&mmaps->mutex); 5548 5549 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { 5550 nxt_thread_mutex_unlock(&mmaps->mutex); 5551 5552 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", 5553 (int) get_mmap_msg->id); 5554 5555 // FIXME 5556 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5557 -1, msg->port_msg.stream, 0, NULL); 5558 return; 5559 } 5560 5561 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; 5562 5563 fd = mmap_handler->fd; 5564 5565 nxt_thread_mutex_unlock(&mmaps->mutex); 5566 5567 nxt_debug(task, "get mmap %PI:%d found", 5568 msg->port_msg.pid, (int) get_mmap_msg->id); 5569 5570 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 5571 } 5572 5573 5574 static void 5575 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5576 { 5577 nxt_port_t *port, *reply_port; 5578 nxt_runtime_t *rt; 5579 nxt_port_msg_get_port_t *get_port_msg; 5580 5581 rt = task->thread->runtime; 5582 5583 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5584 msg->port_msg.reply_port); 5585 if (nxt_slow_path(reply_port == NULL)) { 5586 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", 5587 msg->port_msg.pid, msg->port_msg.reply_port); 5588 5589 return; 5590 } 5591 5592 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5593 < (int) sizeof(nxt_port_msg_get_port_t))) 5594 { 5595 nxt_alert(task, "get_port_handler: message buffer too small (%d)", 5596 (int) nxt_buf_used_size(msg->buf)); 5597 5598 return; 5599 } 5600 5601 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; 5602 5603 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); 5604 if (nxt_slow_path(port == NULL)) { 5605 nxt_alert(task, "get_port_handler: port %PI:%d not found", 5606 get_port_msg->pid, get_port_msg->id); 5607 5608 return; 5609 } 5610 5611 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, 5612 get_port_msg->id); 5613 5614 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); 5615 } 5616