1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 #include <nxt_app_queue.h> 19 #include <nxt_port_queue.h> 20 21 typedef struct { 22 nxt_str_t type; 23 uint32_t processes; 24 uint32_t max_processes; 25 uint32_t spare_processes; 26 nxt_msec_t timeout; 27 nxt_msec_t idle_timeout; 28 uint32_t requests; 29 nxt_conf_value_t *limits_value; 30 nxt_conf_value_t *processes_value; 31 nxt_conf_value_t *targets_value; 32 } nxt_router_app_conf_t; 33 34 35 typedef struct { 36 nxt_str_t pass; 37 nxt_str_t application; 38 } nxt_router_listener_conf_t; 39 40 41 #if (NXT_TLS) 42 43 typedef struct { 44 nxt_str_t name; 45 nxt_socket_conf_t *conf; 46 47 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 48 } nxt_router_tlssock_t; 49 50 #endif 51 52 53 typedef struct { 54 nxt_socket_conf_t *socket_conf; 55 nxt_router_temp_conf_t *temp_conf; 56 } nxt_socket_rpc_t; 57 58 59 typedef struct { 60 nxt_app_t *app; 61 nxt_router_temp_conf_t *temp_conf; 62 } nxt_app_rpc_t; 63 64 65 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 66 nxt_mp_t *mp); 67 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 68 static void nxt_router_greet_controller(nxt_task_t *task, 69 nxt_port_t *controller_port); 70 71 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 72 73 static void nxt_router_new_port_handler(nxt_task_t *task, 74 nxt_port_recv_msg_t *msg); 75 static void nxt_router_conf_data_handler(nxt_task_t *task, 76 nxt_port_recv_msg_t *msg); 77 static void nxt_router_remove_pid_handler(nxt_task_t *task, 78 nxt_port_recv_msg_t *msg); 79 static void nxt_router_access_log_reopen_handler(nxt_task_t *task, 80 nxt_port_recv_msg_t *msg); 81 82 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 83 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 84 static void nxt_router_conf_ready(nxt_task_t *task, 85 nxt_router_temp_conf_t *tmcf); 86 static void nxt_router_conf_error(nxt_task_t *task, 87 nxt_router_temp_conf_t *tmcf); 88 static void nxt_router_conf_send(nxt_task_t *task, 89 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 90 91 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 92 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 93 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 94 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 95 96 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 97 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 98 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 99 nxt_app_t *app); 100 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 101 nxt_str_t *name); 102 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 103 int i); 104 105 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 106 nxt_port_t *port); 107 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 108 nxt_port_t *port); 109 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 110 nxt_port_t *port, nxt_fd_t fd); 111 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 112 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 113 static void nxt_router_listen_socket_ready(nxt_task_t *task, 114 nxt_port_recv_msg_t *msg, void *data); 115 static void nxt_router_listen_socket_error(nxt_task_t *task, 116 nxt_port_recv_msg_t *msg, void *data); 117 #if (NXT_TLS) 118 static void nxt_router_tls_rpc_create(nxt_task_t *task, 119 nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls); 120 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 121 nxt_port_recv_msg_t *msg, void *data); 122 #endif 123 static void nxt_router_app_rpc_create(nxt_task_t *task, 124 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 125 static void nxt_router_app_prefork_ready(nxt_task_t *task, 126 nxt_port_recv_msg_t *msg, void *data); 127 static void nxt_router_app_prefork_error(nxt_task_t *task, 128 nxt_port_recv_msg_t *msg, void *data); 129 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 130 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 131 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 132 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 133 134 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 135 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 136 const nxt_event_interface_t *interface); 137 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 138 nxt_router_engine_conf_t *recf); 139 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 140 nxt_router_engine_conf_t *recf); 141 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 142 nxt_router_engine_conf_t *recf); 143 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 144 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 145 nxt_work_handler_t handler); 146 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 147 nxt_router_engine_conf_t *recf); 148 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 149 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 150 151 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 152 nxt_router_temp_conf_t *tmcf); 153 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 154 nxt_event_engine_t *engine); 155 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 156 nxt_router_temp_conf_t *tmcf); 157 158 static void nxt_router_engines_post(nxt_router_t *router, 159 nxt_router_temp_conf_t *tmcf); 160 static void nxt_router_engine_post(nxt_event_engine_t *engine, 161 nxt_work_t *jobs); 162 163 static void nxt_router_thread_start(void *data); 164 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 165 void *data); 166 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 167 void *data); 168 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 169 void *data); 170 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 171 void *data); 172 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 173 void *data); 174 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 175 void *data); 176 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 177 void *data); 178 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 179 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 180 static void nxt_router_listen_socket_release(nxt_task_t *task, 181 nxt_socket_conf_t *skcf); 182 183 static void nxt_router_access_log_writer(nxt_task_t *task, 184 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 185 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 186 struct tm *tm, size_t size, const char *format); 187 static void nxt_router_access_log_open(nxt_task_t *task, 188 nxt_router_temp_conf_t *tmcf); 189 static void nxt_router_access_log_ready(nxt_task_t *task, 190 nxt_port_recv_msg_t *msg, void *data); 191 static void nxt_router_access_log_error(nxt_task_t *task, 192 nxt_port_recv_msg_t *msg, void *data); 193 static void nxt_router_access_log_release(nxt_task_t *task, 194 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 195 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 196 void *data); 197 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 198 nxt_port_recv_msg_t *msg, void *data); 199 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 200 nxt_port_recv_msg_t *msg, void *data); 201 202 static void nxt_router_app_port_ready(nxt_task_t *task, 203 nxt_port_recv_msg_t *msg, void *data); 204 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, 205 nxt_port_t *app_port); 206 static void nxt_router_app_port_error(nxt_task_t *task, 207 nxt_port_recv_msg_t *msg, void *data); 208 209 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 210 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 211 212 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 213 nxt_apr_action_t action); 214 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 215 nxt_request_rpc_data_t *req_rpc_data); 216 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 217 void *data); 218 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 219 void *data); 220 221 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, 222 void *data); 223 static void nxt_router_app_prepare_request(nxt_task_t *task, 224 nxt_request_rpc_data_t *req_rpc_data); 225 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 226 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 227 228 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 229 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 230 void *data); 231 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 232 void *data); 233 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 234 void *data); 235 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 236 237 static const nxt_http_request_state_t nxt_http_request_send_state; 238 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 239 240 static void nxt_router_app_joint_use(nxt_task_t *task, 241 nxt_app_joint_t *app_joint, int i); 242 243 static void nxt_router_http_request_release_post(nxt_task_t *task, 244 nxt_http_request_t *r); 245 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 246 void *data); 247 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 248 static void nxt_router_get_port_handler(nxt_task_t *task, 249 nxt_port_recv_msg_t *msg); 250 static void nxt_router_get_mmap_handler(nxt_task_t *task, 251 nxt_port_recv_msg_t *msg); 252 253 extern const nxt_http_request_state_t nxt_http_websocket; 254 255 static nxt_router_t *nxt_router; 256 257 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 258 static const nxt_str_t empty_prefix = nxt_string(""); 259 260 static const nxt_str_t *nxt_app_msg_prefix[] = { 261 &empty_prefix, 262 &empty_prefix, 263 &http_prefix, 264 &http_prefix, 265 &http_prefix, 266 &empty_prefix, 267 }; 268 269 270 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 271 .quit = nxt_signal_quit_handler, 272 .new_port = nxt_router_new_port_handler, 273 .get_port = nxt_router_get_port_handler, 274 .change_file = nxt_port_change_log_file_handler, 275 .mmap = nxt_port_mmap_handler, 276 .get_mmap = nxt_router_get_mmap_handler, 277 .data = nxt_router_conf_data_handler, 278 .remove_pid = nxt_router_remove_pid_handler, 279 .access_log = nxt_router_access_log_reopen_handler, 280 .rpc_ready = nxt_port_rpc_handler, 281 .rpc_error = nxt_port_rpc_handler, 282 .oosm = nxt_router_oosm_handler, 283 }; 284 285 286 const nxt_process_init_t nxt_router_process = { 287 .name = "router", 288 .type = NXT_PROCESS_ROUTER, 289 .prefork = nxt_router_prefork, 290 .restart = 1, 291 .setup = nxt_process_core_setup, 292 .start = nxt_router_start, 293 .port_handlers = &nxt_router_process_port_handlers, 294 .signals = nxt_process_signals, 295 }; 296 297 298 /* Queues of nxt_socket_conf_t */ 299 nxt_queue_t creating_sockets; 300 nxt_queue_t pending_sockets; 301 nxt_queue_t updating_sockets; 302 nxt_queue_t keeping_sockets; 303 nxt_queue_t deleting_sockets; 304 305 306 static nxt_int_t 307 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 308 { 309 nxt_runtime_stop_app_processes(task, task->thread->runtime); 310 311 return NXT_OK; 312 } 313 314 315 static nxt_int_t 316 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 317 { 318 nxt_int_t ret; 319 nxt_port_t *controller_port; 320 nxt_router_t *router; 321 nxt_runtime_t *rt; 322 323 rt = task->thread->runtime; 324 325 nxt_log(task, NXT_LOG_INFO, "router started"); 326 327 #if (NXT_TLS) 328 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 329 if (nxt_slow_path(rt->tls == NULL)) { 330 return NXT_ERROR; 331 } 332 333 ret = rt->tls->library_init(task); 334 if (nxt_slow_path(ret != NXT_OK)) { 335 return ret; 336 } 337 #endif 338 339 ret = nxt_http_init(task); 340 if (nxt_slow_path(ret != NXT_OK)) { 341 return ret; 342 } 343 344 router = nxt_zalloc(sizeof(nxt_router_t)); 345 if (nxt_slow_path(router == NULL)) { 346 return NXT_ERROR; 347 } 348 349 nxt_queue_init(&router->engines); 350 nxt_queue_init(&router->sockets); 351 nxt_queue_init(&router->apps); 352 353 nxt_router = router; 354 355 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 356 if (controller_port != NULL) { 357 nxt_router_greet_controller(task, controller_port); 358 } 359 360 return NXT_OK; 361 } 362 363 364 static void 365 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 366 { 367 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 368 -1, 0, 0, NULL); 369 } 370 371 372 static void 373 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 374 void *data) 375 { 376 size_t size; 377 uint32_t stream; 378 nxt_mp_t *mp; 379 nxt_int_t ret; 380 nxt_app_t *app; 381 nxt_buf_t *b; 382 nxt_port_t *main_port; 383 nxt_runtime_t *rt; 384 385 app = data; 386 387 rt = task->thread->runtime; 388 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 389 390 nxt_debug(task, "app '%V' %p start process", &app->name, app); 391 392 size = app->name.length + 1 + app->conf.length; 393 394 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 395 396 if (nxt_slow_path(b == NULL)) { 397 goto failed; 398 } 399 400 nxt_buf_cpystr(b, &app->name); 401 *b->mem.free++ = '\0'; 402 nxt_buf_cpystr(b, &app->conf); 403 404 nxt_router_app_joint_use(task, app->joint, 1); 405 406 stream = nxt_port_rpc_register_handler(task, port, 407 nxt_router_app_port_ready, 408 nxt_router_app_port_error, 409 -1, app->joint); 410 411 if (nxt_slow_path(stream == 0)) { 412 nxt_router_app_joint_use(task, app->joint, -1); 413 414 goto failed; 415 } 416 417 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 418 -1, stream, port->id, b); 419 420 if (nxt_slow_path(ret != NXT_OK)) { 421 nxt_port_rpc_cancel(task, port, stream); 422 423 nxt_router_app_joint_use(task, app->joint, -1); 424 425 goto failed; 426 } 427 428 nxt_router_app_use(task, app, -1); 429 430 return; 431 432 failed: 433 434 if (b != NULL) { 435 mp = b->data; 436 nxt_mp_free(mp, b); 437 nxt_mp_release(mp); 438 } 439 440 nxt_thread_mutex_lock(&app->mutex); 441 442 app->pending_processes--; 443 444 nxt_thread_mutex_unlock(&app->mutex); 445 446 nxt_router_app_use(task, app, -1); 447 } 448 449 450 static void 451 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 452 { 453 app_joint->use_count += i; 454 455 if (app_joint->use_count == 0) { 456 nxt_assert(app_joint->app == NULL); 457 458 nxt_free(app_joint); 459 } 460 } 461 462 463 static nxt_int_t 464 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 465 { 466 nxt_int_t res; 467 nxt_port_t *router_port; 468 nxt_runtime_t *rt; 469 470 nxt_debug(task, "app '%V' start process", &app->name); 471 472 rt = task->thread->runtime; 473 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 474 475 nxt_router_app_use(task, app, 1); 476 477 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 478 app); 479 480 if (res == NXT_OK) { 481 return res; 482 } 483 484 nxt_thread_mutex_lock(&app->mutex); 485 486 app->pending_processes--; 487 488 nxt_thread_mutex_unlock(&app->mutex); 489 490 nxt_router_app_use(task, app, -1); 491 492 return NXT_ERROR; 493 } 494 495 496 nxt_inline nxt_bool_t 497 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 498 { 499 nxt_buf_t *b, *next; 500 nxt_bool_t cancelled; 501 nxt_msg_info_t *msg_info; 502 503 msg_info = &req_rpc_data->msg_info; 504 505 if (msg_info->buf == NULL) { 506 return 0; 507 } 508 509 cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue, 510 msg_info->tracking_cookie, 511 req_rpc_data->stream); 512 513 if (cancelled) { 514 nxt_debug(task, "stream #%uD: cancelled by router", 515 req_rpc_data->stream); 516 } 517 518 for (b = msg_info->buf; b != NULL; b = next) { 519 next = b->next; 520 b->next = NULL; 521 522 b->completion_handler = msg_info->completion_handler; 523 524 if (b->is_port_mmap_sent) { 525 b->is_port_mmap_sent = cancelled == 0; 526 b->completion_handler(task, b, b->parent); 527 } 528 } 529 530 msg_info->buf = NULL; 531 532 return cancelled; 533 } 534 535 536 nxt_inline nxt_bool_t 537 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 538 { 539 if (lnk->next != NULL) { 540 nxt_queue_remove(lnk); 541 542 lnk->next = NULL; 543 544 return 1; 545 } 546 547 return 0; 548 } 549 550 551 nxt_inline void 552 nxt_request_rpc_data_unlink(nxt_task_t *task, 553 nxt_request_rpc_data_t *req_rpc_data) 554 { 555 nxt_app_t *app; 556 nxt_bool_t unlinked; 557 nxt_http_request_t *r; 558 559 nxt_router_msg_cancel(task, req_rpc_data); 560 561 if (req_rpc_data->app_port != NULL) { 562 nxt_router_app_port_release(task, req_rpc_data->app_port, 563 req_rpc_data->apr_action); 564 565 req_rpc_data->app_port = NULL; 566 } 567 568 app = req_rpc_data->app; 569 r = req_rpc_data->request; 570 571 if (r != NULL) { 572 r->timer_data = NULL; 573 574 nxt_router_http_request_release_post(task, r); 575 576 r->req_rpc_data = NULL; 577 req_rpc_data->request = NULL; 578 579 if (app != NULL) { 580 unlinked = 0; 581 582 nxt_thread_mutex_lock(&app->mutex); 583 584 if (r->app_link.next != NULL) { 585 nxt_queue_remove(&r->app_link); 586 r->app_link.next = NULL; 587 588 unlinked = 1; 589 } 590 591 nxt_thread_mutex_unlock(&app->mutex); 592 593 if (unlinked) { 594 nxt_mp_release(r->mem_pool); 595 } 596 } 597 } 598 599 if (app != NULL) { 600 nxt_router_app_use(task, app, -1); 601 602 req_rpc_data->app = NULL; 603 } 604 605 if (req_rpc_data->msg_info.body_fd != -1) { 606 nxt_fd_close(req_rpc_data->msg_info.body_fd); 607 608 req_rpc_data->msg_info.body_fd = -1; 609 } 610 611 if (req_rpc_data->rpc_cancel) { 612 req_rpc_data->rpc_cancel = 0; 613 614 nxt_port_rpc_cancel(task, task->thread->engine->port, 615 req_rpc_data->stream); 616 } 617 } 618 619 620 static void 621 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 622 { 623 nxt_int_t res; 624 nxt_app_t *app; 625 nxt_port_t *port, *main_app_port; 626 nxt_runtime_t *rt; 627 628 nxt_port_new_port_handler(task, msg); 629 630 port = msg->u.new_port; 631 632 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 633 nxt_router_greet_controller(task, msg->u.new_port); 634 } 635 636 if (port == NULL || port->type != NXT_PROCESS_APP) { 637 638 if (msg->port_msg.stream == 0) { 639 return; 640 } 641 642 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 643 644 } else { 645 if (msg->fd[1] != -1) { 646 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 647 if (nxt_slow_path(res != NXT_OK)) { 648 return; 649 } 650 651 nxt_fd_close(msg->fd[1]); 652 msg->fd[1] = -1; 653 } 654 } 655 656 if (msg->port_msg.stream != 0) { 657 nxt_port_rpc_handler(task, msg); 658 return; 659 } 660 661 /* 662 * Port with "id == 0" is application 'main' port and it always 663 * should come with non-zero stream. 664 */ 665 nxt_assert(port->id != 0); 666 667 /* Find 'main' app port and get app reference. */ 668 rt = task->thread->runtime; 669 670 /* 671 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 672 * sent to main port (with id == 0) and processed in main thread. 673 */ 674 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 675 nxt_assert(main_app_port != NULL); 676 677 app = main_app_port->app; 678 nxt_assert(app != NULL); 679 680 nxt_thread_mutex_lock(&app->mutex); 681 682 /* TODO here should be find-and-add code because there can be 683 port waiters in port_hash */ 684 nxt_port_hash_add(&app->port_hash, port); 685 app->port_hash_count++; 686 687 nxt_thread_mutex_unlock(&app->mutex); 688 689 port->app = app; 690 port->main_app_port = main_app_port; 691 692 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 693 } 694 695 696 static void 697 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 698 { 699 void *p; 700 size_t size; 701 nxt_int_t ret; 702 nxt_port_t *port; 703 nxt_router_temp_conf_t *tmcf; 704 705 port = nxt_runtime_port_find(task->thread->runtime, 706 msg->port_msg.pid, 707 msg->port_msg.reply_port); 708 if (nxt_slow_path(port == NULL)) { 709 nxt_alert(task, "conf_data_handler: reply port not found"); 710 return; 711 } 712 713 p = MAP_FAILED; 714 715 /* 716 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 717 * initialized in 'cleanup' section. 718 */ 719 size = 0; 720 721 tmcf = nxt_router_temp_conf(task); 722 if (nxt_slow_path(tmcf == NULL)) { 723 goto fail; 724 } 725 726 if (nxt_slow_path(msg->fd[0] == -1)) { 727 nxt_alert(task, "conf_data_handler: invalid shm fd"); 728 goto fail; 729 } 730 731 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 732 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 733 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 734 goto fail; 735 } 736 737 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 738 739 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 740 741 nxt_fd_close(msg->fd[0]); 742 msg->fd[0] = -1; 743 744 if (nxt_slow_path(p == MAP_FAILED)) { 745 goto fail; 746 } 747 748 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 749 750 tmcf->router_conf->router = nxt_router; 751 tmcf->stream = msg->port_msg.stream; 752 tmcf->port = port; 753 754 nxt_port_use(task, tmcf->port, 1); 755 756 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 757 758 if (nxt_fast_path(ret == NXT_OK)) { 759 nxt_router_conf_apply(task, tmcf, NULL); 760 761 } else { 762 nxt_router_conf_error(task, tmcf); 763 } 764 765 goto cleanup; 766 767 fail: 768 769 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 770 msg->port_msg.stream, 0, NULL); 771 772 if (tmcf != NULL) { 773 nxt_mp_destroy(tmcf->mem_pool); 774 } 775 776 cleanup: 777 778 if (p != MAP_FAILED) { 779 nxt_mem_munmap(p, size); 780 } 781 782 if (msg->fd[0] != -1) { 783 nxt_fd_close(msg->fd[0]); 784 msg->fd[0] = -1; 785 } 786 } 787 788 789 static void 790 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 791 void *data) 792 { 793 union { 794 nxt_pid_t removed_pid; 795 void *data; 796 } u; 797 798 u.data = data; 799 800 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 801 } 802 803 804 static void 805 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 806 { 807 nxt_event_engine_t *engine; 808 809 nxt_port_remove_pid_handler(task, msg); 810 811 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 812 { 813 if (nxt_fast_path(engine->port != NULL)) { 814 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 815 msg->u.data); 816 } 817 } 818 nxt_queue_loop; 819 820 if (msg->port_msg.stream == 0) { 821 return; 822 } 823 824 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 825 826 nxt_port_rpc_handler(task, msg); 827 } 828 829 830 static nxt_router_temp_conf_t * 831 nxt_router_temp_conf(nxt_task_t *task) 832 { 833 nxt_mp_t *mp, *tmp; 834 nxt_router_conf_t *rtcf; 835 nxt_router_temp_conf_t *tmcf; 836 837 mp = nxt_mp_create(1024, 128, 256, 32); 838 if (nxt_slow_path(mp == NULL)) { 839 return NULL; 840 } 841 842 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 843 if (nxt_slow_path(rtcf == NULL)) { 844 goto fail; 845 } 846 847 rtcf->mem_pool = mp; 848 849 tmp = nxt_mp_create(1024, 128, 256, 32); 850 if (nxt_slow_path(tmp == NULL)) { 851 goto fail; 852 } 853 854 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 855 if (nxt_slow_path(tmcf == NULL)) { 856 goto temp_fail; 857 } 858 859 tmcf->mem_pool = tmp; 860 tmcf->router_conf = rtcf; 861 tmcf->count = 1; 862 tmcf->engine = task->thread->engine; 863 864 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 865 sizeof(nxt_router_engine_conf_t)); 866 if (nxt_slow_path(tmcf->engines == NULL)) { 867 goto temp_fail; 868 } 869 870 nxt_queue_init(&creating_sockets); 871 nxt_queue_init(&pending_sockets); 872 nxt_queue_init(&updating_sockets); 873 nxt_queue_init(&keeping_sockets); 874 nxt_queue_init(&deleting_sockets); 875 876 #if (NXT_TLS) 877 nxt_queue_init(&tmcf->tls); 878 #endif 879 880 nxt_queue_init(&tmcf->apps); 881 nxt_queue_init(&tmcf->previous); 882 883 return tmcf; 884 885 temp_fail: 886 887 nxt_mp_destroy(tmp); 888 889 fail: 890 891 nxt_mp_destroy(mp); 892 893 return NULL; 894 } 895 896 897 nxt_inline nxt_bool_t 898 nxt_router_app_can_start(nxt_app_t *app) 899 { 900 return app->processes + app->pending_processes < app->max_processes 901 && app->pending_processes < app->max_pending_processes; 902 } 903 904 905 nxt_inline nxt_bool_t 906 nxt_router_app_need_start(nxt_app_t *app) 907 { 908 return (app->active_requests 909 > app->port_hash_count + app->pending_processes) 910 || (app->spare_processes 911 > app->idle_processes + app->pending_processes); 912 } 913 914 915 static void 916 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 917 { 918 nxt_int_t ret; 919 nxt_app_t *app; 920 nxt_router_t *router; 921 nxt_runtime_t *rt; 922 nxt_queue_link_t *qlk; 923 nxt_socket_conf_t *skcf; 924 nxt_router_conf_t *rtcf; 925 nxt_router_temp_conf_t *tmcf; 926 const nxt_event_interface_t *interface; 927 #if (NXT_TLS) 928 nxt_router_tlssock_t *tls; 929 #endif 930 931 tmcf = obj; 932 933 qlk = nxt_queue_first(&pending_sockets); 934 935 if (qlk != nxt_queue_tail(&pending_sockets)) { 936 nxt_queue_remove(qlk); 937 nxt_queue_insert_tail(&creating_sockets, qlk); 938 939 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 940 941 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 942 943 return; 944 } 945 946 #if (NXT_TLS) 947 qlk = nxt_queue_first(&tmcf->tls); 948 949 if (qlk != nxt_queue_tail(&tmcf->tls)) { 950 nxt_queue_remove(qlk); 951 952 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 953 954 nxt_router_tls_rpc_create(task, tmcf, tls); 955 return; 956 } 957 #endif 958 959 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 960 961 if (nxt_router_app_need_start(app)) { 962 nxt_router_app_rpc_create(task, tmcf, app); 963 return; 964 } 965 966 } nxt_queue_loop; 967 968 rtcf = tmcf->router_conf; 969 970 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 971 nxt_router_access_log_open(task, tmcf); 972 return; 973 } 974 975 rt = task->thread->runtime; 976 977 interface = nxt_service_get(rt->services, "engine", NULL); 978 979 router = rtcf->router; 980 981 ret = nxt_router_engines_create(task, router, tmcf, interface); 982 if (nxt_slow_path(ret != NXT_OK)) { 983 goto fail; 984 } 985 986 ret = nxt_router_threads_create(task, rt, tmcf); 987 if (nxt_slow_path(ret != NXT_OK)) { 988 goto fail; 989 } 990 991 nxt_router_apps_sort(task, router, tmcf); 992 993 nxt_router_apps_hash_use(task, rtcf, 1); 994 995 nxt_router_engines_post(router, tmcf); 996 997 nxt_queue_add(&router->sockets, &updating_sockets); 998 nxt_queue_add(&router->sockets, &creating_sockets); 999 1000 router->access_log = rtcf->access_log; 1001 1002 nxt_router_conf_ready(task, tmcf); 1003 1004 return; 1005 1006 fail: 1007 1008 nxt_router_conf_error(task, tmcf); 1009 1010 return; 1011 } 1012 1013 1014 static void 1015 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1016 { 1017 nxt_joint_job_t *job; 1018 1019 job = obj; 1020 1021 nxt_router_conf_ready(task, job->tmcf); 1022 } 1023 1024 1025 static void 1026 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1027 { 1028 uint32_t count; 1029 nxt_router_conf_t *rtcf; 1030 nxt_thread_spinlock_t *lock; 1031 1032 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1033 1034 if (--tmcf->count > 0) { 1035 return; 1036 } 1037 1038 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1039 1040 rtcf = tmcf->router_conf; 1041 1042 lock = &rtcf->router->lock; 1043 1044 nxt_thread_spin_lock(lock); 1045 1046 count = rtcf->count; 1047 1048 nxt_thread_spin_unlock(lock); 1049 1050 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1051 1052 if (count == 0) { 1053 nxt_router_apps_hash_use(task, rtcf, -1); 1054 1055 nxt_router_access_log_release(task, lock, rtcf->access_log); 1056 1057 nxt_mp_destroy(rtcf->mem_pool); 1058 } 1059 1060 nxt_mp_destroy(tmcf->mem_pool); 1061 } 1062 1063 1064 static void 1065 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1066 { 1067 nxt_app_t *app; 1068 nxt_queue_t new_socket_confs; 1069 nxt_socket_t s; 1070 nxt_router_t *router; 1071 nxt_queue_link_t *qlk; 1072 nxt_socket_conf_t *skcf; 1073 nxt_router_conf_t *rtcf; 1074 1075 nxt_alert(task, "failed to apply new conf"); 1076 1077 for (qlk = nxt_queue_first(&creating_sockets); 1078 qlk != nxt_queue_tail(&creating_sockets); 1079 qlk = nxt_queue_next(qlk)) 1080 { 1081 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1082 s = skcf->listen->socket; 1083 1084 if (s != -1) { 1085 nxt_socket_close(task, s); 1086 } 1087 1088 nxt_free(skcf->listen); 1089 } 1090 1091 nxt_queue_init(&new_socket_confs); 1092 nxt_queue_add(&new_socket_confs, &updating_sockets); 1093 nxt_queue_add(&new_socket_confs, &pending_sockets); 1094 nxt_queue_add(&new_socket_confs, &creating_sockets); 1095 1096 rtcf = tmcf->router_conf; 1097 1098 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1099 1100 nxt_router_app_unlink(task, app); 1101 1102 } nxt_queue_loop; 1103 1104 router = rtcf->router; 1105 1106 nxt_queue_add(&router->sockets, &keeping_sockets); 1107 nxt_queue_add(&router->sockets, &deleting_sockets); 1108 1109 nxt_queue_add(&router->apps, &tmcf->previous); 1110 1111 // TODO: new engines and threads 1112 1113 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1114 1115 nxt_mp_destroy(rtcf->mem_pool); 1116 1117 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1118 1119 nxt_mp_destroy(tmcf->mem_pool); 1120 } 1121 1122 1123 static void 1124 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1125 nxt_port_msg_type_t type) 1126 { 1127 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1128 1129 nxt_port_use(task, tmcf->port, -1); 1130 1131 tmcf->port = NULL; 1132 } 1133 1134 1135 static nxt_conf_map_t nxt_router_conf[] = { 1136 { 1137 nxt_string("listeners_threads"), 1138 NXT_CONF_MAP_INT32, 1139 offsetof(nxt_router_conf_t, threads), 1140 }, 1141 }; 1142 1143 1144 static nxt_conf_map_t nxt_router_app_conf[] = { 1145 { 1146 nxt_string("type"), 1147 NXT_CONF_MAP_STR, 1148 offsetof(nxt_router_app_conf_t, type), 1149 }, 1150 1151 { 1152 nxt_string("limits"), 1153 NXT_CONF_MAP_PTR, 1154 offsetof(nxt_router_app_conf_t, limits_value), 1155 }, 1156 1157 { 1158 nxt_string("processes"), 1159 NXT_CONF_MAP_INT32, 1160 offsetof(nxt_router_app_conf_t, processes), 1161 }, 1162 1163 { 1164 nxt_string("processes"), 1165 NXT_CONF_MAP_PTR, 1166 offsetof(nxt_router_app_conf_t, processes_value), 1167 }, 1168 1169 { 1170 nxt_string("targets"), 1171 NXT_CONF_MAP_PTR, 1172 offsetof(nxt_router_app_conf_t, targets_value), 1173 }, 1174 }; 1175 1176 1177 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1178 { 1179 nxt_string("timeout"), 1180 NXT_CONF_MAP_MSEC, 1181 offsetof(nxt_router_app_conf_t, timeout), 1182 }, 1183 1184 { 1185 nxt_string("requests"), 1186 NXT_CONF_MAP_INT32, 1187 offsetof(nxt_router_app_conf_t, requests), 1188 }, 1189 }; 1190 1191 1192 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1193 { 1194 nxt_string("spare"), 1195 NXT_CONF_MAP_INT32, 1196 offsetof(nxt_router_app_conf_t, spare_processes), 1197 }, 1198 1199 { 1200 nxt_string("max"), 1201 NXT_CONF_MAP_INT32, 1202 offsetof(nxt_router_app_conf_t, max_processes), 1203 }, 1204 1205 { 1206 nxt_string("idle_timeout"), 1207 NXT_CONF_MAP_MSEC, 1208 offsetof(nxt_router_app_conf_t, idle_timeout), 1209 }, 1210 }; 1211 1212 1213 static nxt_conf_map_t nxt_router_listener_conf[] = { 1214 { 1215 nxt_string("pass"), 1216 NXT_CONF_MAP_STR_COPY, 1217 offsetof(nxt_router_listener_conf_t, pass), 1218 }, 1219 1220 { 1221 nxt_string("application"), 1222 NXT_CONF_MAP_STR_COPY, 1223 offsetof(nxt_router_listener_conf_t, application), 1224 }, 1225 }; 1226 1227 1228 static nxt_conf_map_t nxt_router_http_conf[] = { 1229 { 1230 nxt_string("header_buffer_size"), 1231 NXT_CONF_MAP_SIZE, 1232 offsetof(nxt_socket_conf_t, header_buffer_size), 1233 }, 1234 1235 { 1236 nxt_string("large_header_buffer_size"), 1237 NXT_CONF_MAP_SIZE, 1238 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1239 }, 1240 1241 { 1242 nxt_string("large_header_buffers"), 1243 NXT_CONF_MAP_SIZE, 1244 offsetof(nxt_socket_conf_t, large_header_buffers), 1245 }, 1246 1247 { 1248 nxt_string("body_buffer_size"), 1249 NXT_CONF_MAP_SIZE, 1250 offsetof(nxt_socket_conf_t, body_buffer_size), 1251 }, 1252 1253 { 1254 nxt_string("max_body_size"), 1255 NXT_CONF_MAP_SIZE, 1256 offsetof(nxt_socket_conf_t, max_body_size), 1257 }, 1258 1259 { 1260 nxt_string("idle_timeout"), 1261 NXT_CONF_MAP_MSEC, 1262 offsetof(nxt_socket_conf_t, idle_timeout), 1263 }, 1264 1265 { 1266 nxt_string("header_read_timeout"), 1267 NXT_CONF_MAP_MSEC, 1268 offsetof(nxt_socket_conf_t, header_read_timeout), 1269 }, 1270 1271 { 1272 nxt_string("body_read_timeout"), 1273 NXT_CONF_MAP_MSEC, 1274 offsetof(nxt_socket_conf_t, body_read_timeout), 1275 }, 1276 1277 { 1278 nxt_string("send_timeout"), 1279 NXT_CONF_MAP_MSEC, 1280 offsetof(nxt_socket_conf_t, send_timeout), 1281 }, 1282 1283 { 1284 nxt_string("body_temp_path"), 1285 NXT_CONF_MAP_STR, 1286 offsetof(nxt_socket_conf_t, body_temp_path), 1287 }, 1288 1289 { 1290 nxt_string("discard_unsafe_fields"), 1291 NXT_CONF_MAP_INT8, 1292 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1293 }, 1294 }; 1295 1296 1297 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1298 { 1299 nxt_string("max_frame_size"), 1300 NXT_CONF_MAP_SIZE, 1301 offsetof(nxt_websocket_conf_t, max_frame_size), 1302 }, 1303 1304 { 1305 nxt_string("read_timeout"), 1306 NXT_CONF_MAP_MSEC, 1307 offsetof(nxt_websocket_conf_t, read_timeout), 1308 }, 1309 1310 { 1311 nxt_string("keepalive_interval"), 1312 NXT_CONF_MAP_MSEC, 1313 offsetof(nxt_websocket_conf_t, keepalive_interval), 1314 }, 1315 1316 }; 1317 1318 1319 static nxt_int_t 1320 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1321 u_char *start, u_char *end) 1322 { 1323 u_char *p; 1324 size_t size; 1325 nxt_mp_t *mp, *app_mp; 1326 uint32_t next, next_target; 1327 nxt_int_t ret; 1328 nxt_str_t name, path, target; 1329 nxt_app_t *app, *prev; 1330 nxt_str_t *t, *s, *targets; 1331 nxt_uint_t n, i; 1332 nxt_port_t *port; 1333 nxt_router_t *router; 1334 nxt_app_joint_t *app_joint; 1335 nxt_conf_value_t *conf, *http, *value, *websocket; 1336 nxt_conf_value_t *applications, *application; 1337 nxt_conf_value_t *listeners, *listener; 1338 nxt_conf_value_t *routes_conf, *static_conf; 1339 nxt_socket_conf_t *skcf; 1340 nxt_http_routes_t *routes; 1341 nxt_event_engine_t *engine; 1342 nxt_app_lang_module_t *lang; 1343 nxt_router_app_conf_t apcf; 1344 nxt_router_access_log_t *access_log; 1345 nxt_router_listener_conf_t lscf; 1346 #if (NXT_TLS) 1347 nxt_router_tlssock_t *tls; 1348 #endif 1349 1350 static nxt_str_t http_path = nxt_string("/settings/http"); 1351 static nxt_str_t applications_path = nxt_string("/applications"); 1352 static nxt_str_t listeners_path = nxt_string("/listeners"); 1353 static nxt_str_t routes_path = nxt_string("/routes"); 1354 static nxt_str_t access_log_path = nxt_string("/access_log"); 1355 #if (NXT_TLS) 1356 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1357 #endif 1358 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1359 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1360 1361 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1362 if (conf == NULL) { 1363 nxt_alert(task, "configuration parsing error"); 1364 return NXT_ERROR; 1365 } 1366 1367 mp = tmcf->router_conf->mem_pool; 1368 1369 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1370 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1371 if (ret != NXT_OK) { 1372 nxt_alert(task, "root map error"); 1373 return NXT_ERROR; 1374 } 1375 1376 if (tmcf->router_conf->threads == 0) { 1377 tmcf->router_conf->threads = nxt_ncpu; 1378 } 1379 1380 static_conf = nxt_conf_get_path(conf, &static_path); 1381 1382 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf); 1383 if (nxt_slow_path(ret != NXT_OK)) { 1384 return NXT_ERROR; 1385 } 1386 1387 router = tmcf->router_conf->router; 1388 1389 applications = nxt_conf_get_path(conf, &applications_path); 1390 1391 if (applications != NULL) { 1392 next = 0; 1393 1394 for ( ;; ) { 1395 application = nxt_conf_next_object_member(applications, 1396 &name, &next); 1397 if (application == NULL) { 1398 break; 1399 } 1400 1401 nxt_debug(task, "application \"%V\"", &name); 1402 1403 size = nxt_conf_json_length(application, NULL); 1404 1405 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1406 if (nxt_slow_path(app_mp == NULL)) { 1407 goto fail; 1408 } 1409 1410 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1411 if (app == NULL) { 1412 goto app_fail; 1413 } 1414 1415 nxt_memzero(app, sizeof(nxt_app_t)); 1416 1417 app->mem_pool = app_mp; 1418 1419 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1420 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1421 + name.length); 1422 1423 p = nxt_conf_json_print(app->conf.start, application, NULL); 1424 app->conf.length = p - app->conf.start; 1425 1426 nxt_assert(app->conf.length <= size); 1427 1428 nxt_debug(task, "application conf \"%V\"", &app->conf); 1429 1430 prev = nxt_router_app_find(&router->apps, &name); 1431 1432 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1433 nxt_mp_destroy(app_mp); 1434 1435 nxt_queue_remove(&prev->link); 1436 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1437 1438 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev); 1439 if (nxt_slow_path(ret != NXT_OK)) { 1440 goto fail; 1441 } 1442 1443 continue; 1444 } 1445 1446 apcf.processes = 1; 1447 apcf.max_processes = 1; 1448 apcf.spare_processes = 0; 1449 apcf.timeout = 0; 1450 apcf.idle_timeout = 15000; 1451 apcf.requests = 0; 1452 apcf.limits_value = NULL; 1453 apcf.processes_value = NULL; 1454 apcf.targets_value = NULL; 1455 1456 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1457 if (nxt_slow_path(app_joint == NULL)) { 1458 goto app_fail; 1459 } 1460 1461 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1462 1463 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1464 nxt_nitems(nxt_router_app_conf), &apcf); 1465 if (ret != NXT_OK) { 1466 nxt_alert(task, "application map error"); 1467 goto app_fail; 1468 } 1469 1470 if (apcf.limits_value != NULL) { 1471 1472 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1473 nxt_alert(task, "application limits is not object"); 1474 goto app_fail; 1475 } 1476 1477 ret = nxt_conf_map_object(mp, apcf.limits_value, 1478 nxt_router_app_limits_conf, 1479 nxt_nitems(nxt_router_app_limits_conf), 1480 &apcf); 1481 if (ret != NXT_OK) { 1482 nxt_alert(task, "application limits map error"); 1483 goto app_fail; 1484 } 1485 } 1486 1487 if (apcf.processes_value != NULL 1488 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1489 { 1490 ret = nxt_conf_map_object(mp, apcf.processes_value, 1491 nxt_router_app_processes_conf, 1492 nxt_nitems(nxt_router_app_processes_conf), 1493 &apcf); 1494 if (ret != NXT_OK) { 1495 nxt_alert(task, "application processes map error"); 1496 goto app_fail; 1497 } 1498 1499 } else { 1500 apcf.max_processes = apcf.processes; 1501 apcf.spare_processes = apcf.processes; 1502 } 1503 1504 if (apcf.targets_value != NULL) { 1505 n = nxt_conf_object_members_count(apcf.targets_value); 1506 1507 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1508 if (nxt_slow_path(targets == NULL)) { 1509 goto app_fail; 1510 } 1511 1512 next_target = 0; 1513 1514 for (i = 0; i < n; i++) { 1515 (void) nxt_conf_next_object_member(apcf.targets_value, 1516 &target, &next_target); 1517 1518 s = nxt_str_dup(app_mp, &targets[i], &target); 1519 if (nxt_slow_path(s == NULL)) { 1520 goto app_fail; 1521 } 1522 } 1523 1524 } else { 1525 targets = NULL; 1526 } 1527 1528 nxt_debug(task, "application type: %V", &apcf.type); 1529 nxt_debug(task, "application processes: %D", apcf.processes); 1530 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1531 nxt_debug(task, "application requests: %D", apcf.requests); 1532 1533 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1534 1535 if (lang == NULL) { 1536 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1537 goto app_fail; 1538 } 1539 1540 nxt_debug(task, "application language module: \"%s\"", lang->file); 1541 1542 ret = nxt_thread_mutex_create(&app->mutex); 1543 if (ret != NXT_OK) { 1544 goto app_fail; 1545 } 1546 1547 nxt_queue_init(&app->ports); 1548 nxt_queue_init(&app->spare_ports); 1549 nxt_queue_init(&app->idle_ports); 1550 nxt_queue_init(&app->ack_waiting_req); 1551 1552 app->name.length = name.length; 1553 nxt_memcpy(app->name.start, name.start, name.length); 1554 1555 app->type = lang->type; 1556 app->max_processes = apcf.max_processes; 1557 app->spare_processes = apcf.spare_processes; 1558 app->max_pending_processes = apcf.spare_processes 1559 ? apcf.spare_processes : 1; 1560 app->timeout = apcf.timeout; 1561 app->idle_timeout = apcf.idle_timeout; 1562 app->max_requests = apcf.requests; 1563 1564 app->targets = targets; 1565 1566 engine = task->thread->engine; 1567 1568 app->engine = engine; 1569 1570 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1571 app->adjust_idle_work.task = &engine->task; 1572 app->adjust_idle_work.obj = app; 1573 1574 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1575 1576 ret = nxt_router_apps_hash_add(tmcf->router_conf, app); 1577 if (nxt_slow_path(ret != NXT_OK)) { 1578 goto app_fail; 1579 } 1580 1581 nxt_router_app_use(task, app, 1); 1582 1583 app->joint = app_joint; 1584 1585 app_joint->use_count = 1; 1586 app_joint->app = app; 1587 1588 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1589 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1590 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1591 app_joint->idle_timer.task = &engine->task; 1592 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1593 1594 app_joint->free_app_work.handler = nxt_router_free_app; 1595 app_joint->free_app_work.task = &engine->task; 1596 app_joint->free_app_work.obj = app_joint; 1597 1598 port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid, 1599 NXT_PROCESS_APP); 1600 if (nxt_slow_path(port == NULL)) { 1601 return NXT_ERROR; 1602 } 1603 1604 ret = nxt_port_socket_init(task, port, 0); 1605 if (nxt_slow_path(ret != NXT_OK)) { 1606 nxt_port_use(task, port, -1); 1607 return NXT_ERROR; 1608 } 1609 1610 ret = nxt_router_app_queue_init(task, port); 1611 if (nxt_slow_path(ret != NXT_OK)) { 1612 nxt_port_write_close(port); 1613 nxt_port_read_close(port); 1614 nxt_port_use(task, port, -1); 1615 return NXT_ERROR; 1616 } 1617 1618 nxt_port_write_enable(task, port); 1619 port->app = app; 1620 1621 app->shared_port = port; 1622 1623 nxt_thread_mutex_create(&app->outgoing.mutex); 1624 } 1625 } 1626 1627 routes_conf = nxt_conf_get_path(conf, &routes_path); 1628 if (nxt_fast_path(routes_conf != NULL)) { 1629 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1630 if (nxt_slow_path(routes == NULL)) { 1631 return NXT_ERROR; 1632 } 1633 tmcf->router_conf->routes = routes; 1634 } 1635 1636 ret = nxt_upstreams_create(task, tmcf, conf); 1637 if (nxt_slow_path(ret != NXT_OK)) { 1638 return ret; 1639 } 1640 1641 http = nxt_conf_get_path(conf, &http_path); 1642 #if 0 1643 if (http == NULL) { 1644 nxt_alert(task, "no \"http\" block"); 1645 return NXT_ERROR; 1646 } 1647 #endif 1648 1649 websocket = nxt_conf_get_path(conf, &websocket_path); 1650 1651 listeners = nxt_conf_get_path(conf, &listeners_path); 1652 1653 if (listeners != NULL) { 1654 next = 0; 1655 1656 for ( ;; ) { 1657 listener = nxt_conf_next_object_member(listeners, &name, &next); 1658 if (listener == NULL) { 1659 break; 1660 } 1661 1662 skcf = nxt_router_socket_conf(task, tmcf, &name); 1663 if (skcf == NULL) { 1664 goto fail; 1665 } 1666 1667 nxt_memzero(&lscf, sizeof(lscf)); 1668 1669 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1670 nxt_nitems(nxt_router_listener_conf), 1671 &lscf); 1672 if (ret != NXT_OK) { 1673 nxt_alert(task, "listener map error"); 1674 goto fail; 1675 } 1676 1677 nxt_debug(task, "application: %V", &lscf.application); 1678 1679 // STUB, default values if http block is not defined. 1680 skcf->header_buffer_size = 2048; 1681 skcf->large_header_buffer_size = 8192; 1682 skcf->large_header_buffers = 4; 1683 skcf->discard_unsafe_fields = 1; 1684 skcf->body_buffer_size = 16 * 1024; 1685 skcf->max_body_size = 8 * 1024 * 1024; 1686 skcf->proxy_header_buffer_size = 64 * 1024; 1687 skcf->proxy_buffer_size = 4096; 1688 skcf->proxy_buffers = 256; 1689 skcf->idle_timeout = 180 * 1000; 1690 skcf->header_read_timeout = 30 * 1000; 1691 skcf->body_read_timeout = 30 * 1000; 1692 skcf->send_timeout = 30 * 1000; 1693 skcf->proxy_timeout = 60 * 1000; 1694 skcf->proxy_send_timeout = 30 * 1000; 1695 skcf->proxy_read_timeout = 30 * 1000; 1696 1697 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1698 skcf->websocket_conf.read_timeout = 60 * 1000; 1699 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1700 1701 nxt_str_null(&skcf->body_temp_path); 1702 1703 if (http != NULL) { 1704 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1705 nxt_nitems(nxt_router_http_conf), 1706 skcf); 1707 if (ret != NXT_OK) { 1708 nxt_alert(task, "http map error"); 1709 goto fail; 1710 } 1711 } 1712 1713 if (websocket != NULL) { 1714 ret = nxt_conf_map_object(mp, websocket, 1715 nxt_router_websocket_conf, 1716 nxt_nitems(nxt_router_websocket_conf), 1717 &skcf->websocket_conf); 1718 if (ret != NXT_OK) { 1719 nxt_alert(task, "websocket map error"); 1720 goto fail; 1721 } 1722 } 1723 1724 t = &skcf->body_temp_path; 1725 1726 if (t->length == 0) { 1727 t->start = (u_char *) task->thread->runtime->tmp; 1728 t->length = nxt_strlen(t->start); 1729 } 1730 1731 #if (NXT_TLS) 1732 value = nxt_conf_get_path(listener, &certificate_path); 1733 1734 if (value != NULL) { 1735 nxt_conf_get_string(value, &name); 1736 1737 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t)); 1738 if (nxt_slow_path(tls == NULL)) { 1739 goto fail; 1740 } 1741 1742 tls->name = name; 1743 tls->conf = skcf; 1744 1745 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 1746 } 1747 #endif 1748 1749 skcf->listen->handler = nxt_http_conn_init; 1750 skcf->router_conf = tmcf->router_conf; 1751 skcf->router_conf->count++; 1752 1753 if (lscf.pass.length != 0) { 1754 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1755 1756 /* COMPATIBILITY: listener application. */ 1757 } else if (lscf.application.length > 0) { 1758 skcf->action = nxt_http_pass_application(task, 1759 tmcf->router_conf, 1760 &lscf.application); 1761 } 1762 1763 if (nxt_slow_path(skcf->action == NULL)) { 1764 goto fail; 1765 } 1766 } 1767 } 1768 1769 ret = nxt_http_routes_resolve(task, tmcf); 1770 if (nxt_slow_path(ret != NXT_OK)) { 1771 goto fail; 1772 } 1773 1774 value = nxt_conf_get_path(conf, &access_log_path); 1775 1776 if (value != NULL) { 1777 nxt_conf_get_string(value, &path); 1778 1779 access_log = router->access_log; 1780 1781 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1782 nxt_thread_spin_lock(&router->lock); 1783 access_log->count++; 1784 nxt_thread_spin_unlock(&router->lock); 1785 1786 } else { 1787 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1788 + path.length); 1789 if (access_log == NULL) { 1790 nxt_alert(task, "failed to allocate access log structure"); 1791 goto fail; 1792 } 1793 1794 access_log->fd = -1; 1795 access_log->handler = &nxt_router_access_log_writer; 1796 access_log->count = 1; 1797 1798 access_log->path.length = path.length; 1799 access_log->path.start = (u_char *) access_log 1800 + sizeof(nxt_router_access_log_t); 1801 1802 nxt_memcpy(access_log->path.start, path.start, path.length); 1803 } 1804 1805 tmcf->router_conf->access_log = access_log; 1806 } 1807 1808 nxt_queue_add(&deleting_sockets, &router->sockets); 1809 nxt_queue_init(&router->sockets); 1810 1811 return NXT_OK; 1812 1813 app_fail: 1814 1815 nxt_mp_destroy(app_mp); 1816 1817 fail: 1818 1819 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1820 1821 nxt_queue_remove(&app->link); 1822 nxt_thread_mutex_destroy(&app->mutex); 1823 nxt_mp_destroy(app->mem_pool); 1824 1825 } nxt_queue_loop; 1826 1827 return NXT_ERROR; 1828 } 1829 1830 1831 static nxt_int_t 1832 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 1833 nxt_conf_value_t *conf) 1834 { 1835 uint32_t next, i; 1836 nxt_mp_t *mp; 1837 nxt_str_t *type, extension, str; 1838 nxt_int_t ret; 1839 nxt_uint_t exts; 1840 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 1841 1842 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 1843 1844 mp = rtcf->mem_pool; 1845 1846 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 1847 if (nxt_slow_path(ret != NXT_OK)) { 1848 return NXT_ERROR; 1849 } 1850 1851 if (conf == NULL) { 1852 return NXT_OK; 1853 } 1854 1855 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 1856 1857 if (mtypes_conf != NULL) { 1858 next = 0; 1859 1860 for ( ;; ) { 1861 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 1862 1863 if (ext_conf == NULL) { 1864 break; 1865 } 1866 1867 type = nxt_str_dup(mp, NULL, &str); 1868 if (nxt_slow_path(type == NULL)) { 1869 return NXT_ERROR; 1870 } 1871 1872 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 1873 nxt_conf_get_string(ext_conf, &str); 1874 1875 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { 1876 return NXT_ERROR; 1877 } 1878 1879 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1880 &extension, type); 1881 if (nxt_slow_path(ret != NXT_OK)) { 1882 return NXT_ERROR; 1883 } 1884 1885 continue; 1886 } 1887 1888 exts = nxt_conf_array_elements_count(ext_conf); 1889 1890 for (i = 0; i < exts; i++) { 1891 value = nxt_conf_get_array_element(ext_conf, i); 1892 1893 nxt_conf_get_string(value, &str); 1894 1895 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { 1896 return NXT_ERROR; 1897 } 1898 1899 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 1900 &extension, type); 1901 if (nxt_slow_path(ret != NXT_OK)) { 1902 return NXT_ERROR; 1903 } 1904 } 1905 } 1906 } 1907 1908 return NXT_OK; 1909 } 1910 1911 1912 static nxt_app_t * 1913 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 1914 { 1915 nxt_app_t *app; 1916 1917 nxt_queue_each(app, queue, nxt_app_t, link) { 1918 1919 if (nxt_strstr_eq(name, &app->name)) { 1920 return app; 1921 } 1922 1923 } nxt_queue_loop; 1924 1925 return NULL; 1926 } 1927 1928 1929 static nxt_int_t 1930 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 1931 { 1932 void *mem; 1933 nxt_int_t fd; 1934 1935 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 1936 if (nxt_slow_path(fd == -1)) { 1937 return NXT_ERROR; 1938 } 1939 1940 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 1941 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 1942 if (nxt_slow_path(mem == MAP_FAILED)) { 1943 nxt_fd_close(fd); 1944 1945 return NXT_ERROR; 1946 } 1947 1948 nxt_app_queue_init(mem); 1949 1950 port->queue_fd = fd; 1951 port->queue = mem; 1952 1953 return NXT_OK; 1954 } 1955 1956 1957 static nxt_int_t 1958 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 1959 { 1960 void *mem; 1961 nxt_int_t fd; 1962 1963 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 1964 if (nxt_slow_path(fd == -1)) { 1965 return NXT_ERROR; 1966 } 1967 1968 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 1969 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 1970 if (nxt_slow_path(mem == MAP_FAILED)) { 1971 nxt_fd_close(fd); 1972 1973 return NXT_ERROR; 1974 } 1975 1976 nxt_port_queue_init(mem); 1977 1978 port->queue_fd = fd; 1979 port->queue = mem; 1980 1981 return NXT_OK; 1982 } 1983 1984 1985 static nxt_int_t 1986 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 1987 { 1988 void *mem; 1989 1990 nxt_assert(fd != -1); 1991 1992 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 1993 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 1994 if (nxt_slow_path(mem == MAP_FAILED)) { 1995 1996 return NXT_ERROR; 1997 } 1998 1999 port->queue = mem; 2000 2001 return NXT_OK; 2002 } 2003 2004 2005 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2006 NXT_LVLHSH_DEFAULT, 2007 nxt_router_apps_hash_test, 2008 nxt_mp_lvlhsh_alloc, 2009 nxt_mp_lvlhsh_free, 2010 }; 2011 2012 2013 static nxt_int_t 2014 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2015 { 2016 nxt_app_t *app; 2017 2018 app = data; 2019 2020 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2021 } 2022 2023 2024 static nxt_int_t 2025 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2026 { 2027 nxt_lvlhsh_query_t lhq; 2028 2029 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2030 lhq.replace = 0; 2031 lhq.key = app->name; 2032 lhq.value = app; 2033 lhq.proto = &nxt_router_apps_hash_proto; 2034 lhq.pool = rtcf->mem_pool; 2035 2036 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2037 2038 case NXT_OK: 2039 return NXT_OK; 2040 2041 case NXT_DECLINED: 2042 nxt_thread_log_alert("router app hash adding failed: " 2043 "\"%V\" is already in hash", &lhq.key); 2044 /* Fall through. */ 2045 default: 2046 return NXT_ERROR; 2047 } 2048 } 2049 2050 2051 static nxt_app_t * 2052 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2053 { 2054 nxt_lvlhsh_query_t lhq; 2055 2056 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2057 lhq.key = *name; 2058 lhq.proto = &nxt_router_apps_hash_proto; 2059 2060 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2061 return NULL; 2062 } 2063 2064 return lhq.value; 2065 } 2066 2067 2068 static void 2069 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2070 { 2071 nxt_app_t *app; 2072 nxt_lvlhsh_each_t lhe; 2073 2074 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2075 2076 for ( ;; ) { 2077 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2078 2079 if (app == NULL) { 2080 break; 2081 } 2082 2083 nxt_router_app_use(task, app, i); 2084 } 2085 } 2086 2087 2088 2089 nxt_int_t 2090 nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name, 2091 nxt_http_action_t *action) 2092 { 2093 nxt_app_t *app; 2094 2095 app = nxt_router_apps_hash_get(rtcf, name); 2096 2097 if (app == NULL) { 2098 return NXT_DECLINED; 2099 } 2100 2101 action->u.application = app; 2102 action->handler = nxt_http_application_handler; 2103 2104 return NXT_OK; 2105 } 2106 2107 2108 static nxt_socket_conf_t * 2109 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2110 nxt_str_t *name) 2111 { 2112 size_t size; 2113 nxt_int_t ret; 2114 nxt_bool_t wildcard; 2115 nxt_sockaddr_t *sa; 2116 nxt_socket_conf_t *skcf; 2117 nxt_listen_socket_t *ls; 2118 2119 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2120 if (nxt_slow_path(sa == NULL)) { 2121 nxt_alert(task, "invalid listener \"%V\"", name); 2122 return NULL; 2123 } 2124 2125 sa->type = SOCK_STREAM; 2126 2127 nxt_debug(task, "router listener: \"%*s\"", 2128 (size_t) sa->length, nxt_sockaddr_start(sa)); 2129 2130 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2131 if (nxt_slow_path(skcf == NULL)) { 2132 return NULL; 2133 } 2134 2135 size = nxt_sockaddr_size(sa); 2136 2137 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2138 2139 if (ret != NXT_OK) { 2140 2141 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2142 if (nxt_slow_path(ls == NULL)) { 2143 return NULL; 2144 } 2145 2146 skcf->listen = ls; 2147 2148 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2149 nxt_memcpy(ls->sockaddr, sa, size); 2150 2151 nxt_listen_socket_remote_size(ls); 2152 2153 ls->socket = -1; 2154 ls->backlog = NXT_LISTEN_BACKLOG; 2155 ls->flags = NXT_NONBLOCK; 2156 ls->read_after_accept = 1; 2157 } 2158 2159 switch (sa->u.sockaddr.sa_family) { 2160 #if (NXT_HAVE_UNIX_DOMAIN) 2161 case AF_UNIX: 2162 wildcard = 0; 2163 break; 2164 #endif 2165 #if (NXT_INET6) 2166 case AF_INET6: 2167 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2168 break; 2169 #endif 2170 case AF_INET: 2171 default: 2172 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2173 break; 2174 } 2175 2176 if (!wildcard) { 2177 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2178 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2179 return NULL; 2180 } 2181 2182 nxt_memcpy(skcf->sockaddr, sa, size); 2183 } 2184 2185 return skcf; 2186 } 2187 2188 2189 static nxt_int_t 2190 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2191 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2192 { 2193 nxt_router_t *router; 2194 nxt_queue_link_t *qlk; 2195 nxt_socket_conf_t *skcf; 2196 2197 router = tmcf->router_conf->router; 2198 2199 for (qlk = nxt_queue_first(&router->sockets); 2200 qlk != nxt_queue_tail(&router->sockets); 2201 qlk = nxt_queue_next(qlk)) 2202 { 2203 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2204 2205 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2206 nskcf->listen = skcf->listen; 2207 2208 nxt_queue_remove(qlk); 2209 nxt_queue_insert_tail(&keeping_sockets, qlk); 2210 2211 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2212 2213 return NXT_OK; 2214 } 2215 } 2216 2217 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2218 2219 return NXT_DECLINED; 2220 } 2221 2222 2223 static void 2224 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2225 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2226 { 2227 size_t size; 2228 uint32_t stream; 2229 nxt_int_t ret; 2230 nxt_buf_t *b; 2231 nxt_port_t *main_port, *router_port; 2232 nxt_runtime_t *rt; 2233 nxt_socket_rpc_t *rpc; 2234 2235 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2236 if (rpc == NULL) { 2237 goto fail; 2238 } 2239 2240 rpc->socket_conf = skcf; 2241 rpc->temp_conf = tmcf; 2242 2243 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2244 2245 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2246 if (b == NULL) { 2247 goto fail; 2248 } 2249 2250 b->completion_handler = nxt_router_dummy_buf_completion; 2251 2252 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2253 2254 rt = task->thread->runtime; 2255 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2256 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2257 2258 stream = nxt_port_rpc_register_handler(task, router_port, 2259 nxt_router_listen_socket_ready, 2260 nxt_router_listen_socket_error, 2261 main_port->pid, rpc); 2262 if (nxt_slow_path(stream == 0)) { 2263 goto fail; 2264 } 2265 2266 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2267 stream, router_port->id, b); 2268 2269 if (nxt_slow_path(ret != NXT_OK)) { 2270 nxt_port_rpc_cancel(task, router_port, stream); 2271 goto fail; 2272 } 2273 2274 return; 2275 2276 fail: 2277 2278 nxt_router_conf_error(task, tmcf); 2279 } 2280 2281 2282 static void 2283 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2284 void *data) 2285 { 2286 nxt_int_t ret; 2287 nxt_socket_t s; 2288 nxt_socket_rpc_t *rpc; 2289 2290 rpc = data; 2291 2292 s = msg->fd[0]; 2293 2294 ret = nxt_socket_nonblocking(task, s); 2295 if (nxt_slow_path(ret != NXT_OK)) { 2296 goto fail; 2297 } 2298 2299 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2300 2301 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2302 if (nxt_slow_path(ret != NXT_OK)) { 2303 goto fail; 2304 } 2305 2306 rpc->socket_conf->listen->socket = s; 2307 2308 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2309 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2310 2311 return; 2312 2313 fail: 2314 2315 nxt_socket_close(task, s); 2316 2317 nxt_router_conf_error(task, rpc->temp_conf); 2318 } 2319 2320 2321 static void 2322 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2323 void *data) 2324 { 2325 nxt_socket_rpc_t *rpc; 2326 nxt_router_temp_conf_t *tmcf; 2327 2328 rpc = data; 2329 tmcf = rpc->temp_conf; 2330 2331 #if 0 2332 u_char *p; 2333 size_t size; 2334 uint8_t error; 2335 nxt_buf_t *in, *out; 2336 nxt_sockaddr_t *sa; 2337 2338 static nxt_str_t socket_errors[] = { 2339 nxt_string("ListenerSystem"), 2340 nxt_string("ListenerNoIPv6"), 2341 nxt_string("ListenerPort"), 2342 nxt_string("ListenerInUse"), 2343 nxt_string("ListenerNoAddress"), 2344 nxt_string("ListenerNoAccess"), 2345 nxt_string("ListenerPath"), 2346 }; 2347 2348 sa = rpc->socket_conf->listen->sockaddr; 2349 2350 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2351 2352 if (nxt_slow_path(in == NULL)) { 2353 return; 2354 } 2355 2356 p = in->mem.pos; 2357 2358 error = *p++; 2359 2360 size = nxt_length("listen socket error: ") 2361 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2362 + sa->length + socket_errors[error].length + (in->mem.free - p); 2363 2364 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2365 if (nxt_slow_path(out == NULL)) { 2366 return; 2367 } 2368 2369 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2370 "listen socket error: " 2371 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2372 (size_t) sa->length, nxt_sockaddr_start(sa), 2373 &socket_errors[error], in->mem.free - p, p); 2374 2375 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2376 #endif 2377 2378 nxt_router_conf_error(task, tmcf); 2379 } 2380 2381 2382 #if (NXT_TLS) 2383 2384 static void 2385 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2386 nxt_router_tlssock_t *tls) 2387 { 2388 nxt_socket_rpc_t *rpc; 2389 2390 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2391 if (rpc == NULL) { 2392 nxt_router_conf_error(task, tmcf); 2393 return; 2394 } 2395 2396 rpc->socket_conf = tls->conf; 2397 rpc->temp_conf = tmcf; 2398 2399 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 2400 nxt_router_tls_rpc_handler, rpc); 2401 } 2402 2403 2404 static void 2405 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2406 void *data) 2407 { 2408 nxt_mp_t *mp; 2409 nxt_int_t ret; 2410 nxt_tls_conf_t *tlscf; 2411 nxt_socket_rpc_t *rpc; 2412 nxt_router_temp_conf_t *tmcf; 2413 2414 nxt_debug(task, "tls rpc handler"); 2415 2416 rpc = data; 2417 tmcf = rpc->temp_conf; 2418 2419 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2420 goto fail; 2421 } 2422 2423 mp = tmcf->router_conf->mem_pool; 2424 2425 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2426 if (nxt_slow_path(tlscf == NULL)) { 2427 goto fail; 2428 } 2429 2430 tlscf->chain_file = msg->fd[0]; 2431 2432 ret = task->thread->runtime->tls->server_init(task, tlscf); 2433 if (nxt_slow_path(ret != NXT_OK)) { 2434 goto fail; 2435 } 2436 2437 rpc->socket_conf->tls = tlscf; 2438 2439 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2440 nxt_router_conf_apply, task, tmcf, NULL); 2441 return; 2442 2443 fail: 2444 2445 nxt_router_conf_error(task, tmcf); 2446 } 2447 2448 #endif 2449 2450 2451 static void 2452 nxt_router_app_rpc_create(nxt_task_t *task, 2453 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2454 { 2455 size_t size; 2456 uint32_t stream; 2457 nxt_int_t ret; 2458 nxt_buf_t *b; 2459 nxt_port_t *main_port, *router_port; 2460 nxt_runtime_t *rt; 2461 nxt_app_rpc_t *rpc; 2462 2463 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); 2464 if (rpc == NULL) { 2465 goto fail; 2466 } 2467 2468 rpc->app = app; 2469 rpc->temp_conf = tmcf; 2470 2471 nxt_debug(task, "app '%V' prefork", &app->name); 2472 2473 size = app->name.length + 1 + app->conf.length; 2474 2475 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2476 if (nxt_slow_path(b == NULL)) { 2477 goto fail; 2478 } 2479 2480 b->completion_handler = nxt_router_dummy_buf_completion; 2481 2482 nxt_buf_cpystr(b, &app->name); 2483 *b->mem.free++ = '\0'; 2484 nxt_buf_cpystr(b, &app->conf); 2485 2486 rt = task->thread->runtime; 2487 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2488 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2489 2490 stream = nxt_port_rpc_register_handler(task, router_port, 2491 nxt_router_app_prefork_ready, 2492 nxt_router_app_prefork_error, 2493 -1, rpc); 2494 if (nxt_slow_path(stream == 0)) { 2495 goto fail; 2496 } 2497 2498 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, 2499 -1, stream, router_port->id, b); 2500 2501 if (nxt_slow_path(ret != NXT_OK)) { 2502 nxt_port_rpc_cancel(task, router_port, stream); 2503 goto fail; 2504 } 2505 2506 app->pending_processes++; 2507 2508 return; 2509 2510 fail: 2511 2512 nxt_router_conf_error(task, tmcf); 2513 } 2514 2515 2516 static void 2517 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2518 void *data) 2519 { 2520 nxt_app_t *app; 2521 nxt_port_t *port; 2522 nxt_app_rpc_t *rpc; 2523 nxt_event_engine_t *engine; 2524 2525 rpc = data; 2526 app = rpc->app; 2527 2528 port = msg->u.new_port; 2529 2530 nxt_assert(port != NULL); 2531 nxt_assert(port->type == NXT_PROCESS_APP); 2532 nxt_assert(port->id == 0); 2533 2534 port->app = app; 2535 port->main_app_port = port; 2536 2537 app->pending_processes--; 2538 app->processes++; 2539 app->idle_processes++; 2540 2541 engine = task->thread->engine; 2542 2543 nxt_queue_insert_tail(&app->ports, &port->app_link); 2544 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2545 2546 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2547 &app->name, port->pid, port->id); 2548 2549 nxt_port_hash_add(&app->port_hash, port); 2550 app->port_hash_count++; 2551 2552 port->idle_start = 0; 2553 2554 nxt_port_inc_use(port); 2555 2556 nxt_router_app_shared_port_send(task, port); 2557 2558 nxt_work_queue_add(&engine->fast_work_queue, 2559 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2560 } 2561 2562 2563 static void 2564 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2565 void *data) 2566 { 2567 nxt_app_t *app; 2568 nxt_app_rpc_t *rpc; 2569 nxt_router_temp_conf_t *tmcf; 2570 2571 rpc = data; 2572 app = rpc->app; 2573 tmcf = rpc->temp_conf; 2574 2575 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2576 &app->name); 2577 2578 app->pending_processes--; 2579 2580 nxt_router_conf_error(task, tmcf); 2581 } 2582 2583 2584 static nxt_int_t 2585 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2586 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2587 { 2588 nxt_int_t ret; 2589 nxt_uint_t n, threads; 2590 nxt_queue_link_t *qlk; 2591 nxt_router_engine_conf_t *recf; 2592 2593 threads = tmcf->router_conf->threads; 2594 2595 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2596 sizeof(nxt_router_engine_conf_t)); 2597 if (nxt_slow_path(tmcf->engines == NULL)) { 2598 return NXT_ERROR; 2599 } 2600 2601 n = 0; 2602 2603 for (qlk = nxt_queue_first(&router->engines); 2604 qlk != nxt_queue_tail(&router->engines); 2605 qlk = nxt_queue_next(qlk)) 2606 { 2607 recf = nxt_array_zero_add(tmcf->engines); 2608 if (nxt_slow_path(recf == NULL)) { 2609 return NXT_ERROR; 2610 } 2611 2612 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2613 2614 if (n < threads) { 2615 recf->action = NXT_ROUTER_ENGINE_KEEP; 2616 ret = nxt_router_engine_conf_update(tmcf, recf); 2617 2618 } else { 2619 recf->action = NXT_ROUTER_ENGINE_DELETE; 2620 ret = nxt_router_engine_conf_delete(tmcf, recf); 2621 } 2622 2623 if (nxt_slow_path(ret != NXT_OK)) { 2624 return ret; 2625 } 2626 2627 n++; 2628 } 2629 2630 tmcf->new_threads = n; 2631 2632 while (n < threads) { 2633 recf = nxt_array_zero_add(tmcf->engines); 2634 if (nxt_slow_path(recf == NULL)) { 2635 return NXT_ERROR; 2636 } 2637 2638 recf->action = NXT_ROUTER_ENGINE_ADD; 2639 2640 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2641 if (nxt_slow_path(recf->engine == NULL)) { 2642 return NXT_ERROR; 2643 } 2644 2645 ret = nxt_router_engine_conf_create(tmcf, recf); 2646 if (nxt_slow_path(ret != NXT_OK)) { 2647 return ret; 2648 } 2649 2650 n++; 2651 } 2652 2653 return NXT_OK; 2654 } 2655 2656 2657 static nxt_int_t 2658 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 2659 nxt_router_engine_conf_t *recf) 2660 { 2661 nxt_int_t ret; 2662 2663 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2664 nxt_router_listen_socket_create); 2665 if (nxt_slow_path(ret != NXT_OK)) { 2666 return ret; 2667 } 2668 2669 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2670 nxt_router_listen_socket_create); 2671 if (nxt_slow_path(ret != NXT_OK)) { 2672 return ret; 2673 } 2674 2675 return ret; 2676 } 2677 2678 2679 static nxt_int_t 2680 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 2681 nxt_router_engine_conf_t *recf) 2682 { 2683 nxt_int_t ret; 2684 2685 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 2686 nxt_router_listen_socket_create); 2687 if (nxt_slow_path(ret != NXT_OK)) { 2688 return ret; 2689 } 2690 2691 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 2692 nxt_router_listen_socket_update); 2693 if (nxt_slow_path(ret != NXT_OK)) { 2694 return ret; 2695 } 2696 2697 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 2698 if (nxt_slow_path(ret != NXT_OK)) { 2699 return ret; 2700 } 2701 2702 return ret; 2703 } 2704 2705 2706 static nxt_int_t 2707 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 2708 nxt_router_engine_conf_t *recf) 2709 { 2710 nxt_int_t ret; 2711 2712 ret = nxt_router_engine_quit(tmcf, recf); 2713 if (nxt_slow_path(ret != NXT_OK)) { 2714 return ret; 2715 } 2716 2717 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 2718 if (nxt_slow_path(ret != NXT_OK)) { 2719 return ret; 2720 } 2721 2722 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 2723 } 2724 2725 2726 static nxt_int_t 2727 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 2728 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 2729 nxt_work_handler_t handler) 2730 { 2731 nxt_int_t ret; 2732 nxt_joint_job_t *job; 2733 nxt_queue_link_t *qlk; 2734 nxt_socket_conf_t *skcf; 2735 nxt_socket_conf_joint_t *joint; 2736 2737 for (qlk = nxt_queue_first(sockets); 2738 qlk != nxt_queue_tail(sockets); 2739 qlk = nxt_queue_next(qlk)) 2740 { 2741 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2742 if (nxt_slow_path(job == NULL)) { 2743 return NXT_ERROR; 2744 } 2745 2746 job->work.next = recf->jobs; 2747 recf->jobs = &job->work; 2748 2749 job->task = tmcf->engine->task; 2750 job->work.handler = handler; 2751 job->work.task = &job->task; 2752 job->work.obj = job; 2753 job->tmcf = tmcf; 2754 2755 tmcf->count++; 2756 2757 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 2758 sizeof(nxt_socket_conf_joint_t)); 2759 if (nxt_slow_path(joint == NULL)) { 2760 return NXT_ERROR; 2761 } 2762 2763 job->work.data = joint; 2764 2765 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 2766 if (nxt_slow_path(ret != NXT_OK)) { 2767 return ret; 2768 } 2769 2770 joint->count = 1; 2771 2772 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2773 skcf->count++; 2774 joint->socket_conf = skcf; 2775 2776 joint->engine = recf->engine; 2777 } 2778 2779 return NXT_OK; 2780 } 2781 2782 2783 static nxt_int_t 2784 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 2785 nxt_router_engine_conf_t *recf) 2786 { 2787 nxt_joint_job_t *job; 2788 2789 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2790 if (nxt_slow_path(job == NULL)) { 2791 return NXT_ERROR; 2792 } 2793 2794 job->work.next = recf->jobs; 2795 recf->jobs = &job->work; 2796 2797 job->task = tmcf->engine->task; 2798 job->work.handler = nxt_router_worker_thread_quit; 2799 job->work.task = &job->task; 2800 job->work.obj = NULL; 2801 job->work.data = NULL; 2802 job->tmcf = NULL; 2803 2804 return NXT_OK; 2805 } 2806 2807 2808 static nxt_int_t 2809 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 2810 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 2811 { 2812 nxt_joint_job_t *job; 2813 nxt_queue_link_t *qlk; 2814 2815 for (qlk = nxt_queue_first(sockets); 2816 qlk != nxt_queue_tail(sockets); 2817 qlk = nxt_queue_next(qlk)) 2818 { 2819 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 2820 if (nxt_slow_path(job == NULL)) { 2821 return NXT_ERROR; 2822 } 2823 2824 job->work.next = recf->jobs; 2825 recf->jobs = &job->work; 2826 2827 job->task = tmcf->engine->task; 2828 job->work.handler = nxt_router_listen_socket_delete; 2829 job->work.task = &job->task; 2830 job->work.obj = job; 2831 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2832 job->tmcf = tmcf; 2833 2834 tmcf->count++; 2835 } 2836 2837 return NXT_OK; 2838 } 2839 2840 2841 static nxt_int_t 2842 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 2843 nxt_router_temp_conf_t *tmcf) 2844 { 2845 nxt_int_t ret; 2846 nxt_uint_t i, threads; 2847 nxt_router_engine_conf_t *recf; 2848 2849 recf = tmcf->engines->elts; 2850 threads = tmcf->router_conf->threads; 2851 2852 for (i = tmcf->new_threads; i < threads; i++) { 2853 ret = nxt_router_thread_create(task, rt, recf[i].engine); 2854 if (nxt_slow_path(ret != NXT_OK)) { 2855 return ret; 2856 } 2857 } 2858 2859 return NXT_OK; 2860 } 2861 2862 2863 static nxt_int_t 2864 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 2865 nxt_event_engine_t *engine) 2866 { 2867 nxt_int_t ret; 2868 nxt_thread_link_t *link; 2869 nxt_thread_handle_t handle; 2870 2871 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2872 2873 if (nxt_slow_path(link == NULL)) { 2874 return NXT_ERROR; 2875 } 2876 2877 link->start = nxt_router_thread_start; 2878 link->engine = engine; 2879 link->work.handler = nxt_router_thread_exit_handler; 2880 link->work.task = task; 2881 link->work.data = link; 2882 2883 nxt_queue_insert_tail(&rt->engines, &engine->link); 2884 2885 ret = nxt_thread_create(&handle, link); 2886 2887 if (nxt_slow_path(ret != NXT_OK)) { 2888 nxt_queue_remove(&engine->link); 2889 } 2890 2891 return ret; 2892 } 2893 2894 2895 static void 2896 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 2897 nxt_router_temp_conf_t *tmcf) 2898 { 2899 nxt_app_t *app; 2900 2901 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 2902 2903 nxt_router_app_unlink(task, app); 2904 2905 } nxt_queue_loop; 2906 2907 nxt_queue_add(&router->apps, &tmcf->previous); 2908 nxt_queue_add(&router->apps, &tmcf->apps); 2909 } 2910 2911 2912 static void 2913 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 2914 { 2915 nxt_uint_t n; 2916 nxt_event_engine_t *engine; 2917 nxt_router_engine_conf_t *recf; 2918 2919 recf = tmcf->engines->elts; 2920 2921 for (n = tmcf->engines->nelts; n != 0; n--) { 2922 engine = recf->engine; 2923 2924 switch (recf->action) { 2925 2926 case NXT_ROUTER_ENGINE_KEEP: 2927 break; 2928 2929 case NXT_ROUTER_ENGINE_ADD: 2930 nxt_queue_insert_tail(&router->engines, &engine->link0); 2931 break; 2932 2933 case NXT_ROUTER_ENGINE_DELETE: 2934 nxt_queue_remove(&engine->link0); 2935 break; 2936 } 2937 2938 nxt_router_engine_post(engine, recf->jobs); 2939 2940 recf++; 2941 } 2942 } 2943 2944 2945 static void 2946 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 2947 { 2948 nxt_work_t *work, *next; 2949 2950 for (work = jobs; work != NULL; work = next) { 2951 next = work->next; 2952 work->next = NULL; 2953 2954 nxt_event_engine_post(engine, work); 2955 } 2956 } 2957 2958 2959 static nxt_port_handlers_t nxt_router_app_port_handlers = { 2960 .rpc_error = nxt_port_rpc_handler, 2961 .mmap = nxt_port_mmap_handler, 2962 .data = nxt_port_rpc_handler, 2963 .oosm = nxt_router_oosm_handler, 2964 .req_headers_ack = nxt_port_rpc_handler, 2965 }; 2966 2967 2968 static void 2969 nxt_router_thread_start(void *data) 2970 { 2971 nxt_int_t ret; 2972 nxt_port_t *port; 2973 nxt_task_t *task; 2974 nxt_work_t *work; 2975 nxt_thread_t *thread; 2976 nxt_thread_link_t *link; 2977 nxt_event_engine_t *engine; 2978 2979 link = data; 2980 engine = link->engine; 2981 task = &engine->task; 2982 2983 thread = nxt_thread(); 2984 2985 nxt_event_engine_thread_adopt(engine); 2986 2987 /* STUB */ 2988 thread->runtime = engine->task.thread->runtime; 2989 2990 engine->task.thread = thread; 2991 engine->task.log = thread->log; 2992 thread->engine = engine; 2993 thread->task = &engine->task; 2994 #if 0 2995 thread->fiber = &engine->fibers->fiber; 2996 #endif 2997 2998 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 2999 if (nxt_slow_path(engine->mem_pool == NULL)) { 3000 return; 3001 } 3002 3003 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 3004 NXT_PROCESS_ROUTER); 3005 if (nxt_slow_path(port == NULL)) { 3006 return; 3007 } 3008 3009 ret = nxt_port_socket_init(task, port, 0); 3010 if (nxt_slow_path(ret != NXT_OK)) { 3011 nxt_port_use(task, port, -1); 3012 return; 3013 } 3014 3015 ret = nxt_router_port_queue_init(task, port); 3016 if (nxt_slow_path(ret != NXT_OK)) { 3017 nxt_port_use(task, port, -1); 3018 return; 3019 } 3020 3021 engine->port = port; 3022 3023 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 3024 3025 work = nxt_zalloc(sizeof(nxt_work_t)); 3026 if (nxt_slow_path(work == NULL)) { 3027 return; 3028 } 3029 3030 work->handler = nxt_router_rt_add_port; 3031 work->task = link->work.task; 3032 work->obj = work; 3033 work->data = port; 3034 3035 nxt_event_engine_post(link->work.task->thread->engine, work); 3036 3037 nxt_event_engine_start(engine); 3038 } 3039 3040 3041 static void 3042 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) 3043 { 3044 nxt_int_t res; 3045 nxt_port_t *port; 3046 nxt_runtime_t *rt; 3047 3048 rt = task->thread->runtime; 3049 port = data; 3050 3051 nxt_free(obj); 3052 3053 res = nxt_port_hash_add(&rt->ports, port); 3054 3055 if (nxt_fast_path(res == NXT_OK)) { 3056 nxt_port_use(task, port, 1); 3057 } 3058 } 3059 3060 3061 static void 3062 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 3063 { 3064 nxt_joint_job_t *job; 3065 nxt_socket_conf_t *skcf; 3066 nxt_listen_event_t *lev; 3067 nxt_listen_socket_t *ls; 3068 nxt_thread_spinlock_t *lock; 3069 nxt_socket_conf_joint_t *joint; 3070 3071 job = obj; 3072 joint = data; 3073 3074 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 3075 3076 skcf = joint->socket_conf; 3077 ls = skcf->listen; 3078 3079 lev = nxt_listen_event(task, ls); 3080 if (nxt_slow_path(lev == NULL)) { 3081 nxt_router_listen_socket_release(task, skcf); 3082 return; 3083 } 3084 3085 lev->socket.data = joint; 3086 3087 lock = &skcf->router_conf->router->lock; 3088 3089 nxt_thread_spin_lock(lock); 3090 ls->count++; 3091 nxt_thread_spin_unlock(lock); 3092 3093 job->work.next = NULL; 3094 job->work.handler = nxt_router_conf_wait; 3095 3096 nxt_event_engine_post(job->tmcf->engine, &job->work); 3097 } 3098 3099 3100 nxt_inline nxt_listen_event_t * 3101 nxt_router_listen_event(nxt_queue_t *listen_connections, 3102 nxt_socket_conf_t *skcf) 3103 { 3104 nxt_socket_t fd; 3105 nxt_queue_link_t *qlk; 3106 nxt_listen_event_t *lev; 3107 3108 fd = skcf->listen->socket; 3109 3110 for (qlk = nxt_queue_first(listen_connections); 3111 qlk != nxt_queue_tail(listen_connections); 3112 qlk = nxt_queue_next(qlk)) 3113 { 3114 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 3115 3116 if (fd == lev->socket.fd) { 3117 return lev; 3118 } 3119 } 3120 3121 return NULL; 3122 } 3123 3124 3125 static void 3126 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 3127 { 3128 nxt_joint_job_t *job; 3129 nxt_event_engine_t *engine; 3130 nxt_listen_event_t *lev; 3131 nxt_socket_conf_joint_t *joint, *old; 3132 3133 job = obj; 3134 joint = data; 3135 3136 engine = task->thread->engine; 3137 3138 nxt_queue_insert_tail(&engine->joints, &joint->link); 3139 3140 lev = nxt_router_listen_event(&engine->listen_connections, 3141 joint->socket_conf); 3142 3143 old = lev->socket.data; 3144 lev->socket.data = joint; 3145 lev->listen = joint->socket_conf->listen; 3146 3147 job->work.next = NULL; 3148 job->work.handler = nxt_router_conf_wait; 3149 3150 nxt_event_engine_post(job->tmcf->engine, &job->work); 3151 3152 /* 3153 * The task is allocated from configuration temporary 3154 * memory pool so it can be freed after engine post operation. 3155 */ 3156 3157 nxt_router_conf_release(&engine->task, old); 3158 } 3159 3160 3161 static void 3162 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 3163 { 3164 nxt_joint_job_t *job; 3165 nxt_socket_conf_t *skcf; 3166 nxt_listen_event_t *lev; 3167 nxt_event_engine_t *engine; 3168 3169 job = obj; 3170 skcf = data; 3171 3172 engine = task->thread->engine; 3173 3174 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 3175 3176 nxt_fd_event_delete(engine, &lev->socket); 3177 3178 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 3179 lev->socket.fd); 3180 3181 lev->timer.handler = nxt_router_listen_socket_close; 3182 lev->timer.work_queue = &engine->fast_work_queue; 3183 3184 nxt_timer_add(engine, &lev->timer, 0); 3185 3186 job->work.next = NULL; 3187 job->work.handler = nxt_router_conf_wait; 3188 3189 nxt_event_engine_post(job->tmcf->engine, &job->work); 3190 } 3191 3192 3193 static void 3194 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 3195 { 3196 nxt_event_engine_t *engine; 3197 3198 nxt_debug(task, "router worker thread quit"); 3199 3200 engine = task->thread->engine; 3201 3202 engine->shutdown = 1; 3203 3204 if (nxt_queue_is_empty(&engine->joints)) { 3205 nxt_thread_exit(task->thread); 3206 } 3207 } 3208 3209 3210 static void 3211 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3212 { 3213 nxt_timer_t *timer; 3214 nxt_listen_event_t *lev; 3215 nxt_socket_conf_joint_t *joint; 3216 3217 timer = obj; 3218 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3219 3220 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3221 lev->socket.fd); 3222 3223 nxt_queue_remove(&lev->link); 3224 3225 joint = lev->socket.data; 3226 lev->socket.data = NULL; 3227 3228 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3229 task = &task->thread->engine->task; 3230 3231 nxt_router_listen_socket_release(task, joint->socket_conf); 3232 3233 nxt_router_listen_event_release(task, lev, joint); 3234 } 3235 3236 3237 static void 3238 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3239 { 3240 nxt_listen_socket_t *ls; 3241 nxt_thread_spinlock_t *lock; 3242 3243 ls = skcf->listen; 3244 lock = &skcf->router_conf->router->lock; 3245 3246 nxt_thread_spin_lock(lock); 3247 3248 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3249 task->thread->engine, ls->count); 3250 3251 if (--ls->count != 0) { 3252 ls = NULL; 3253 } 3254 3255 nxt_thread_spin_unlock(lock); 3256 3257 if (ls != NULL) { 3258 nxt_socket_close(task, ls->socket); 3259 nxt_free(ls); 3260 } 3261 } 3262 3263 3264 void 3265 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3266 nxt_socket_conf_joint_t *joint) 3267 { 3268 nxt_event_engine_t *engine; 3269 3270 nxt_debug(task, "listen event count: %D", lev->count); 3271 3272 engine = task->thread->engine; 3273 3274 if (--lev->count == 0) { 3275 if (lev->next != NULL) { 3276 nxt_sockaddr_cache_free(engine, lev->next); 3277 3278 nxt_conn_free(task, lev->next); 3279 } 3280 3281 nxt_free(lev); 3282 } 3283 3284 if (joint != NULL) { 3285 nxt_router_conf_release(task, joint); 3286 } 3287 3288 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3289 nxt_thread_exit(task->thread); 3290 } 3291 } 3292 3293 3294 void 3295 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3296 { 3297 nxt_socket_conf_t *skcf; 3298 nxt_router_conf_t *rtcf; 3299 nxt_thread_spinlock_t *lock; 3300 3301 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3302 3303 if (--joint->count != 0) { 3304 return; 3305 } 3306 3307 nxt_queue_remove(&joint->link); 3308 3309 /* 3310 * The joint content can not be safely used after the critical 3311 * section protected by the spinlock because its memory pool may 3312 * be already destroyed by another thread. 3313 */ 3314 skcf = joint->socket_conf; 3315 rtcf = skcf->router_conf; 3316 lock = &rtcf->router->lock; 3317 3318 nxt_thread_spin_lock(lock); 3319 3320 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3321 rtcf, rtcf->count); 3322 3323 if (--skcf->count != 0) { 3324 skcf = NULL; 3325 rtcf = NULL; 3326 3327 } else { 3328 nxt_queue_remove(&skcf->link); 3329 3330 if (--rtcf->count != 0) { 3331 rtcf = NULL; 3332 } 3333 } 3334 3335 nxt_thread_spin_unlock(lock); 3336 3337 #if (NXT_TLS) 3338 if (skcf != NULL && skcf->tls != NULL) { 3339 task->thread->runtime->tls->server_free(task, skcf->tls); 3340 } 3341 #endif 3342 3343 /* TODO remove engine->port */ 3344 3345 if (rtcf != NULL) { 3346 nxt_debug(task, "old router conf is destroyed"); 3347 3348 nxt_router_apps_hash_use(task, rtcf, -1); 3349 3350 nxt_router_access_log_release(task, lock, rtcf->access_log); 3351 3352 nxt_mp_thread_adopt(rtcf->mem_pool); 3353 3354 nxt_mp_destroy(rtcf->mem_pool); 3355 } 3356 } 3357 3358 3359 static void 3360 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3361 nxt_router_access_log_t *access_log) 3362 { 3363 size_t size; 3364 u_char *buf, *p; 3365 nxt_off_t bytes; 3366 3367 static nxt_time_string_t date_cache = { 3368 (nxt_atomic_uint_t) -1, 3369 nxt_router_access_log_date, 3370 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3371 nxt_length("31/Dec/1986:19:40:00 +0300"), 3372 NXT_THREAD_TIME_LOCAL, 3373 NXT_THREAD_TIME_SEC, 3374 }; 3375 3376 size = r->remote->address_length 3377 + 6 /* ' - - [' */ 3378 + date_cache.size 3379 + 3 /* '] "' */ 3380 + r->method->length 3381 + 1 /* space */ 3382 + r->target.length 3383 + 1 /* space */ 3384 + r->version.length 3385 + 2 /* '" ' */ 3386 + 3 /* status */ 3387 + 1 /* space */ 3388 + NXT_OFF_T_LEN 3389 + 2 /* ' "' */ 3390 + (r->referer != NULL ? r->referer->value_length : 1) 3391 + 3 /* '" "' */ 3392 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3393 + 2 /* '"\n' */ 3394 ; 3395 3396 buf = nxt_mp_nget(r->mem_pool, size); 3397 if (nxt_slow_path(buf == NULL)) { 3398 return; 3399 } 3400 3401 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3402 r->remote->address_length); 3403 3404 p = nxt_cpymem(p, " - - [", 6); 3405 3406 p = nxt_thread_time_string(task->thread, &date_cache, p); 3407 3408 p = nxt_cpymem(p, "] \"", 3); 3409 3410 if (r->method->length != 0) { 3411 p = nxt_cpymem(p, r->method->start, r->method->length); 3412 3413 if (r->target.length != 0) { 3414 *p++ = ' '; 3415 p = nxt_cpymem(p, r->target.start, r->target.length); 3416 3417 if (r->version.length != 0) { 3418 *p++ = ' '; 3419 p = nxt_cpymem(p, r->version.start, r->version.length); 3420 } 3421 } 3422 3423 } else { 3424 *p++ = '-'; 3425 } 3426 3427 p = nxt_cpymem(p, "\" ", 2); 3428 3429 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3430 3431 *p++ = ' '; 3432 3433 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3434 3435 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3436 3437 p = nxt_cpymem(p, " \"", 2); 3438 3439 if (r->referer != NULL) { 3440 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3441 3442 } else { 3443 *p++ = '-'; 3444 } 3445 3446 p = nxt_cpymem(p, "\" \"", 3); 3447 3448 if (r->user_agent != NULL) { 3449 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3450 3451 } else { 3452 *p++ = '-'; 3453 } 3454 3455 p = nxt_cpymem(p, "\"\n", 2); 3456 3457 nxt_fd_write(access_log->fd, buf, p - buf); 3458 } 3459 3460 3461 static u_char * 3462 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3463 size_t size, const char *format) 3464 { 3465 u_char sign; 3466 time_t gmtoff; 3467 3468 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3469 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3470 3471 gmtoff = nxt_timezone(tm) / 60; 3472 3473 if (gmtoff < 0) { 3474 gmtoff = -gmtoff; 3475 sign = '-'; 3476 3477 } else { 3478 sign = '+'; 3479 } 3480 3481 return nxt_sprintf(buf, buf + size, format, 3482 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3483 tm->tm_hour, tm->tm_min, tm->tm_sec, 3484 sign, gmtoff / 60, gmtoff % 60); 3485 } 3486 3487 3488 static void 3489 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3490 { 3491 uint32_t stream; 3492 nxt_int_t ret; 3493 nxt_buf_t *b; 3494 nxt_port_t *main_port, *router_port; 3495 nxt_runtime_t *rt; 3496 nxt_router_access_log_t *access_log; 3497 3498 access_log = tmcf->router_conf->access_log; 3499 3500 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3501 if (nxt_slow_path(b == NULL)) { 3502 goto fail; 3503 } 3504 3505 b->completion_handler = nxt_router_dummy_buf_completion; 3506 3507 nxt_buf_cpystr(b, &access_log->path); 3508 *b->mem.free++ = '\0'; 3509 3510 rt = task->thread->runtime; 3511 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3512 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3513 3514 stream = nxt_port_rpc_register_handler(task, router_port, 3515 nxt_router_access_log_ready, 3516 nxt_router_access_log_error, 3517 -1, tmcf); 3518 if (nxt_slow_path(stream == 0)) { 3519 goto fail; 3520 } 3521 3522 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3523 stream, router_port->id, b); 3524 3525 if (nxt_slow_path(ret != NXT_OK)) { 3526 nxt_port_rpc_cancel(task, router_port, stream); 3527 goto fail; 3528 } 3529 3530 return; 3531 3532 fail: 3533 3534 nxt_router_conf_error(task, tmcf); 3535 } 3536 3537 3538 static void 3539 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3540 void *data) 3541 { 3542 nxt_router_temp_conf_t *tmcf; 3543 nxt_router_access_log_t *access_log; 3544 3545 tmcf = data; 3546 3547 access_log = tmcf->router_conf->access_log; 3548 3549 access_log->fd = msg->fd[0]; 3550 3551 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3552 nxt_router_conf_apply, task, tmcf, NULL); 3553 } 3554 3555 3556 static void 3557 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3558 void *data) 3559 { 3560 nxt_router_temp_conf_t *tmcf; 3561 3562 tmcf = data; 3563 3564 nxt_router_conf_error(task, tmcf); 3565 } 3566 3567 3568 static void 3569 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3570 nxt_router_access_log_t *access_log) 3571 { 3572 if (access_log == NULL) { 3573 return; 3574 } 3575 3576 nxt_thread_spin_lock(lock); 3577 3578 if (--access_log->count != 0) { 3579 access_log = NULL; 3580 } 3581 3582 nxt_thread_spin_unlock(lock); 3583 3584 if (access_log != NULL) { 3585 3586 if (access_log->fd != -1) { 3587 nxt_fd_close(access_log->fd); 3588 } 3589 3590 nxt_free(access_log); 3591 } 3592 } 3593 3594 3595 typedef struct { 3596 nxt_mp_t *mem_pool; 3597 nxt_router_access_log_t *access_log; 3598 } nxt_router_access_log_reopen_t; 3599 3600 3601 static void 3602 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3603 { 3604 nxt_mp_t *mp; 3605 uint32_t stream; 3606 nxt_int_t ret; 3607 nxt_buf_t *b; 3608 nxt_port_t *main_port, *router_port; 3609 nxt_runtime_t *rt; 3610 nxt_router_access_log_t *access_log; 3611 nxt_router_access_log_reopen_t *reopen; 3612 3613 access_log = nxt_router->access_log; 3614 3615 if (access_log == NULL) { 3616 return; 3617 } 3618 3619 mp = nxt_mp_create(1024, 128, 256, 32); 3620 if (nxt_slow_path(mp == NULL)) { 3621 return; 3622 } 3623 3624 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3625 if (nxt_slow_path(reopen == NULL)) { 3626 goto fail; 3627 } 3628 3629 reopen->mem_pool = mp; 3630 reopen->access_log = access_log; 3631 3632 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 3633 if (nxt_slow_path(b == NULL)) { 3634 goto fail; 3635 } 3636 3637 b->completion_handler = nxt_router_access_log_reopen_completion; 3638 3639 nxt_buf_cpystr(b, &access_log->path); 3640 *b->mem.free++ = '\0'; 3641 3642 rt = task->thread->runtime; 3643 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3644 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3645 3646 stream = nxt_port_rpc_register_handler(task, router_port, 3647 nxt_router_access_log_reopen_ready, 3648 nxt_router_access_log_reopen_error, 3649 -1, reopen); 3650 if (nxt_slow_path(stream == 0)) { 3651 goto fail; 3652 } 3653 3654 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3655 stream, router_port->id, b); 3656 3657 if (nxt_slow_path(ret != NXT_OK)) { 3658 nxt_port_rpc_cancel(task, router_port, stream); 3659 goto fail; 3660 } 3661 3662 nxt_mp_retain(mp); 3663 3664 return; 3665 3666 fail: 3667 3668 nxt_mp_destroy(mp); 3669 } 3670 3671 3672 static void 3673 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 3674 { 3675 nxt_mp_t *mp; 3676 nxt_buf_t *b; 3677 3678 b = obj; 3679 mp = b->data; 3680 3681 nxt_mp_release(mp); 3682 } 3683 3684 3685 static void 3686 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3687 void *data) 3688 { 3689 nxt_router_access_log_t *access_log; 3690 nxt_router_access_log_reopen_t *reopen; 3691 3692 reopen = data; 3693 3694 access_log = reopen->access_log; 3695 3696 if (access_log == nxt_router->access_log) { 3697 3698 if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { 3699 nxt_alert(task, "dup2(%FD, %FD) failed %E", 3700 msg->fd[0], access_log->fd, nxt_errno); 3701 } 3702 } 3703 3704 nxt_fd_close(msg->fd[0]); 3705 nxt_mp_release(reopen->mem_pool); 3706 } 3707 3708 3709 static void 3710 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3711 void *data) 3712 { 3713 nxt_router_access_log_reopen_t *reopen; 3714 3715 reopen = data; 3716 3717 nxt_mp_release(reopen->mem_pool); 3718 } 3719 3720 3721 static void 3722 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 3723 { 3724 nxt_port_t *port; 3725 nxt_thread_link_t *link; 3726 nxt_event_engine_t *engine; 3727 nxt_thread_handle_t handle; 3728 3729 handle = (nxt_thread_handle_t) obj; 3730 link = data; 3731 3732 nxt_thread_wait(handle); 3733 3734 engine = link->engine; 3735 3736 nxt_queue_remove(&engine->link); 3737 3738 port = engine->port; 3739 3740 // TODO notify all apps 3741 3742 port->engine = task->thread->engine; 3743 nxt_mp_thread_adopt(port->mem_pool); 3744 nxt_port_use(task, port, -1); 3745 3746 nxt_mp_thread_adopt(engine->mem_pool); 3747 nxt_mp_destroy(engine->mem_pool); 3748 3749 nxt_event_engine_free(engine); 3750 3751 nxt_free(link); 3752 } 3753 3754 3755 static void 3756 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3757 void *data) 3758 { 3759 size_t b_size, count; 3760 nxt_int_t ret; 3761 nxt_app_t *app; 3762 nxt_buf_t *b, *next; 3763 nxt_port_t *app_port; 3764 nxt_unit_field_t *f; 3765 nxt_http_field_t *field; 3766 nxt_http_request_t *r; 3767 nxt_unit_response_t *resp; 3768 nxt_request_rpc_data_t *req_rpc_data; 3769 3770 req_rpc_data = data; 3771 3772 r = req_rpc_data->request; 3773 if (nxt_slow_path(r == NULL)) { 3774 return; 3775 } 3776 3777 if (r->error) { 3778 nxt_request_rpc_data_unlink(task, req_rpc_data); 3779 return; 3780 } 3781 3782 app = req_rpc_data->app; 3783 nxt_assert(app != NULL); 3784 3785 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { 3786 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); 3787 3788 return; 3789 } 3790 3791 b = (msg->size == 0) ? NULL : msg->buf; 3792 3793 if (msg->port_msg.last != 0) { 3794 nxt_debug(task, "router data create last buf"); 3795 3796 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 3797 3798 req_rpc_data->rpc_cancel = 0; 3799 3800 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { 3801 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; 3802 } 3803 3804 nxt_request_rpc_data_unlink(task, req_rpc_data); 3805 3806 } else { 3807 if (app->timeout != 0) { 3808 r->timer.handler = nxt_router_app_timeout; 3809 r->timer_data = req_rpc_data; 3810 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 3811 } 3812 } 3813 3814 if (b == NULL) { 3815 return; 3816 } 3817 3818 if (msg->buf == b) { 3819 /* Disable instant buffer completion/re-using by port. */ 3820 msg->buf = NULL; 3821 } 3822 3823 if (r->header_sent) { 3824 nxt_buf_chain_add(&r->out, b); 3825 nxt_http_request_send_body(task, r, NULL); 3826 3827 } else { 3828 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0; 3829 3830 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) { 3831 nxt_alert(task, "response buffer too small: %z", b_size); 3832 goto fail; 3833 } 3834 3835 resp = (void *) b->mem.pos; 3836 count = (b_size - sizeof(nxt_unit_response_t)) 3837 / sizeof(nxt_unit_field_t); 3838 3839 if (nxt_slow_path(count < resp->fields_count)) { 3840 nxt_alert(task, "response buffer too small for fields count: %D", 3841 resp->fields_count); 3842 goto fail; 3843 } 3844 3845 field = NULL; 3846 3847 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 3848 if (f->skip) { 3849 continue; 3850 } 3851 3852 field = nxt_list_add(r->resp.fields); 3853 3854 if (nxt_slow_path(field == NULL)) { 3855 goto fail; 3856 } 3857 3858 field->hash = f->hash; 3859 field->skip = 0; 3860 field->hopbyhop = 0; 3861 3862 field->name_length = f->name_length; 3863 field->value_length = f->value_length; 3864 field->name = nxt_unit_sptr_get(&f->name); 3865 field->value = nxt_unit_sptr_get(&f->value); 3866 3867 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 3868 if (nxt_slow_path(ret != NXT_OK)) { 3869 goto fail; 3870 } 3871 3872 nxt_debug(task, "header%s: %*s: %*s", 3873 (field->skip ? " skipped" : ""), 3874 (size_t) field->name_length, field->name, 3875 (size_t) field->value_length, field->value); 3876 3877 if (field->skip) { 3878 r->resp.fields->last->nelts--; 3879 } 3880 } 3881 3882 r->status = resp->status; 3883 3884 if (resp->piggyback_content_length != 0) { 3885 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 3886 b->mem.free = b->mem.pos + resp->piggyback_content_length; 3887 3888 } else { 3889 b->mem.pos = b->mem.free; 3890 } 3891 3892 if (nxt_buf_mem_used_size(&b->mem) == 0) { 3893 next = b->next; 3894 b->next = NULL; 3895 3896 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3897 b->completion_handler, task, b, b->parent); 3898 3899 b = next; 3900 } 3901 3902 if (b != NULL) { 3903 nxt_buf_chain_add(&r->out, b); 3904 } 3905 3906 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 3907 3908 if (r->websocket_handshake 3909 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 3910 { 3911 app_port = req_rpc_data->app_port; 3912 if (nxt_slow_path(app_port == NULL)) { 3913 goto fail; 3914 } 3915 3916 nxt_thread_mutex_lock(&app->mutex); 3917 3918 app_port->main_app_port->active_websockets++; 3919 3920 nxt_thread_mutex_unlock(&app->mutex); 3921 3922 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); 3923 req_rpc_data->apr_action = NXT_APR_CLOSE; 3924 3925 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 3926 3927 r->state = &nxt_http_websocket; 3928 3929 } else { 3930 r->state = &nxt_http_request_send_state; 3931 } 3932 } 3933 3934 return; 3935 3936 fail: 3937 3938 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 3939 3940 nxt_request_rpc_data_unlink(task, req_rpc_data); 3941 } 3942 3943 3944 static void 3945 nxt_router_req_headers_ack_handler(nxt_task_t *task, 3946 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 3947 { 3948 int res; 3949 nxt_app_t *app; 3950 nxt_buf_t *b; 3951 nxt_bool_t start_process, unlinked; 3952 nxt_port_t *app_port, *main_app_port, *idle_port; 3953 nxt_queue_link_t *idle_lnk; 3954 nxt_http_request_t *r; 3955 3956 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 3957 req_rpc_data->stream, 3958 msg->port_msg.pid, msg->port_msg.reply_port); 3959 3960 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 3961 msg->port_msg.pid); 3962 3963 app = req_rpc_data->app; 3964 r = req_rpc_data->request; 3965 3966 start_process = 0; 3967 unlinked = 0; 3968 3969 nxt_thread_mutex_lock(&app->mutex); 3970 3971 if (r->app_link.next != NULL) { 3972 nxt_queue_remove(&r->app_link); 3973 r->app_link.next = NULL; 3974 3975 unlinked = 1; 3976 } 3977 3978 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 3979 msg->port_msg.reply_port); 3980 if (nxt_slow_path(app_port == NULL)) { 3981 nxt_thread_mutex_unlock(&app->mutex); 3982 3983 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 3984 3985 if (unlinked) { 3986 nxt_mp_release(r->mem_pool); 3987 } 3988 3989 return; 3990 } 3991 3992 main_app_port = app_port->main_app_port; 3993 3994 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 3995 app->idle_processes--; 3996 3997 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", 3998 &app->name, main_app_port->pid, main_app_port->id, 3999 (main_app_port->idle_start ? "idle_ports" : "spare_ports")); 4000 4001 /* Check port was in 'spare_ports' using idle_start field. */ 4002 if (main_app_port->idle_start == 0 4003 && app->idle_processes >= app->spare_processes) 4004 { 4005 /* 4006 * If there is a vacant space in spare ports, 4007 * move the last idle to spare_ports. 4008 */ 4009 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4010 4011 idle_lnk = nxt_queue_last(&app->idle_ports); 4012 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4013 nxt_queue_remove(idle_lnk); 4014 4015 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4016 4017 idle_port->idle_start = 0; 4018 4019 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4020 "to spare_ports", 4021 &app->name, idle_port->pid, idle_port->id); 4022 } 4023 4024 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4025 app->pending_processes++; 4026 start_process = 1; 4027 } 4028 } 4029 4030 main_app_port->active_requests++; 4031 4032 nxt_port_inc_use(app_port); 4033 4034 nxt_thread_mutex_unlock(&app->mutex); 4035 4036 if (unlinked) { 4037 nxt_mp_release(r->mem_pool); 4038 } 4039 4040 if (start_process) { 4041 nxt_router_start_app_process(task, app); 4042 } 4043 4044 nxt_port_use(task, req_rpc_data->app_port, -1); 4045 4046 req_rpc_data->app_port = app_port; 4047 4048 b = req_rpc_data->msg_info.buf; 4049 4050 if (b != NULL) { 4051 /* First buffer is already sent. Start from second. */ 4052 b = b->next; 4053 } 4054 4055 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) { 4056 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, 4057 req_rpc_data->msg_info.body_fd); 4058 4059 if (req_rpc_data->msg_info.body_fd != -1) { 4060 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 4061 } 4062 4063 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 4064 req_rpc_data->msg_info.body_fd, 4065 req_rpc_data->stream, 4066 task->thread->engine->port->id, b); 4067 4068 if (nxt_slow_path(res != NXT_OK)) { 4069 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4070 } 4071 } 4072 4073 if (app->timeout != 0) { 4074 r->timer.handler = nxt_router_app_timeout; 4075 r->timer_data = req_rpc_data; 4076 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4077 } 4078 } 4079 4080 4081 static const nxt_http_request_state_t nxt_http_request_send_state 4082 nxt_aligned(64) = 4083 { 4084 .error_handler = nxt_http_request_error_handler, 4085 }; 4086 4087 4088 static void 4089 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 4090 { 4091 nxt_buf_t *out; 4092 nxt_http_request_t *r; 4093 4094 r = obj; 4095 4096 out = r->out; 4097 4098 if (out != NULL) { 4099 r->out = NULL; 4100 nxt_http_request_send(task, r, out); 4101 } 4102 } 4103 4104 4105 static void 4106 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4107 void *data) 4108 { 4109 nxt_request_rpc_data_t *req_rpc_data; 4110 4111 req_rpc_data = data; 4112 4113 req_rpc_data->rpc_cancel = 0; 4114 4115 /* TODO cancel message and return if cancelled. */ 4116 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 4117 4118 if (req_rpc_data->request != NULL) { 4119 nxt_http_request_error(task, req_rpc_data->request, 4120 NXT_HTTP_SERVICE_UNAVAILABLE); 4121 } 4122 4123 nxt_request_rpc_data_unlink(task, req_rpc_data); 4124 } 4125 4126 4127 static void 4128 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4129 void *data) 4130 { 4131 nxt_app_t *app; 4132 nxt_port_t *port; 4133 nxt_app_joint_t *app_joint; 4134 4135 app_joint = data; 4136 port = msg->u.new_port; 4137 4138 nxt_assert(app_joint != NULL); 4139 nxt_assert(port != NULL); 4140 nxt_assert(port->type == NXT_PROCESS_APP); 4141 nxt_assert(port->id == 0); 4142 4143 app = app_joint->app; 4144 4145 nxt_router_app_joint_use(task, app_joint, -1); 4146 4147 if (nxt_slow_path(app == NULL)) { 4148 nxt_debug(task, "new port ready for released app, send QUIT"); 4149 4150 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4151 4152 return; 4153 } 4154 4155 port->app = app; 4156 port->main_app_port = port; 4157 4158 nxt_thread_mutex_lock(&app->mutex); 4159 4160 nxt_assert(app->pending_processes != 0); 4161 4162 app->pending_processes--; 4163 app->processes++; 4164 nxt_port_hash_add(&app->port_hash, port); 4165 app->port_hash_count++; 4166 4167 nxt_thread_mutex_unlock(&app->mutex); 4168 4169 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 4170 &app->name, port->pid, app->processes, app->pending_processes); 4171 4172 nxt_router_app_shared_port_send(task, port); 4173 4174 nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); 4175 } 4176 4177 4178 static nxt_int_t 4179 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) 4180 { 4181 nxt_buf_t *b; 4182 nxt_port_t *port; 4183 nxt_port_msg_new_port_t *msg; 4184 4185 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 4186 sizeof(nxt_port_data_t)); 4187 if (nxt_slow_path(b == NULL)) { 4188 return NXT_ERROR; 4189 } 4190 4191 port = app_port->app->shared_port; 4192 4193 nxt_debug(task, "send port %FD to process %PI", 4194 port->pair[0], app_port->pid); 4195 4196 b->mem.free += sizeof(nxt_port_msg_new_port_t); 4197 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 4198 4199 msg->id = port->id; 4200 msg->pid = port->pid; 4201 msg->max_size = port->max_size; 4202 msg->max_share = port->max_share; 4203 msg->type = port->type; 4204 4205 return nxt_port_socket_write2(task, app_port, 4206 NXT_PORT_MSG_NEW_PORT, 4207 port->pair[0], port->queue_fd, 4208 0, 0, b); 4209 } 4210 4211 4212 static void 4213 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4214 void *data) 4215 { 4216 nxt_app_t *app; 4217 nxt_app_joint_t *app_joint; 4218 nxt_queue_link_t *link; 4219 nxt_http_request_t *r; 4220 4221 app_joint = data; 4222 4223 nxt_assert(app_joint != NULL); 4224 4225 app = app_joint->app; 4226 4227 nxt_router_app_joint_use(task, app_joint, -1); 4228 4229 if (nxt_slow_path(app == NULL)) { 4230 nxt_debug(task, "start error for released app"); 4231 4232 return; 4233 } 4234 4235 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4236 4237 link = NULL; 4238 4239 nxt_thread_mutex_lock(&app->mutex); 4240 4241 nxt_assert(app->pending_processes != 0); 4242 4243 app->pending_processes--; 4244 4245 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4246 link = nxt_queue_first(&app->ack_waiting_req); 4247 4248 nxt_queue_remove(link); 4249 link->next = NULL; 4250 } 4251 4252 nxt_thread_mutex_unlock(&app->mutex); 4253 4254 while (link != NULL) { 4255 r = nxt_container_of(link, nxt_http_request_t, app_link); 4256 4257 nxt_event_engine_post(r->engine, &r->err_work); 4258 4259 link = NULL; 4260 4261 nxt_thread_mutex_lock(&app->mutex); 4262 4263 if (app->processes == 0 && app->pending_processes == 0 4264 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4265 { 4266 link = nxt_queue_first(&app->ack_waiting_req); 4267 4268 nxt_queue_remove(link); 4269 link->next = NULL; 4270 } 4271 4272 nxt_thread_mutex_unlock(&app->mutex); 4273 } 4274 } 4275 4276 4277 4278 nxt_inline nxt_port_t * 4279 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) 4280 { 4281 nxt_port_t *port; 4282 4283 port = NULL; 4284 4285 nxt_thread_mutex_lock(&app->mutex); 4286 4287 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 4288 4289 /* Caller is responsible to decrease port use count. */ 4290 nxt_queue_chk_remove(&port->app_link); 4291 4292 if (nxt_queue_chk_remove(&port->idle_link)) { 4293 app->idle_processes--; 4294 4295 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", 4296 &app->name, port->pid, port->id, 4297 (port->idle_start ? "idle_ports" : "spare_ports")); 4298 } 4299 4300 nxt_port_hash_remove(&app->port_hash, port); 4301 app->port_hash_count--; 4302 4303 port->app = NULL; 4304 app->processes--; 4305 4306 break; 4307 4308 } nxt_queue_loop; 4309 4310 nxt_thread_mutex_unlock(&app->mutex); 4311 4312 return port; 4313 } 4314 4315 4316 static void 4317 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4318 { 4319 int c; 4320 4321 c = nxt_atomic_fetch_add(&app->use_count, i); 4322 4323 if (i < 0 && c == -i) { 4324 4325 if (task->thread->engine != app->engine) { 4326 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 4327 4328 } else { 4329 nxt_router_free_app(task, app->joint, NULL); 4330 } 4331 } 4332 } 4333 4334 4335 static void 4336 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 4337 { 4338 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 4339 4340 nxt_queue_remove(&app->link); 4341 4342 nxt_router_app_use(task, app, -1); 4343 } 4344 4345 4346 static void 4347 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 4348 nxt_apr_action_t action) 4349 { 4350 int inc_use; 4351 uint32_t got_response, dec_requests; 4352 nxt_app_t *app; 4353 nxt_bool_t port_unchained, send_quit, adjust_idle_timer; 4354 nxt_port_t *main_app_port; 4355 4356 nxt_assert(port != NULL); 4357 nxt_assert(port->app != NULL); 4358 4359 app = port->app; 4360 4361 inc_use = 0; 4362 got_response = 0; 4363 dec_requests = 0; 4364 4365 switch (action) { 4366 case NXT_APR_NEW_PORT: 4367 break; 4368 case NXT_APR_REQUEST_FAILED: 4369 dec_requests = 1; 4370 inc_use = -1; 4371 break; 4372 case NXT_APR_GOT_RESPONSE: 4373 got_response = 1; 4374 inc_use = -1; 4375 break; 4376 case NXT_APR_UPGRADE: 4377 got_response = 1; 4378 break; 4379 case NXT_APR_CLOSE: 4380 inc_use = -1; 4381 break; 4382 } 4383 4384 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, 4385 port->pid, port->id, 4386 (int) inc_use, (int) got_response); 4387 4388 if (port == app->shared_port) { 4389 nxt_thread_mutex_lock(&app->mutex); 4390 4391 app->active_requests -= got_response + dec_requests; 4392 4393 nxt_thread_mutex_unlock(&app->mutex); 4394 4395 goto adjust_use; 4396 } 4397 4398 main_app_port = port->main_app_port; 4399 4400 nxt_thread_mutex_lock(&app->mutex); 4401 4402 main_app_port->app_responses += got_response; 4403 main_app_port->active_requests -= got_response + dec_requests; 4404 app->active_requests -= got_response + dec_requests; 4405 4406 if (main_app_port->pair[1] != -1 4407 && (app->max_requests == 0 4408 || main_app_port->app_responses < app->max_requests)) 4409 { 4410 if (main_app_port->app_link.next == NULL) { 4411 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4412 4413 nxt_port_inc_use(main_app_port); 4414 } 4415 } 4416 4417 send_quit = (app->max_requests > 0 4418 && main_app_port->app_responses >= app->max_requests); 4419 4420 if (send_quit) { 4421 port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); 4422 4423 nxt_port_hash_remove(&app->port_hash, main_app_port); 4424 app->port_hash_count--; 4425 4426 main_app_port->app = NULL; 4427 app->processes--; 4428 4429 } else { 4430 port_unchained = 0; 4431 } 4432 4433 adjust_idle_timer = 0; 4434 4435 if (main_app_port->pair[1] != -1 && !send_quit 4436 && main_app_port->active_requests == 0 4437 && main_app_port->active_websockets == 0 4438 && main_app_port->idle_link.next == NULL) 4439 { 4440 if (app->idle_processes == app->spare_processes 4441 && app->adjust_idle_work.data == NULL) 4442 { 4443 adjust_idle_timer = 1; 4444 app->adjust_idle_work.data = app; 4445 app->adjust_idle_work.next = NULL; 4446 } 4447 4448 if (app->idle_processes < app->spare_processes) { 4449 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4450 4451 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", 4452 &app->name, main_app_port->pid, main_app_port->id); 4453 } else { 4454 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4455 4456 main_app_port->idle_start = task->thread->engine->timers.now; 4457 4458 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", 4459 &app->name, main_app_port->pid, main_app_port->id); 4460 } 4461 4462 app->idle_processes++; 4463 } 4464 4465 nxt_thread_mutex_unlock(&app->mutex); 4466 4467 if (adjust_idle_timer) { 4468 nxt_router_app_use(task, app, 1); 4469 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4470 } 4471 4472 /* ? */ 4473 if (main_app_port->pair[1] == -1) { 4474 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4475 &app->name, app, main_app_port, main_app_port->pid); 4476 4477 goto adjust_use; 4478 } 4479 4480 if (send_quit) { 4481 nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); 4482 4483 nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, 4484 NULL); 4485 4486 if (port_unchained) { 4487 nxt_port_use(task, main_app_port, -1); 4488 } 4489 4490 goto adjust_use; 4491 } 4492 4493 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4494 &app->name, app); 4495 4496 adjust_use: 4497 4498 nxt_port_use(task, port, inc_use); 4499 } 4500 4501 4502 void 4503 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4504 { 4505 nxt_app_t *app; 4506 nxt_bool_t unchain, start_process; 4507 nxt_port_t *idle_port; 4508 nxt_queue_link_t *idle_lnk; 4509 4510 app = port->app; 4511 4512 nxt_assert(app != NULL); 4513 4514 nxt_thread_mutex_lock(&app->mutex); 4515 4516 nxt_port_hash_remove(&app->port_hash, port); 4517 app->port_hash_count--; 4518 4519 if (port->id != 0) { 4520 nxt_thread_mutex_unlock(&app->mutex); 4521 4522 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, 4523 port->pid, port->id); 4524 4525 return; 4526 } 4527 4528 unchain = nxt_queue_chk_remove(&port->app_link); 4529 4530 if (nxt_queue_chk_remove(&port->idle_link)) { 4531 app->idle_processes--; 4532 4533 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", 4534 &app->name, port->pid, port->id, 4535 (port->idle_start ? "idle_ports" : "spare_ports")); 4536 4537 if (port->idle_start == 0 4538 && app->idle_processes >= app->spare_processes) 4539 { 4540 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4541 4542 idle_lnk = nxt_queue_last(&app->idle_ports); 4543 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4544 nxt_queue_remove(idle_lnk); 4545 4546 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4547 4548 idle_port->idle_start = 0; 4549 4550 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4551 "to spare_ports", 4552 &app->name, idle_port->pid, idle_port->id); 4553 } 4554 } 4555 4556 app->processes--; 4557 4558 start_process = !task->thread->engine->shutdown 4559 && nxt_router_app_can_start(app) 4560 && nxt_router_app_need_start(app); 4561 4562 if (start_process) { 4563 app->pending_processes++; 4564 } 4565 4566 nxt_thread_mutex_unlock(&app->mutex); 4567 4568 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 4569 4570 if (unchain) { 4571 nxt_port_use(task, port, -1); 4572 } 4573 4574 if (start_process) { 4575 nxt_router_start_app_process(task, app); 4576 } 4577 } 4578 4579 4580 static void 4581 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 4582 { 4583 nxt_app_t *app; 4584 nxt_bool_t queued; 4585 nxt_port_t *port; 4586 nxt_msec_t timeout, threshold; 4587 nxt_queue_link_t *lnk; 4588 nxt_event_engine_t *engine; 4589 4590 app = obj; 4591 queued = (data == app); 4592 4593 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 4594 &app->name, queued); 4595 4596 engine = task->thread->engine; 4597 4598 nxt_assert(app->engine == engine); 4599 4600 threshold = engine->timers.now + app->joint->idle_timer.bias; 4601 timeout = 0; 4602 4603 nxt_thread_mutex_lock(&app->mutex); 4604 4605 if (queued) { 4606 app->adjust_idle_work.data = NULL; 4607 } 4608 4609 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", 4610 &app->name, 4611 (int) app->idle_processes, (int) app->spare_processes); 4612 4613 while (app->idle_processes > app->spare_processes) { 4614 4615 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4616 4617 lnk = nxt_queue_first(&app->idle_ports); 4618 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 4619 4620 timeout = port->idle_start + app->idle_timeout; 4621 4622 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", 4623 &app->name, port->pid, 4624 port->idle_start, timeout, threshold); 4625 4626 if (timeout > threshold) { 4627 break; 4628 } 4629 4630 nxt_queue_remove(lnk); 4631 lnk->next = NULL; 4632 4633 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", 4634 &app->name, port->pid, port->id); 4635 4636 nxt_queue_chk_remove(&port->app_link); 4637 4638 nxt_port_hash_remove(&app->port_hash, port); 4639 app->port_hash_count--; 4640 4641 app->idle_processes--; 4642 app->processes--; 4643 port->app = NULL; 4644 4645 nxt_thread_mutex_unlock(&app->mutex); 4646 4647 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 4648 &app->name, port->pid); 4649 4650 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4651 4652 nxt_port_use(task, port, -1); 4653 4654 nxt_thread_mutex_lock(&app->mutex); 4655 } 4656 4657 nxt_thread_mutex_unlock(&app->mutex); 4658 4659 if (timeout > threshold) { 4660 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 4661 4662 } else { 4663 nxt_timer_disable(engine, &app->joint->idle_timer); 4664 } 4665 4666 if (queued) { 4667 nxt_router_app_use(task, app, -1); 4668 } 4669 } 4670 4671 4672 static void 4673 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 4674 { 4675 nxt_timer_t *timer; 4676 nxt_app_joint_t *app_joint; 4677 4678 timer = obj; 4679 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4680 4681 if (nxt_fast_path(app_joint->app != NULL)) { 4682 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 4683 } 4684 } 4685 4686 4687 static void 4688 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 4689 { 4690 nxt_timer_t *timer; 4691 nxt_app_joint_t *app_joint; 4692 4693 timer = obj; 4694 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 4695 4696 nxt_router_app_joint_use(task, app_joint, -1); 4697 } 4698 4699 4700 static void 4701 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 4702 { 4703 nxt_app_t *app; 4704 nxt_port_t *port; 4705 nxt_app_joint_t *app_joint; 4706 4707 app_joint = obj; 4708 app = app_joint->app; 4709 4710 for ( ;; ) { 4711 port = nxt_router_app_get_port_for_quit(task, app); 4712 if (port == NULL) { 4713 break; 4714 } 4715 4716 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); 4717 4718 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4719 4720 nxt_port_use(task, port, -1); 4721 } 4722 4723 nxt_thread_mutex_lock(&app->mutex); 4724 4725 for ( ;; ) { 4726 port = nxt_port_hash_retrieve(&app->port_hash); 4727 if (port == NULL) { 4728 break; 4729 } 4730 4731 app->port_hash_count--; 4732 4733 port->app = NULL; 4734 4735 nxt_port_close(task, port); 4736 4737 nxt_port_use(task, port, -1); 4738 } 4739 4740 nxt_thread_mutex_unlock(&app->mutex); 4741 4742 nxt_assert(app->processes == 0); 4743 nxt_assert(app->active_requests == 0); 4744 nxt_assert(app->port_hash_count == 0); 4745 nxt_assert(app->idle_processes == 0); 4746 nxt_assert(nxt_queue_is_empty(&app->ports)); 4747 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 4748 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 4749 4750 nxt_port_mmaps_destroy(&app->outgoing, 1); 4751 4752 nxt_thread_mutex_destroy(&app->outgoing.mutex); 4753 4754 if (app->shared_port != NULL) { 4755 app->shared_port->app = NULL; 4756 nxt_port_close(task, app->shared_port); 4757 nxt_port_use(task, app->shared_port, -1); 4758 } 4759 4760 nxt_thread_mutex_destroy(&app->mutex); 4761 nxt_mp_destroy(app->mem_pool); 4762 4763 app_joint->app = NULL; 4764 4765 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 4766 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 4767 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 4768 4769 } else { 4770 nxt_router_app_joint_use(task, app_joint, -1); 4771 } 4772 } 4773 4774 4775 static void 4776 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 4777 nxt_request_rpc_data_t *req_rpc_data) 4778 { 4779 nxt_bool_t start_process; 4780 nxt_port_t *port; 4781 nxt_http_request_t *r; 4782 4783 start_process = 0; 4784 4785 nxt_thread_mutex_lock(&app->mutex); 4786 4787 port = app->shared_port; 4788 nxt_port_inc_use(port); 4789 4790 app->active_requests++; 4791 4792 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4793 app->pending_processes++; 4794 start_process = 1; 4795 } 4796 4797 r = req_rpc_data->request; 4798 4799 /* 4800 * Put request into application-wide list to be able to cancel request 4801 * if something goes wrong with application processes. 4802 */ 4803 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 4804 4805 nxt_thread_mutex_unlock(&app->mutex); 4806 4807 /* 4808 * Retain request memory pool while request is linked in ack_waiting_req 4809 * to guarantee request structure memory is accessble. 4810 */ 4811 nxt_mp_retain(r->mem_pool); 4812 4813 req_rpc_data->app_port = port; 4814 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 4815 4816 if (start_process) { 4817 nxt_router_start_app_process(task, app); 4818 } 4819 } 4820 4821 4822 void 4823 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, 4824 nxt_app_t *app) 4825 { 4826 nxt_event_engine_t *engine; 4827 nxt_request_rpc_data_t *req_rpc_data; 4828 4829 engine = task->thread->engine; 4830 4831 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, 4832 nxt_router_response_ready_handler, 4833 nxt_router_response_error_handler, 4834 sizeof(nxt_request_rpc_data_t)); 4835 if (nxt_slow_path(req_rpc_data == NULL)) { 4836 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4837 return; 4838 } 4839 4840 /* 4841 * At this point we have request req_rpc_data allocated and registered 4842 * in port handlers. Need to fixup request memory pool. Counterpart 4843 * release will be called via following call chain: 4844 * nxt_request_rpc_data_unlink() -> 4845 * nxt_router_http_request_release_post() -> 4846 * nxt_router_http_request_release() 4847 */ 4848 nxt_mp_retain(r->mem_pool); 4849 4850 r->timer.task = &engine->task; 4851 r->timer.work_queue = &engine->fast_work_queue; 4852 r->timer.log = engine->task.log; 4853 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 4854 4855 r->engine = engine; 4856 r->err_work.handler = nxt_router_http_request_error; 4857 r->err_work.task = task; 4858 r->err_work.obj = r; 4859 4860 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 4861 req_rpc_data->app = app; 4862 req_rpc_data->msg_info.body_fd = -1; 4863 req_rpc_data->rpc_cancel = 1; 4864 4865 nxt_router_app_use(task, app, 1); 4866 4867 req_rpc_data->request = r; 4868 r->req_rpc_data = req_rpc_data; 4869 4870 if (r->last != NULL) { 4871 r->last->completion_handler = nxt_router_http_request_done; 4872 } 4873 4874 nxt_router_app_port_get(task, app, req_rpc_data); 4875 nxt_router_app_prepare_request(task, req_rpc_data); 4876 } 4877 4878 4879 static void 4880 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 4881 { 4882 nxt_http_request_t *r; 4883 4884 r = obj; 4885 4886 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 4887 4888 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4889 4890 if (r->req_rpc_data != NULL) { 4891 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4892 } 4893 4894 nxt_mp_release(r->mem_pool); 4895 } 4896 4897 4898 static void 4899 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 4900 { 4901 nxt_http_request_t *r; 4902 4903 r = data; 4904 4905 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 4906 4907 if (r->req_rpc_data != NULL) { 4908 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4909 } 4910 4911 nxt_http_request_close_handler(task, r, r->proto.any); 4912 } 4913 4914 4915 static void 4916 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) 4917 { 4918 } 4919 4920 4921 static void 4922 nxt_router_app_prepare_request(nxt_task_t *task, 4923 nxt_request_rpc_data_t *req_rpc_data) 4924 { 4925 nxt_app_t *app; 4926 nxt_buf_t *buf, *body; 4927 nxt_int_t res; 4928 nxt_port_t *port, *reply_port; 4929 4930 int notify; 4931 struct { 4932 nxt_port_msg_t pm; 4933 nxt_port_mmap_msg_t mm; 4934 } msg; 4935 4936 4937 app = req_rpc_data->app; 4938 4939 nxt_assert(app != NULL); 4940 4941 port = req_rpc_data->app_port; 4942 4943 nxt_assert(port != NULL); 4944 nxt_assert(port->queue != NULL); 4945 4946 reply_port = task->thread->engine->port; 4947 4948 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, 4949 nxt_app_msg_prefix[app->type]); 4950 if (nxt_slow_path(buf == NULL)) { 4951 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", 4952 req_rpc_data->stream, &app->name); 4953 4954 nxt_http_request_error(task, req_rpc_data->request, 4955 NXT_HTTP_INTERNAL_SERVER_ERROR); 4956 4957 return; 4958 } 4959 4960 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 4961 nxt_buf_used_size(buf), 4962 port->socket.fd); 4963 4964 req_rpc_data->msg_info.buf = buf; 4965 req_rpc_data->msg_info.completion_handler = buf->completion_handler; 4966 4967 do { 4968 buf->completion_handler = nxt_router_dummy_buf_completion; 4969 buf = buf->next; 4970 } while (buf != NULL); 4971 4972 buf = req_rpc_data->msg_info.buf; 4973 4974 body = req_rpc_data->request->body; 4975 4976 if (body != NULL && nxt_buf_is_file(body)) { 4977 req_rpc_data->msg_info.body_fd = body->file->fd; 4978 4979 body->file->fd = -1; 4980 4981 } else { 4982 req_rpc_data->msg_info.body_fd = -1; 4983 } 4984 4985 msg.pm.stream = req_rpc_data->stream; 4986 msg.pm.pid = reply_port->pid; 4987 msg.pm.reply_port = reply_port->id; 4988 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; 4989 msg.pm.last = 0; 4990 msg.pm.mmap = 1; 4991 msg.pm.nf = 0; 4992 msg.pm.mf = 0; 4993 msg.pm.tracking = 0; 4994 4995 nxt_port_mmap_handler_t *mmap_handler = buf->parent; 4996 nxt_port_mmap_header_t *hdr = mmap_handler->hdr; 4997 4998 msg.mm.mmap_id = hdr->id; 4999 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); 5000 msg.mm.size = nxt_buf_used_size(buf); 5001 5002 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), 5003 req_rpc_data->stream, ¬ify, 5004 &req_rpc_data->msg_info.tracking_cookie); 5005 if (nxt_fast_path(res == NXT_OK)) { 5006 if (notify != 0) { 5007 (void) nxt_port_socket_write(task, port, 5008 NXT_PORT_MSG_READ_QUEUE, 5009 -1, req_rpc_data->stream, 5010 reply_port->id, NULL); 5011 5012 } else { 5013 nxt_debug(task, "queue is not empty"); 5014 } 5015 5016 buf->is_port_mmap_sent = 1; 5017 buf->mem.pos = buf->mem.free; 5018 5019 } else { 5020 nxt_alert(task, "stream #%uD, app '%V': failed to send app message", 5021 req_rpc_data->stream, &app->name); 5022 5023 nxt_http_request_error(task, req_rpc_data->request, 5024 NXT_HTTP_INTERNAL_SERVER_ERROR); 5025 } 5026 } 5027 5028 5029 struct nxt_fields_iter_s { 5030 nxt_list_part_t *part; 5031 nxt_http_field_t *field; 5032 }; 5033 5034 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 5035 5036 5037 static nxt_http_field_t * 5038 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 5039 { 5040 if (part == NULL) { 5041 return NULL; 5042 } 5043 5044 while (part->nelts == 0) { 5045 part = part->next; 5046 if (part == NULL) { 5047 return NULL; 5048 } 5049 } 5050 5051 i->part = part; 5052 i->field = nxt_list_data(i->part); 5053 5054 return i->field; 5055 } 5056 5057 5058 static nxt_http_field_t * 5059 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 5060 { 5061 return nxt_fields_part_first(nxt_list_part(fields), i); 5062 } 5063 5064 5065 static nxt_http_field_t * 5066 nxt_fields_next(nxt_fields_iter_t *i) 5067 { 5068 nxt_http_field_t *end = nxt_list_data(i->part); 5069 5070 end += i->part->nelts; 5071 i->field++; 5072 5073 if (i->field < end) { 5074 return i->field; 5075 } 5076 5077 return nxt_fields_part_first(i->part->next, i); 5078 } 5079 5080 5081 static nxt_buf_t * 5082 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, 5083 nxt_app_t *app, const nxt_str_t *prefix) 5084 { 5085 void *target_pos, *query_pos; 5086 u_char *pos, *end, *p, c; 5087 size_t fields_count, req_size, size, free_size; 5088 size_t copy_size; 5089 nxt_off_t content_length; 5090 nxt_buf_t *b, *buf, *out, **tail; 5091 nxt_http_field_t *field, *dup; 5092 nxt_unit_field_t *dst_field; 5093 nxt_fields_iter_t iter, dup_iter; 5094 nxt_unit_request_t *req; 5095 5096 req_size = sizeof(nxt_unit_request_t) 5097 + r->method->length + 1 5098 + r->version.length + 1 5099 + r->remote->length + 1 5100 + r->local->length + 1 5101 + r->server_name.length + 1 5102 + r->target.length + 1 5103 + (r->path->start != r->target.start ? r->path->length + 1 : 0); 5104 5105 content_length = r->content_length_n < 0 ? 0 : r->content_length_n; 5106 fields_count = 0; 5107 5108 nxt_list_each(field, r->fields) { 5109 fields_count++; 5110 5111 req_size += field->name_length + prefix->length + 1 5112 + field->value_length + 1; 5113 } nxt_list_loop; 5114 5115 req_size += fields_count * sizeof(nxt_unit_field_t); 5116 5117 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 5118 nxt_alert(task, "headers to big to fit in shared memory (%d)", 5119 (int) req_size); 5120 5121 return NULL; 5122 } 5123 5124 out = nxt_port_mmap_get_buf(task, &app->outgoing, 5125 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); 5126 if (nxt_slow_path(out == NULL)) { 5127 return NULL; 5128 } 5129 5130 req = (nxt_unit_request_t *) out->mem.free; 5131 out->mem.free += req_size; 5132 5133 req->app_target = r->app_target; 5134 5135 req->content_length = content_length; 5136 5137 p = (u_char *) (req->fields + fields_count); 5138 5139 nxt_debug(task, "fields_count=%d", (int) fields_count); 5140 5141 req->method_length = r->method->length; 5142 nxt_unit_sptr_set(&req->method, p); 5143 p = nxt_cpymem(p, r->method->start, r->method->length); 5144 *p++ = '\0'; 5145 5146 req->version_length = r->version.length; 5147 nxt_unit_sptr_set(&req->version, p); 5148 p = nxt_cpymem(p, r->version.start, r->version.length); 5149 *p++ = '\0'; 5150 5151 req->remote_length = r->remote->address_length; 5152 nxt_unit_sptr_set(&req->remote, p); 5153 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote), 5154 r->remote->address_length); 5155 *p++ = '\0'; 5156 5157 req->local_length = r->local->address_length; 5158 nxt_unit_sptr_set(&req->local, p); 5159 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length); 5160 *p++ = '\0'; 5161 5162 req->tls = (r->tls != NULL); 5163 req->websocket_handshake = r->websocket_handshake; 5164 5165 req->server_name_length = r->server_name.length; 5166 nxt_unit_sptr_set(&req->server_name, p); 5167 p = nxt_cpymem(p, r->server_name.start, r->server_name.length); 5168 *p++ = '\0'; 5169 5170 target_pos = p; 5171 req->target_length = (uint32_t) r->target.length; 5172 nxt_unit_sptr_set(&req->target, p); 5173 p = nxt_cpymem(p, r->target.start, r->target.length); 5174 *p++ = '\0'; 5175 5176 req->path_length = (uint32_t) r->path->length; 5177 if (r->path->start == r->target.start) { 5178 nxt_unit_sptr_set(&req->path, target_pos); 5179 5180 } else { 5181 nxt_unit_sptr_set(&req->path, p); 5182 p = nxt_cpymem(p, r->path->start, r->path->length); 5183 *p++ = '\0'; 5184 } 5185 5186 req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0; 5187 if (r->args != NULL && r->args->start != NULL) { 5188 query_pos = nxt_pointer_to(target_pos, 5189 r->args->start - r->target.start); 5190 5191 nxt_unit_sptr_set(&req->query, query_pos); 5192 5193 } else { 5194 req->query.offset = 0; 5195 } 5196 5197 req->content_length_field = NXT_UNIT_NONE_FIELD; 5198 req->content_type_field = NXT_UNIT_NONE_FIELD; 5199 req->cookie_field = NXT_UNIT_NONE_FIELD; 5200 req->authorization_field = NXT_UNIT_NONE_FIELD; 5201 5202 dst_field = req->fields; 5203 5204 for (field = nxt_fields_first(r->fields, &iter); 5205 field != NULL; 5206 field = nxt_fields_next(&iter)) 5207 { 5208 if (field->skip) { 5209 continue; 5210 } 5211 5212 dst_field->hash = field->hash; 5213 dst_field->skip = 0; 5214 dst_field->name_length = field->name_length + prefix->length; 5215 dst_field->value_length = field->value_length; 5216 5217 if (field == r->content_length) { 5218 req->content_length_field = dst_field - req->fields; 5219 5220 } else if (field == r->content_type) { 5221 req->content_type_field = dst_field - req->fields; 5222 5223 } else if (field == r->cookie) { 5224 req->cookie_field = dst_field - req->fields; 5225 5226 } else if (field == r->authorization) { 5227 req->authorization_field = dst_field - req->fields; 5228 } 5229 5230 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 5231 (int) field->hash, (int) field->skip, 5232 (int) field->name_length, field->name, 5233 (int) field->value_length, field->value); 5234 5235 if (prefix->length != 0) { 5236 nxt_unit_sptr_set(&dst_field->name, p); 5237 p = nxt_cpymem(p, prefix->start, prefix->length); 5238 5239 end = field->name + field->name_length; 5240 for (pos = field->name; pos < end; pos++) { 5241 c = *pos; 5242 5243 if (c >= 'a' && c <= 'z') { 5244 *p++ = (c & ~0x20); 5245 continue; 5246 } 5247 5248 if (c == '-') { 5249 *p++ = '_'; 5250 continue; 5251 } 5252 5253 *p++ = c; 5254 } 5255 5256 } else { 5257 nxt_unit_sptr_set(&dst_field->name, p); 5258 p = nxt_cpymem(p, field->name, field->name_length); 5259 } 5260 5261 *p++ = '\0'; 5262 5263 nxt_unit_sptr_set(&dst_field->value, p); 5264 p = nxt_cpymem(p, field->value, field->value_length); 5265 5266 if (prefix->length != 0) { 5267 dup_iter = iter; 5268 5269 for (dup = nxt_fields_next(&dup_iter); 5270 dup != NULL; 5271 dup = nxt_fields_next(&dup_iter)) 5272 { 5273 if (dup->name_length != field->name_length 5274 || dup->skip 5275 || dup->hash != field->hash 5276 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 5277 { 5278 continue; 5279 } 5280 5281 p = nxt_cpymem(p, ", ", 2); 5282 p = nxt_cpymem(p, dup->value, dup->value_length); 5283 5284 dst_field->value_length += 2 + dup->value_length; 5285 5286 dup->skip = 1; 5287 } 5288 } 5289 5290 *p++ = '\0'; 5291 5292 dst_field++; 5293 } 5294 5295 req->fields_count = (uint32_t) (dst_field - req->fields); 5296 5297 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 5298 5299 buf = out; 5300 tail = &buf->next; 5301 5302 for (b = r->body; b != NULL; b = b->next) { 5303 size = nxt_buf_mem_used_size(&b->mem); 5304 pos = b->mem.pos; 5305 5306 while (size > 0) { 5307 if (buf == NULL) { 5308 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 5309 5310 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5311 if (nxt_slow_path(buf == NULL)) { 5312 while (out != NULL) { 5313 buf = out->next; 5314 out->next = NULL; 5315 out->completion_handler(task, out, out->parent); 5316 out = buf; 5317 } 5318 return NULL; 5319 } 5320 5321 *tail = buf; 5322 tail = &buf->next; 5323 5324 } else { 5325 free_size = nxt_buf_mem_free_size(&buf->mem); 5326 if (free_size < size 5327 && nxt_port_mmap_increase_buf(task, buf, size, 1) 5328 == NXT_OK) 5329 { 5330 free_size = nxt_buf_mem_free_size(&buf->mem); 5331 } 5332 } 5333 5334 if (free_size > 0) { 5335 copy_size = nxt_min(free_size, size); 5336 5337 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 5338 5339 size -= copy_size; 5340 pos += copy_size; 5341 5342 if (size == 0) { 5343 break; 5344 } 5345 } 5346 5347 buf = NULL; 5348 } 5349 } 5350 5351 return out; 5352 } 5353 5354 5355 static void 5356 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 5357 { 5358 nxt_timer_t *timer; 5359 nxt_http_request_t *r; 5360 nxt_request_rpc_data_t *req_rpc_data; 5361 5362 timer = obj; 5363 5364 nxt_debug(task, "router app timeout"); 5365 5366 r = nxt_timer_data(timer, nxt_http_request_t, timer); 5367 req_rpc_data = r->timer_data; 5368 5369 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5370 5371 nxt_request_rpc_data_unlink(task, req_rpc_data); 5372 } 5373 5374 5375 static void 5376 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5377 { 5378 r->timer.handler = nxt_router_http_request_release; 5379 nxt_timer_add(task->thread->engine, &r->timer, 0); 5380 } 5381 5382 5383 static void 5384 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) 5385 { 5386 nxt_http_request_t *r; 5387 5388 nxt_debug(task, "http request pool release"); 5389 5390 r = nxt_timer_data(obj, nxt_http_request_t, timer); 5391 5392 nxt_mp_release(r->mem_pool); 5393 } 5394 5395 5396 static void 5397 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5398 { 5399 size_t mi; 5400 uint32_t i; 5401 nxt_bool_t ack; 5402 nxt_process_t *process; 5403 nxt_free_map_t *m; 5404 nxt_port_mmap_handler_t *mmap_handler; 5405 5406 nxt_debug(task, "oosm in %PI", msg->port_msg.pid); 5407 5408 process = nxt_runtime_process_find(task->thread->runtime, 5409 msg->port_msg.pid); 5410 if (nxt_slow_path(process == NULL)) { 5411 return; 5412 } 5413 5414 ack = 0; 5415 5416 /* 5417 * To mitigate possible racing condition (when OOSM message received 5418 * after some of the memory was already freed), need to try to find 5419 * first free segment in shared memory and send ACK if found. 5420 */ 5421 5422 nxt_thread_mutex_lock(&process->incoming.mutex); 5423 5424 for (i = 0; i < process->incoming.size; i++) { 5425 mmap_handler = process->incoming.elts[i].mmap_handler; 5426 5427 if (nxt_slow_path(mmap_handler == NULL)) { 5428 continue; 5429 } 5430 5431 m = mmap_handler->hdr->free_map; 5432 5433 for (mi = 0; mi < MAX_FREE_IDX; mi++) { 5434 if (m[mi] != 0) { 5435 ack = 1; 5436 5437 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", 5438 i, mi, m[mi]); 5439 5440 break; 5441 } 5442 } 5443 } 5444 5445 nxt_thread_mutex_unlock(&process->incoming.mutex); 5446 5447 if (ack) { 5448 nxt_process_broadcast_shm_ack(task, process); 5449 } 5450 } 5451 5452 5453 static void 5454 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5455 { 5456 nxt_fd_t fd; 5457 nxt_port_t *port; 5458 nxt_runtime_t *rt; 5459 nxt_port_mmaps_t *mmaps; 5460 nxt_port_msg_get_mmap_t *get_mmap_msg; 5461 nxt_port_mmap_handler_t *mmap_handler; 5462 5463 rt = task->thread->runtime; 5464 5465 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5466 msg->port_msg.reply_port); 5467 if (nxt_slow_path(port == NULL)) { 5468 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", 5469 msg->port_msg.pid, msg->port_msg.reply_port); 5470 5471 return; 5472 } 5473 5474 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5475 < (int) sizeof(nxt_port_msg_get_mmap_t))) 5476 { 5477 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", 5478 (int) nxt_buf_used_size(msg->buf)); 5479 5480 return; 5481 } 5482 5483 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; 5484 5485 nxt_assert(port->type == NXT_PROCESS_APP); 5486 5487 if (nxt_slow_path(port->app == NULL)) { 5488 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", 5489 port->pid, port->id); 5490 5491 // FIXME 5492 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5493 -1, msg->port_msg.stream, 0, NULL); 5494 5495 return; 5496 } 5497 5498 mmaps = &port->app->outgoing; 5499 nxt_thread_mutex_lock(&mmaps->mutex); 5500 5501 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { 5502 nxt_thread_mutex_unlock(&mmaps->mutex); 5503 5504 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", 5505 (int) get_mmap_msg->id); 5506 5507 // FIXME 5508 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5509 -1, msg->port_msg.stream, 0, NULL); 5510 return; 5511 } 5512 5513 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; 5514 5515 fd = mmap_handler->fd; 5516 5517 nxt_thread_mutex_unlock(&mmaps->mutex); 5518 5519 nxt_debug(task, "get mmap %PI:%d found", 5520 msg->port_msg.pid, (int) get_mmap_msg->id); 5521 5522 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 5523 } 5524 5525 5526 static void 5527 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5528 { 5529 nxt_port_t *port, *reply_port; 5530 nxt_runtime_t *rt; 5531 nxt_port_msg_get_port_t *get_port_msg; 5532 5533 rt = task->thread->runtime; 5534 5535 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5536 msg->port_msg.reply_port); 5537 if (nxt_slow_path(reply_port == NULL)) { 5538 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", 5539 msg->port_msg.pid, msg->port_msg.reply_port); 5540 5541 return; 5542 } 5543 5544 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5545 < (int) sizeof(nxt_port_msg_get_port_t))) 5546 { 5547 nxt_alert(task, "get_port_handler: message buffer too small (%d)", 5548 (int) nxt_buf_used_size(msg->buf)); 5549 5550 return; 5551 } 5552 5553 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; 5554 5555 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); 5556 if (nxt_slow_path(port == NULL)) { 5557 nxt_alert(task, "get_port_handler: port %PI:%d not found", 5558 get_port_msg->pid, get_port_msg->id); 5559 5560 return; 5561 } 5562 5563 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, 5564 get_port_msg->id); 5565 5566 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); 5567 } 5568