1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 #include <nxt_app_queue.h> 19 #include <nxt_port_queue.h> 20 21 typedef struct { 22 nxt_str_t type; 23 uint32_t processes; 24 uint32_t max_processes; 25 uint32_t spare_processes; 26 nxt_msec_t timeout; 27 nxt_msec_t idle_timeout; 28 uint32_t requests; 29 nxt_conf_value_t *limits_value; 30 nxt_conf_value_t *processes_value; 31 nxt_conf_value_t *targets_value; 32 } nxt_router_app_conf_t; 33 34 35 typedef struct { 36 nxt_str_t pass; 37 nxt_str_t application; 38 } nxt_router_listener_conf_t; 39 40 41 #if (NXT_TLS) 42 43 typedef struct { 44 nxt_str_t name; 45 nxt_socket_conf_t *conf; 46 47 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 48 } nxt_router_tlssock_t; 49 50 #endif 51 52 53 typedef struct { 54 nxt_str_t *name; 55 nxt_socket_conf_t *socket_conf; 56 nxt_router_temp_conf_t *temp_conf; 57 nxt_bool_t last; 58 } nxt_socket_rpc_t; 59 60 61 typedef struct { 62 nxt_app_t *app; 63 nxt_router_temp_conf_t *temp_conf; 64 } nxt_app_rpc_t; 65 66 67 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 68 nxt_mp_t *mp); 69 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 70 static void nxt_router_greet_controller(nxt_task_t *task, 71 nxt_port_t *controller_port); 72 73 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 74 75 static void nxt_router_new_port_handler(nxt_task_t *task, 76 nxt_port_recv_msg_t *msg); 77 static void nxt_router_conf_data_handler(nxt_task_t *task, 78 nxt_port_recv_msg_t *msg); 79 static void nxt_router_remove_pid_handler(nxt_task_t *task, 80 nxt_port_recv_msg_t *msg); 81 static void nxt_router_access_log_reopen_handler(nxt_task_t *task, 82 nxt_port_recv_msg_t *msg); 83 84 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 85 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 86 static void nxt_router_conf_ready(nxt_task_t *task, 87 nxt_router_temp_conf_t *tmcf); 88 static void nxt_router_conf_error(nxt_task_t *task, 89 nxt_router_temp_conf_t *tmcf); 90 static void nxt_router_conf_send(nxt_task_t *task, 91 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 92 93 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 94 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 95 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 96 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 97 98 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 99 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 100 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 101 nxt_app_t *app); 102 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 103 nxt_str_t *name); 104 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 105 int i); 106 107 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 108 nxt_port_t *port); 109 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 110 nxt_port_t *port); 111 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 112 nxt_port_t *port, nxt_fd_t fd); 113 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 114 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 115 static void nxt_router_listen_socket_ready(nxt_task_t *task, 116 nxt_port_recv_msg_t *msg, void *data); 117 static void nxt_router_listen_socket_error(nxt_task_t *task, 118 nxt_port_recv_msg_t *msg, void *data); 119 #if (NXT_TLS) 120 static void nxt_router_tls_rpc_create(nxt_task_t *task, 121 nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls, nxt_bool_t last); 122 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 123 nxt_port_recv_msg_t *msg, void *data); 124 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 125 nxt_conf_value_t *value, nxt_socket_conf_t *skcf); 126 #endif 127 static void nxt_router_app_rpc_create(nxt_task_t *task, 128 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 129 static void nxt_router_app_prefork_ready(nxt_task_t *task, 130 nxt_port_recv_msg_t *msg, void *data); 131 static void nxt_router_app_prefork_error(nxt_task_t *task, 132 nxt_port_recv_msg_t *msg, void *data); 133 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 134 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 135 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 136 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 137 138 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 139 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 140 const nxt_event_interface_t *interface); 141 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 142 nxt_router_engine_conf_t *recf); 143 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 144 nxt_router_engine_conf_t *recf); 145 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 146 nxt_router_engine_conf_t *recf); 147 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 148 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 149 nxt_work_handler_t handler); 150 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 151 nxt_router_engine_conf_t *recf); 152 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 153 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 154 155 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 156 nxt_router_temp_conf_t *tmcf); 157 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 158 nxt_event_engine_t *engine); 159 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 160 nxt_router_temp_conf_t *tmcf); 161 162 static void nxt_router_engines_post(nxt_router_t *router, 163 nxt_router_temp_conf_t *tmcf); 164 static void nxt_router_engine_post(nxt_event_engine_t *engine, 165 nxt_work_t *jobs); 166 167 static void nxt_router_thread_start(void *data); 168 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 169 void *data); 170 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 171 void *data); 172 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 173 void *data); 174 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 175 void *data); 176 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 177 void *data); 178 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 179 void *data); 180 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 181 void *data); 182 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 183 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 184 static void nxt_router_listen_socket_release(nxt_task_t *task, 185 nxt_socket_conf_t *skcf); 186 187 static void nxt_router_access_log_writer(nxt_task_t *task, 188 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 189 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 190 struct tm *tm, size_t size, const char *format); 191 static void nxt_router_access_log_open(nxt_task_t *task, 192 nxt_router_temp_conf_t *tmcf); 193 static void nxt_router_access_log_ready(nxt_task_t *task, 194 nxt_port_recv_msg_t *msg, void *data); 195 static void nxt_router_access_log_error(nxt_task_t *task, 196 nxt_port_recv_msg_t *msg, void *data); 197 static void nxt_router_access_log_release(nxt_task_t *task, 198 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 199 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 200 void *data); 201 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 202 nxt_port_recv_msg_t *msg, void *data); 203 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 204 nxt_port_recv_msg_t *msg, void *data); 205 206 static void nxt_router_app_port_ready(nxt_task_t *task, 207 nxt_port_recv_msg_t *msg, void *data); 208 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, 209 nxt_port_t *app_port); 210 static void nxt_router_app_port_error(nxt_task_t *task, 211 nxt_port_recv_msg_t *msg, void *data); 212 213 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 214 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 215 216 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 217 nxt_apr_action_t action); 218 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 219 nxt_request_rpc_data_t *req_rpc_data); 220 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 221 void *data); 222 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 223 void *data); 224 225 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, 226 void *data); 227 static void nxt_router_app_prepare_request(nxt_task_t *task, 228 nxt_request_rpc_data_t *req_rpc_data); 229 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 230 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 231 232 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 233 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 234 void *data); 235 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 236 void *data); 237 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 238 void *data); 239 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 240 241 static const nxt_http_request_state_t nxt_http_request_send_state; 242 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 243 244 static void nxt_router_app_joint_use(nxt_task_t *task, 245 nxt_app_joint_t *app_joint, int i); 246 247 static void nxt_router_http_request_release_post(nxt_task_t *task, 248 nxt_http_request_t *r); 249 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 250 void *data); 251 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 252 static void nxt_router_get_port_handler(nxt_task_t *task, 253 nxt_port_recv_msg_t *msg); 254 static void nxt_router_get_mmap_handler(nxt_task_t *task, 255 nxt_port_recv_msg_t *msg); 256 257 extern const nxt_http_request_state_t nxt_http_websocket; 258 259 static nxt_router_t *nxt_router; 260 261 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 262 static const nxt_str_t empty_prefix = nxt_string(""); 263 264 static const nxt_str_t *nxt_app_msg_prefix[] = { 265 &empty_prefix, 266 &empty_prefix, 267 &http_prefix, 268 &http_prefix, 269 &http_prefix, 270 &empty_prefix, 271 }; 272 273 274 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 275 .quit = nxt_signal_quit_handler, 276 .new_port = nxt_router_new_port_handler, 277 .get_port = nxt_router_get_port_handler, 278 .change_file = nxt_port_change_log_file_handler, 279 .mmap = nxt_port_mmap_handler, 280 .get_mmap = nxt_router_get_mmap_handler, 281 .data = nxt_router_conf_data_handler, 282 .remove_pid = nxt_router_remove_pid_handler, 283 .access_log = nxt_router_access_log_reopen_handler, 284 .rpc_ready = nxt_port_rpc_handler, 285 .rpc_error = nxt_port_rpc_handler, 286 .oosm = nxt_router_oosm_handler, 287 }; 288 289 290 const nxt_process_init_t nxt_router_process = { 291 .name = "router", 292 .type = NXT_PROCESS_ROUTER, 293 .prefork = nxt_router_prefork, 294 .restart = 1, 295 .setup = nxt_process_core_setup, 296 .start = nxt_router_start, 297 .port_handlers = &nxt_router_process_port_handlers, 298 .signals = nxt_process_signals, 299 }; 300 301 302 /* Queues of nxt_socket_conf_t */ 303 nxt_queue_t creating_sockets; 304 nxt_queue_t pending_sockets; 305 nxt_queue_t updating_sockets; 306 nxt_queue_t keeping_sockets; 307 nxt_queue_t deleting_sockets; 308 309 310 static nxt_int_t 311 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 312 { 313 nxt_runtime_stop_app_processes(task, task->thread->runtime); 314 315 return NXT_OK; 316 } 317 318 319 static nxt_int_t 320 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 321 { 322 nxt_int_t ret; 323 nxt_port_t *controller_port; 324 nxt_router_t *router; 325 nxt_runtime_t *rt; 326 327 rt = task->thread->runtime; 328 329 nxt_log(task, NXT_LOG_INFO, "router started"); 330 331 #if (NXT_TLS) 332 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 333 if (nxt_slow_path(rt->tls == NULL)) { 334 return NXT_ERROR; 335 } 336 337 ret = rt->tls->library_init(task); 338 if (nxt_slow_path(ret != NXT_OK)) { 339 return ret; 340 } 341 #endif 342 343 ret = nxt_http_init(task); 344 if (nxt_slow_path(ret != NXT_OK)) { 345 return ret; 346 } 347 348 router = nxt_zalloc(sizeof(nxt_router_t)); 349 if (nxt_slow_path(router == NULL)) { 350 return NXT_ERROR; 351 } 352 353 nxt_queue_init(&router->engines); 354 nxt_queue_init(&router->sockets); 355 nxt_queue_init(&router->apps); 356 357 nxt_router = router; 358 359 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 360 if (controller_port != NULL) { 361 nxt_router_greet_controller(task, controller_port); 362 } 363 364 return NXT_OK; 365 } 366 367 368 static void 369 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 370 { 371 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 372 -1, 0, 0, NULL); 373 } 374 375 376 static void 377 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 378 void *data) 379 { 380 size_t size; 381 uint32_t stream; 382 nxt_mp_t *mp; 383 nxt_int_t ret; 384 nxt_app_t *app; 385 nxt_buf_t *b; 386 nxt_port_t *main_port; 387 nxt_runtime_t *rt; 388 389 app = data; 390 391 rt = task->thread->runtime; 392 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 393 394 nxt_debug(task, "app '%V' %p start process", &app->name, app); 395 396 size = app->name.length + 1 + app->conf.length; 397 398 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 399 400 if (nxt_slow_path(b == NULL)) { 401 goto failed; 402 } 403 404 nxt_buf_cpystr(b, &app->name); 405 *b->mem.free++ = '\0'; 406 nxt_buf_cpystr(b, &app->conf); 407 408 nxt_router_app_joint_use(task, app->joint, 1); 409 410 stream = nxt_port_rpc_register_handler(task, port, 411 nxt_router_app_port_ready, 412 nxt_router_app_port_error, 413 -1, app->joint); 414 415 if (nxt_slow_path(stream == 0)) { 416 nxt_router_app_joint_use(task, app->joint, -1); 417 418 goto failed; 419 } 420 421 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 422 -1, stream, port->id, b); 423 424 if (nxt_slow_path(ret != NXT_OK)) { 425 nxt_port_rpc_cancel(task, port, stream); 426 427 nxt_router_app_joint_use(task, app->joint, -1); 428 429 goto failed; 430 } 431 432 nxt_router_app_use(task, app, -1); 433 434 return; 435 436 failed: 437 438 if (b != NULL) { 439 mp = b->data; 440 nxt_mp_free(mp, b); 441 nxt_mp_release(mp); 442 } 443 444 nxt_thread_mutex_lock(&app->mutex); 445 446 app->pending_processes--; 447 448 nxt_thread_mutex_unlock(&app->mutex); 449 450 nxt_router_app_use(task, app, -1); 451 } 452 453 454 static void 455 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 456 { 457 app_joint->use_count += i; 458 459 if (app_joint->use_count == 0) { 460 nxt_assert(app_joint->app == NULL); 461 462 nxt_free(app_joint); 463 } 464 } 465 466 467 static nxt_int_t 468 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 469 { 470 nxt_int_t res; 471 nxt_port_t *router_port; 472 nxt_runtime_t *rt; 473 474 nxt_debug(task, "app '%V' start process", &app->name); 475 476 rt = task->thread->runtime; 477 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 478 479 nxt_router_app_use(task, app, 1); 480 481 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 482 app); 483 484 if (res == NXT_OK) { 485 return res; 486 } 487 488 nxt_thread_mutex_lock(&app->mutex); 489 490 app->pending_processes--; 491 492 nxt_thread_mutex_unlock(&app->mutex); 493 494 nxt_router_app_use(task, app, -1); 495 496 return NXT_ERROR; 497 } 498 499 500 nxt_inline nxt_bool_t 501 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 502 { 503 nxt_buf_t *b, *next; 504 nxt_bool_t cancelled; 505 nxt_msg_info_t *msg_info; 506 507 msg_info = &req_rpc_data->msg_info; 508 509 if (msg_info->buf == NULL) { 510 return 0; 511 } 512 513 cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue, 514 msg_info->tracking_cookie, 515 req_rpc_data->stream); 516 517 if (cancelled) { 518 nxt_debug(task, "stream #%uD: cancelled by router", 519 req_rpc_data->stream); 520 } 521 522 for (b = msg_info->buf; b != NULL; b = next) { 523 next = b->next; 524 b->next = NULL; 525 526 if (b->is_port_mmap_sent) { 527 b->is_port_mmap_sent = cancelled == 0; 528 } 529 530 b->completion_handler(task, b, b->parent); 531 } 532 533 msg_info->buf = NULL; 534 535 return cancelled; 536 } 537 538 539 nxt_inline nxt_bool_t 540 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 541 { 542 if (lnk->next != NULL) { 543 nxt_queue_remove(lnk); 544 545 lnk->next = NULL; 546 547 return 1; 548 } 549 550 return 0; 551 } 552 553 554 nxt_inline void 555 nxt_request_rpc_data_unlink(nxt_task_t *task, 556 nxt_request_rpc_data_t *req_rpc_data) 557 { 558 nxt_app_t *app; 559 nxt_bool_t unlinked; 560 nxt_http_request_t *r; 561 562 nxt_router_msg_cancel(task, req_rpc_data); 563 564 if (req_rpc_data->app_port != NULL) { 565 nxt_router_app_port_release(task, req_rpc_data->app_port, 566 req_rpc_data->apr_action); 567 568 req_rpc_data->app_port = NULL; 569 } 570 571 app = req_rpc_data->app; 572 r = req_rpc_data->request; 573 574 if (r != NULL) { 575 r->timer_data = NULL; 576 577 nxt_router_http_request_release_post(task, r); 578 579 r->req_rpc_data = NULL; 580 req_rpc_data->request = NULL; 581 582 if (app != NULL) { 583 unlinked = 0; 584 585 nxt_thread_mutex_lock(&app->mutex); 586 587 if (r->app_link.next != NULL) { 588 nxt_queue_remove(&r->app_link); 589 r->app_link.next = NULL; 590 591 unlinked = 1; 592 } 593 594 nxt_thread_mutex_unlock(&app->mutex); 595 596 if (unlinked) { 597 nxt_mp_release(r->mem_pool); 598 } 599 } 600 } 601 602 if (app != NULL) { 603 nxt_router_app_use(task, app, -1); 604 605 req_rpc_data->app = NULL; 606 } 607 608 if (req_rpc_data->msg_info.body_fd != -1) { 609 nxt_fd_close(req_rpc_data->msg_info.body_fd); 610 611 req_rpc_data->msg_info.body_fd = -1; 612 } 613 614 if (req_rpc_data->rpc_cancel) { 615 req_rpc_data->rpc_cancel = 0; 616 617 nxt_port_rpc_cancel(task, task->thread->engine->port, 618 req_rpc_data->stream); 619 } 620 } 621 622 623 static void 624 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 625 { 626 nxt_int_t res; 627 nxt_app_t *app; 628 nxt_port_t *port, *main_app_port; 629 nxt_runtime_t *rt; 630 631 nxt_port_new_port_handler(task, msg); 632 633 port = msg->u.new_port; 634 635 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 636 nxt_router_greet_controller(task, msg->u.new_port); 637 } 638 639 if (port == NULL || port->type != NXT_PROCESS_APP) { 640 641 if (msg->port_msg.stream == 0) { 642 return; 643 } 644 645 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 646 647 } else { 648 if (msg->fd[1] != -1) { 649 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 650 if (nxt_slow_path(res != NXT_OK)) { 651 return; 652 } 653 654 nxt_fd_close(msg->fd[1]); 655 msg->fd[1] = -1; 656 } 657 } 658 659 if (msg->port_msg.stream != 0) { 660 nxt_port_rpc_handler(task, msg); 661 return; 662 } 663 664 /* 665 * Port with "id == 0" is application 'main' port and it always 666 * should come with non-zero stream. 667 */ 668 nxt_assert(port->id != 0); 669 670 /* Find 'main' app port and get app reference. */ 671 rt = task->thread->runtime; 672 673 /* 674 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 675 * sent to main port (with id == 0) and processed in main thread. 676 */ 677 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 678 nxt_assert(main_app_port != NULL); 679 680 app = main_app_port->app; 681 nxt_assert(app != NULL); 682 683 nxt_thread_mutex_lock(&app->mutex); 684 685 /* TODO here should be find-and-add code because there can be 686 port waiters in port_hash */ 687 nxt_port_hash_add(&app->port_hash, port); 688 app->port_hash_count++; 689 690 nxt_thread_mutex_unlock(&app->mutex); 691 692 port->app = app; 693 port->main_app_port = main_app_port; 694 695 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 696 } 697 698 699 static void 700 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 701 { 702 void *p; 703 size_t size; 704 nxt_int_t ret; 705 nxt_port_t *port; 706 nxt_router_temp_conf_t *tmcf; 707 708 port = nxt_runtime_port_find(task->thread->runtime, 709 msg->port_msg.pid, 710 msg->port_msg.reply_port); 711 if (nxt_slow_path(port == NULL)) { 712 nxt_alert(task, "conf_data_handler: reply port not found"); 713 return; 714 } 715 716 p = MAP_FAILED; 717 718 /* 719 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 720 * initialized in 'cleanup' section. 721 */ 722 size = 0; 723 724 tmcf = nxt_router_temp_conf(task); 725 if (nxt_slow_path(tmcf == NULL)) { 726 goto fail; 727 } 728 729 if (nxt_slow_path(msg->fd[0] == -1)) { 730 nxt_alert(task, "conf_data_handler: invalid shm fd"); 731 goto fail; 732 } 733 734 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 735 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 736 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 737 goto fail; 738 } 739 740 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 741 742 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 743 744 nxt_fd_close(msg->fd[0]); 745 msg->fd[0] = -1; 746 747 if (nxt_slow_path(p == MAP_FAILED)) { 748 goto fail; 749 } 750 751 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 752 753 tmcf->router_conf->router = nxt_router; 754 tmcf->stream = msg->port_msg.stream; 755 tmcf->port = port; 756 757 nxt_port_use(task, tmcf->port, 1); 758 759 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 760 761 if (nxt_fast_path(ret == NXT_OK)) { 762 nxt_router_conf_apply(task, tmcf, NULL); 763 764 } else { 765 nxt_router_conf_error(task, tmcf); 766 } 767 768 goto cleanup; 769 770 fail: 771 772 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 773 msg->port_msg.stream, 0, NULL); 774 775 if (tmcf != NULL) { 776 nxt_mp_release(tmcf->mem_pool); 777 } 778 779 cleanup: 780 781 if (p != MAP_FAILED) { 782 nxt_mem_munmap(p, size); 783 } 784 785 if (msg->fd[0] != -1) { 786 nxt_fd_close(msg->fd[0]); 787 msg->fd[0] = -1; 788 } 789 } 790 791 792 static void 793 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 794 void *data) 795 { 796 union { 797 nxt_pid_t removed_pid; 798 void *data; 799 } u; 800 801 u.data = data; 802 803 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 804 } 805 806 807 static void 808 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 809 { 810 nxt_event_engine_t *engine; 811 812 nxt_port_remove_pid_handler(task, msg); 813 814 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 815 { 816 if (nxt_fast_path(engine->port != NULL)) { 817 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 818 msg->u.data); 819 } 820 } 821 nxt_queue_loop; 822 823 if (msg->port_msg.stream == 0) { 824 return; 825 } 826 827 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 828 829 nxt_port_rpc_handler(task, msg); 830 } 831 832 833 static nxt_router_temp_conf_t * 834 nxt_router_temp_conf(nxt_task_t *task) 835 { 836 nxt_mp_t *mp, *tmp; 837 nxt_router_conf_t *rtcf; 838 nxt_router_temp_conf_t *tmcf; 839 840 mp = nxt_mp_create(1024, 128, 256, 32); 841 if (nxt_slow_path(mp == NULL)) { 842 return NULL; 843 } 844 845 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 846 if (nxt_slow_path(rtcf == NULL)) { 847 goto fail; 848 } 849 850 rtcf->mem_pool = mp; 851 852 tmp = nxt_mp_create(1024, 128, 256, 32); 853 if (nxt_slow_path(tmp == NULL)) { 854 goto fail; 855 } 856 857 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 858 if (nxt_slow_path(tmcf == NULL)) { 859 goto temp_fail; 860 } 861 862 tmcf->mem_pool = tmp; 863 tmcf->router_conf = rtcf; 864 tmcf->count = 1; 865 tmcf->engine = task->thread->engine; 866 867 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 868 sizeof(nxt_router_engine_conf_t)); 869 if (nxt_slow_path(tmcf->engines == NULL)) { 870 goto temp_fail; 871 } 872 873 nxt_queue_init(&creating_sockets); 874 nxt_queue_init(&pending_sockets); 875 nxt_queue_init(&updating_sockets); 876 nxt_queue_init(&keeping_sockets); 877 nxt_queue_init(&deleting_sockets); 878 879 #if (NXT_TLS) 880 nxt_queue_init(&tmcf->tls); 881 #endif 882 883 nxt_queue_init(&tmcf->apps); 884 nxt_queue_init(&tmcf->previous); 885 886 return tmcf; 887 888 temp_fail: 889 890 nxt_mp_destroy(tmp); 891 892 fail: 893 894 nxt_mp_destroy(mp); 895 896 return NULL; 897 } 898 899 900 nxt_inline nxt_bool_t 901 nxt_router_app_can_start(nxt_app_t *app) 902 { 903 return app->processes + app->pending_processes < app->max_processes 904 && app->pending_processes < app->max_pending_processes; 905 } 906 907 908 nxt_inline nxt_bool_t 909 nxt_router_app_need_start(nxt_app_t *app) 910 { 911 return (app->active_requests 912 > app->port_hash_count + app->pending_processes) 913 || (app->spare_processes 914 > app->idle_processes + app->pending_processes); 915 } 916 917 918 static void 919 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 920 { 921 nxt_int_t ret; 922 nxt_app_t *app; 923 nxt_router_t *router; 924 nxt_runtime_t *rt; 925 nxt_queue_link_t *qlk; 926 nxt_socket_conf_t *skcf; 927 nxt_router_conf_t *rtcf; 928 nxt_router_temp_conf_t *tmcf; 929 const nxt_event_interface_t *interface; 930 #if (NXT_TLS) 931 nxt_router_tlssock_t *tls; 932 #endif 933 934 tmcf = obj; 935 936 qlk = nxt_queue_first(&pending_sockets); 937 938 if (qlk != nxt_queue_tail(&pending_sockets)) { 939 nxt_queue_remove(qlk); 940 nxt_queue_insert_tail(&creating_sockets, qlk); 941 942 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 943 944 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 945 946 return; 947 } 948 949 #if (NXT_TLS) 950 qlk = nxt_queue_last(&tmcf->tls); 951 952 if (qlk != nxt_queue_head(&tmcf->tls)) { 953 nxt_queue_remove(qlk); 954 955 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 956 957 nxt_router_tls_rpc_create(task, tmcf, tls, 958 nxt_queue_is_empty(&tmcf->tls)); 959 return; 960 } 961 #endif 962 963 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 964 965 if (nxt_router_app_need_start(app)) { 966 nxt_router_app_rpc_create(task, tmcf, app); 967 return; 968 } 969 970 } nxt_queue_loop; 971 972 rtcf = tmcf->router_conf; 973 974 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 975 nxt_router_access_log_open(task, tmcf); 976 return; 977 } 978 979 rt = task->thread->runtime; 980 981 interface = nxt_service_get(rt->services, "engine", NULL); 982 983 router = rtcf->router; 984 985 ret = nxt_router_engines_create(task, router, tmcf, interface); 986 if (nxt_slow_path(ret != NXT_OK)) { 987 goto fail; 988 } 989 990 ret = nxt_router_threads_create(task, rt, tmcf); 991 if (nxt_slow_path(ret != NXT_OK)) { 992 goto fail; 993 } 994 995 nxt_router_apps_sort(task, router, tmcf); 996 997 nxt_router_apps_hash_use(task, rtcf, 1); 998 999 nxt_router_engines_post(router, tmcf); 1000 1001 nxt_queue_add(&router->sockets, &updating_sockets); 1002 nxt_queue_add(&router->sockets, &creating_sockets); 1003 1004 router->access_log = rtcf->access_log; 1005 1006 nxt_router_conf_ready(task, tmcf); 1007 1008 return; 1009 1010 fail: 1011 1012 nxt_router_conf_error(task, tmcf); 1013 1014 return; 1015 } 1016 1017 1018 static void 1019 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1020 { 1021 nxt_joint_job_t *job; 1022 1023 job = obj; 1024 1025 nxt_router_conf_ready(task, job->tmcf); 1026 } 1027 1028 1029 static void 1030 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1031 { 1032 uint32_t count; 1033 nxt_router_conf_t *rtcf; 1034 nxt_thread_spinlock_t *lock; 1035 1036 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1037 1038 if (--tmcf->count > 0) { 1039 return; 1040 } 1041 1042 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1043 1044 rtcf = tmcf->router_conf; 1045 1046 lock = &rtcf->router->lock; 1047 1048 nxt_thread_spin_lock(lock); 1049 1050 count = rtcf->count; 1051 1052 nxt_thread_spin_unlock(lock); 1053 1054 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1055 1056 if (count == 0) { 1057 nxt_router_apps_hash_use(task, rtcf, -1); 1058 1059 nxt_router_access_log_release(task, lock, rtcf->access_log); 1060 1061 nxt_mp_destroy(rtcf->mem_pool); 1062 } 1063 1064 nxt_mp_release(tmcf->mem_pool); 1065 } 1066 1067 1068 static void 1069 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1070 { 1071 nxt_app_t *app; 1072 nxt_queue_t new_socket_confs; 1073 nxt_socket_t s; 1074 nxt_router_t *router; 1075 nxt_queue_link_t *qlk; 1076 nxt_socket_conf_t *skcf; 1077 nxt_router_conf_t *rtcf; 1078 1079 nxt_alert(task, "failed to apply new conf"); 1080 1081 for (qlk = nxt_queue_first(&creating_sockets); 1082 qlk != nxt_queue_tail(&creating_sockets); 1083 qlk = nxt_queue_next(qlk)) 1084 { 1085 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1086 s = skcf->listen->socket; 1087 1088 if (s != -1) { 1089 nxt_socket_close(task, s); 1090 } 1091 1092 nxt_free(skcf->listen); 1093 } 1094 1095 nxt_queue_init(&new_socket_confs); 1096 nxt_queue_add(&new_socket_confs, &updating_sockets); 1097 nxt_queue_add(&new_socket_confs, &pending_sockets); 1098 nxt_queue_add(&new_socket_confs, &creating_sockets); 1099 1100 rtcf = tmcf->router_conf; 1101 1102 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1103 1104 nxt_router_app_unlink(task, app); 1105 1106 } nxt_queue_loop; 1107 1108 router = rtcf->router; 1109 1110 nxt_queue_add(&router->sockets, &keeping_sockets); 1111 nxt_queue_add(&router->sockets, &deleting_sockets); 1112 1113 nxt_queue_add(&router->apps, &tmcf->previous); 1114 1115 // TODO: new engines and threads 1116 1117 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1118 1119 nxt_mp_destroy(rtcf->mem_pool); 1120 1121 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1122 1123 nxt_mp_release(tmcf->mem_pool); 1124 } 1125 1126 1127 static void 1128 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1129 nxt_port_msg_type_t type) 1130 { 1131 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1132 1133 nxt_port_use(task, tmcf->port, -1); 1134 1135 tmcf->port = NULL; 1136 } 1137 1138 1139 static nxt_conf_map_t nxt_router_conf[] = { 1140 { 1141 nxt_string("listeners_threads"), 1142 NXT_CONF_MAP_INT32, 1143 offsetof(nxt_router_conf_t, threads), 1144 }, 1145 }; 1146 1147 1148 static nxt_conf_map_t nxt_router_app_conf[] = { 1149 { 1150 nxt_string("type"), 1151 NXT_CONF_MAP_STR, 1152 offsetof(nxt_router_app_conf_t, type), 1153 }, 1154 1155 { 1156 nxt_string("limits"), 1157 NXT_CONF_MAP_PTR, 1158 offsetof(nxt_router_app_conf_t, limits_value), 1159 }, 1160 1161 { 1162 nxt_string("processes"), 1163 NXT_CONF_MAP_INT32, 1164 offsetof(nxt_router_app_conf_t, processes), 1165 }, 1166 1167 { 1168 nxt_string("processes"), 1169 NXT_CONF_MAP_PTR, 1170 offsetof(nxt_router_app_conf_t, processes_value), 1171 }, 1172 1173 { 1174 nxt_string("targets"), 1175 NXT_CONF_MAP_PTR, 1176 offsetof(nxt_router_app_conf_t, targets_value), 1177 }, 1178 }; 1179 1180 1181 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1182 { 1183 nxt_string("timeout"), 1184 NXT_CONF_MAP_MSEC, 1185 offsetof(nxt_router_app_conf_t, timeout), 1186 }, 1187 1188 { 1189 nxt_string("requests"), 1190 NXT_CONF_MAP_INT32, 1191 offsetof(nxt_router_app_conf_t, requests), 1192 }, 1193 }; 1194 1195 1196 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1197 { 1198 nxt_string("spare"), 1199 NXT_CONF_MAP_INT32, 1200 offsetof(nxt_router_app_conf_t, spare_processes), 1201 }, 1202 1203 { 1204 nxt_string("max"), 1205 NXT_CONF_MAP_INT32, 1206 offsetof(nxt_router_app_conf_t, max_processes), 1207 }, 1208 1209 { 1210 nxt_string("idle_timeout"), 1211 NXT_CONF_MAP_MSEC, 1212 offsetof(nxt_router_app_conf_t, idle_timeout), 1213 }, 1214 }; 1215 1216 1217 static nxt_conf_map_t nxt_router_listener_conf[] = { 1218 { 1219 nxt_string("pass"), 1220 NXT_CONF_MAP_STR_COPY, 1221 offsetof(nxt_router_listener_conf_t, pass), 1222 }, 1223 1224 { 1225 nxt_string("application"), 1226 NXT_CONF_MAP_STR_COPY, 1227 offsetof(nxt_router_listener_conf_t, application), 1228 }, 1229 }; 1230 1231 1232 static nxt_conf_map_t nxt_router_http_conf[] = { 1233 { 1234 nxt_string("header_buffer_size"), 1235 NXT_CONF_MAP_SIZE, 1236 offsetof(nxt_socket_conf_t, header_buffer_size), 1237 }, 1238 1239 { 1240 nxt_string("large_header_buffer_size"), 1241 NXT_CONF_MAP_SIZE, 1242 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1243 }, 1244 1245 { 1246 nxt_string("large_header_buffers"), 1247 NXT_CONF_MAP_SIZE, 1248 offsetof(nxt_socket_conf_t, large_header_buffers), 1249 }, 1250 1251 { 1252 nxt_string("body_buffer_size"), 1253 NXT_CONF_MAP_SIZE, 1254 offsetof(nxt_socket_conf_t, body_buffer_size), 1255 }, 1256 1257 { 1258 nxt_string("max_body_size"), 1259 NXT_CONF_MAP_SIZE, 1260 offsetof(nxt_socket_conf_t, max_body_size), 1261 }, 1262 1263 { 1264 nxt_string("idle_timeout"), 1265 NXT_CONF_MAP_MSEC, 1266 offsetof(nxt_socket_conf_t, idle_timeout), 1267 }, 1268 1269 { 1270 nxt_string("header_read_timeout"), 1271 NXT_CONF_MAP_MSEC, 1272 offsetof(nxt_socket_conf_t, header_read_timeout), 1273 }, 1274 1275 { 1276 nxt_string("body_read_timeout"), 1277 NXT_CONF_MAP_MSEC, 1278 offsetof(nxt_socket_conf_t, body_read_timeout), 1279 }, 1280 1281 { 1282 nxt_string("send_timeout"), 1283 NXT_CONF_MAP_MSEC, 1284 offsetof(nxt_socket_conf_t, send_timeout), 1285 }, 1286 1287 { 1288 nxt_string("body_temp_path"), 1289 NXT_CONF_MAP_STR, 1290 offsetof(nxt_socket_conf_t, body_temp_path), 1291 }, 1292 1293 { 1294 nxt_string("discard_unsafe_fields"), 1295 NXT_CONF_MAP_INT8, 1296 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1297 }, 1298 }; 1299 1300 1301 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1302 { 1303 nxt_string("max_frame_size"), 1304 NXT_CONF_MAP_SIZE, 1305 offsetof(nxt_websocket_conf_t, max_frame_size), 1306 }, 1307 1308 { 1309 nxt_string("read_timeout"), 1310 NXT_CONF_MAP_MSEC, 1311 offsetof(nxt_websocket_conf_t, read_timeout), 1312 }, 1313 1314 { 1315 nxt_string("keepalive_interval"), 1316 NXT_CONF_MAP_MSEC, 1317 offsetof(nxt_websocket_conf_t, keepalive_interval), 1318 }, 1319 1320 }; 1321 1322 1323 static nxt_int_t 1324 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1325 u_char *start, u_char *end) 1326 { 1327 u_char *p; 1328 size_t size; 1329 nxt_mp_t *mp, *app_mp; 1330 uint32_t next, next_target; 1331 nxt_int_t ret; 1332 nxt_str_t name, path, target; 1333 nxt_app_t *app, *prev; 1334 nxt_str_t *t, *s, *targets; 1335 nxt_uint_t n, i; 1336 nxt_port_t *port; 1337 nxt_router_t *router; 1338 nxt_app_joint_t *app_joint; 1339 #if (NXT_TLS) 1340 nxt_conf_value_t *certificate; 1341 #endif 1342 nxt_conf_value_t *conf, *http, *value, *websocket; 1343 nxt_conf_value_t *applications, *application; 1344 nxt_conf_value_t *listeners, *listener; 1345 nxt_conf_value_t *routes_conf, *static_conf; 1346 nxt_socket_conf_t *skcf; 1347 nxt_http_routes_t *routes; 1348 nxt_event_engine_t *engine; 1349 nxt_app_lang_module_t *lang; 1350 nxt_router_app_conf_t apcf; 1351 nxt_router_access_log_t *access_log; 1352 nxt_router_listener_conf_t lscf; 1353 1354 static nxt_str_t http_path = nxt_string("/settings/http"); 1355 static nxt_str_t applications_path = nxt_string("/applications"); 1356 static nxt_str_t listeners_path = nxt_string("/listeners"); 1357 static nxt_str_t routes_path = nxt_string("/routes"); 1358 static nxt_str_t access_log_path = nxt_string("/access_log"); 1359 #if (NXT_TLS) 1360 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1361 #endif 1362 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1363 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1364 1365 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1366 if (conf == NULL) { 1367 nxt_alert(task, "configuration parsing error"); 1368 return NXT_ERROR; 1369 } 1370 1371 mp = tmcf->router_conf->mem_pool; 1372 1373 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1374 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1375 if (ret != NXT_OK) { 1376 nxt_alert(task, "root map error"); 1377 return NXT_ERROR; 1378 } 1379 1380 if (tmcf->router_conf->threads == 0) { 1381 tmcf->router_conf->threads = nxt_ncpu; 1382 } 1383 1384 static_conf = nxt_conf_get_path(conf, &static_path); 1385 1386 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf); 1387 if (nxt_slow_path(ret != NXT_OK)) { 1388 return NXT_ERROR; 1389 } 1390 1391 router = tmcf->router_conf->router; 1392 1393 applications = nxt_conf_get_path(conf, &applications_path); 1394 1395 if (applications != NULL) { 1396 next = 0; 1397 1398 for ( ;; ) { 1399 application = nxt_conf_next_object_member(applications, 1400 &name, &next); 1401 if (application == NULL) { 1402 break; 1403 } 1404 1405 nxt_debug(task, "application \"%V\"", &name); 1406 1407 size = nxt_conf_json_length(application, NULL); 1408 1409 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1410 if (nxt_slow_path(app_mp == NULL)) { 1411 goto fail; 1412 } 1413 1414 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1415 if (app == NULL) { 1416 goto app_fail; 1417 } 1418 1419 nxt_memzero(app, sizeof(nxt_app_t)); 1420 1421 app->mem_pool = app_mp; 1422 1423 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1424 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1425 + name.length); 1426 1427 p = nxt_conf_json_print(app->conf.start, application, NULL); 1428 app->conf.length = p - app->conf.start; 1429 1430 nxt_assert(app->conf.length <= size); 1431 1432 nxt_debug(task, "application conf \"%V\"", &app->conf); 1433 1434 prev = nxt_router_app_find(&router->apps, &name); 1435 1436 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1437 nxt_mp_destroy(app_mp); 1438 1439 nxt_queue_remove(&prev->link); 1440 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1441 1442 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev); 1443 if (nxt_slow_path(ret != NXT_OK)) { 1444 goto fail; 1445 } 1446 1447 continue; 1448 } 1449 1450 apcf.processes = 1; 1451 apcf.max_processes = 1; 1452 apcf.spare_processes = 0; 1453 apcf.timeout = 0; 1454 apcf.idle_timeout = 15000; 1455 apcf.requests = 0; 1456 apcf.limits_value = NULL; 1457 apcf.processes_value = NULL; 1458 apcf.targets_value = NULL; 1459 1460 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1461 if (nxt_slow_path(app_joint == NULL)) { 1462 goto app_fail; 1463 } 1464 1465 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1466 1467 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1468 nxt_nitems(nxt_router_app_conf), &apcf); 1469 if (ret != NXT_OK) { 1470 nxt_alert(task, "application map error"); 1471 goto app_fail; 1472 } 1473 1474 if (apcf.limits_value != NULL) { 1475 1476 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1477 nxt_alert(task, "application limits is not object"); 1478 goto app_fail; 1479 } 1480 1481 ret = nxt_conf_map_object(mp, apcf.limits_value, 1482 nxt_router_app_limits_conf, 1483 nxt_nitems(nxt_router_app_limits_conf), 1484 &apcf); 1485 if (ret != NXT_OK) { 1486 nxt_alert(task, "application limits map error"); 1487 goto app_fail; 1488 } 1489 } 1490 1491 if (apcf.processes_value != NULL 1492 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1493 { 1494 ret = nxt_conf_map_object(mp, apcf.processes_value, 1495 nxt_router_app_processes_conf, 1496 nxt_nitems(nxt_router_app_processes_conf), 1497 &apcf); 1498 if (ret != NXT_OK) { 1499 nxt_alert(task, "application processes map error"); 1500 goto app_fail; 1501 } 1502 1503 } else { 1504 apcf.max_processes = apcf.processes; 1505 apcf.spare_processes = apcf.processes; 1506 } 1507 1508 if (apcf.targets_value != NULL) { 1509 n = nxt_conf_object_members_count(apcf.targets_value); 1510 1511 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1512 if (nxt_slow_path(targets == NULL)) { 1513 goto app_fail; 1514 } 1515 1516 next_target = 0; 1517 1518 for (i = 0; i < n; i++) { 1519 (void) nxt_conf_next_object_member(apcf.targets_value, 1520 &target, &next_target); 1521 1522 s = nxt_str_dup(app_mp, &targets[i], &target); 1523 if (nxt_slow_path(s == NULL)) { 1524 goto app_fail; 1525 } 1526 } 1527 1528 } else { 1529 targets = NULL; 1530 } 1531 1532 nxt_debug(task, "application type: %V", &apcf.type); 1533 nxt_debug(task, "application processes: %D", apcf.processes); 1534 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1535 nxt_debug(task, "application requests: %D", apcf.requests); 1536 1537 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1538 1539 if (lang == NULL) { 1540 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1541 goto app_fail; 1542 } 1543 1544 nxt_debug(task, "application language module: \"%s\"", lang->file); 1545 1546 ret = nxt_thread_mutex_create(&app->mutex); 1547 if (ret != NXT_OK) { 1548 goto app_fail; 1549 } 1550 1551 nxt_queue_init(&app->ports); 1552 nxt_queue_init(&app->spare_ports); 1553 nxt_queue_init(&app->idle_ports); 1554 nxt_queue_init(&app->ack_waiting_req); 1555 1556 app->name.length = name.length; 1557 nxt_memcpy(app->name.start, name.start, name.length); 1558 1559 app->type = lang->type; 1560 app->max_processes = apcf.max_processes; 1561 app->spare_processes = apcf.spare_processes; 1562 app->max_pending_processes = apcf.spare_processes 1563 ? apcf.spare_processes : 1; 1564 app->timeout = apcf.timeout; 1565 app->idle_timeout = apcf.idle_timeout; 1566 app->max_requests = apcf.requests; 1567 1568 app->targets = targets; 1569 1570 engine = task->thread->engine; 1571 1572 app->engine = engine; 1573 1574 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1575 app->adjust_idle_work.task = &engine->task; 1576 app->adjust_idle_work.obj = app; 1577 1578 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1579 1580 ret = nxt_router_apps_hash_add(tmcf->router_conf, app); 1581 if (nxt_slow_path(ret != NXT_OK)) { 1582 goto app_fail; 1583 } 1584 1585 nxt_router_app_use(task, app, 1); 1586 1587 app->joint = app_joint; 1588 1589 app_joint->use_count = 1; 1590 app_joint->app = app; 1591 1592 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1593 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1594 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1595 app_joint->idle_timer.task = &engine->task; 1596 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1597 1598 app_joint->free_app_work.handler = nxt_router_free_app; 1599 app_joint->free_app_work.task = &engine->task; 1600 app_joint->free_app_work.obj = app_joint; 1601 1602 port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid, 1603 NXT_PROCESS_APP); 1604 if (nxt_slow_path(port == NULL)) { 1605 return NXT_ERROR; 1606 } 1607 1608 ret = nxt_port_socket_init(task, port, 0); 1609 if (nxt_slow_path(ret != NXT_OK)) { 1610 nxt_port_use(task, port, -1); 1611 return NXT_ERROR; 1612 } 1613 1614 ret = nxt_router_app_queue_init(task, port); 1615 if (nxt_slow_path(ret != NXT_OK)) { 1616 nxt_port_write_close(port); 1617 nxt_port_read_close(port); 1618 nxt_port_use(task, port, -1); 1619 return NXT_ERROR; 1620 } 1621 1622 nxt_port_write_enable(task, port); 1623 port->app = app; 1624 1625 app->shared_port = port; 1626 1627 nxt_thread_mutex_create(&app->outgoing.mutex); 1628 } 1629 } 1630 1631 routes_conf = nxt_conf_get_path(conf, &routes_path); 1632 if (nxt_fast_path(routes_conf != NULL)) { 1633 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1634 if (nxt_slow_path(routes == NULL)) { 1635 return NXT_ERROR; 1636 } 1637 tmcf->router_conf->routes = routes; 1638 } 1639 1640 ret = nxt_upstreams_create(task, tmcf, conf); 1641 if (nxt_slow_path(ret != NXT_OK)) { 1642 return ret; 1643 } 1644 1645 http = nxt_conf_get_path(conf, &http_path); 1646 #if 0 1647 if (http == NULL) { 1648 nxt_alert(task, "no \"http\" block"); 1649 return NXT_ERROR; 1650 } 1651 #endif 1652 1653 websocket = nxt_conf_get_path(conf, &websocket_path); 1654 1655 listeners = nxt_conf_get_path(conf, &listeners_path); 1656 1657 if (listeners != NULL) { 1658 next = 0; 1659 1660 for ( ;; ) { 1661 listener = nxt_conf_next_object_member(listeners, &name, &next); 1662 if (listener == NULL) { 1663 break; 1664 } 1665 1666 skcf = nxt_router_socket_conf(task, tmcf, &name); 1667 if (skcf == NULL) { 1668 goto fail; 1669 } 1670 1671 nxt_memzero(&lscf, sizeof(lscf)); 1672 1673 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1674 nxt_nitems(nxt_router_listener_conf), 1675 &lscf); 1676 if (ret != NXT_OK) { 1677 nxt_alert(task, "listener map error"); 1678 goto fail; 1679 } 1680 1681 nxt_debug(task, "application: %V", &lscf.application); 1682 1683 // STUB, default values if http block is not defined. 1684 skcf->header_buffer_size = 2048; 1685 skcf->large_header_buffer_size = 8192; 1686 skcf->large_header_buffers = 4; 1687 skcf->discard_unsafe_fields = 1; 1688 skcf->body_buffer_size = 16 * 1024; 1689 skcf->max_body_size = 8 * 1024 * 1024; 1690 skcf->proxy_header_buffer_size = 64 * 1024; 1691 skcf->proxy_buffer_size = 4096; 1692 skcf->proxy_buffers = 256; 1693 skcf->idle_timeout = 180 * 1000; 1694 skcf->header_read_timeout = 30 * 1000; 1695 skcf->body_read_timeout = 30 * 1000; 1696 skcf->send_timeout = 30 * 1000; 1697 skcf->proxy_timeout = 60 * 1000; 1698 skcf->proxy_send_timeout = 30 * 1000; 1699 skcf->proxy_read_timeout = 30 * 1000; 1700 1701 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1702 skcf->websocket_conf.read_timeout = 60 * 1000; 1703 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1704 1705 nxt_str_null(&skcf->body_temp_path); 1706 1707 if (http != NULL) { 1708 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1709 nxt_nitems(nxt_router_http_conf), 1710 skcf); 1711 if (ret != NXT_OK) { 1712 nxt_alert(task, "http map error"); 1713 goto fail; 1714 } 1715 } 1716 1717 if (websocket != NULL) { 1718 ret = nxt_conf_map_object(mp, websocket, 1719 nxt_router_websocket_conf, 1720 nxt_nitems(nxt_router_websocket_conf), 1721 &skcf->websocket_conf); 1722 if (ret != NXT_OK) { 1723 nxt_alert(task, "websocket map error"); 1724 goto fail; 1725 } 1726 } 1727 1728 t = &skcf->body_temp_path; 1729 1730 if (t->length == 0) { 1731 t->start = (u_char *) task->thread->runtime->tmp; 1732 t->length = nxt_strlen(t->start); 1733 } 1734 1735 #if (NXT_TLS) 1736 certificate = nxt_conf_get_path(listener, &certificate_path); 1737 1738 if (certificate != NULL) { 1739 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) { 1740 n = nxt_conf_array_elements_count(certificate); 1741 1742 for (i = 0; i < n; i++) { 1743 value = nxt_conf_get_array_element(certificate, i); 1744 1745 nxt_assert(value != NULL); 1746 1747 ret = nxt_router_conf_tls_insert(tmcf, value, skcf); 1748 if (nxt_slow_path(ret != NXT_OK)) { 1749 goto fail; 1750 } 1751 } 1752 1753 } else { 1754 /* NXT_CONF_STRING */ 1755 ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf); 1756 if (nxt_slow_path(ret != NXT_OK)) { 1757 goto fail; 1758 } 1759 } 1760 } 1761 #endif 1762 1763 skcf->listen->handler = nxt_http_conn_init; 1764 skcf->router_conf = tmcf->router_conf; 1765 skcf->router_conf->count++; 1766 1767 if (lscf.pass.length != 0) { 1768 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1769 1770 /* COMPATIBILITY: listener application. */ 1771 } else if (lscf.application.length > 0) { 1772 skcf->action = nxt_http_pass_application(task, 1773 tmcf->router_conf, 1774 &lscf.application); 1775 } 1776 1777 if (nxt_slow_path(skcf->action == NULL)) { 1778 goto fail; 1779 } 1780 } 1781 } 1782 1783 ret = nxt_http_routes_resolve(task, tmcf); 1784 if (nxt_slow_path(ret != NXT_OK)) { 1785 goto fail; 1786 } 1787 1788 value = nxt_conf_get_path(conf, &access_log_path); 1789 1790 if (value != NULL) { 1791 nxt_conf_get_string(value, &path); 1792 1793 access_log = router->access_log; 1794 1795 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1796 nxt_thread_spin_lock(&router->lock); 1797 access_log->count++; 1798 nxt_thread_spin_unlock(&router->lock); 1799 1800 } else { 1801 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1802 + path.length); 1803 if (access_log == NULL) { 1804 nxt_alert(task, "failed to allocate access log structure"); 1805 goto fail; 1806 } 1807 1808 access_log->fd = -1; 1809 access_log->handler = &nxt_router_access_log_writer; 1810 access_log->count = 1; 1811 1812 access_log->path.length = path.length; 1813 access_log->path.start = (u_char *) access_log 1814 + sizeof(nxt_router_access_log_t); 1815 1816 nxt_memcpy(access_log->path.start, path.start, path.length); 1817 } 1818 1819 tmcf->router_conf->access_log = access_log; 1820 } 1821 1822 nxt_queue_add(&deleting_sockets, &router->sockets); 1823 nxt_queue_init(&router->sockets); 1824 1825 return NXT_OK; 1826 1827 app_fail: 1828 1829 nxt_mp_destroy(app_mp); 1830 1831 fail: 1832 1833 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1834 1835 nxt_queue_remove(&app->link); 1836 nxt_thread_mutex_destroy(&app->mutex); 1837 nxt_mp_destroy(app->mem_pool); 1838 1839 } nxt_queue_loop; 1840 1841 return NXT_ERROR; 1842 } 1843 1844 1845 #if (NXT_TLS) 1846 1847 static nxt_int_t 1848 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 1849 nxt_conf_value_t *value, nxt_socket_conf_t *skcf) 1850 { 1851 nxt_mp_t *mp; 1852 nxt_str_t str; 1853 nxt_router_tlssock_t *tls; 1854 1855 mp = tmcf->router_conf->mem_pool; 1856 1857 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t)); 1858 if (nxt_slow_path(tls == NULL)) { 1859 return NXT_ERROR; 1860 } 1861 1862 tls->conf = skcf; 1863 nxt_conf_get_string(value, &str); 1864 1865 if (nxt_slow_path(nxt_str_dup(mp, &tls->name, &str) == NULL)) { 1866 return NXT_ERROR; 1867 } 1868 1869 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 1870 1871 return NXT_OK; 1872 } 1873 1874 #endif 1875 1876 1877 static nxt_int_t 1878 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 1879 nxt_conf_value_t *conf) 1880 { 1881 uint32_t next, i; 1882 nxt_mp_t *mp; 1883 nxt_str_t *type, extension, str; 1884 nxt_int_t ret; 1885 nxt_uint_t exts; 1886 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 1887 1888 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 1889 1890 mp = rtcf->mem_pool; 1891 1892 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 1893 if (nxt_slow_path(ret != NXT_OK)) { 1894 return NXT_ERROR; 1895 } 1896 1897 if (conf == NULL) { 1898 return NXT_OK; 1899 } 1900 1901 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 1902 1903 if (mtypes_conf != NULL) { 1904 next = 0; 1905 1906 for ( ;; ) { 1907 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 1908 1909 if (ext_conf == NULL) { 1910 break; 1911 } 1912 1913 type = nxt_str_dup(mp, NULL, &str); 1914 if (nxt_slow_path(type == NULL)) { 1915 return NXT_ERROR; 1916 } 1917 1918 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 1919 nxt_conf_get_string(ext_conf, &str); 1920 1921 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { 1922 return NXT_ERROR; 1923 } 1924 1925 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1926 &extension, type); 1927 if (nxt_slow_path(ret != NXT_OK)) { 1928 return NXT_ERROR; 1929 } 1930 1931 continue; 1932 } 1933 1934 exts = nxt_conf_array_elements_count(ext_conf); 1935 1936 for (i = 0; i < exts; i++) { 1937 value = nxt_conf_get_array_element(ext_conf, i); 1938 1939 nxt_conf_get_string(value, &str); 1940 1941 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { 1942 return NXT_ERROR; 1943 } 1944 1945 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1946 &extension, type); 1947 if (nxt_slow_path(ret != NXT_OK)) { 1948 return NXT_ERROR; 1949 } 1950 } 1951 } 1952 } 1953 1954 return NXT_OK; 1955 } 1956 1957 1958 static nxt_app_t * 1959 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1960 { 1961 nxt_app_t *app; 1962 1963 nxt_queue_each(app, queue, nxt_app_t, link) { 1964 1965 if (nxt_strstr_eq(name, &app->name)) { 1966 return app; 1967 } 1968 1969 } nxt_queue_loop; 1970 1971 return NULL; 1972 } 1973 1974 1975 static nxt_int_t 1976 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 1977 { 1978 void *mem; 1979 nxt_int_t fd; 1980 1981 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 1982 if (nxt_slow_path(fd == -1)) { 1983 return NXT_ERROR; 1984 } 1985 1986 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 1987 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 1988 if (nxt_slow_path(mem == MAP_FAILED)) { 1989 nxt_fd_close(fd); 1990 1991 return NXT_ERROR; 1992 } 1993 1994 nxt_app_queue_init(mem); 1995 1996 port->queue_fd = fd; 1997 port->queue = mem; 1998 1999 return NXT_OK; 2000 } 2001 2002 2003 static nxt_int_t 2004 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 2005 { 2006 void *mem; 2007 nxt_int_t fd; 2008 2009 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 2010 if (nxt_slow_path(fd == -1)) { 2011 return NXT_ERROR; 2012 } 2013 2014 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2015 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2016 if (nxt_slow_path(mem == MAP_FAILED)) { 2017 nxt_fd_close(fd); 2018 2019 return NXT_ERROR; 2020 } 2021 2022 nxt_port_queue_init(mem); 2023 2024 port->queue_fd = fd; 2025 port->queue = mem; 2026 2027 return NXT_OK; 2028 } 2029 2030 2031 static nxt_int_t 2032 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 2033 { 2034 void *mem; 2035 2036 nxt_assert(fd != -1); 2037 2038 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2039 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2040 if (nxt_slow_path(mem == MAP_FAILED)) { 2041 2042 return NXT_ERROR; 2043 } 2044 2045 port->queue = mem; 2046 2047 return NXT_OK; 2048 } 2049 2050 2051 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2052 NXT_LVLHSH_DEFAULT, 2053 nxt_router_apps_hash_test, 2054 nxt_mp_lvlhsh_alloc, 2055 nxt_mp_lvlhsh_free, 2056 }; 2057 2058 2059 static nxt_int_t 2060 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2061 { 2062 nxt_app_t *app; 2063 2064 app = data; 2065 2066 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2067 } 2068 2069 2070 static nxt_int_t 2071 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2072 { 2073 nxt_lvlhsh_query_t lhq; 2074 2075 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2076 lhq.replace = 0; 2077 lhq.key = app->name; 2078 lhq.value = app; 2079 lhq.proto = &nxt_router_apps_hash_proto; 2080 lhq.pool = rtcf->mem_pool; 2081 2082 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2083 2084 case NXT_OK: 2085 return NXT_OK; 2086 2087 case NXT_DECLINED: 2088 nxt_thread_log_alert("router app hash adding failed: " 2089 "\"%V\" is already in hash", &lhq.key); 2090 /* Fall through. */ 2091 default: 2092 return NXT_ERROR; 2093 } 2094 } 2095 2096 2097 static nxt_app_t * 2098 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2099 { 2100 nxt_lvlhsh_query_t lhq; 2101 2102 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2103 lhq.key = *name; 2104 lhq.proto = &nxt_router_apps_hash_proto; 2105 2106 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2107 return NULL; 2108 } 2109 2110 return lhq.value; 2111 } 2112 2113 2114 static void 2115 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2116 { 2117 nxt_app_t *app; 2118 nxt_lvlhsh_each_t lhe; 2119 2120 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2121 2122 for ( ;; ) { 2123 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2124 2125 if (app == NULL) { 2126 break; 2127 } 2128 2129 nxt_router_app_use(task, app, i); 2130 } 2131 } 2132 2133 2134 2135 nxt_int_t 2136 nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name, 2137 nxt_http_action_t *action) 2138 { 2139 nxt_app_t *app; 2140 2141 app = nxt_router_apps_hash_get(rtcf, name); 2142 2143 if (app == NULL) { 2144 return NXT_DECLINED; 2145 } 2146 2147 action->u.app.application = app; 2148 action->handler = nxt_http_application_handler; 2149 2150 return NXT_OK; 2151 } 2152 2153 2154 static nxt_socket_conf_t * 2155 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2156 nxt_str_t *name) 2157 { 2158 size_t size; 2159 nxt_int_t ret; 2160 nxt_bool_t wildcard; 2161 nxt_sockaddr_t *sa; 2162 nxt_socket_conf_t *skcf; 2163 nxt_listen_socket_t *ls; 2164 2165 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2166 if (nxt_slow_path(sa == NULL)) { 2167 nxt_alert(task, "invalid listener \"%V\"", name); 2168 return NULL; 2169 } 2170 2171 sa->type = SOCK_STREAM; 2172 2173 nxt_debug(task, "router listener: \"%*s\"", 2174 (size_t) sa->length, nxt_sockaddr_start(sa)); 2175 2176 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2177 if (nxt_slow_path(skcf == NULL)) { 2178 return NULL; 2179 } 2180 2181 size = nxt_sockaddr_size(sa); 2182 2183 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2184 2185 if (ret != NXT_OK) { 2186 2187 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2188 if (nxt_slow_path(ls == NULL)) { 2189 return NULL; 2190 } 2191 2192 skcf->listen = ls; 2193 2194 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2195 nxt_memcpy(ls->sockaddr, sa, size); 2196 2197 nxt_listen_socket_remote_size(ls); 2198 2199 ls->socket = -1; 2200 ls->backlog = NXT_LISTEN_BACKLOG; 2201 ls->flags = NXT_NONBLOCK; 2202 ls->read_after_accept = 1; 2203 } 2204 2205 switch (sa->u.sockaddr.sa_family) { 2206 #if (NXT_HAVE_UNIX_DOMAIN) 2207 case AF_UNIX: 2208 wildcard = 0; 2209 break; 2210 #endif 2211 #if (NXT_INET6) 2212 case AF_INET6: 2213 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2214 break; 2215 #endif 2216 case AF_INET: 2217 default: 2218 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2219 break; 2220 } 2221 2222 if (!wildcard) { 2223 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2224 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2225 return NULL; 2226 } 2227 2228 nxt_memcpy(skcf->sockaddr, sa, size); 2229 } 2230 2231 return skcf; 2232 } 2233 2234 2235 static nxt_int_t 2236 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2237 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2238 { 2239 nxt_router_t *router; 2240 nxt_queue_link_t *qlk; 2241 nxt_socket_conf_t *skcf; 2242 2243 router = tmcf->router_conf->router; 2244 2245 for (qlk = nxt_queue_first(&router->sockets); 2246 qlk != nxt_queue_tail(&router->sockets); 2247 qlk = nxt_queue_next(qlk)) 2248 { 2249 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2250 2251 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2252 nskcf->listen = skcf->listen; 2253 2254 nxt_queue_remove(qlk); 2255 nxt_queue_insert_tail(&keeping_sockets, qlk); 2256 2257 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2258 2259 return NXT_OK; 2260 } 2261 } 2262 2263 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2264 2265 return NXT_DECLINED; 2266 } 2267 2268 2269 static void 2270 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2271 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2272 { 2273 size_t size; 2274 uint32_t stream; 2275 nxt_int_t ret; 2276 nxt_buf_t *b; 2277 nxt_port_t *main_port, *router_port; 2278 nxt_runtime_t *rt; 2279 nxt_socket_rpc_t *rpc; 2280 2281 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2282 if (rpc == NULL) { 2283 goto fail; 2284 } 2285 2286 rpc->socket_conf = skcf; 2287 rpc->temp_conf = tmcf; 2288 2289 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2290 2291 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2292 if (b == NULL) { 2293 goto fail; 2294 } 2295 2296 b->completion_handler = nxt_router_dummy_buf_completion; 2297 2298 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2299 2300 rt = task->thread->runtime; 2301 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2302 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2303 2304 stream = nxt_port_rpc_register_handler(task, router_port, 2305 nxt_router_listen_socket_ready, 2306 nxt_router_listen_socket_error, 2307 main_port->pid, rpc); 2308 if (nxt_slow_path(stream == 0)) { 2309 goto fail; 2310 } 2311 2312 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2313 stream, router_port->id, b); 2314 2315 if (nxt_slow_path(ret != NXT_OK)) { 2316 nxt_port_rpc_cancel(task, router_port, stream); 2317 goto fail; 2318 } 2319 2320 return; 2321 2322 fail: 2323 2324 nxt_router_conf_error(task, tmcf); 2325 } 2326 2327 2328 static void 2329 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2330 void *data) 2331 { 2332 nxt_int_t ret; 2333 nxt_socket_t s; 2334 nxt_socket_rpc_t *rpc; 2335 2336 rpc = data; 2337 2338 s = msg->fd[0]; 2339 2340 ret = nxt_socket_nonblocking(task, s); 2341 if (nxt_slow_path(ret != NXT_OK)) { 2342 goto fail; 2343 } 2344 2345 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2346 2347 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2348 if (nxt_slow_path(ret != NXT_OK)) { 2349 goto fail; 2350 } 2351 2352 rpc->socket_conf->listen->socket = s; 2353 2354 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2355 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2356 2357 return; 2358 2359 fail: 2360 2361 nxt_socket_close(task, s); 2362 2363 nxt_router_conf_error(task, rpc->temp_conf); 2364 } 2365 2366 2367 static void 2368 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2369 void *data) 2370 { 2371 nxt_socket_rpc_t *rpc; 2372 nxt_router_temp_conf_t *tmcf; 2373 2374 rpc = data; 2375 tmcf = rpc->temp_conf; 2376 2377 #if 0 2378 u_char *p; 2379 size_t size; 2380 uint8_t error; 2381 nxt_buf_t *in, *out; 2382 nxt_sockaddr_t *sa; 2383 2384 static nxt_str_t socket_errors[] = { 2385 nxt_string("ListenerSystem"), 2386 nxt_string("ListenerNoIPv6"), 2387 nxt_string("ListenerPort"), 2388 nxt_string("ListenerInUse"), 2389 nxt_string("ListenerNoAddress"), 2390 nxt_string("ListenerNoAccess"), 2391 nxt_string("ListenerPath"), 2392 }; 2393 2394 sa = rpc->socket_conf->listen->sockaddr; 2395 2396 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2397 2398 if (nxt_slow_path(in == NULL)) { 2399 return; 2400 } 2401 2402 p = in->mem.pos; 2403 2404 error = *p++; 2405 2406 size = nxt_length("listen socket error: ") 2407 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2408 + sa->length + socket_errors[error].length + (in->mem.free - p); 2409 2410 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2411 if (nxt_slow_path(out == NULL)) { 2412 return; 2413 } 2414 2415 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2416 "listen socket error: " 2417 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2418 (size_t) sa->length, nxt_sockaddr_start(sa), 2419 &socket_errors[error], in->mem.free - p, p); 2420 2421 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2422 #endif 2423 2424 nxt_router_conf_error(task, tmcf); 2425 } 2426 2427 2428 #if (NXT_TLS) 2429 2430 static void 2431 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2432 nxt_router_tlssock_t *tls, nxt_bool_t last) 2433 { 2434 nxt_socket_rpc_t *rpc; 2435 2436 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2437 if (rpc == NULL) { 2438 nxt_router_conf_error(task, tmcf); 2439 return; 2440 } 2441 2442 rpc->name = &tls->name; 2443 rpc->socket_conf = tls->conf; 2444 rpc->temp_conf = tmcf; 2445 rpc->last = last; 2446 2447 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 2448 nxt_router_tls_rpc_handler, rpc); 2449 } 2450 2451 2452 static void 2453 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2454 void *data) 2455 { 2456 nxt_mp_t *mp; 2457 nxt_int_t ret; 2458 nxt_tls_conf_t *tlscf; 2459 nxt_socket_rpc_t *rpc; 2460 nxt_tls_bundle_conf_t *bundle; 2461 nxt_router_temp_conf_t *tmcf; 2462 2463 nxt_debug(task, "tls rpc handler"); 2464 2465 rpc = data; 2466 tmcf = rpc->temp_conf; 2467 2468 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2469 goto fail; 2470 } 2471 2472 mp = tmcf->router_conf->mem_pool; 2473 2474 if (rpc->socket_conf->tls == NULL){ 2475 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2476 if (nxt_slow_path(tlscf == NULL)) { 2477 goto fail; 2478 } 2479 2480 rpc->socket_conf->tls = tlscf; 2481 2482 } else { 2483 tlscf = rpc->socket_conf->tls; 2484 } 2485 2486 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); 2487 if (nxt_slow_path(bundle == NULL)) { 2488 goto fail; 2489 } 2490 2491 bundle->name = rpc->name; 2492 bundle->chain_file = msg->fd[0]; 2493 bundle->next = tlscf->bundle; 2494 tlscf->bundle = bundle; 2495 2496 ret = task->thread->runtime->tls->server_init(task, tlscf, mp, rpc->last); 2497 if (nxt_slow_path(ret != NXT_OK)) { 2498 goto fail; 2499 } 2500 2501 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2502 nxt_router_conf_apply, task, tmcf, NULL); 2503 return; 2504 2505 fail: 2506 2507 nxt_router_conf_error(task, tmcf); 2508 } 2509 2510 #endif 2511 2512 2513 static void 2514 nxt_router_app_rpc_create(nxt_task_t *task, 2515 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2516 { 2517 size_t size; 2518 uint32_t stream; 2519 nxt_int_t ret; 2520 nxt_buf_t *b; 2521 nxt_port_t *main_port, *router_port; 2522 nxt_runtime_t *rt; 2523 nxt_app_rpc_t *rpc; 2524 2525 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2526 if (rpc == NULL) { 2527 goto fail; 2528 } 2529 2530 rpc->app = app; 2531 rpc->temp_conf = tmcf; 2532 2533 nxt_debug(task, "app '%V' prefork", &app->name); 2534 2535 size = app->name.length + 1 + app->conf.length; 2536 2537 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2538 if (nxt_slow_path(b == NULL)) { 2539 goto fail; 2540 } 2541 2542 b->completion_handler = nxt_router_dummy_buf_completion; 2543 2544 nxt_buf_cpystr(b, &app->name); 2545 *b->mem.free++ = '\0'; 2546 nxt_buf_cpystr(b, &app->conf); 2547 2548 rt = task->thread->runtime; 2549 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2550 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2551 2552 stream = nxt_port_rpc_register_handler(task, router_port, 2553 nxt_router_app_prefork_ready, 2554 nxt_router_app_prefork_error, 2555 -1, rpc); 2556 if (nxt_slow_path(stream == 0)) { 2557 goto fail; 2558 } 2559 2560 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 2561 -1, stream, router_port->id, b); 2562 2563 if (nxt_slow_path(ret != NXT_OK)) { 2564 nxt_port_rpc_cancel(task, router_port, stream); 2565 goto fail; 2566 } 2567 2568 app->pending_processes++; 2569 2570 return; 2571 2572 fail: 2573 2574 nxt_router_conf_error(task, tmcf); 2575 } 2576 2577 2578 static void 2579 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2580 void *data) 2581 { 2582 nxt_app_t *app; 2583 nxt_port_t *port; 2584 nxt_app_rpc_t *rpc; 2585 nxt_event_engine_t *engine; 2586 2587 rpc = data; 2588 app = rpc->app; 2589 2590 port = msg->u.new_port; 2591 2592 nxt_assert(port != NULL); 2593 nxt_assert(port->type == NXT_PROCESS_APP); 2594 nxt_assert(port->id == 0); 2595 2596 port->app = app; 2597 port->main_app_port = port; 2598 2599 app->pending_processes--; 2600 app->processes++; 2601 app->idle_processes++; 2602 2603 engine = task->thread->engine; 2604 2605 nxt_queue_insert_tail(&app->ports, &port->app_link); 2606 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2607 2608 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2609 &app->name, port->pid, port->id); 2610 2611 nxt_port_hash_add(&app->port_hash, port); 2612 app->port_hash_count++; 2613 2614 port->idle_start = 0; 2615 2616 nxt_port_inc_use(port); 2617 2618 nxt_router_app_shared_port_send(task, port); 2619 2620 nxt_work_queue_add(&engine->fast_work_queue, 2621 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2622 } 2623 2624 2625 static void 2626 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2627 void *data) 2628 { 2629 nxt_app_t *app; 2630 nxt_app_rpc_t *rpc; 2631 nxt_router_temp_conf_t *tmcf; 2632 2633 rpc = data; 2634 app = rpc->app; 2635 tmcf = rpc->temp_conf; 2636 2637 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2638 &app->name); 2639 2640 app->pending_processes--; 2641 2642 nxt_router_conf_error(task, tmcf); 2643 } 2644 2645 2646 static nxt_int_t 2647 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2648 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2649 { 2650 nxt_int_t ret; 2651 nxt_uint_t n, threads; 2652 nxt_queue_link_t *qlk; 2653 nxt_router_engine_conf_t *recf; 2654 2655 threads = tmcf->router_conf->threads; 2656 2657 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2658 sizeof(nxt_router_engine_conf_t)); 2659 if (nxt_slow_path(tmcf->engines == NULL)) { 2660 return NXT_ERROR; 2661 } 2662 2663 n = 0; 2664 2665 for (qlk = nxt_queue_first(&router->engines); 2666 qlk != nxt_queue_tail(&router->engines); 2667 qlk = nxt_queue_next(qlk)) 2668 { 2669 recf = nxt_array_zero_add(tmcf->engines); 2670 if (nxt_slow_path(recf == NULL)) { 2671 return NXT_ERROR; 2672 } 2673 2674 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2675 2676 if (n < threads) { 2677 recf->action = NXT_ROUTER_ENGINE_KEEP; 2678 ret = nxt_router_engine_conf_update(tmcf, recf); 2679 2680 } else { 2681 recf->action = NXT_ROUTER_ENGINE_DELETE; 2682 ret = nxt_router_engine_conf_delete(tmcf, recf); 2683 } 2684 2685 if (nxt_slow_path(ret != NXT_OK)) { 2686 return ret; 2687 } 2688 2689 n++; 2690 } 2691 2692 tmcf->new_threads = n; 2693 2694 while (n < threads) { 2695 recf = nxt_array_zero_add(tmcf->engines); 2696 if (nxt_slow_path(recf == NULL)) { 2697 return NXT_ERROR; 2698 } 2699 2700 recf->action = NXT_ROUTER_ENGINE_ADD; 2701 2702 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2703 if (nxt_slow_path(recf->engine == NULL)) { 2704 return NXT_ERROR; 2705 } 2706 2707 ret = nxt_router_engine_conf_create(tmcf, recf); 2708 if (nxt_slow_path(ret != NXT_OK)) { 2709 return ret; 2710 } 2711 2712 n++; 2713 } 2714 2715 return NXT_OK; 2716 } 2717 2718 2719 static nxt_int_t 2720 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2721 nxt_router_engine_conf_t *recf) 2722 { 2723 nxt_int_t ret; 2724 2725 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2726 nxt_router_listen_socket_create); 2727 if (nxt_slow_path(ret != NXT_OK)) { 2728 return ret; 2729 } 2730 2731 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2732 nxt_router_listen_socket_create); 2733 if (nxt_slow_path(ret != NXT_OK)) { 2734 return ret; 2735 } 2736 2737 return ret; 2738 } 2739 2740 2741 static nxt_int_t 2742 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2743 nxt_router_engine_conf_t *recf) 2744 { 2745 nxt_int_t ret; 2746 2747 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2748 nxt_router_listen_socket_create); 2749 if (nxt_slow_path(ret != NXT_OK)) { 2750 return ret; 2751 } 2752 2753 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2754 nxt_router_listen_socket_update); 2755 if (nxt_slow_path(ret != NXT_OK)) { 2756 return ret; 2757 } 2758 2759 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 2760 if (nxt_slow_path(ret != NXT_OK)) { 2761 return ret; 2762 } 2763 2764 return ret; 2765 } 2766 2767 2768 static nxt_int_t 2769 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2770 nxt_router_engine_conf_t *recf) 2771 { 2772 nxt_int_t ret; 2773 2774 ret = nxt_router_engine_quit(tmcf, recf); 2775 if (nxt_slow_path(ret != NXT_OK)) { 2776 return ret; 2777 } 2778 2779 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 2780 if (nxt_slow_path(ret != NXT_OK)) { 2781 return ret; 2782 } 2783 2784 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 2785 } 2786 2787 2788 static nxt_int_t 2789 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 2790 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 2791 nxt_work_handler_t handler) 2792 { 2793 nxt_int_t ret; 2794 nxt_joint_job_t *job; 2795 nxt_queue_link_t *qlk; 2796 nxt_socket_conf_t *skcf; 2797 nxt_socket_conf_joint_t *joint; 2798 2799 for (qlk = nxt_queue_first(sockets); 2800 qlk != nxt_queue_tail(sockets); 2801 qlk = nxt_queue_next(qlk)) 2802 { 2803 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2804 if (nxt_slow_path(job == NULL)) { 2805 return NXT_ERROR; 2806 } 2807 2808 job->work.next = recf->jobs; 2809 recf->jobs = &job->work; 2810 2811 job->task = tmcf->engine->task; 2812 job->work.handler = handler; 2813 job->work.task = &job->task; 2814 job->work.obj = job; 2815 job->tmcf = tmcf; 2816 2817 tmcf->count++; 2818 2819 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 2820 sizeof(nxt_socket_conf_joint_t)); 2821 if (nxt_slow_path(joint == NULL)) { 2822 return NXT_ERROR; 2823 } 2824 2825 job->work.data = joint; 2826 2827 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 2828 if (nxt_slow_path(ret != NXT_OK)) { 2829 return ret; 2830 } 2831 2832 joint->count = 1; 2833 2834 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2835 skcf->count++; 2836 joint->socket_conf = skcf; 2837 2838 joint->engine = recf->engine; 2839 } 2840 2841 return NXT_OK; 2842 } 2843 2844 2845 static nxt_int_t 2846 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 2847 nxt_router_engine_conf_t *recf) 2848 { 2849 nxt_joint_job_t *job; 2850 2851 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2852 if (nxt_slow_path(job == NULL)) { 2853 return NXT_ERROR; 2854 } 2855 2856 job->work.next = recf->jobs; 2857 recf->jobs = &job->work; 2858 2859 job->task = tmcf->engine->task; 2860 job->work.handler = nxt_router_worker_thread_quit; 2861 job->work.task = &job->task; 2862 job->work.obj = NULL; 2863 job->work.data = NULL; 2864 job->tmcf = NULL; 2865 2866 return NXT_OK; 2867 } 2868 2869 2870 static nxt_int_t 2871 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 2872 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 2873 { 2874 nxt_joint_job_t *job; 2875 nxt_queue_link_t *qlk; 2876 2877 for (qlk = nxt_queue_first(sockets); 2878 qlk != nxt_queue_tail(sockets); 2879 qlk = nxt_queue_next(qlk)) 2880 { 2881 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2882 if (nxt_slow_path(job == NULL)) { 2883 return NXT_ERROR; 2884 } 2885 2886 job->work.next = recf->jobs; 2887 recf->jobs = &job->work; 2888 2889 job->task = tmcf->engine->task; 2890 job->work.handler = nxt_router_listen_socket_delete; 2891 job->work.task = &job->task; 2892 job->work.obj = job; 2893 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2894 job->tmcf = tmcf; 2895 2896 tmcf->count++; 2897 } 2898 2899 return NXT_OK; 2900 } 2901 2902 2903 static nxt_int_t 2904 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 2905 nxt_router_temp_conf_t *tmcf) 2906 { 2907 nxt_int_t ret; 2908 nxt_uint_t i, threads; 2909 nxt_router_engine_conf_t *recf; 2910 2911 recf = tmcf->engines->elts; 2912 threads = tmcf->router_conf->threads; 2913 2914 for (i = tmcf->new_threads; i < threads; i++) { 2915 ret = nxt_router_thread_create(task, rt, recf[i].engine); 2916 if (nxt_slow_path(ret != NXT_OK)) { 2917 return ret; 2918 } 2919 } 2920 2921 return NXT_OK; 2922 } 2923 2924 2925 static nxt_int_t 2926 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 2927 nxt_event_engine_t *engine) 2928 { 2929 nxt_int_t ret; 2930 nxt_thread_link_t *link; 2931 nxt_thread_handle_t handle; 2932 2933 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2934 2935 if (nxt_slow_path(link == NULL)) { 2936 return NXT_ERROR; 2937 } 2938 2939 link->start = nxt_router_thread_start; 2940 link->engine = engine; 2941 link->work.handler = nxt_router_thread_exit_handler; 2942 link->work.task = task; 2943 link->work.data = link; 2944 2945 nxt_queue_insert_tail(&rt->engines, &engine->link); 2946 2947 ret = nxt_thread_create(&handle, link); 2948 2949 if (nxt_slow_path(ret != NXT_OK)) { 2950 nxt_queue_remove(&engine->link); 2951 } 2952 2953 return ret; 2954 } 2955 2956 2957 static void 2958 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 2959 nxt_router_temp_conf_t *tmcf) 2960 { 2961 nxt_app_t *app; 2962 2963 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 2964 2965 nxt_router_app_unlink(task, app); 2966 2967 } nxt_queue_loop; 2968 2969 nxt_queue_add(&router->apps, &tmcf->previous); 2970 nxt_queue_add(&router->apps, &tmcf->apps); 2971 } 2972 2973 2974 static void 2975 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 2976 { 2977 nxt_uint_t n; 2978 nxt_event_engine_t *engine; 2979 nxt_router_engine_conf_t *recf; 2980 2981 recf = tmcf->engines->elts; 2982 2983 for (n = tmcf->engines->nelts; n != 0; n--) { 2984 engine = recf->engine; 2985 2986 switch (recf->action) { 2987 2988 case NXT_ROUTER_ENGINE_KEEP: 2989 break; 2990 2991 case NXT_ROUTER_ENGINE_ADD: 2992 nxt_queue_insert_tail(&router->engines, &engine->link0); 2993 break; 2994 2995 case NXT_ROUTER_ENGINE_DELETE: 2996 nxt_queue_remove(&engine->link0); 2997 break; 2998 } 2999 3000 nxt_router_engine_post(engine, recf->jobs); 3001 3002 recf++; 3003 } 3004 } 3005 3006 3007 static void 3008 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 3009 { 3010 nxt_work_t *work, *next; 3011 3012 for (work = jobs; work != NULL; work = next) { 3013 next = work->next; 3014 work->next = NULL; 3015 3016 nxt_event_engine_post(engine, work); 3017 } 3018 } 3019 3020 3021 static nxt_port_handlers_t nxt_router_app_port_handlers = { 3022 .rpc_error = nxt_port_rpc_handler, 3023 .mmap = nxt_port_mmap_handler, 3024 .data = nxt_port_rpc_handler, 3025 .oosm = nxt_router_oosm_handler, 3026 .req_headers_ack = nxt_port_rpc_handler, 3027 }; 3028 3029 3030 static void 3031 nxt_router_thread_start(void *data) 3032 { 3033 nxt_int_t ret; 3034 nxt_port_t *port; 3035 nxt_task_t *task; 3036 nxt_work_t *work; 3037 nxt_thread_t *thread; 3038 nxt_thread_link_t *link; 3039 nxt_event_engine_t *engine; 3040 3041 link = data; 3042 engine = link->engine; 3043 task = &engine->task; 3044 3045 thread = nxt_thread(); 3046 3047 nxt_event_engine_thread_adopt(engine); 3048 3049 /* STUB */ 3050 thread->runtime = engine->task.thread->runtime; 3051 3052 engine->task.thread = thread; 3053 engine->task.log = thread->log; 3054 thread->engine = engine; 3055 thread->task = &engine->task; 3056 #if 0 3057 thread->fiber = &engine->fibers->fiber; 3058 #endif 3059 3060 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 3061 if (nxt_slow_path(engine->mem_pool == NULL)) { 3062 return; 3063 } 3064 3065 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 3066 NXT_PROCESS_ROUTER); 3067 if (nxt_slow_path(port == NULL)) { 3068 return; 3069 } 3070 3071 ret = nxt_port_socket_init(task, port, 0); 3072 if (nxt_slow_path(ret != NXT_OK)) { 3073 nxt_port_use(task, port, -1); 3074 return; 3075 } 3076 3077 ret = nxt_router_port_queue_init(task, port); 3078 if (nxt_slow_path(ret != NXT_OK)) { 3079 nxt_port_use(task, port, -1); 3080 return; 3081 } 3082 3083 engine->port = port; 3084 3085 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 3086 3087 work = nxt_zalloc(sizeof(nxt_work_t)); 3088 if (nxt_slow_path(work == NULL)) { 3089 return; 3090 } 3091 3092 work->handler = nxt_router_rt_add_port; 3093 work->task = link->work.task; 3094 work->obj = work; 3095 work->data = port; 3096 3097 nxt_event_engine_post(link->work.task->thread->engine, work); 3098 3099 nxt_event_engine_start(engine); 3100 } 3101 3102 3103 static void 3104 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) 3105 { 3106 nxt_int_t res; 3107 nxt_port_t *port; 3108 nxt_runtime_t *rt; 3109 3110 rt = task->thread->runtime; 3111 port = data; 3112 3113 nxt_free(obj); 3114 3115 res = nxt_port_hash_add(&rt->ports, port); 3116 3117 if (nxt_fast_path(res == NXT_OK)) { 3118 nxt_port_use(task, port, 1); 3119 } 3120 } 3121 3122 3123 static void 3124 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 3125 { 3126 nxt_joint_job_t *job; 3127 nxt_socket_conf_t *skcf; 3128 nxt_listen_event_t *lev; 3129 nxt_listen_socket_t *ls; 3130 nxt_thread_spinlock_t *lock; 3131 nxt_socket_conf_joint_t *joint; 3132 3133 job = obj; 3134 joint = data; 3135 3136 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 3137 3138 skcf = joint->socket_conf; 3139 ls = skcf->listen; 3140 3141 lev = nxt_listen_event(task, ls); 3142 if (nxt_slow_path(lev == NULL)) { 3143 nxt_router_listen_socket_release(task, skcf); 3144 return; 3145 } 3146 3147 lev->socket.data = joint; 3148 3149 lock = &skcf->router_conf->router->lock; 3150 3151 nxt_thread_spin_lock(lock); 3152 ls->count++; 3153 nxt_thread_spin_unlock(lock); 3154 3155 job->work.next = NULL; 3156 job->work.handler = nxt_router_conf_wait; 3157 3158 nxt_event_engine_post(job->tmcf->engine, &job->work); 3159 } 3160 3161 3162 nxt_inline nxt_listen_event_t * 3163 nxt_router_listen_event(nxt_queue_t *listen_connections, 3164 nxt_socket_conf_t *skcf) 3165 { 3166 nxt_socket_t fd; 3167 nxt_queue_link_t *qlk; 3168 nxt_listen_event_t *lev; 3169 3170 fd = skcf->listen->socket; 3171 3172 for (qlk = nxt_queue_first(listen_connections); 3173 qlk != nxt_queue_tail(listen_connections); 3174 qlk = nxt_queue_next(qlk)) 3175 { 3176 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 3177 3178 if (fd == lev->socket.fd) { 3179 return lev; 3180 } 3181 } 3182 3183 return NULL; 3184 } 3185 3186 3187 static void 3188 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 3189 { 3190 nxt_joint_job_t *job; 3191 nxt_event_engine_t *engine; 3192 nxt_listen_event_t *lev; 3193 nxt_socket_conf_joint_t *joint, *old; 3194 3195 job = obj; 3196 joint = data; 3197 3198 engine = task->thread->engine; 3199 3200 nxt_queue_insert_tail(&engine->joints, &joint->link); 3201 3202 lev = nxt_router_listen_event(&engine->listen_connections, 3203 joint->socket_conf); 3204 3205 old = lev->socket.data; 3206 lev->socket.data = joint; 3207 lev->listen = joint->socket_conf->listen; 3208 3209 job->work.next = NULL; 3210 job->work.handler = nxt_router_conf_wait; 3211 3212 nxt_event_engine_post(job->tmcf->engine, &job->work); 3213 3214 /* 3215 * The task is allocated from configuration temporary 3216 * memory pool so it can be freed after engine post operation. 3217 */ 3218 3219 nxt_router_conf_release(&engine->task, old); 3220 } 3221 3222 3223 static void 3224 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 3225 { 3226 nxt_socket_conf_t *skcf; 3227 nxt_listen_event_t *lev; 3228 nxt_event_engine_t *engine; 3229 nxt_socket_conf_joint_t *joint; 3230 3231 skcf = data; 3232 3233 engine = task->thread->engine; 3234 3235 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 3236 3237 nxt_fd_event_delete(engine, &lev->socket); 3238 3239 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 3240 lev->socket.fd); 3241 3242 joint = lev->socket.data; 3243 joint->close_job = obj; 3244 3245 lev->timer.handler = nxt_router_listen_socket_close; 3246 lev->timer.work_queue = &engine->fast_work_queue; 3247 3248 nxt_timer_add(engine, &lev->timer, 0); 3249 } 3250 3251 3252 static void 3253 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 3254 { 3255 nxt_event_engine_t *engine; 3256 3257 nxt_debug(task, "router worker thread quit"); 3258 3259 engine = task->thread->engine; 3260 3261 engine->shutdown = 1; 3262 3263 if (nxt_queue_is_empty(&engine->joints)) { 3264 nxt_thread_exit(task->thread); 3265 } 3266 } 3267 3268 3269 static void 3270 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3271 { 3272 nxt_timer_t *timer; 3273 nxt_joint_job_t *job; 3274 nxt_listen_event_t *lev; 3275 nxt_socket_conf_joint_t *joint; 3276 3277 timer = obj; 3278 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3279 3280 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3281 lev->socket.fd); 3282 3283 nxt_queue_remove(&lev->link); 3284 3285 joint = lev->socket.data; 3286 lev->socket.data = NULL; 3287 3288 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3289 task = &task->thread->engine->task; 3290 3291 nxt_router_listen_socket_release(task, joint->socket_conf); 3292 3293 job = joint->close_job; 3294 job->work.next = NULL; 3295 job->work.handler = nxt_router_conf_wait; 3296 3297 nxt_event_engine_post(job->tmcf->engine, &job->work); 3298 3299 nxt_router_listen_event_release(task, lev, joint); 3300 } 3301 3302 3303 static void 3304 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3305 { 3306 nxt_listen_socket_t *ls; 3307 nxt_thread_spinlock_t *lock; 3308 3309 ls = skcf->listen; 3310 lock = &skcf->router_conf->router->lock; 3311 3312 nxt_thread_spin_lock(lock); 3313 3314 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3315 task->thread->engine, ls->count); 3316 3317 if (--ls->count != 0) { 3318 ls = NULL; 3319 } 3320 3321 nxt_thread_spin_unlock(lock); 3322 3323 if (ls != NULL) { 3324 nxt_socket_close(task, ls->socket); 3325 nxt_free(ls); 3326 } 3327 } 3328 3329 3330 void 3331 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3332 nxt_socket_conf_joint_t *joint) 3333 { 3334 nxt_event_engine_t *engine; 3335 3336 nxt_debug(task, "listen event count: %D", lev->count); 3337 3338 engine = task->thread->engine; 3339 3340 if (--lev->count == 0) { 3341 if (lev->next != NULL) { 3342 nxt_sockaddr_cache_free(engine, lev->next); 3343 3344 nxt_conn_free(task, lev->next); 3345 } 3346 3347 nxt_free(lev); 3348 } 3349 3350 if (joint != NULL) { 3351 nxt_router_conf_release(task, joint); 3352 } 3353 3354 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3355 nxt_thread_exit(task->thread); 3356 } 3357 } 3358 3359 3360 void 3361 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3362 { 3363 nxt_socket_conf_t *skcf; 3364 nxt_router_conf_t *rtcf; 3365 nxt_thread_spinlock_t *lock; 3366 3367 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3368 3369 if (--joint->count != 0) { 3370 return; 3371 } 3372 3373 nxt_queue_remove(&joint->link); 3374 3375 /* 3376 * The joint content can not be safely used after the critical 3377 * section protected by the spinlock because its memory pool may 3378 * be already destroyed by another thread. 3379 */ 3380 skcf = joint->socket_conf; 3381 rtcf = skcf->router_conf; 3382 lock = &rtcf->router->lock; 3383 3384 nxt_thread_spin_lock(lock); 3385 3386 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3387 rtcf, rtcf->count); 3388 3389 if (--skcf->count != 0) { 3390 skcf = NULL; 3391 rtcf = NULL; 3392 3393 } else { 3394 nxt_queue_remove(&skcf->link); 3395 3396 if (--rtcf->count != 0) { 3397 rtcf = NULL; 3398 } 3399 } 3400 3401 nxt_thread_spin_unlock(lock); 3402 3403 #if (NXT_TLS) 3404 if (skcf != NULL && skcf->tls != NULL) { 3405 task->thread->runtime->tls->server_free(task, skcf->tls); 3406 } 3407 #endif 3408 3409 /* TODO remove engine->port */ 3410 3411 if (rtcf != NULL) { 3412 nxt_debug(task, "old router conf is destroyed"); 3413 3414 nxt_router_apps_hash_use(task, rtcf, -1); 3415 3416 nxt_router_access_log_release(task, lock, rtcf->access_log); 3417 3418 nxt_mp_thread_adopt(rtcf->mem_pool); 3419 3420 nxt_mp_destroy(rtcf->mem_pool); 3421 } 3422 } 3423 3424 3425 static void 3426 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3427 nxt_router_access_log_t *access_log) 3428 { 3429 size_t size; 3430 u_char *buf, *p; 3431 nxt_off_t bytes; 3432 3433 static nxt_time_string_t date_cache = { 3434 (nxt_atomic_uint_t) -1, 3435 nxt_router_access_log_date, 3436 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3437 nxt_length("31/Dec/1986:19:40:00 +0300"), 3438 NXT_THREAD_TIME_LOCAL, 3439 NXT_THREAD_TIME_SEC, 3440 }; 3441 3442 size = r->remote->address_length 3443 + 6 /* ' - - [' */ 3444 + date_cache.size 3445 + 3 /* '] "' */ 3446 + r->method->length 3447 + 1 /* space */ 3448 + r->target.length 3449 + 1 /* space */ 3450 + r->version.length 3451 + 2 /* '" ' */ 3452 + 3 /* status */ 3453 + 1 /* space */ 3454 + NXT_OFF_T_LEN 3455 + 2 /* ' "' */ 3456 + (r->referer != NULL ? r->referer->value_length : 1) 3457 + 3 /* '" "' */ 3458 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3459 + 2 /* '"\n' */ 3460 ; 3461 3462 buf = nxt_mp_nget(r->mem_pool, size); 3463 if (nxt_slow_path(buf == NULL)) { 3464 return; 3465 } 3466 3467 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3468 r->remote->address_length); 3469 3470 p = nxt_cpymem(p, " - - [", 6); 3471 3472 p = nxt_thread_time_string(task->thread, &date_cache, p); 3473 3474 p = nxt_cpymem(p, "] \"", 3); 3475 3476 if (r->method->length != 0) { 3477 p = nxt_cpymem(p, r->method->start, r->method->length); 3478 3479 if (r->target.length != 0) { 3480 *p++ = ' '; 3481 p = nxt_cpymem(p, r->target.start, r->target.length); 3482 3483 if (r->version.length != 0) { 3484 *p++ = ' '; 3485 p = nxt_cpymem(p, r->version.start, r->version.length); 3486 } 3487 } 3488 3489 } else { 3490 *p++ = '-'; 3491 } 3492 3493 p = nxt_cpymem(p, "\" ", 2); 3494 3495 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3496 3497 *p++ = ' '; 3498 3499 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3500 3501 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3502 3503 p = nxt_cpymem(p, " \"", 2); 3504 3505 if (r->referer != NULL) { 3506 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3507 3508 } else { 3509 *p++ = '-'; 3510 } 3511 3512 p = nxt_cpymem(p, "\" \"", 3); 3513 3514 if (r->user_agent != NULL) { 3515 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3516 3517 } else { 3518 *p++ = '-'; 3519 } 3520 3521 p = nxt_cpymem(p, "\"\n", 2); 3522 3523 nxt_fd_write(access_log->fd, buf, p - buf); 3524 } 3525 3526 3527 static u_char * 3528 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3529 size_t size, const char *format) 3530 { 3531 u_char sign; 3532 time_t gmtoff; 3533 3534 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3535 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3536 3537 gmtoff = nxt_timezone(tm) / 60; 3538 3539 if (gmtoff < 0) { 3540 gmtoff = -gmtoff; 3541 sign = '-'; 3542 3543 } else { 3544 sign = '+'; 3545 } 3546 3547 return nxt_sprintf(buf, buf + size, format, 3548 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3549 tm->tm_hour, tm->tm_min, tm->tm_sec, 3550 sign, gmtoff / 60, gmtoff % 60); 3551 } 3552 3553 3554 static void 3555 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3556 { 3557 uint32_t stream; 3558 nxt_int_t ret; 3559 nxt_buf_t *b; 3560 nxt_port_t *main_port, *router_port; 3561 nxt_runtime_t *rt; 3562 nxt_router_access_log_t *access_log; 3563 3564 access_log = tmcf->router_conf->access_log; 3565 3566 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3567 if (nxt_slow_path(b == NULL)) { 3568 goto fail; 3569 } 3570 3571 b->completion_handler = nxt_router_dummy_buf_completion; 3572 3573 nxt_buf_cpystr(b, &access_log->path); 3574 *b->mem.free++ = '\0'; 3575 3576 rt = task->thread->runtime; 3577 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3578 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3579 3580 stream = nxt_port_rpc_register_handler(task, router_port, 3581 nxt_router_access_log_ready, 3582 nxt_router_access_log_error, 3583 -1, tmcf); 3584 if (nxt_slow_path(stream == 0)) { 3585 goto fail; 3586 } 3587 3588 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3589 stream, router_port->id, b); 3590 3591 if (nxt_slow_path(ret != NXT_OK)) { 3592 nxt_port_rpc_cancel(task, router_port, stream); 3593 goto fail; 3594 } 3595 3596 return; 3597 3598 fail: 3599 3600 nxt_router_conf_error(task, tmcf); 3601 } 3602 3603 3604 static void 3605 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3606 void *data) 3607 { 3608 nxt_router_temp_conf_t *tmcf; 3609 nxt_router_access_log_t *access_log; 3610 3611 tmcf = data; 3612 3613 access_log = tmcf->router_conf->access_log; 3614 3615 access_log->fd = msg->fd[0]; 3616 3617 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3618 nxt_router_conf_apply, task, tmcf, NULL); 3619 } 3620 3621 3622 static void 3623 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3624 void *data) 3625 { 3626 nxt_router_temp_conf_t *tmcf; 3627 3628 tmcf = data; 3629 3630 nxt_router_conf_error(task, tmcf); 3631 } 3632 3633 3634 static void 3635 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3636 nxt_router_access_log_t *access_log) 3637 { 3638 if (access_log == NULL) { 3639 return; 3640 } 3641 3642 nxt_thread_spin_lock(lock); 3643 3644 if (--access_log->count != 0) { 3645 access_log = NULL; 3646 } 3647 3648 nxt_thread_spin_unlock(lock); 3649 3650 if (access_log != NULL) { 3651 3652 if (access_log->fd != -1) { 3653 nxt_fd_close(access_log->fd); 3654 } 3655 3656 nxt_free(access_log); 3657 } 3658 } 3659 3660 3661 typedef struct { 3662 nxt_mp_t *mem_pool; 3663 nxt_router_access_log_t *access_log; 3664 } nxt_router_access_log_reopen_t; 3665 3666 3667 static void 3668 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3669 { 3670 nxt_mp_t *mp; 3671 uint32_t stream; 3672 nxt_int_t ret; 3673 nxt_buf_t *b; 3674 nxt_port_t *main_port, *router_port; 3675 nxt_runtime_t *rt; 3676 nxt_router_access_log_t *access_log; 3677 nxt_router_access_log_reopen_t *reopen; 3678 3679 access_log = nxt_router->access_log; 3680 3681 if (access_log == NULL) { 3682 return; 3683 } 3684 3685 mp = nxt_mp_create(1024, 128, 256, 32); 3686 if (nxt_slow_path(mp == NULL)) { 3687 return; 3688 } 3689 3690 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3691 if (nxt_slow_path(reopen == NULL)) { 3692 goto fail; 3693 } 3694 3695 reopen->mem_pool = mp; 3696 reopen->access_log = access_log; 3697 3698 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 3699 if (nxt_slow_path(b == NULL)) { 3700 goto fail; 3701 } 3702 3703 b->completion_handler = nxt_router_access_log_reopen_completion; 3704 3705 nxt_buf_cpystr(b, &access_log->path); 3706 *b->mem.free++ = '\0'; 3707 3708 rt = task->thread->runtime; 3709 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3710 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3711 3712 stream = nxt_port_rpc_register_handler(task, router_port, 3713 nxt_router_access_log_reopen_ready, 3714 nxt_router_access_log_reopen_error, 3715 -1, reopen); 3716 if (nxt_slow_path(stream == 0)) { 3717 goto fail; 3718 } 3719 3720 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3721 stream, router_port->id, b); 3722 3723 if (nxt_slow_path(ret != NXT_OK)) { 3724 nxt_port_rpc_cancel(task, router_port, stream); 3725 goto fail; 3726 } 3727 3728 nxt_mp_retain(mp); 3729 3730 return; 3731 3732 fail: 3733 3734 nxt_mp_destroy(mp); 3735 } 3736 3737 3738 static void 3739 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 3740 { 3741 nxt_mp_t *mp; 3742 nxt_buf_t *b; 3743 3744 b = obj; 3745 mp = b->data; 3746 3747 nxt_mp_release(mp); 3748 } 3749 3750 3751 static void 3752 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3753 void *data) 3754 { 3755 nxt_router_access_log_t *access_log; 3756 nxt_router_access_log_reopen_t *reopen; 3757 3758 reopen = data; 3759 3760 access_log = reopen->access_log; 3761 3762 if (access_log == nxt_router->access_log) { 3763 3764 if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { 3765 nxt_alert(task, "dup2(%FD, %FD) failed %E", 3766 msg->fd[0], access_log->fd, nxt_errno); 3767 } 3768 } 3769 3770 nxt_fd_close(msg->fd[0]); 3771 nxt_mp_release(reopen->mem_pool); 3772 } 3773 3774 3775 static void 3776 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3777 void *data) 3778 { 3779 nxt_router_access_log_reopen_t *reopen; 3780 3781 reopen = data; 3782 3783 nxt_mp_release(reopen->mem_pool); 3784 } 3785 3786 3787 static void 3788 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 3789 { 3790 nxt_port_t *port; 3791 nxt_thread_link_t *link; 3792 nxt_event_engine_t *engine; 3793 nxt_thread_handle_t handle; 3794 3795 handle = (nxt_thread_handle_t) (uintptr_t) obj; 3796 link = data; 3797 3798 nxt_thread_wait(handle); 3799 3800 engine = link->engine; 3801 3802 nxt_queue_remove(&engine->link); 3803 3804 port = engine->port; 3805 3806 // TODO notify all apps 3807 3808 port->engine = task->thread->engine; 3809 nxt_mp_thread_adopt(port->mem_pool); 3810 nxt_port_use(task, port, -1); 3811 3812 nxt_mp_thread_adopt(engine->mem_pool); 3813 nxt_mp_destroy(engine->mem_pool); 3814 3815 nxt_event_engine_free(engine); 3816 3817 nxt_free(link); 3818 } 3819 3820 3821 static void 3822 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3823 void *data) 3824 { 3825 size_t b_size, count; 3826 nxt_int_t ret; 3827 nxt_app_t *app; 3828 nxt_buf_t *b, *next; 3829 nxt_port_t *app_port; 3830 nxt_unit_field_t *f; 3831 nxt_http_field_t *field; 3832 nxt_http_request_t *r; 3833 nxt_unit_response_t *resp; 3834 nxt_request_rpc_data_t *req_rpc_data; 3835 3836 req_rpc_data = data; 3837 3838 r = req_rpc_data->request; 3839 if (nxt_slow_path(r == NULL)) { 3840 return; 3841 } 3842 3843 if (r->error) { 3844 nxt_request_rpc_data_unlink(task, req_rpc_data); 3845 return; 3846 } 3847 3848 app = req_rpc_data->app; 3849 nxt_assert(app != NULL); 3850 3851 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { 3852 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); 3853 3854 return; 3855 } 3856 3857 b = (msg->size == 0) ? NULL : msg->buf; 3858 3859 if (msg->port_msg.last != 0) { 3860 nxt_debug(task, "router data create last buf"); 3861 3862 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 3863 3864 req_rpc_data->rpc_cancel = 0; 3865 3866 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { 3867 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; 3868 } 3869 3870 nxt_request_rpc_data_unlink(task, req_rpc_data); 3871 3872 } else { 3873 if (app->timeout != 0) { 3874 r->timer.handler = nxt_router_app_timeout; 3875 r->timer_data = req_rpc_data; 3876 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 3877 } 3878 } 3879 3880 if (b == NULL) { 3881 return; 3882 } 3883 3884 if (msg->buf == b) { 3885 /* Disable instant buffer completion/re-using by port. */ 3886 msg->buf = NULL; 3887 } 3888 3889 if (r->header_sent) { 3890 nxt_buf_chain_add(&r->out, b); 3891 nxt_http_request_send_body(task, r, NULL); 3892 3893 } else { 3894 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0; 3895 3896 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) { 3897 nxt_alert(task, "response buffer too small: %z", b_size); 3898 goto fail; 3899 } 3900 3901 resp = (void *) b->mem.pos; 3902 count = (b_size - sizeof(nxt_unit_response_t)) 3903 / sizeof(nxt_unit_field_t); 3904 3905 if (nxt_slow_path(count < resp->fields_count)) { 3906 nxt_alert(task, "response buffer too small for fields count: %D", 3907 resp->fields_count); 3908 goto fail; 3909 } 3910 3911 field = NULL; 3912 3913 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 3914 if (f->skip) { 3915 continue; 3916 } 3917 3918 field = nxt_list_add(r->resp.fields); 3919 3920 if (nxt_slow_path(field == NULL)) { 3921 goto fail; 3922 } 3923 3924 field->hash = f->hash; 3925 field->skip = 0; 3926 field->hopbyhop = 0; 3927 3928 field->name_length = f->name_length; 3929 field->value_length = f->value_length; 3930 field->name = nxt_unit_sptr_get(&f->name); 3931 field->value = nxt_unit_sptr_get(&f->value); 3932 3933 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 3934 if (nxt_slow_path(ret != NXT_OK)) { 3935 goto fail; 3936 } 3937 3938 nxt_debug(task, "header%s: %*s: %*s", 3939 (field->skip ? " skipped" : ""), 3940 (size_t) field->name_length, field->name, 3941 (size_t) field->value_length, field->value); 3942 3943 if (field->skip) { 3944 r->resp.fields->last->nelts--; 3945 } 3946 } 3947 3948 r->status = resp->status; 3949 3950 if (resp->piggyback_content_length != 0) { 3951 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 3952 b->mem.free = b->mem.pos + resp->piggyback_content_length; 3953 3954 } else { 3955 b->mem.pos = b->mem.free; 3956 } 3957 3958 if (nxt_buf_mem_used_size(&b->mem) == 0) { 3959 next = b->next; 3960 b->next = NULL; 3961 3962 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3963 b->completion_handler, task, b, b->parent); 3964 3965 b = next; 3966 } 3967 3968 if (b != NULL) { 3969 nxt_buf_chain_add(&r->out, b); 3970 } 3971 3972 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 3973 3974 if (r->websocket_handshake 3975 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 3976 { 3977 app_port = req_rpc_data->app_port; 3978 if (nxt_slow_path(app_port == NULL)) { 3979 goto fail; 3980 } 3981 3982 nxt_thread_mutex_lock(&app->mutex); 3983 3984 app_port->main_app_port->active_websockets++; 3985 3986 nxt_thread_mutex_unlock(&app->mutex); 3987 3988 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); 3989 req_rpc_data->apr_action = NXT_APR_CLOSE; 3990 3991 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 3992 3993 r->state = &nxt_http_websocket; 3994 3995 } else { 3996 r->state = &nxt_http_request_send_state; 3997 } 3998 } 3999 4000 return; 4001 4002 fail: 4003 4004 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4005 4006 nxt_request_rpc_data_unlink(task, req_rpc_data); 4007 } 4008 4009 4010 static void 4011 nxt_router_req_headers_ack_handler(nxt_task_t *task, 4012 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 4013 { 4014 int res; 4015 nxt_app_t *app; 4016 nxt_buf_t *b; 4017 nxt_bool_t start_process, unlinked; 4018 nxt_port_t *app_port, *main_app_port, *idle_port; 4019 nxt_queue_link_t *idle_lnk; 4020 nxt_http_request_t *r; 4021 4022 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 4023 req_rpc_data->stream, 4024 msg->port_msg.pid, msg->port_msg.reply_port); 4025 4026 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 4027 msg->port_msg.pid); 4028 4029 app = req_rpc_data->app; 4030 r = req_rpc_data->request; 4031 4032 start_process = 0; 4033 unlinked = 0; 4034 4035 nxt_thread_mutex_lock(&app->mutex); 4036 4037 if (r->app_link.next != NULL) { 4038 nxt_queue_remove(&r->app_link); 4039 r->app_link.next = NULL; 4040 4041 unlinked = 1; 4042 } 4043 4044 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 4045 msg->port_msg.reply_port); 4046 if (nxt_slow_path(app_port == NULL)) { 4047 nxt_thread_mutex_unlock(&app->mutex); 4048 4049 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4050 4051 if (unlinked) { 4052 nxt_mp_release(r->mem_pool); 4053 } 4054 4055 return; 4056 } 4057 4058 main_app_port = app_port->main_app_port; 4059 4060 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 4061 app->idle_processes--; 4062 4063 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", 4064 &app->name, main_app_port->pid, main_app_port->id, 4065 (main_app_port->idle_start ? "idle_ports" : "spare_ports")); 4066 4067 /* Check port was in 'spare_ports' using idle_start field. */ 4068 if (main_app_port->idle_start == 0 4069 && app->idle_processes >= app->spare_processes) 4070 { 4071 /* 4072 * If there is a vacant space in spare ports, 4073 * move the last idle to spare_ports. 4074 */ 4075 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4076 4077 idle_lnk = nxt_queue_last(&app->idle_ports); 4078 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4079 nxt_queue_remove(idle_lnk); 4080 4081 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4082 4083 idle_port->idle_start = 0; 4084 4085 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4086 "to spare_ports", 4087 &app->name, idle_port->pid, idle_port->id); 4088 } 4089 4090 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4091 app->pending_processes++; 4092 start_process = 1; 4093 } 4094 } 4095 4096 main_app_port->active_requests++; 4097 4098 nxt_port_inc_use(app_port); 4099 4100 nxt_thread_mutex_unlock(&app->mutex); 4101 4102 if (unlinked) { 4103 nxt_mp_release(r->mem_pool); 4104 } 4105 4106 if (start_process) { 4107 nxt_router_start_app_process(task, app); 4108 } 4109 4110 nxt_port_use(task, req_rpc_data->app_port, -1); 4111 4112 req_rpc_data->app_port = app_port; 4113 4114 b = req_rpc_data->msg_info.buf; 4115 4116 if (b != NULL) { 4117 /* First buffer is already sent. Start from second. */ 4118 b = b->next; 4119 4120 req_rpc_data->msg_info.buf->next = NULL; 4121 } 4122 4123 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) { 4124 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, 4125 req_rpc_data->msg_info.body_fd); 4126 4127 if (req_rpc_data->msg_info.body_fd != -1) { 4128 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 4129 } 4130 4131 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 4132 req_rpc_data->msg_info.body_fd, 4133 req_rpc_data->stream, 4134 task->thread->engine->port->id, b); 4135 4136 if (nxt_slow_path(res != NXT_OK)) { 4137 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4138 } 4139 } 4140 4141 if (app->timeout != 0) { 4142 r->timer.handler = nxt_router_app_timeout; 4143 r->timer_data = req_rpc_data; 4144 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4145 } 4146 } 4147 4148 4149 static const nxt_http_request_state_t nxt_http_request_send_state 4150 nxt_aligned(64) = 4151 { 4152 .error_handler = nxt_http_request_error_handler, 4153 }; 4154 4155 4156 static void 4157 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 4158 { 4159 nxt_buf_t *out; 4160 nxt_http_request_t *r; 4161 4162 r = obj; 4163 4164 out = r->out; 4165 4166 if (out != NULL) { 4167 r->out = NULL; 4168 nxt_http_request_send(task, r, out); 4169 } 4170 } 4171 4172 4173 static void 4174 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4175 void *data) 4176 { 4177 nxt_request_rpc_data_t *req_rpc_data; 4178 4179 req_rpc_data = data; 4180 4181 req_rpc_data->rpc_cancel = 0; 4182 4183 /* TODO cancel message and return if cancelled. */ 4184 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 4185 4186 if (req_rpc_data->request != NULL) { 4187 nxt_http_request_error(task, req_rpc_data->request, 4188 NXT_HTTP_SERVICE_UNAVAILABLE); 4189 } 4190 4191 nxt_request_rpc_data_unlink(task, req_rpc_data); 4192 } 4193 4194 4195 static void 4196 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4197 void *data) 4198 { 4199 nxt_app_t *app; 4200 nxt_port_t *port; 4201 nxt_app_joint_t *app_joint; 4202 4203 app_joint = data; 4204 port = msg->u.new_port; 4205 4206 nxt_assert(app_joint != NULL); 4207 nxt_assert(port != NULL); 4208 nxt_assert(port->type == NXT_PROCESS_APP); 4209 nxt_assert(port->id == 0); 4210 4211 app = app_joint->app; 4212 4213 nxt_router_app_joint_use(task, app_joint, -1); 4214 4215 if (nxt_slow_path(app == NULL)) { 4216 nxt_debug(task, "new port ready for released app, send QUIT"); 4217 4218 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4219 4220 return; 4221 } 4222 4223 port->app = app; 4224 port->main_app_port = port; 4225 4226 nxt_thread_mutex_lock(&app->mutex); 4227 4228 nxt_assert(app->pending_processes != 0); 4229 4230 app->pending_processes--; 4231 app->processes++; 4232 nxt_port_hash_add(&app->port_hash, port); 4233 app->port_hash_count++; 4234 4235 nxt_thread_mutex_unlock(&app->mutex); 4236 4237 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 4238 &app->name, port->pid, app->processes, app->pending_processes); 4239 4240 nxt_router_app_shared_port_send(task, port); 4241 4242 nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); 4243 } 4244 4245 4246 static nxt_int_t 4247 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) 4248 { 4249 nxt_buf_t *b; 4250 nxt_port_t *port; 4251 nxt_port_msg_new_port_t *msg; 4252 4253 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 4254 sizeof(nxt_port_data_t)); 4255 if (nxt_slow_path(b == NULL)) { 4256 return NXT_ERROR; 4257 } 4258 4259 port = app_port->app->shared_port; 4260 4261 nxt_debug(task, "send port %FD to process %PI", 4262 port->pair[0], app_port->pid); 4263 4264 b->mem.free += sizeof(nxt_port_msg_new_port_t); 4265 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 4266 4267 msg->id = port->id; 4268 msg->pid = port->pid; 4269 msg->max_size = port->max_size; 4270 msg->max_share = port->max_share; 4271 msg->type = port->type; 4272 4273 return nxt_port_socket_write2(task, app_port, 4274 NXT_PORT_MSG_NEW_PORT, 4275 port->pair[0], port->queue_fd, 4276 0, 0, b); 4277 } 4278 4279 4280 static void 4281 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4282 void *data) 4283 { 4284 nxt_app_t *app; 4285 nxt_app_joint_t *app_joint; 4286 nxt_queue_link_t *link; 4287 nxt_http_request_t *r; 4288 4289 app_joint = data; 4290 4291 nxt_assert(app_joint != NULL); 4292 4293 app = app_joint->app; 4294 4295 nxt_router_app_joint_use(task, app_joint, -1); 4296 4297 if (nxt_slow_path(app == NULL)) { 4298 nxt_debug(task, "start error for released app"); 4299 4300 return; 4301 } 4302 4303 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4304 4305 link = NULL; 4306 4307 nxt_thread_mutex_lock(&app->mutex); 4308 4309 nxt_assert(app->pending_processes != 0); 4310 4311 app->pending_processes--; 4312 4313 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4314 link = nxt_queue_first(&app->ack_waiting_req); 4315 4316 nxt_queue_remove(link); 4317 link->next = NULL; 4318 } 4319 4320 nxt_thread_mutex_unlock(&app->mutex); 4321 4322 while (link != NULL) { 4323 r = nxt_container_of(link, nxt_http_request_t, app_link); 4324 4325 nxt_event_engine_post(r->engine, &r->err_work); 4326 4327 link = NULL; 4328 4329 nxt_thread_mutex_lock(&app->mutex); 4330 4331 if (app->processes == 0 && app->pending_processes == 0 4332 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4333 { 4334 link = nxt_queue_first(&app->ack_waiting_req); 4335 4336 nxt_queue_remove(link); 4337 link->next = NULL; 4338 } 4339 4340 nxt_thread_mutex_unlock(&app->mutex); 4341 } 4342 } 4343 4344 4345 4346 nxt_inline nxt_port_t * 4347 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) 4348 { 4349 nxt_port_t *port; 4350 4351 port = NULL; 4352 4353 nxt_thread_mutex_lock(&app->mutex); 4354 4355 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 4356 4357 /* Caller is responsible to decrease port use count. */ 4358 nxt_queue_chk_remove(&port->app_link); 4359 4360 if (nxt_queue_chk_remove(&port->idle_link)) { 4361 app->idle_processes--; 4362 4363 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", 4364 &app->name, port->pid, port->id, 4365 (port->idle_start ? "idle_ports" : "spare_ports")); 4366 } 4367 4368 nxt_port_hash_remove(&app->port_hash, port); 4369 app->port_hash_count--; 4370 4371 port->app = NULL; 4372 app->processes--; 4373 4374 break; 4375 4376 } nxt_queue_loop; 4377 4378 nxt_thread_mutex_unlock(&app->mutex); 4379 4380 return port; 4381 } 4382 4383 4384 static void 4385 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4386 { 4387 int c; 4388 4389 c = nxt_atomic_fetch_add(&app->use_count, i); 4390 4391 if (i < 0 && c == -i) { 4392 4393 if (task->thread->engine != app->engine) { 4394 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 4395 4396 } else { 4397 nxt_router_free_app(task, app->joint, NULL); 4398 } 4399 } 4400 } 4401 4402 4403 static void 4404 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 4405 { 4406 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 4407 4408 nxt_queue_remove(&app->link); 4409 4410 nxt_router_app_use(task, app, -1); 4411 } 4412 4413 4414 static void 4415 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 4416 nxt_apr_action_t action) 4417 { 4418 int inc_use; 4419 uint32_t got_response, dec_requests; 4420 nxt_app_t *app; 4421 nxt_bool_t port_unchained, send_quit, adjust_idle_timer; 4422 nxt_port_t *main_app_port; 4423 4424 nxt_assert(port != NULL); 4425 nxt_assert(port->app != NULL); 4426 4427 app = port->app; 4428 4429 inc_use = 0; 4430 got_response = 0; 4431 dec_requests = 0; 4432 4433 switch (action) { 4434 case NXT_APR_NEW_PORT: 4435 break; 4436 case NXT_APR_REQUEST_FAILED: 4437 dec_requests = 1; 4438 inc_use = -1; 4439 break; 4440 case NXT_APR_GOT_RESPONSE: 4441 got_response = 1; 4442 inc_use = -1; 4443 break; 4444 case NXT_APR_UPGRADE: 4445 got_response = 1; 4446 break; 4447 case NXT_APR_CLOSE: 4448 inc_use = -1; 4449 break; 4450 } 4451 4452 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, 4453 port->pid, port->id, 4454 (int) inc_use, (int) got_response); 4455 4456 if (port == app->shared_port) { 4457 nxt_thread_mutex_lock(&app->mutex); 4458 4459 app->active_requests -= got_response + dec_requests; 4460 4461 nxt_thread_mutex_unlock(&app->mutex); 4462 4463 goto adjust_use; 4464 } 4465 4466 main_app_port = port->main_app_port; 4467 4468 nxt_thread_mutex_lock(&app->mutex); 4469 4470 main_app_port->app_responses += got_response; 4471 main_app_port->active_requests -= got_response + dec_requests; 4472 app->active_requests -= got_response + dec_requests; 4473 4474 if (main_app_port->pair[1] != -1 4475 && (app->max_requests == 0 4476 || main_app_port->app_responses < app->max_requests)) 4477 { 4478 if (main_app_port->app_link.next == NULL) { 4479 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4480 4481 nxt_port_inc_use(main_app_port); 4482 } 4483 } 4484 4485 send_quit = (app->max_requests > 0 4486 && main_app_port->app_responses >= app->max_requests); 4487 4488 if (send_quit) { 4489 port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); 4490 4491 nxt_port_hash_remove(&app->port_hash, main_app_port); 4492 app->port_hash_count--; 4493 4494 main_app_port->app = NULL; 4495 app->processes--; 4496 4497 } else { 4498 port_unchained = 0; 4499 } 4500 4501 adjust_idle_timer = 0; 4502 4503 if (main_app_port->pair[1] != -1 && !send_quit 4504 && main_app_port->active_requests == 0 4505 && main_app_port->active_websockets == 0 4506 && main_app_port->idle_link.next == NULL) 4507 { 4508 if (app->idle_processes == app->spare_processes 4509 && app->adjust_idle_work.data == NULL) 4510 { 4511 adjust_idle_timer = 1; 4512 app->adjust_idle_work.data = app; 4513 app->adjust_idle_work.next = NULL; 4514 } 4515 4516 if (app->idle_processes < app->spare_processes) { 4517 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4518 4519 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", 4520 &app->name, main_app_port->pid, main_app_port->id); 4521 } else { 4522 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4523 4524 main_app_port->idle_start = task->thread->engine->timers.now; 4525 4526 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", 4527 &app->name, main_app_port->pid, main_app_port->id); 4528 } 4529 4530 app->idle_processes++; 4531 } 4532 4533 nxt_thread_mutex_unlock(&app->mutex); 4534 4535 if (adjust_idle_timer) { 4536 nxt_router_app_use(task, app, 1); 4537 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4538 } 4539 4540 /* ? */ 4541 if (main_app_port->pair[1] == -1) { 4542 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4543 &app->name, app, main_app_port, main_app_port->pid); 4544 4545 goto adjust_use; 4546 } 4547 4548 if (send_quit) { 4549 nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); 4550 4551 nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, 4552 NULL); 4553 4554 if (port_unchained) { 4555 nxt_port_use(task, main_app_port, -1); 4556 } 4557 4558 goto adjust_use; 4559 } 4560 4561 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4562 &app->name, app); 4563 4564 adjust_use: 4565 4566 nxt_port_use(task, port, inc_use); 4567 } 4568 4569 4570 void 4571 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4572 { 4573 nxt_app_t *app; 4574 nxt_bool_t unchain, start_process; 4575 nxt_port_t *idle_port; 4576 nxt_queue_link_t *idle_lnk; 4577 4578 app = port->app; 4579 4580 nxt_assert(app != NULL); 4581 4582 nxt_thread_mutex_lock(&app->mutex); 4583 4584 nxt_port_hash_remove(&app->port_hash, port); 4585 app->port_hash_count--; 4586 4587 if (port->id != 0) { 4588 nxt_thread_mutex_unlock(&app->mutex); 4589 4590 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, 4591 port->pid, port->id); 4592 4593 return; 4594 } 4595 4596 unchain = nxt_queue_chk_remove(&port->app_link); 4597 4598 if (nxt_queue_chk_remove(&port->idle_link)) { 4599 app->idle_processes--; 4600 4601 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", 4602 &app->name, port->pid, port->id, 4603 (port->idle_start ? "idle_ports" : "spare_ports")); 4604 4605 if (port->idle_start == 0 4606 && app->idle_processes >= app->spare_processes) 4607 { 4608 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4609 4610 idle_lnk = nxt_queue_last(&app->idle_ports); 4611 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4612 nxt_queue_remove(idle_lnk); 4613 4614 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4615 4616 idle_port->idle_start = 0; 4617 4618 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4619 "to spare_ports", 4620 &app->name, idle_port->pid, idle_port->id); 4621 } 4622 } 4623 4624 app->processes--; 4625 4626 start_process = !task->thread->engine->shutdown 4627 && nxt_router_app_can_start(app) 4628 && nxt_router_app_need_start(app); 4629 4630 if (start_process) { 4631 app->pending_processes++; 4632 } 4633 4634 nxt_thread_mutex_unlock(&app->mutex); 4635 4636 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 4637 4638 if (unchain) { 4639 nxt_port_use(task, port, -1); 4640 } 4641 4642 if (start_process) { 4643 nxt_router_start_app_process(task, app); 4644 } 4645 } 4646 4647 4648 static void 4649 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 4650 { 4651 nxt_app_t *app; 4652 nxt_bool_t queued; 4653 nxt_port_t *port; 4654 nxt_msec_t timeout, threshold; 4655 nxt_queue_link_t *lnk; 4656 nxt_event_engine_t *engine; 4657 4658 app = obj; 4659 queued = (data == app); 4660 4661 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 4662 &app->name, queued); 4663 4664 engine = task->thread->engine; 4665 4666 nxt_assert(app->engine == engine); 4667 4668 threshold = engine->timers.now + app->joint->idle_timer.bias; 4669 timeout = 0; 4670 4671 nxt_thread_mutex_lock(&app->mutex); 4672 4673 if (queued) { 4674 app->adjust_idle_work.data = NULL; 4675 } 4676 4677 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", 4678 &app->name, 4679 (int) app->idle_processes, (int) app->spare_processes); 4680 4681 while (app->idle_processes > app->spare_processes) { 4682 4683 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4684 4685 lnk = nxt_queue_first(&app->idle_ports); 4686 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 4687 4688 timeout = port->idle_start + app->idle_timeout; 4689 4690 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", 4691 &app->name, port->pid, 4692 port->idle_start, timeout, threshold); 4693 4694 if (timeout > threshold) { 4695 break; 4696 } 4697 4698 nxt_queue_remove(lnk); 4699 lnk->next = NULL; 4700 4701 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", 4702 &app->name, port->pid, port->id); 4703 4704 nxt_queue_chk_remove(&port->app_link); 4705 4706 nxt_port_hash_remove(&app->port_hash, port); 4707 app->port_hash_count--; 4708 4709 app->idle_processes--; 4710 app->processes--; 4711 port->app = NULL; 4712 4713 nxt_thread_mutex_unlock(&app->mutex); 4714 4715 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 4716 &app->name, port->pid); 4717 4718 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4719 4720 nxt_port_use(task, port, -1); 4721 4722 nxt_thread_mutex_lock(&app->mutex); 4723 } 4724 4725 nxt_thread_mutex_unlock(&app->mutex); 4726 4727 if (timeout > threshold) { 4728 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 4729 4730 } else { 4731 nxt_timer_disable(engine, &app->joint->idle_timer); 4732 } 4733 4734 if (queued) { 4735 nxt_router_app_use(task, app, -1); 4736 } 4737 } 4738 4739 4740 static void 4741 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 4742 { 4743 nxt_timer_t *timer; 4744 nxt_app_joint_t *app_joint; 4745 4746 timer = obj; 4747 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4748 4749 if (nxt_fast_path(app_joint->app != NULL)) { 4750 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 4751 } 4752 } 4753 4754 4755 static void 4756 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 4757 { 4758 nxt_timer_t *timer; 4759 nxt_app_joint_t *app_joint; 4760 4761 timer = obj; 4762 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4763 4764 nxt_router_app_joint_use(task, app_joint, -1); 4765 } 4766 4767 4768 static void 4769 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 4770 { 4771 nxt_app_t *app; 4772 nxt_port_t *port; 4773 nxt_app_joint_t *app_joint; 4774 4775 app_joint = obj; 4776 app = app_joint->app; 4777 4778 for ( ;; ) { 4779 port = nxt_router_app_get_port_for_quit(task, app); 4780 if (port == NULL) { 4781 break; 4782 } 4783 4784 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); 4785 4786 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4787 4788 nxt_port_use(task, port, -1); 4789 } 4790 4791 nxt_thread_mutex_lock(&app->mutex); 4792 4793 for ( ;; ) { 4794 port = nxt_port_hash_retrieve(&app->port_hash); 4795 if (port == NULL) { 4796 break; 4797 } 4798 4799 app->port_hash_count--; 4800 4801 port->app = NULL; 4802 4803 nxt_port_close(task, port); 4804 4805 nxt_port_use(task, port, -1); 4806 } 4807 4808 nxt_thread_mutex_unlock(&app->mutex); 4809 4810 nxt_assert(app->processes == 0); 4811 nxt_assert(app->active_requests == 0); 4812 nxt_assert(app->port_hash_count == 0); 4813 nxt_assert(app->idle_processes == 0); 4814 nxt_assert(nxt_queue_is_empty(&app->ports)); 4815 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 4816 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 4817 4818 nxt_port_mmaps_destroy(&app->outgoing, 1); 4819 4820 nxt_thread_mutex_destroy(&app->outgoing.mutex); 4821 4822 if (app->shared_port != NULL) { 4823 app->shared_port->app = NULL; 4824 nxt_port_close(task, app->shared_port); 4825 nxt_port_use(task, app->shared_port, -1); 4826 } 4827 4828 nxt_thread_mutex_destroy(&app->mutex); 4829 nxt_mp_destroy(app->mem_pool); 4830 4831 app_joint->app = NULL; 4832 4833 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 4834 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 4835 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 4836 4837 } else { 4838 nxt_router_app_joint_use(task, app_joint, -1); 4839 } 4840 } 4841 4842 4843 static void 4844 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 4845 nxt_request_rpc_data_t *req_rpc_data) 4846 { 4847 nxt_bool_t start_process; 4848 nxt_port_t *port; 4849 nxt_http_request_t *r; 4850 4851 start_process = 0; 4852 4853 nxt_thread_mutex_lock(&app->mutex); 4854 4855 port = app->shared_port; 4856 nxt_port_inc_use(port); 4857 4858 app->active_requests++; 4859 4860 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4861 app->pending_processes++; 4862 start_process = 1; 4863 } 4864 4865 r = req_rpc_data->request; 4866 4867 /* 4868 * Put request into application-wide list to be able to cancel request 4869 * if something goes wrong with application processes. 4870 */ 4871 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 4872 4873 nxt_thread_mutex_unlock(&app->mutex); 4874 4875 /* 4876 * Retain request memory pool while request is linked in ack_waiting_req 4877 * to guarantee request structure memory is accessble. 4878 */ 4879 nxt_mp_retain(r->mem_pool); 4880 4881 req_rpc_data->app_port = port; 4882 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 4883 4884 if (start_process) { 4885 nxt_router_start_app_process(task, app); 4886 } 4887 } 4888 4889 4890 void 4891 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, 4892 nxt_app_t *app) 4893 { 4894 nxt_event_engine_t *engine; 4895 nxt_request_rpc_data_t *req_rpc_data; 4896 4897 engine = task->thread->engine; 4898 4899 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, 4900 nxt_router_response_ready_handler, 4901 nxt_router_response_error_handler, 4902 sizeof(nxt_request_rpc_data_t)); 4903 if (nxt_slow_path(req_rpc_data == NULL)) { 4904 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4905 return; 4906 } 4907 4908 /* 4909 * At this point we have request req_rpc_data allocated and registered 4910 * in port handlers. Need to fixup request memory pool. Counterpart 4911 * release will be called via following call chain: 4912 * nxt_request_rpc_data_unlink() -> 4913 * nxt_router_http_request_release_post() -> 4914 * nxt_router_http_request_release() 4915 */ 4916 nxt_mp_retain(r->mem_pool); 4917 4918 r->timer.task = &engine->task; 4919 r->timer.work_queue = &engine->fast_work_queue; 4920 r->timer.log = engine->task.log; 4921 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 4922 4923 r->engine = engine; 4924 r->err_work.handler = nxt_router_http_request_error; 4925 r->err_work.task = task; 4926 r->err_work.obj = r; 4927 4928 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 4929 req_rpc_data->app = app; 4930 req_rpc_data->msg_info.body_fd = -1; 4931 req_rpc_data->rpc_cancel = 1; 4932 4933 nxt_router_app_use(task, app, 1); 4934 4935 req_rpc_data->request = r; 4936 r->req_rpc_data = req_rpc_data; 4937 4938 if (r->last != NULL) { 4939 r->last->completion_handler = nxt_router_http_request_done; 4940 } 4941 4942 nxt_router_app_port_get(task, app, req_rpc_data); 4943 nxt_router_app_prepare_request(task, req_rpc_data); 4944 } 4945 4946 4947 static void 4948 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 4949 { 4950 nxt_http_request_t *r; 4951 4952 r = obj; 4953 4954 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 4955 4956 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4957 4958 if (r->req_rpc_data != NULL) { 4959 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4960 } 4961 4962 nxt_mp_release(r->mem_pool); 4963 } 4964 4965 4966 static void 4967 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 4968 { 4969 nxt_http_request_t *r; 4970 4971 r = data; 4972 4973 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 4974 4975 if (r->req_rpc_data != NULL) { 4976 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4977 } 4978 4979 nxt_http_request_close_handler(task, r, r->proto.any); 4980 } 4981 4982 4983 static void 4984 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) 4985 { 4986 } 4987 4988 4989 static void 4990 nxt_router_app_prepare_request(nxt_task_t *task, 4991 nxt_request_rpc_data_t *req_rpc_data) 4992 { 4993 nxt_app_t *app; 4994 nxt_buf_t *buf, *body; 4995 nxt_int_t res; 4996 nxt_port_t *port, *reply_port; 4997 4998 int notify; 4999 struct { 5000 nxt_port_msg_t pm; 5001 nxt_port_mmap_msg_t mm; 5002 } msg; 5003 5004 5005 app = req_rpc_data->app; 5006 5007 nxt_assert(app != NULL); 5008 5009 port = req_rpc_data->app_port; 5010 5011 nxt_assert(port != NULL); 5012 nxt_assert(port->queue != NULL); 5013 5014 reply_port = task->thread->engine->port; 5015 5016 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, 5017 nxt_app_msg_prefix[app->type]); 5018 if (nxt_slow_path(buf == NULL)) { 5019 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", 5020 req_rpc_data->stream, &app->name); 5021 5022 nxt_http_request_error(task, req_rpc_data->request, 5023 NXT_HTTP_INTERNAL_SERVER_ERROR); 5024 5025 return; 5026 } 5027 5028 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 5029 nxt_buf_used_size(buf), 5030 port->socket.fd); 5031 5032 req_rpc_data->msg_info.buf = buf; 5033 5034 body = req_rpc_data->request->body; 5035 5036 if (body != NULL && nxt_buf_is_file(body)) { 5037 req_rpc_data->msg_info.body_fd = body->file->fd; 5038 5039 body->file->fd = -1; 5040 5041 } else { 5042 req_rpc_data->msg_info.body_fd = -1; 5043 } 5044 5045 msg.pm.stream = req_rpc_data->stream; 5046 msg.pm.pid = reply_port->pid; 5047 msg.pm.reply_port = reply_port->id; 5048 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; 5049 msg.pm.last = 0; 5050 msg.pm.mmap = 1; 5051 msg.pm.nf = 0; 5052 msg.pm.mf = 0; 5053 msg.pm.tracking = 0; 5054 5055 nxt_port_mmap_handler_t *mmap_handler = buf->parent; 5056 nxt_port_mmap_header_t *hdr = mmap_handler->hdr; 5057 5058 msg.mm.mmap_id = hdr->id; 5059 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); 5060 msg.mm.size = nxt_buf_used_size(buf); 5061 5062 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), 5063 req_rpc_data->stream, ¬ify, 5064 &req_rpc_data->msg_info.tracking_cookie); 5065 if (nxt_fast_path(res == NXT_OK)) { 5066 if (notify != 0) { 5067 (void) nxt_port_socket_write(task, port, 5068 NXT_PORT_MSG_READ_QUEUE, 5069 -1, req_rpc_data->stream, 5070 reply_port->id, NULL); 5071 5072 } else { 5073 nxt_debug(task, "queue is not empty"); 5074 } 5075 5076 buf->is_port_mmap_sent = 1; 5077 buf->mem.pos = buf->mem.free; 5078 5079 } else { 5080 nxt_alert(task, "stream #%uD, app '%V': failed to send app message", 5081 req_rpc_data->stream, &app->name); 5082 5083 nxt_http_request_error(task, req_rpc_data->request, 5084 NXT_HTTP_INTERNAL_SERVER_ERROR); 5085 } 5086 } 5087 5088 5089 struct nxt_fields_iter_s { 5090 nxt_list_part_t *part; 5091 nxt_http_field_t *field; 5092 }; 5093 5094 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 5095 5096 5097 static nxt_http_field_t * 5098 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 5099 { 5100 if (part == NULL) { 5101 return NULL; 5102 } 5103 5104 while (part->nelts == 0) { 5105 part = part->next; 5106 if (part == NULL) { 5107 return NULL; 5108 } 5109 } 5110 5111 i->part = part; 5112 i->field = nxt_list_data(i->part); 5113 5114 return i->field; 5115 } 5116 5117 5118 static nxt_http_field_t * 5119 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 5120 { 5121 return nxt_fields_part_first(nxt_list_part(fields), i); 5122 } 5123 5124 5125 static nxt_http_field_t * 5126 nxt_fields_next(nxt_fields_iter_t *i) 5127 { 5128 nxt_http_field_t *end = nxt_list_data(i->part); 5129 5130 end += i->part->nelts; 5131 i->field++; 5132 5133 if (i->field < end) { 5134 return i->field; 5135 } 5136 5137 return nxt_fields_part_first(i->part->next, i); 5138 } 5139 5140 5141 static nxt_buf_t * 5142 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, 5143 nxt_app_t *app, const nxt_str_t *prefix) 5144 { 5145 void *target_pos, *query_pos; 5146 u_char *pos, *end, *p, c; 5147 size_t fields_count, req_size, size, free_size; 5148 size_t copy_size; 5149 nxt_off_t content_length; 5150 nxt_buf_t *b, *buf, *out, **tail; 5151 nxt_http_field_t *field, *dup; 5152 nxt_unit_field_t *dst_field; 5153 nxt_fields_iter_t iter, dup_iter; 5154 nxt_unit_request_t *req; 5155 5156 req_size = sizeof(nxt_unit_request_t) 5157 + r->method->length + 1 5158 + r->version.length + 1 5159 + r->remote->length + 1 5160 + r->local->length + 1 5161 + r->server_name.length + 1 5162 + r->target.length + 1 5163 + (r->path->start != r->target.start ? r->path->length + 1 : 0); 5164 5165 content_length = r->content_length_n < 0 ? 0 : r->content_length_n; 5166 fields_count = 0; 5167 5168 nxt_list_each(field, r->fields) { 5169 fields_count++; 5170 5171 req_size += field->name_length + prefix->length + 1 5172 + field->value_length + 1; 5173 } nxt_list_loop; 5174 5175 req_size += fields_count * sizeof(nxt_unit_field_t); 5176 5177 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 5178 nxt_alert(task, "headers to big to fit in shared memory (%d)", 5179 (int) req_size); 5180 5181 return NULL; 5182 } 5183 5184 out = nxt_port_mmap_get_buf(task, &app->outgoing, 5185 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); 5186 if (nxt_slow_path(out == NULL)) { 5187 return NULL; 5188 } 5189 5190 req = (nxt_unit_request_t *) out->mem.free; 5191 out->mem.free += req_size; 5192 5193 req->app_target = r->app_target; 5194 5195 req->content_length = content_length; 5196 5197 p = (u_char *) (req->fields + fields_count); 5198 5199 nxt_debug(task, "fields_count=%d", (int) fields_count); 5200 5201 req->method_length = r->method->length; 5202 nxt_unit_sptr_set(&req->method, p); 5203 p = nxt_cpymem(p, r->method->start, r->method->length); 5204 *p++ = '\0'; 5205 5206 req->version_length = r->version.length; 5207 nxt_unit_sptr_set(&req->version, p); 5208 p = nxt_cpymem(p, r->version.start, r->version.length); 5209 *p++ = '\0'; 5210 5211 req->remote_length = r->remote->address_length; 5212 nxt_unit_sptr_set(&req->remote, p); 5213 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote), 5214 r->remote->address_length); 5215 *p++ = '\0'; 5216 5217 req->local_length = r->local->address_length; 5218 nxt_unit_sptr_set(&req->local, p); 5219 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length); 5220 *p++ = '\0'; 5221 5222 req->tls = (r->tls != NULL); 5223 req->websocket_handshake = r->websocket_handshake; 5224 5225 req->server_name_length = r->server_name.length; 5226 nxt_unit_sptr_set(&req->server_name, p); 5227 p = nxt_cpymem(p, r->server_name.start, r->server_name.length); 5228 *p++ = '\0'; 5229 5230 target_pos = p; 5231 req->target_length = (uint32_t) r->target.length; 5232 nxt_unit_sptr_set(&req->target, p); 5233 p = nxt_cpymem(p, r->target.start, r->target.length); 5234 *p++ = '\0'; 5235 5236 req->path_length = (uint32_t) r->path->length; 5237 if (r->path->start == r->target.start) { 5238 nxt_unit_sptr_set(&req->path, target_pos); 5239 5240 } else { 5241 nxt_unit_sptr_set(&req->path, p); 5242 p = nxt_cpymem(p, r->path->start, r->path->length); 5243 *p++ = '\0'; 5244 } 5245 5246 req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0; 5247 if (r->args != NULL && r->args->start != NULL) { 5248 query_pos = nxt_pointer_to(target_pos, 5249 r->args->start - r->target.start); 5250 5251 nxt_unit_sptr_set(&req->query, query_pos); 5252 5253 } else { 5254 req->query.offset = 0; 5255 } 5256 5257 req->content_length_field = NXT_UNIT_NONE_FIELD; 5258 req->content_type_field = NXT_UNIT_NONE_FIELD; 5259 req->cookie_field = NXT_UNIT_NONE_FIELD; 5260 req->authorization_field = NXT_UNIT_NONE_FIELD; 5261 5262 dst_field = req->fields; 5263 5264 for (field = nxt_fields_first(r->fields, &iter); 5265 field != NULL; 5266 field = nxt_fields_next(&iter)) 5267 { 5268 if (field->skip) { 5269 continue; 5270 } 5271 5272 dst_field->hash = field->hash; 5273 dst_field->skip = 0; 5274 dst_field->name_length = field->name_length + prefix->length; 5275 dst_field->value_length = field->value_length; 5276 5277 if (field == r->content_length) { 5278 req->content_length_field = dst_field - req->fields; 5279 5280 } else if (field == r->content_type) { 5281 req->content_type_field = dst_field - req->fields; 5282 5283 } else if (field == r->cookie) { 5284 req->cookie_field = dst_field - req->fields; 5285 5286 } else if (field == r->authorization) { 5287 req->authorization_field = dst_field - req->fields; 5288 } 5289 5290 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 5291 (int) field->hash, (int) field->skip, 5292 (int) field->name_length, field->name, 5293 (int) field->value_length, field->value); 5294 5295 if (prefix->length != 0) { 5296 nxt_unit_sptr_set(&dst_field->name, p); 5297 p = nxt_cpymem(p, prefix->start, prefix->length); 5298 5299 end = field->name + field->name_length; 5300 for (pos = field->name; pos < end; pos++) { 5301 c = *pos; 5302 5303 if (c >= 'a' && c <= 'z') { 5304 *p++ = (c & ~0x20); 5305 continue; 5306 } 5307 5308 if (c == '-') { 5309 *p++ = '_'; 5310 continue; 5311 } 5312 5313 *p++ = c; 5314 } 5315 5316 } else { 5317 nxt_unit_sptr_set(&dst_field->name, p); 5318 p = nxt_cpymem(p, field->name, field->name_length); 5319 } 5320 5321 *p++ = '\0'; 5322 5323 nxt_unit_sptr_set(&dst_field->value, p); 5324 p = nxt_cpymem(p, field->value, field->value_length); 5325 5326 if (prefix->length != 0) { 5327 dup_iter = iter; 5328 5329 for (dup = nxt_fields_next(&dup_iter); 5330 dup != NULL; 5331 dup = nxt_fields_next(&dup_iter)) 5332 { 5333 if (dup->name_length != field->name_length 5334 || dup->skip 5335 || dup->hash != field->hash 5336 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 5337 { 5338 continue; 5339 } 5340 5341 p = nxt_cpymem(p, ", ", 2); 5342 p = nxt_cpymem(p, dup->value, dup->value_length); 5343 5344 dst_field->value_length += 2 + dup->value_length; 5345 5346 dup->skip = 1; 5347 } 5348 } 5349 5350 *p++ = '\0'; 5351 5352 dst_field++; 5353 } 5354 5355 req->fields_count = (uint32_t) (dst_field - req->fields); 5356 5357 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 5358 5359 buf = out; 5360 tail = &buf->next; 5361 5362 for (b = r->body; b != NULL; b = b->next) { 5363 size = nxt_buf_mem_used_size(&b->mem); 5364 pos = b->mem.pos; 5365 5366 while (size > 0) { 5367 if (buf == NULL) { 5368 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 5369 5370 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5371 if (nxt_slow_path(buf == NULL)) { 5372 while (out != NULL) { 5373 buf = out->next; 5374 out->next = NULL; 5375 out->completion_handler(task, out, out->parent); 5376 out = buf; 5377 } 5378 return NULL; 5379 } 5380 5381 *tail = buf; 5382 tail = &buf->next; 5383 5384 } else { 5385 free_size = nxt_buf_mem_free_size(&buf->mem); 5386 if (free_size < size 5387 && nxt_port_mmap_increase_buf(task, buf, size, 1) 5388 == NXT_OK) 5389 { 5390 free_size = nxt_buf_mem_free_size(&buf->mem); 5391 } 5392 } 5393 5394 if (free_size > 0) { 5395 copy_size = nxt_min(free_size, size); 5396 5397 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 5398 5399 size -= copy_size; 5400 pos += copy_size; 5401 5402 if (size == 0) { 5403 break; 5404 } 5405 } 5406 5407 buf = NULL; 5408 } 5409 } 5410 5411 return out; 5412 } 5413 5414 5415 static void 5416 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 5417 { 5418 nxt_timer_t *timer; 5419 nxt_http_request_t *r; 5420 nxt_request_rpc_data_t *req_rpc_data; 5421 5422 timer = obj; 5423 5424 nxt_debug(task, "router app timeout"); 5425 5426 r = nxt_timer_data(timer, nxt_http_request_t, timer); 5427 req_rpc_data = r->timer_data; 5428 5429 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5430 5431 nxt_request_rpc_data_unlink(task, req_rpc_data); 5432 } 5433 5434 5435 static void 5436 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5437 { 5438 r->timer.handler = nxt_router_http_request_release; 5439 nxt_timer_add(task->thread->engine, &r->timer, 0); 5440 } 5441 5442 5443 static void 5444 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) 5445 { 5446 nxt_http_request_t *r; 5447 5448 nxt_debug(task, "http request pool release"); 5449 5450 r = nxt_timer_data(obj, nxt_http_request_t, timer); 5451 5452 nxt_mp_release(r->mem_pool); 5453 } 5454 5455 5456 static void 5457 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5458 { 5459 size_t mi; 5460 uint32_t i; 5461 nxt_bool_t ack; 5462 nxt_process_t *process; 5463 nxt_free_map_t *m; 5464 nxt_port_mmap_handler_t *mmap_handler; 5465 5466 nxt_debug(task, "oosm in %PI", msg->port_msg.pid); 5467 5468 process = nxt_runtime_process_find(task->thread->runtime, 5469 msg->port_msg.pid); 5470 if (nxt_slow_path(process == NULL)) { 5471 return; 5472 } 5473 5474 ack = 0; 5475 5476 /* 5477 * To mitigate possible racing condition (when OOSM message received 5478 * after some of the memory was already freed), need to try to find 5479 * first free segment in shared memory and send ACK if found. 5480 */ 5481 5482 nxt_thread_mutex_lock(&process->incoming.mutex); 5483 5484 for (i = 0; i < process->incoming.size; i++) { 5485 mmap_handler = process->incoming.elts[i].mmap_handler; 5486 5487 if (nxt_slow_path(mmap_handler == NULL)) { 5488 continue; 5489 } 5490 5491 m = mmap_handler->hdr->free_map; 5492 5493 for (mi = 0; mi < MAX_FREE_IDX; mi++) { 5494 if (m[mi] != 0) { 5495 ack = 1; 5496 5497 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", 5498 i, mi, m[mi]); 5499 5500 break; 5501 } 5502 } 5503 } 5504 5505 nxt_thread_mutex_unlock(&process->incoming.mutex); 5506 5507 if (ack) { 5508 nxt_process_broadcast_shm_ack(task, process); 5509 } 5510 } 5511 5512 5513 static void 5514 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5515 { 5516 nxt_fd_t fd; 5517 nxt_port_t *port; 5518 nxt_runtime_t *rt; 5519 nxt_port_mmaps_t *mmaps; 5520 nxt_port_msg_get_mmap_t *get_mmap_msg; 5521 nxt_port_mmap_handler_t *mmap_handler; 5522 5523 rt = task->thread->runtime; 5524 5525 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5526 msg->port_msg.reply_port); 5527 if (nxt_slow_path(port == NULL)) { 5528 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", 5529 msg->port_msg.pid, msg->port_msg.reply_port); 5530 5531 return; 5532 } 5533 5534 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5535 < (int) sizeof(nxt_port_msg_get_mmap_t))) 5536 { 5537 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", 5538 (int) nxt_buf_used_size(msg->buf)); 5539 5540 return; 5541 } 5542 5543 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; 5544 5545 nxt_assert(port->type == NXT_PROCESS_APP); 5546 5547 if (nxt_slow_path(port->app == NULL)) { 5548 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", 5549 port->pid, port->id); 5550 5551 // FIXME 5552 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5553 -1, msg->port_msg.stream, 0, NULL); 5554 5555 return; 5556 } 5557 5558 mmaps = &port->app->outgoing; 5559 nxt_thread_mutex_lock(&mmaps->mutex); 5560 5561 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { 5562 nxt_thread_mutex_unlock(&mmaps->mutex); 5563 5564 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", 5565 (int) get_mmap_msg->id); 5566 5567 // FIXME 5568 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5569 -1, msg->port_msg.stream, 0, NULL); 5570 return; 5571 } 5572 5573 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; 5574 5575 fd = mmap_handler->fd; 5576 5577 nxt_thread_mutex_unlock(&mmaps->mutex); 5578 5579 nxt_debug(task, "get mmap %PI:%d found", 5580 msg->port_msg.pid, (int) get_mmap_msg->id); 5581 5582 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 5583 } 5584 5585 5586 static void 5587 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5588 { 5589 nxt_port_t *port, *reply_port; 5590 nxt_runtime_t *rt; 5591 nxt_port_msg_get_port_t *get_port_msg; 5592 5593 rt = task->thread->runtime; 5594 5595 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5596 msg->port_msg.reply_port); 5597 if (nxt_slow_path(reply_port == NULL)) { 5598 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", 5599 msg->port_msg.pid, msg->port_msg.reply_port); 5600 5601 return; 5602 } 5603 5604 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5605 < (int) sizeof(nxt_port_msg_get_port_t))) 5606 { 5607 nxt_alert(task, "get_port_handler: message buffer too small (%d)", 5608 (int) nxt_buf_used_size(msg->buf)); 5609 5610 return; 5611 } 5612 5613 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; 5614 5615 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); 5616 if (nxt_slow_path(port == NULL)) { 5617 nxt_alert(task, "get_port_handler: port %PI:%d not found", 5618 get_port_msg->pid, get_port_msg->id); 5619 5620 return; 5621 } 5622 5623 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, 5624 get_port_msg->id); 5625 5626 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); 5627 } 5628