1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 #include <nxt_app_queue.h> 19 #include <nxt_port_queue.h> 20 21 #define NXT_SHARED_PORT_ID 0xFFFFu 22 23 typedef struct { 24 nxt_str_t type; 25 uint32_t processes; 26 uint32_t max_processes; 27 uint32_t spare_processes; 28 nxt_msec_t timeout; 29 nxt_msec_t idle_timeout; 30 nxt_conf_value_t *limits_value; 31 nxt_conf_value_t *processes_value; 32 nxt_conf_value_t *targets_value; 33 } nxt_router_app_conf_t; 34 35 36 typedef struct { 37 nxt_str_t pass; 38 nxt_str_t application; 39 } nxt_router_listener_conf_t; 40 41 42 #if (NXT_TLS) 43 44 typedef struct { 45 nxt_str_t name; 46 nxt_socket_conf_t *socket_conf; 47 nxt_router_temp_conf_t *temp_conf; 48 nxt_tls_init_t *tls_init; 49 nxt_bool_t last; 50 51 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 52 } nxt_router_tlssock_t; 53 54 #endif 55 56 57 typedef struct { 58 nxt_str_t *name; 59 nxt_socket_conf_t *socket_conf; 60 nxt_router_temp_conf_t *temp_conf; 61 nxt_bool_t last; 62 } nxt_socket_rpc_t; 63 64 65 typedef struct { 66 nxt_app_t *app; 67 nxt_router_temp_conf_t *temp_conf; 68 uint8_t proto; /* 1 bit */ 69 } nxt_app_rpc_t; 70 71 72 typedef struct { 73 nxt_app_joint_t *app_joint; 74 uint32_t generation; 75 uint8_t proto; /* 1 bit */ 76 } nxt_app_joint_rpc_t; 77 78 79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 80 nxt_mp_t *mp); 81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 82 static void nxt_router_greet_controller(nxt_task_t *task, 83 nxt_port_t *controller_port); 84 85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 86 87 static void nxt_router_new_port_handler(nxt_task_t *task, 88 nxt_port_recv_msg_t *msg); 89 static void nxt_router_conf_data_handler(nxt_task_t *task, 90 nxt_port_recv_msg_t *msg); 91 static void nxt_router_app_restart_handler(nxt_task_t *task, 92 nxt_port_recv_msg_t *msg); 93 static void nxt_router_remove_pid_handler(nxt_task_t *task, 94 nxt_port_recv_msg_t *msg); 95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task, 96 nxt_port_recv_msg_t *msg); 97 98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 100 static void nxt_router_conf_ready(nxt_task_t *task, 101 nxt_router_temp_conf_t *tmcf); 102 static void nxt_router_conf_error(nxt_task_t *task, 103 nxt_router_temp_conf_t *tmcf); 104 static void nxt_router_conf_send(nxt_task_t *task, 105 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 106 107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 108 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 110 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task, 112 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf, 113 nxt_conf_value_t *conf); 114 115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 118 nxt_app_t *app); 119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 120 nxt_str_t *name); 121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 122 int i); 123 124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 125 nxt_port_t *port); 126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 127 nxt_port_t *port); 128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 129 nxt_port_t *port, nxt_fd_t fd); 130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 131 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 132 static void nxt_router_listen_socket_ready(nxt_task_t *task, 133 nxt_port_recv_msg_t *msg, void *data); 134 static void nxt_router_listen_socket_error(nxt_task_t *task, 135 nxt_port_recv_msg_t *msg, void *data); 136 #if (NXT_TLS) 137 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 138 nxt_port_recv_msg_t *msg, void *data); 139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 140 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init, 141 nxt_bool_t last); 142 #endif 143 static void nxt_router_app_rpc_create(nxt_task_t *task, 144 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 145 static void nxt_router_app_prefork_ready(nxt_task_t *task, 146 nxt_port_recv_msg_t *msg, void *data); 147 static void nxt_router_app_prefork_error(nxt_task_t *task, 148 nxt_port_recv_msg_t *msg, void *data); 149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 150 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 152 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 153 154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 155 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 156 const nxt_event_interface_t *interface); 157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 158 nxt_router_engine_conf_t *recf); 159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 160 nxt_router_engine_conf_t *recf); 161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 162 nxt_router_engine_conf_t *recf); 163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 164 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 165 nxt_work_handler_t handler); 166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 167 nxt_router_engine_conf_t *recf); 168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 169 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 170 171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 172 nxt_router_temp_conf_t *tmcf); 173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 174 nxt_event_engine_t *engine); 175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 176 nxt_router_temp_conf_t *tmcf); 177 178 static void nxt_router_engines_post(nxt_router_t *router, 179 nxt_router_temp_conf_t *tmcf); 180 static void nxt_router_engine_post(nxt_event_engine_t *engine, 181 nxt_work_t *jobs); 182 183 static void nxt_router_thread_start(void *data); 184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 185 void *data); 186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 187 void *data); 188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 189 void *data); 190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 191 void *data); 192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 193 void *data); 194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 195 void *data); 196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 197 void *data); 198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 199 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 200 static void nxt_router_listen_socket_release(nxt_task_t *task, 201 nxt_socket_conf_t *skcf); 202 203 static void nxt_router_access_log_writer(nxt_task_t *task, 204 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 206 struct tm *tm, size_t size, const char *format); 207 static void nxt_router_access_log_open(nxt_task_t *task, 208 nxt_router_temp_conf_t *tmcf); 209 static void nxt_router_access_log_ready(nxt_task_t *task, 210 nxt_port_recv_msg_t *msg, void *data); 211 static void nxt_router_access_log_error(nxt_task_t *task, 212 nxt_port_recv_msg_t *msg, void *data); 213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock, 214 nxt_router_access_log_t *access_log); 215 static void nxt_router_access_log_release(nxt_task_t *task, 216 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 218 void *data); 219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 220 nxt_port_recv_msg_t *msg, void *data); 221 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 222 nxt_port_recv_msg_t *msg, void *data); 223 224 static void nxt_router_app_port_ready(nxt_task_t *task, 225 nxt_port_recv_msg_t *msg, void *data); 226 static void nxt_router_app_port_error(nxt_task_t *task, 227 nxt_port_recv_msg_t *msg, void *data); 228 229 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 230 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 231 232 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, 233 nxt_port_t *port, nxt_apr_action_t action); 234 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 235 nxt_request_rpc_data_t *req_rpc_data); 236 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 237 void *data); 238 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 239 void *data); 240 241 static void nxt_router_app_prepare_request(nxt_task_t *task, 242 nxt_request_rpc_data_t *req_rpc_data); 243 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 244 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 245 246 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 247 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 248 void *data); 249 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 250 void *data); 251 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 252 void *data); 253 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 254 255 static const nxt_http_request_state_t nxt_http_request_send_state; 256 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 257 258 static void nxt_router_app_joint_use(nxt_task_t *task, 259 nxt_app_joint_t *app_joint, int i); 260 261 static void nxt_router_http_request_release_post(nxt_task_t *task, 262 nxt_http_request_t *r); 263 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 264 void *data); 265 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 266 static void nxt_router_get_port_handler(nxt_task_t *task, 267 nxt_port_recv_msg_t *msg); 268 static void nxt_router_get_mmap_handler(nxt_task_t *task, 269 nxt_port_recv_msg_t *msg); 270 271 extern const nxt_http_request_state_t nxt_http_websocket; 272 273 static nxt_router_t *nxt_router; 274 275 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 276 static const nxt_str_t empty_prefix = nxt_string(""); 277 278 static const nxt_str_t *nxt_app_msg_prefix[] = { 279 &empty_prefix, 280 &empty_prefix, 281 &http_prefix, 282 &http_prefix, 283 &http_prefix, 284 &empty_prefix, 285 }; 286 287 288 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 289 .quit = nxt_signal_quit_handler, 290 .new_port = nxt_router_new_port_handler, 291 .get_port = nxt_router_get_port_handler, 292 .change_file = nxt_port_change_log_file_handler, 293 .mmap = nxt_port_mmap_handler, 294 .get_mmap = nxt_router_get_mmap_handler, 295 .data = nxt_router_conf_data_handler, 296 .app_restart = nxt_router_app_restart_handler, 297 .remove_pid = nxt_router_remove_pid_handler, 298 .access_log = nxt_router_access_log_reopen_handler, 299 .rpc_ready = nxt_port_rpc_handler, 300 .rpc_error = nxt_port_rpc_handler, 301 .oosm = nxt_router_oosm_handler, 302 }; 303 304 305 const nxt_process_init_t nxt_router_process = { 306 .name = "router", 307 .type = NXT_PROCESS_ROUTER, 308 .prefork = nxt_router_prefork, 309 .restart = 1, 310 .setup = nxt_process_core_setup, 311 .start = nxt_router_start, 312 .port_handlers = &nxt_router_process_port_handlers, 313 .signals = nxt_process_signals, 314 }; 315 316 317 /* Queues of nxt_socket_conf_t */ 318 nxt_queue_t creating_sockets; 319 nxt_queue_t pending_sockets; 320 nxt_queue_t updating_sockets; 321 nxt_queue_t keeping_sockets; 322 nxt_queue_t deleting_sockets; 323 324 325 static nxt_int_t 326 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 327 { 328 nxt_runtime_stop_app_processes(task, task->thread->runtime); 329 330 return NXT_OK; 331 } 332 333 334 static nxt_int_t 335 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 336 { 337 nxt_int_t ret; 338 nxt_port_t *controller_port; 339 nxt_router_t *router; 340 nxt_runtime_t *rt; 341 342 rt = task->thread->runtime; 343 344 nxt_log(task, NXT_LOG_INFO, "router started"); 345 346 #if (NXT_TLS) 347 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 348 if (nxt_slow_path(rt->tls == NULL)) { 349 return NXT_ERROR; 350 } 351 352 ret = rt->tls->library_init(task); 353 if (nxt_slow_path(ret != NXT_OK)) { 354 return ret; 355 } 356 #endif 357 358 ret = nxt_http_init(task); 359 if (nxt_slow_path(ret != NXT_OK)) { 360 return ret; 361 } 362 363 router = nxt_zalloc(sizeof(nxt_router_t)); 364 if (nxt_slow_path(router == NULL)) { 365 return NXT_ERROR; 366 } 367 368 nxt_queue_init(&router->engines); 369 nxt_queue_init(&router->sockets); 370 nxt_queue_init(&router->apps); 371 372 nxt_router = router; 373 374 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 375 if (controller_port != NULL) { 376 nxt_router_greet_controller(task, controller_port); 377 } 378 379 return NXT_OK; 380 } 381 382 383 static void 384 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 385 { 386 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 387 -1, 0, 0, NULL); 388 } 389 390 391 static void 392 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 393 void *data) 394 { 395 size_t size; 396 uint32_t stream; 397 nxt_fd_t port_fd, queue_fd; 398 nxt_int_t ret; 399 nxt_app_t *app; 400 nxt_buf_t *b; 401 nxt_port_t *dport; 402 nxt_runtime_t *rt; 403 nxt_app_joint_rpc_t *app_joint_rpc; 404 405 app = data; 406 407 nxt_thread_mutex_lock(&app->mutex); 408 409 dport = app->proto_port; 410 411 nxt_thread_mutex_unlock(&app->mutex); 412 413 if (dport != NULL) { 414 nxt_debug(task, "app '%V' %p start process", &app->name, app); 415 416 b = NULL; 417 port_fd = -1; 418 queue_fd = -1; 419 420 } else { 421 if (app->proto_port_requests > 0) { 422 nxt_debug(task, "app '%V' %p wait for prototype process", 423 &app->name, app); 424 425 app->proto_port_requests++; 426 427 goto skip; 428 } 429 430 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app); 431 432 rt = task->thread->runtime; 433 dport = rt->port_by_type[NXT_PROCESS_MAIN]; 434 435 size = app->name.length + 1 + app->conf.length; 436 437 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0); 438 if (nxt_slow_path(b == NULL)) { 439 goto failed; 440 } 441 442 nxt_buf_cpystr(b, &app->name); 443 *b->mem.free++ = '\0'; 444 nxt_buf_cpystr(b, &app->conf); 445 446 port_fd = app->shared_port->pair[0]; 447 queue_fd = app->shared_port->queue_fd; 448 } 449 450 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, 451 nxt_router_app_port_ready, 452 nxt_router_app_port_error, 453 sizeof(nxt_app_joint_rpc_t)); 454 if (nxt_slow_path(app_joint_rpc == NULL)) { 455 goto failed; 456 } 457 458 stream = nxt_port_rpc_ex_stream(app_joint_rpc); 459 460 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, 461 port_fd, queue_fd, stream, port->id, b); 462 if (nxt_slow_path(ret != NXT_OK)) { 463 nxt_port_rpc_cancel(task, port, stream); 464 465 goto failed; 466 } 467 468 app_joint_rpc->app_joint = app->joint; 469 app_joint_rpc->generation = app->generation; 470 app_joint_rpc->proto = (b != NULL); 471 472 if (b != NULL) { 473 app->proto_port_requests++; 474 475 b = NULL; 476 } 477 478 nxt_router_app_joint_use(task, app->joint, 1); 479 480 failed: 481 482 if (b != NULL) { 483 nxt_mp_free(b->data, b); 484 } 485 486 skip: 487 488 nxt_router_app_use(task, app, -1); 489 } 490 491 492 static void 493 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 494 { 495 app_joint->use_count += i; 496 497 if (app_joint->use_count == 0) { 498 nxt_assert(app_joint->app == NULL); 499 500 nxt_free(app_joint); 501 } 502 } 503 504 505 static nxt_int_t 506 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 507 { 508 nxt_int_t res; 509 nxt_port_t *router_port; 510 nxt_runtime_t *rt; 511 512 nxt_debug(task, "app '%V' start process", &app->name); 513 514 rt = task->thread->runtime; 515 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 516 517 nxt_router_app_use(task, app, 1); 518 519 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 520 app); 521 522 if (res == NXT_OK) { 523 return res; 524 } 525 526 nxt_thread_mutex_lock(&app->mutex); 527 528 app->pending_processes--; 529 530 nxt_thread_mutex_unlock(&app->mutex); 531 532 nxt_router_app_use(task, app, -1); 533 534 return NXT_ERROR; 535 } 536 537 538 nxt_inline nxt_bool_t 539 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 540 { 541 nxt_buf_t *b, *next; 542 nxt_bool_t cancelled; 543 nxt_port_t *app_port; 544 nxt_msg_info_t *msg_info; 545 546 msg_info = &req_rpc_data->msg_info; 547 548 if (msg_info->buf == NULL) { 549 return 0; 550 } 551 552 app_port = req_rpc_data->app_port; 553 554 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) { 555 cancelled = nxt_app_queue_cancel(app_port->queue, 556 msg_info->tracking_cookie, 557 req_rpc_data->stream); 558 559 if (cancelled) { 560 nxt_debug(task, "stream #%uD: cancelled by router", 561 req_rpc_data->stream); 562 } 563 564 } else { 565 cancelled = 0; 566 } 567 568 for (b = msg_info->buf; b != NULL; b = next) { 569 next = b->next; 570 b->next = NULL; 571 572 if (b->is_port_mmap_sent) { 573 b->is_port_mmap_sent = cancelled == 0; 574 } 575 576 b->completion_handler(task, b, b->parent); 577 } 578 579 msg_info->buf = NULL; 580 581 return cancelled; 582 } 583 584 585 nxt_inline nxt_bool_t 586 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 587 { 588 if (lnk->next != NULL) { 589 nxt_queue_remove(lnk); 590 591 lnk->next = NULL; 592 593 return 1; 594 } 595 596 return 0; 597 } 598 599 600 nxt_inline void 601 nxt_request_rpc_data_unlink(nxt_task_t *task, 602 nxt_request_rpc_data_t *req_rpc_data) 603 { 604 nxt_app_t *app; 605 nxt_bool_t unlinked; 606 nxt_http_request_t *r; 607 608 nxt_router_msg_cancel(task, req_rpc_data); 609 610 app = req_rpc_data->app; 611 612 if (req_rpc_data->app_port != NULL) { 613 nxt_router_app_port_release(task, app, req_rpc_data->app_port, 614 req_rpc_data->apr_action); 615 616 req_rpc_data->app_port = NULL; 617 } 618 619 r = req_rpc_data->request; 620 621 if (r != NULL) { 622 r->timer_data = NULL; 623 624 nxt_router_http_request_release_post(task, r); 625 626 r->req_rpc_data = NULL; 627 req_rpc_data->request = NULL; 628 629 if (app != NULL) { 630 unlinked = 0; 631 632 nxt_thread_mutex_lock(&app->mutex); 633 634 if (r->app_link.next != NULL) { 635 nxt_queue_remove(&r->app_link); 636 r->app_link.next = NULL; 637 638 unlinked = 1; 639 } 640 641 nxt_thread_mutex_unlock(&app->mutex); 642 643 if (unlinked) { 644 nxt_mp_release(r->mem_pool); 645 } 646 } 647 } 648 649 if (app != NULL) { 650 nxt_router_app_use(task, app, -1); 651 652 req_rpc_data->app = NULL; 653 } 654 655 if (req_rpc_data->msg_info.body_fd != -1) { 656 nxt_fd_close(req_rpc_data->msg_info.body_fd); 657 658 req_rpc_data->msg_info.body_fd = -1; 659 } 660 661 if (req_rpc_data->rpc_cancel) { 662 req_rpc_data->rpc_cancel = 0; 663 664 nxt_port_rpc_cancel(task, task->thread->engine->port, 665 req_rpc_data->stream); 666 } 667 } 668 669 670 static void 671 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 672 { 673 nxt_int_t res; 674 nxt_app_t *app; 675 nxt_port_t *port, *main_app_port; 676 nxt_runtime_t *rt; 677 678 nxt_port_new_port_handler(task, msg); 679 680 port = msg->u.new_port; 681 682 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 683 nxt_router_greet_controller(task, msg->u.new_port); 684 } 685 686 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) { 687 nxt_port_rpc_handler(task, msg); 688 689 return; 690 } 691 692 if (port == NULL || port->type != NXT_PROCESS_APP) { 693 694 if (msg->port_msg.stream == 0) { 695 return; 696 } 697 698 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 699 700 } else { 701 if (msg->fd[1] != -1) { 702 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 703 if (nxt_slow_path(res != NXT_OK)) { 704 return; 705 } 706 707 nxt_fd_close(msg->fd[1]); 708 msg->fd[1] = -1; 709 } 710 } 711 712 if (msg->port_msg.stream != 0) { 713 nxt_port_rpc_handler(task, msg); 714 return; 715 } 716 717 nxt_debug(task, "new port id %d (%d)", port->id, port->type); 718 719 /* 720 * Port with "id == 0" is application 'main' port and it always 721 * should come with non-zero stream. 722 */ 723 nxt_assert(port->id != 0); 724 725 /* Find 'main' app port and get app reference. */ 726 rt = task->thread->runtime; 727 728 /* 729 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 730 * sent to main port (with id == 0) and processed in main thread. 731 */ 732 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 733 nxt_assert(main_app_port != NULL); 734 735 app = main_app_port->app; 736 737 if (nxt_fast_path(app != NULL)) { 738 nxt_thread_mutex_lock(&app->mutex); 739 740 /* TODO here should be find-and-add code because there can be 741 port waiters in port_hash */ 742 nxt_port_hash_add(&app->port_hash, port); 743 app->port_hash_count++; 744 745 nxt_thread_mutex_unlock(&app->mutex); 746 747 port->app = app; 748 } 749 750 port->main_app_port = main_app_port; 751 752 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 753 } 754 755 756 static void 757 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 758 { 759 void *p; 760 size_t size; 761 nxt_int_t ret; 762 nxt_port_t *port; 763 nxt_router_temp_conf_t *tmcf; 764 765 port = nxt_runtime_port_find(task->thread->runtime, 766 msg->port_msg.pid, 767 msg->port_msg.reply_port); 768 if (nxt_slow_path(port == NULL)) { 769 nxt_alert(task, "conf_data_handler: reply port not found"); 770 return; 771 } 772 773 p = MAP_FAILED; 774 775 /* 776 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 777 * initialized in 'cleanup' section. 778 */ 779 size = 0; 780 781 tmcf = nxt_router_temp_conf(task); 782 if (nxt_slow_path(tmcf == NULL)) { 783 goto fail; 784 } 785 786 if (nxt_slow_path(msg->fd[0] == -1)) { 787 nxt_alert(task, "conf_data_handler: invalid shm fd"); 788 goto fail; 789 } 790 791 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 792 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 793 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 794 goto fail; 795 } 796 797 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 798 799 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 800 801 nxt_fd_close(msg->fd[0]); 802 msg->fd[0] = -1; 803 804 if (nxt_slow_path(p == MAP_FAILED)) { 805 goto fail; 806 } 807 808 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 809 810 tmcf->router_conf->router = nxt_router; 811 tmcf->stream = msg->port_msg.stream; 812 tmcf->port = port; 813 814 nxt_port_use(task, tmcf->port, 1); 815 816 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 817 818 if (nxt_fast_path(ret == NXT_OK)) { 819 nxt_router_conf_apply(task, tmcf, NULL); 820 821 } else { 822 nxt_router_conf_error(task, tmcf); 823 } 824 825 goto cleanup; 826 827 fail: 828 829 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 830 msg->port_msg.stream, 0, NULL); 831 832 if (tmcf != NULL) { 833 nxt_mp_release(tmcf->mem_pool); 834 } 835 836 cleanup: 837 838 if (p != MAP_FAILED) { 839 nxt_mem_munmap(p, size); 840 } 841 842 if (msg->fd[0] != -1) { 843 nxt_fd_close(msg->fd[0]); 844 msg->fd[0] = -1; 845 } 846 } 847 848 849 static void 850 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 851 { 852 nxt_app_t *app; 853 nxt_int_t ret; 854 nxt_str_t app_name; 855 nxt_port_t *reply_port, *shared_port, *old_shared_port; 856 nxt_port_t *proto_port; 857 nxt_port_msg_type_t reply; 858 859 reply_port = nxt_runtime_port_find(task->thread->runtime, 860 msg->port_msg.pid, 861 msg->port_msg.reply_port); 862 if (nxt_slow_path(reply_port == NULL)) { 863 nxt_alert(task, "app_restart_handler: reply port not found"); 864 return; 865 } 866 867 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem); 868 app_name.start = msg->buf->mem.pos; 869 870 nxt_debug(task, "app_restart_handler: %V", &app_name); 871 872 app = nxt_router_app_find(&nxt_router->apps, &app_name); 873 874 if (nxt_fast_path(app != NULL)) { 875 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 876 NXT_PROCESS_APP); 877 if (nxt_slow_path(shared_port == NULL)) { 878 goto fail; 879 } 880 881 ret = nxt_port_socket_init(task, shared_port, 0); 882 if (nxt_slow_path(ret != NXT_OK)) { 883 nxt_port_use(task, shared_port, -1); 884 goto fail; 885 } 886 887 ret = nxt_router_app_queue_init(task, shared_port); 888 if (nxt_slow_path(ret != NXT_OK)) { 889 nxt_port_write_close(shared_port); 890 nxt_port_read_close(shared_port); 891 nxt_port_use(task, shared_port, -1); 892 goto fail; 893 } 894 895 nxt_port_write_enable(task, shared_port); 896 897 nxt_thread_mutex_lock(&app->mutex); 898 899 proto_port = app->proto_port; 900 901 if (proto_port != NULL) { 902 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, 903 proto_port->pid); 904 905 app->proto_port = NULL; 906 proto_port->app = NULL; 907 } 908 909 app->generation++; 910 911 shared_port->app = app; 912 913 old_shared_port = app->shared_port; 914 old_shared_port->app = NULL; 915 916 app->shared_port = shared_port; 917 918 nxt_thread_mutex_unlock(&app->mutex); 919 920 nxt_port_close(task, old_shared_port); 921 nxt_port_use(task, old_shared_port, -1); 922 923 if (proto_port != NULL) { 924 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, 925 -1, 0, 0, NULL); 926 927 nxt_port_close(task, proto_port); 928 929 nxt_port_use(task, proto_port, -1); 930 } 931 932 reply = NXT_PORT_MSG_RPC_READY_LAST; 933 934 } else { 935 936 fail: 937 938 reply = NXT_PORT_MSG_RPC_ERROR; 939 } 940 941 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream, 942 0, NULL); 943 } 944 945 946 static void 947 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 948 void *data) 949 { 950 union { 951 nxt_pid_t removed_pid; 952 void *data; 953 } u; 954 955 u.data = data; 956 957 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 958 } 959 960 961 static void 962 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 963 { 964 nxt_event_engine_t *engine; 965 966 nxt_port_remove_pid_handler(task, msg); 967 968 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 969 { 970 if (nxt_fast_path(engine->port != NULL)) { 971 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 972 msg->u.data); 973 } 974 } 975 nxt_queue_loop; 976 977 if (msg->port_msg.stream == 0) { 978 return; 979 } 980 981 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 982 983 nxt_port_rpc_handler(task, msg); 984 } 985 986 987 static nxt_router_temp_conf_t * 988 nxt_router_temp_conf(nxt_task_t *task) 989 { 990 nxt_mp_t *mp, *tmp; 991 nxt_router_conf_t *rtcf; 992 nxt_router_temp_conf_t *tmcf; 993 994 mp = nxt_mp_create(1024, 128, 256, 32); 995 if (nxt_slow_path(mp == NULL)) { 996 return NULL; 997 } 998 999 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 1000 if (nxt_slow_path(rtcf == NULL)) { 1001 goto fail; 1002 } 1003 1004 rtcf->mem_pool = mp; 1005 1006 tmp = nxt_mp_create(1024, 128, 256, 32); 1007 if (nxt_slow_path(tmp == NULL)) { 1008 goto fail; 1009 } 1010 1011 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 1012 if (nxt_slow_path(tmcf == NULL)) { 1013 goto temp_fail; 1014 } 1015 1016 tmcf->mem_pool = tmp; 1017 tmcf->router_conf = rtcf; 1018 tmcf->count = 1; 1019 tmcf->engine = task->thread->engine; 1020 1021 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 1022 sizeof(nxt_router_engine_conf_t)); 1023 if (nxt_slow_path(tmcf->engines == NULL)) { 1024 goto temp_fail; 1025 } 1026 1027 nxt_queue_init(&creating_sockets); 1028 nxt_queue_init(&pending_sockets); 1029 nxt_queue_init(&updating_sockets); 1030 nxt_queue_init(&keeping_sockets); 1031 nxt_queue_init(&deleting_sockets); 1032 1033 #if (NXT_TLS) 1034 nxt_queue_init(&tmcf->tls); 1035 #endif 1036 1037 nxt_queue_init(&tmcf->apps); 1038 nxt_queue_init(&tmcf->previous); 1039 1040 return tmcf; 1041 1042 temp_fail: 1043 1044 nxt_mp_destroy(tmp); 1045 1046 fail: 1047 1048 nxt_mp_destroy(mp); 1049 1050 return NULL; 1051 } 1052 1053 1054 nxt_inline nxt_bool_t 1055 nxt_router_app_can_start(nxt_app_t *app) 1056 { 1057 return app->processes + app->pending_processes < app->max_processes 1058 && app->pending_processes < app->max_pending_processes; 1059 } 1060 1061 1062 nxt_inline nxt_bool_t 1063 nxt_router_app_need_start(nxt_app_t *app) 1064 { 1065 return (app->active_requests 1066 > app->port_hash_count + app->pending_processes) 1067 || (app->spare_processes 1068 > app->idle_processes + app->pending_processes); 1069 } 1070 1071 1072 static void 1073 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1074 { 1075 nxt_int_t ret; 1076 nxt_app_t *app; 1077 nxt_router_t *router; 1078 nxt_runtime_t *rt; 1079 nxt_queue_link_t *qlk; 1080 nxt_socket_conf_t *skcf; 1081 nxt_router_conf_t *rtcf; 1082 nxt_router_temp_conf_t *tmcf; 1083 const nxt_event_interface_t *interface; 1084 #if (NXT_TLS) 1085 nxt_router_tlssock_t *tls; 1086 #endif 1087 1088 tmcf = obj; 1089 1090 qlk = nxt_queue_first(&pending_sockets); 1091 1092 if (qlk != nxt_queue_tail(&pending_sockets)) { 1093 nxt_queue_remove(qlk); 1094 nxt_queue_insert_tail(&creating_sockets, qlk); 1095 1096 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1097 1098 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1099 1100 return; 1101 } 1102 1103 #if (NXT_TLS) 1104 qlk = nxt_queue_last(&tmcf->tls); 1105 1106 if (qlk != nxt_queue_head(&tmcf->tls)) { 1107 nxt_queue_remove(qlk); 1108 1109 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1110 1111 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 1112 nxt_router_tls_rpc_handler, tls); 1113 return; 1114 } 1115 #endif 1116 1117 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1118 1119 if (nxt_router_app_need_start(app)) { 1120 nxt_router_app_rpc_create(task, tmcf, app); 1121 return; 1122 } 1123 1124 } nxt_queue_loop; 1125 1126 rtcf = tmcf->router_conf; 1127 1128 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1129 nxt_router_access_log_open(task, tmcf); 1130 return; 1131 } 1132 1133 rt = task->thread->runtime; 1134 1135 interface = nxt_service_get(rt->services, "engine", NULL); 1136 1137 router = rtcf->router; 1138 1139 ret = nxt_router_engines_create(task, router, tmcf, interface); 1140 if (nxt_slow_path(ret != NXT_OK)) { 1141 goto fail; 1142 } 1143 1144 ret = nxt_router_threads_create(task, rt, tmcf); 1145 if (nxt_slow_path(ret != NXT_OK)) { 1146 goto fail; 1147 } 1148 1149 nxt_router_apps_sort(task, router, tmcf); 1150 1151 nxt_router_apps_hash_use(task, rtcf, 1); 1152 1153 nxt_router_engines_post(router, tmcf); 1154 1155 nxt_queue_add(&router->sockets, &updating_sockets); 1156 nxt_queue_add(&router->sockets, &creating_sockets); 1157 1158 if (router->access_log != rtcf->access_log) { 1159 nxt_router_access_log_use(&router->lock, rtcf->access_log); 1160 1161 nxt_router_access_log_release(task, &router->lock, router->access_log); 1162 1163 router->access_log = rtcf->access_log; 1164 } 1165 1166 nxt_router_conf_ready(task, tmcf); 1167 1168 return; 1169 1170 fail: 1171 1172 nxt_router_conf_error(task, tmcf); 1173 1174 return; 1175 } 1176 1177 1178 static void 1179 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1180 { 1181 nxt_joint_job_t *job; 1182 1183 job = obj; 1184 1185 nxt_router_conf_ready(task, job->tmcf); 1186 } 1187 1188 1189 static void 1190 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1191 { 1192 uint32_t count; 1193 nxt_router_conf_t *rtcf; 1194 nxt_thread_spinlock_t *lock; 1195 1196 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1197 1198 if (--tmcf->count > 0) { 1199 return; 1200 } 1201 1202 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1203 1204 rtcf = tmcf->router_conf; 1205 1206 lock = &rtcf->router->lock; 1207 1208 nxt_thread_spin_lock(lock); 1209 1210 count = rtcf->count; 1211 1212 nxt_thread_spin_unlock(lock); 1213 1214 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1215 1216 if (count == 0) { 1217 nxt_router_apps_hash_use(task, rtcf, -1); 1218 1219 nxt_router_access_log_release(task, lock, rtcf->access_log); 1220 1221 nxt_mp_destroy(rtcf->mem_pool); 1222 } 1223 1224 nxt_mp_release(tmcf->mem_pool); 1225 } 1226 1227 1228 static void 1229 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1230 { 1231 nxt_app_t *app; 1232 nxt_socket_t s; 1233 nxt_router_t *router; 1234 nxt_queue_link_t *qlk; 1235 nxt_socket_conf_t *skcf; 1236 nxt_router_conf_t *rtcf; 1237 1238 nxt_alert(task, "failed to apply new conf"); 1239 1240 for (qlk = nxt_queue_first(&creating_sockets); 1241 qlk != nxt_queue_tail(&creating_sockets); 1242 qlk = nxt_queue_next(qlk)) 1243 { 1244 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1245 s = skcf->listen->socket; 1246 1247 if (s != -1) { 1248 nxt_socket_close(task, s); 1249 } 1250 1251 nxt_free(skcf->listen); 1252 } 1253 1254 rtcf = tmcf->router_conf; 1255 1256 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1257 1258 nxt_router_app_unlink(task, app); 1259 1260 } nxt_queue_loop; 1261 1262 router = rtcf->router; 1263 1264 nxt_queue_add(&router->sockets, &keeping_sockets); 1265 nxt_queue_add(&router->sockets, &deleting_sockets); 1266 1267 nxt_queue_add(&router->apps, &tmcf->previous); 1268 1269 // TODO: new engines and threads 1270 1271 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1272 1273 nxt_mp_destroy(rtcf->mem_pool); 1274 1275 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1276 1277 nxt_mp_release(tmcf->mem_pool); 1278 } 1279 1280 1281 static void 1282 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1283 nxt_port_msg_type_t type) 1284 { 1285 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1286 1287 nxt_port_use(task, tmcf->port, -1); 1288 1289 tmcf->port = NULL; 1290 } 1291 1292 1293 static nxt_conf_map_t nxt_router_conf[] = { 1294 { 1295 nxt_string("listeners_threads"), 1296 NXT_CONF_MAP_INT32, 1297 offsetof(nxt_router_conf_t, threads), 1298 }, 1299 }; 1300 1301 1302 static nxt_conf_map_t nxt_router_app_conf[] = { 1303 { 1304 nxt_string("type"), 1305 NXT_CONF_MAP_STR, 1306 offsetof(nxt_router_app_conf_t, type), 1307 }, 1308 1309 { 1310 nxt_string("limits"), 1311 NXT_CONF_MAP_PTR, 1312 offsetof(nxt_router_app_conf_t, limits_value), 1313 }, 1314 1315 { 1316 nxt_string("processes"), 1317 NXT_CONF_MAP_INT32, 1318 offsetof(nxt_router_app_conf_t, processes), 1319 }, 1320 1321 { 1322 nxt_string("processes"), 1323 NXT_CONF_MAP_PTR, 1324 offsetof(nxt_router_app_conf_t, processes_value), 1325 }, 1326 1327 { 1328 nxt_string("targets"), 1329 NXT_CONF_MAP_PTR, 1330 offsetof(nxt_router_app_conf_t, targets_value), 1331 }, 1332 }; 1333 1334 1335 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1336 { 1337 nxt_string("timeout"), 1338 NXT_CONF_MAP_MSEC, 1339 offsetof(nxt_router_app_conf_t, timeout), 1340 }, 1341 }; 1342 1343 1344 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1345 { 1346 nxt_string("spare"), 1347 NXT_CONF_MAP_INT32, 1348 offsetof(nxt_router_app_conf_t, spare_processes), 1349 }, 1350 1351 { 1352 nxt_string("max"), 1353 NXT_CONF_MAP_INT32, 1354 offsetof(nxt_router_app_conf_t, max_processes), 1355 }, 1356 1357 { 1358 nxt_string("idle_timeout"), 1359 NXT_CONF_MAP_MSEC, 1360 offsetof(nxt_router_app_conf_t, idle_timeout), 1361 }, 1362 }; 1363 1364 1365 static nxt_conf_map_t nxt_router_listener_conf[] = { 1366 { 1367 nxt_string("pass"), 1368 NXT_CONF_MAP_STR_COPY, 1369 offsetof(nxt_router_listener_conf_t, pass), 1370 }, 1371 1372 { 1373 nxt_string("application"), 1374 NXT_CONF_MAP_STR_COPY, 1375 offsetof(nxt_router_listener_conf_t, application), 1376 }, 1377 }; 1378 1379 1380 static nxt_conf_map_t nxt_router_http_conf[] = { 1381 { 1382 nxt_string("header_buffer_size"), 1383 NXT_CONF_MAP_SIZE, 1384 offsetof(nxt_socket_conf_t, header_buffer_size), 1385 }, 1386 1387 { 1388 nxt_string("large_header_buffer_size"), 1389 NXT_CONF_MAP_SIZE, 1390 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1391 }, 1392 1393 { 1394 nxt_string("large_header_buffers"), 1395 NXT_CONF_MAP_SIZE, 1396 offsetof(nxt_socket_conf_t, large_header_buffers), 1397 }, 1398 1399 { 1400 nxt_string("body_buffer_size"), 1401 NXT_CONF_MAP_SIZE, 1402 offsetof(nxt_socket_conf_t, body_buffer_size), 1403 }, 1404 1405 { 1406 nxt_string("max_body_size"), 1407 NXT_CONF_MAP_SIZE, 1408 offsetof(nxt_socket_conf_t, max_body_size), 1409 }, 1410 1411 { 1412 nxt_string("idle_timeout"), 1413 NXT_CONF_MAP_MSEC, 1414 offsetof(nxt_socket_conf_t, idle_timeout), 1415 }, 1416 1417 { 1418 nxt_string("header_read_timeout"), 1419 NXT_CONF_MAP_MSEC, 1420 offsetof(nxt_socket_conf_t, header_read_timeout), 1421 }, 1422 1423 { 1424 nxt_string("body_read_timeout"), 1425 NXT_CONF_MAP_MSEC, 1426 offsetof(nxt_socket_conf_t, body_read_timeout), 1427 }, 1428 1429 { 1430 nxt_string("send_timeout"), 1431 NXT_CONF_MAP_MSEC, 1432 offsetof(nxt_socket_conf_t, send_timeout), 1433 }, 1434 1435 { 1436 nxt_string("body_temp_path"), 1437 NXT_CONF_MAP_STR, 1438 offsetof(nxt_socket_conf_t, body_temp_path), 1439 }, 1440 1441 { 1442 nxt_string("discard_unsafe_fields"), 1443 NXT_CONF_MAP_INT8, 1444 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1445 }, 1446 }; 1447 1448 1449 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1450 { 1451 nxt_string("max_frame_size"), 1452 NXT_CONF_MAP_SIZE, 1453 offsetof(nxt_websocket_conf_t, max_frame_size), 1454 }, 1455 1456 { 1457 nxt_string("read_timeout"), 1458 NXT_CONF_MAP_MSEC, 1459 offsetof(nxt_websocket_conf_t, read_timeout), 1460 }, 1461 1462 { 1463 nxt_string("keepalive_interval"), 1464 NXT_CONF_MAP_MSEC, 1465 offsetof(nxt_websocket_conf_t, keepalive_interval), 1466 }, 1467 1468 }; 1469 1470 1471 static nxt_int_t 1472 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1473 u_char *start, u_char *end) 1474 { 1475 u_char *p; 1476 size_t size; 1477 nxt_mp_t *mp, *app_mp; 1478 uint32_t next, next_target; 1479 nxt_int_t ret; 1480 nxt_str_t name, path, target; 1481 nxt_app_t *app, *prev; 1482 nxt_str_t *t, *s, *targets; 1483 nxt_uint_t n, i; 1484 nxt_port_t *port; 1485 nxt_router_t *router; 1486 nxt_app_joint_t *app_joint; 1487 #if (NXT_TLS) 1488 nxt_tls_init_t *tls_init; 1489 nxt_conf_value_t *certificate; 1490 #endif 1491 nxt_conf_value_t *root, *conf, *http, *value, *websocket; 1492 nxt_conf_value_t *applications, *application; 1493 nxt_conf_value_t *listeners, *listener; 1494 nxt_socket_conf_t *skcf; 1495 nxt_router_conf_t *rtcf; 1496 nxt_http_routes_t *routes; 1497 nxt_event_engine_t *engine; 1498 nxt_app_lang_module_t *lang; 1499 nxt_router_app_conf_t apcf; 1500 nxt_router_access_log_t *access_log; 1501 nxt_router_listener_conf_t lscf; 1502 1503 static nxt_str_t http_path = nxt_string("/settings/http"); 1504 static nxt_str_t applications_path = nxt_string("/applications"); 1505 static nxt_str_t listeners_path = nxt_string("/listeners"); 1506 static nxt_str_t routes_path = nxt_string("/routes"); 1507 static nxt_str_t access_log_path = nxt_string("/access_log"); 1508 #if (NXT_TLS) 1509 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1510 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands"); 1511 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size"); 1512 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout"); 1513 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets"); 1514 #endif 1515 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1516 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1517 static nxt_str_t client_ip_path = nxt_string("/client_ip"); 1518 1519 root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1520 if (root == NULL) { 1521 nxt_alert(task, "configuration parsing error"); 1522 return NXT_ERROR; 1523 } 1524 1525 rtcf = tmcf->router_conf; 1526 mp = rtcf->mem_pool; 1527 1528 ret = nxt_conf_map_object(mp, root, nxt_router_conf, 1529 nxt_nitems(nxt_router_conf), rtcf); 1530 if (ret != NXT_OK) { 1531 nxt_alert(task, "root map error"); 1532 return NXT_ERROR; 1533 } 1534 1535 if (rtcf->threads == 0) { 1536 rtcf->threads = nxt_ncpu; 1537 } 1538 1539 conf = nxt_conf_get_path(root, &static_path); 1540 1541 ret = nxt_router_conf_process_static(task, rtcf, conf); 1542 if (nxt_slow_path(ret != NXT_OK)) { 1543 return NXT_ERROR; 1544 } 1545 1546 router = rtcf->router; 1547 1548 applications = nxt_conf_get_path(root, &applications_path); 1549 1550 if (applications != NULL) { 1551 next = 0; 1552 1553 for ( ;; ) { 1554 application = nxt_conf_next_object_member(applications, 1555 &name, &next); 1556 if (application == NULL) { 1557 break; 1558 } 1559 1560 nxt_debug(task, "application \"%V\"", &name); 1561 1562 size = nxt_conf_json_length(application, NULL); 1563 1564 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1565 if (nxt_slow_path(app_mp == NULL)) { 1566 goto fail; 1567 } 1568 1569 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1570 if (app == NULL) { 1571 goto app_fail; 1572 } 1573 1574 nxt_memzero(app, sizeof(nxt_app_t)); 1575 1576 app->mem_pool = app_mp; 1577 1578 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1579 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1580 + name.length); 1581 1582 p = nxt_conf_json_print(app->conf.start, application, NULL); 1583 app->conf.length = p - app->conf.start; 1584 1585 nxt_assert(app->conf.length <= size); 1586 1587 nxt_debug(task, "application conf \"%V\"", &app->conf); 1588 1589 prev = nxt_router_app_find(&router->apps, &name); 1590 1591 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1592 nxt_mp_destroy(app_mp); 1593 1594 nxt_queue_remove(&prev->link); 1595 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1596 1597 ret = nxt_router_apps_hash_add(rtcf, prev); 1598 if (nxt_slow_path(ret != NXT_OK)) { 1599 goto fail; 1600 } 1601 1602 continue; 1603 } 1604 1605 apcf.processes = 1; 1606 apcf.max_processes = 1; 1607 apcf.spare_processes = 0; 1608 apcf.timeout = 0; 1609 apcf.idle_timeout = 15000; 1610 apcf.limits_value = NULL; 1611 apcf.processes_value = NULL; 1612 apcf.targets_value = NULL; 1613 1614 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1615 if (nxt_slow_path(app_joint == NULL)) { 1616 goto app_fail; 1617 } 1618 1619 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1620 1621 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1622 nxt_nitems(nxt_router_app_conf), &apcf); 1623 if (ret != NXT_OK) { 1624 nxt_alert(task, "application map error"); 1625 goto app_fail; 1626 } 1627 1628 if (apcf.limits_value != NULL) { 1629 1630 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1631 nxt_alert(task, "application limits is not object"); 1632 goto app_fail; 1633 } 1634 1635 ret = nxt_conf_map_object(mp, apcf.limits_value, 1636 nxt_router_app_limits_conf, 1637 nxt_nitems(nxt_router_app_limits_conf), 1638 &apcf); 1639 if (ret != NXT_OK) { 1640 nxt_alert(task, "application limits map error"); 1641 goto app_fail; 1642 } 1643 } 1644 1645 if (apcf.processes_value != NULL 1646 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1647 { 1648 ret = nxt_conf_map_object(mp, apcf.processes_value, 1649 nxt_router_app_processes_conf, 1650 nxt_nitems(nxt_router_app_processes_conf), 1651 &apcf); 1652 if (ret != NXT_OK) { 1653 nxt_alert(task, "application processes map error"); 1654 goto app_fail; 1655 } 1656 1657 } else { 1658 apcf.max_processes = apcf.processes; 1659 apcf.spare_processes = apcf.processes; 1660 } 1661 1662 if (apcf.targets_value != NULL) { 1663 n = nxt_conf_object_members_count(apcf.targets_value); 1664 1665 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1666 if (nxt_slow_path(targets == NULL)) { 1667 goto app_fail; 1668 } 1669 1670 next_target = 0; 1671 1672 for (i = 0; i < n; i++) { 1673 (void) nxt_conf_next_object_member(apcf.targets_value, 1674 &target, &next_target); 1675 1676 s = nxt_str_dup(app_mp, &targets[i], &target); 1677 if (nxt_slow_path(s == NULL)) { 1678 goto app_fail; 1679 } 1680 } 1681 1682 } else { 1683 targets = NULL; 1684 } 1685 1686 nxt_debug(task, "application type: %V", &apcf.type); 1687 nxt_debug(task, "application processes: %D", apcf.processes); 1688 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1689 1690 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1691 1692 if (lang == NULL) { 1693 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1694 goto app_fail; 1695 } 1696 1697 nxt_debug(task, "application language module: \"%s\"", lang->file); 1698 1699 ret = nxt_thread_mutex_create(&app->mutex); 1700 if (ret != NXT_OK) { 1701 goto app_fail; 1702 } 1703 1704 nxt_queue_init(&app->ports); 1705 nxt_queue_init(&app->spare_ports); 1706 nxt_queue_init(&app->idle_ports); 1707 nxt_queue_init(&app->ack_waiting_req); 1708 1709 app->name.length = name.length; 1710 nxt_memcpy(app->name.start, name.start, name.length); 1711 1712 app->type = lang->type; 1713 app->max_processes = apcf.max_processes; 1714 app->spare_processes = apcf.spare_processes; 1715 app->max_pending_processes = apcf.spare_processes 1716 ? apcf.spare_processes : 1; 1717 app->timeout = apcf.timeout; 1718 app->idle_timeout = apcf.idle_timeout; 1719 1720 app->targets = targets; 1721 1722 engine = task->thread->engine; 1723 1724 app->engine = engine; 1725 1726 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1727 app->adjust_idle_work.task = &engine->task; 1728 app->adjust_idle_work.obj = app; 1729 1730 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1731 1732 ret = nxt_router_apps_hash_add(rtcf, app); 1733 if (nxt_slow_path(ret != NXT_OK)) { 1734 goto app_fail; 1735 } 1736 1737 nxt_router_app_use(task, app, 1); 1738 1739 app->joint = app_joint; 1740 1741 app_joint->use_count = 1; 1742 app_joint->app = app; 1743 1744 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1745 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1746 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1747 app_joint->idle_timer.task = &engine->task; 1748 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1749 1750 app_joint->free_app_work.handler = nxt_router_free_app; 1751 app_joint->free_app_work.task = &engine->task; 1752 app_joint->free_app_work.obj = app_joint; 1753 1754 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 1755 NXT_PROCESS_APP); 1756 if (nxt_slow_path(port == NULL)) { 1757 return NXT_ERROR; 1758 } 1759 1760 ret = nxt_port_socket_init(task, port, 0); 1761 if (nxt_slow_path(ret != NXT_OK)) { 1762 nxt_port_use(task, port, -1); 1763 return NXT_ERROR; 1764 } 1765 1766 ret = nxt_router_app_queue_init(task, port); 1767 if (nxt_slow_path(ret != NXT_OK)) { 1768 nxt_port_write_close(port); 1769 nxt_port_read_close(port); 1770 nxt_port_use(task, port, -1); 1771 return NXT_ERROR; 1772 } 1773 1774 nxt_port_write_enable(task, port); 1775 port->app = app; 1776 1777 app->shared_port = port; 1778 1779 nxt_thread_mutex_create(&app->outgoing.mutex); 1780 } 1781 } 1782 1783 conf = nxt_conf_get_path(root, &routes_path); 1784 if (nxt_fast_path(conf != NULL)) { 1785 routes = nxt_http_routes_create(task, tmcf, conf); 1786 if (nxt_slow_path(routes == NULL)) { 1787 return NXT_ERROR; 1788 } 1789 rtcf->routes = routes; 1790 } 1791 1792 ret = nxt_upstreams_create(task, tmcf, root); 1793 if (nxt_slow_path(ret != NXT_OK)) { 1794 return ret; 1795 } 1796 1797 http = nxt_conf_get_path(root, &http_path); 1798 #if 0 1799 if (http == NULL) { 1800 nxt_alert(task, "no \"http\" block"); 1801 return NXT_ERROR; 1802 } 1803 #endif 1804 1805 websocket = nxt_conf_get_path(root, &websocket_path); 1806 1807 listeners = nxt_conf_get_path(root, &listeners_path); 1808 1809 if (listeners != NULL) { 1810 next = 0; 1811 1812 for ( ;; ) { 1813 listener = nxt_conf_next_object_member(listeners, &name, &next); 1814 if (listener == NULL) { 1815 break; 1816 } 1817 1818 skcf = nxt_router_socket_conf(task, tmcf, &name); 1819 if (skcf == NULL) { 1820 goto fail; 1821 } 1822 1823 nxt_memzero(&lscf, sizeof(lscf)); 1824 1825 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1826 nxt_nitems(nxt_router_listener_conf), 1827 &lscf); 1828 if (ret != NXT_OK) { 1829 nxt_alert(task, "listener map error"); 1830 goto fail; 1831 } 1832 1833 nxt_debug(task, "application: %V", &lscf.application); 1834 1835 // STUB, default values if http block is not defined. 1836 skcf->header_buffer_size = 2048; 1837 skcf->large_header_buffer_size = 8192; 1838 skcf->large_header_buffers = 4; 1839 skcf->discard_unsafe_fields = 1; 1840 skcf->body_buffer_size = 16 * 1024; 1841 skcf->max_body_size = 8 * 1024 * 1024; 1842 skcf->proxy_header_buffer_size = 64 * 1024; 1843 skcf->proxy_buffer_size = 4096; 1844 skcf->proxy_buffers = 256; 1845 skcf->idle_timeout = 180 * 1000; 1846 skcf->header_read_timeout = 30 * 1000; 1847 skcf->body_read_timeout = 30 * 1000; 1848 skcf->send_timeout = 30 * 1000; 1849 skcf->proxy_timeout = 60 * 1000; 1850 skcf->proxy_send_timeout = 30 * 1000; 1851 skcf->proxy_read_timeout = 30 * 1000; 1852 1853 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1854 skcf->websocket_conf.read_timeout = 60 * 1000; 1855 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1856 1857 nxt_str_null(&skcf->body_temp_path); 1858 1859 if (http != NULL) { 1860 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1861 nxt_nitems(nxt_router_http_conf), 1862 skcf); 1863 if (ret != NXT_OK) { 1864 nxt_alert(task, "http map error"); 1865 goto fail; 1866 } 1867 } 1868 1869 if (websocket != NULL) { 1870 ret = nxt_conf_map_object(mp, websocket, 1871 nxt_router_websocket_conf, 1872 nxt_nitems(nxt_router_websocket_conf), 1873 &skcf->websocket_conf); 1874 if (ret != NXT_OK) { 1875 nxt_alert(task, "websocket map error"); 1876 goto fail; 1877 } 1878 } 1879 1880 t = &skcf->body_temp_path; 1881 1882 if (t->length == 0) { 1883 t->start = (u_char *) task->thread->runtime->tmp; 1884 t->length = nxt_strlen(t->start); 1885 } 1886 1887 conf = nxt_conf_get_path(listener, &client_ip_path); 1888 ret = nxt_router_conf_process_client_ip(task, tmcf, skcf, 1889 conf); 1890 if (nxt_slow_path(ret != NXT_OK)) { 1891 return NXT_ERROR; 1892 } 1893 1894 #if (NXT_TLS) 1895 certificate = nxt_conf_get_path(listener, &certificate_path); 1896 1897 if (certificate != NULL) { 1898 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t)); 1899 if (nxt_slow_path(tls_init == NULL)) { 1900 return NXT_ERROR; 1901 } 1902 1903 tls_init->cache_size = 0; 1904 tls_init->timeout = 300; 1905 1906 value = nxt_conf_get_path(listener, &conf_cache_path); 1907 if (value != NULL) { 1908 tls_init->cache_size = nxt_conf_get_number(value); 1909 } 1910 1911 value = nxt_conf_get_path(listener, &conf_timeout_path); 1912 if (value != NULL) { 1913 tls_init->timeout = nxt_conf_get_number(value); 1914 } 1915 1916 tls_init->conf_cmds = nxt_conf_get_path(listener, 1917 &conf_commands_path); 1918 1919 tls_init->tickets_conf = nxt_conf_get_path(listener, 1920 &conf_tickets); 1921 1922 n = nxt_conf_array_elements_count_or_1(certificate); 1923 1924 for (i = 0; i < n; i++) { 1925 value = nxt_conf_get_array_element_or_itself(certificate, 1926 i); 1927 nxt_assert(value != NULL); 1928 1929 ret = nxt_router_conf_tls_insert(tmcf, value, skcf, 1930 tls_init, i == 0); 1931 if (nxt_slow_path(ret != NXT_OK)) { 1932 goto fail; 1933 } 1934 } 1935 } 1936 #endif 1937 1938 skcf->listen->handler = nxt_http_conn_init; 1939 skcf->router_conf = rtcf; 1940 skcf->router_conf->count++; 1941 1942 if (lscf.pass.length != 0) { 1943 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1944 1945 /* COMPATIBILITY: listener application. */ 1946 } else if (lscf.application.length > 0) { 1947 skcf->action = nxt_http_pass_application(task, rtcf, 1948 &lscf.application); 1949 } 1950 1951 if (nxt_slow_path(skcf->action == NULL)) { 1952 goto fail; 1953 } 1954 } 1955 } 1956 1957 ret = nxt_http_routes_resolve(task, tmcf); 1958 if (nxt_slow_path(ret != NXT_OK)) { 1959 goto fail; 1960 } 1961 1962 value = nxt_conf_get_path(root, &access_log_path); 1963 1964 if (value != NULL) { 1965 nxt_conf_get_string(value, &path); 1966 1967 access_log = router->access_log; 1968 1969 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1970 nxt_router_access_log_use(&router->lock, access_log); 1971 1972 } else { 1973 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1974 + path.length); 1975 if (access_log == NULL) { 1976 nxt_alert(task, "failed to allocate access log structure"); 1977 goto fail; 1978 } 1979 1980 access_log->fd = -1; 1981 access_log->handler = &nxt_router_access_log_writer; 1982 access_log->count = 1; 1983 1984 access_log->path.length = path.length; 1985 access_log->path.start = (u_char *) access_log 1986 + sizeof(nxt_router_access_log_t); 1987 1988 nxt_memcpy(access_log->path.start, path.start, path.length); 1989 } 1990 1991 rtcf->access_log = access_log; 1992 } 1993 1994 nxt_queue_add(&deleting_sockets, &router->sockets); 1995 nxt_queue_init(&router->sockets); 1996 1997 return NXT_OK; 1998 1999 app_fail: 2000 2001 nxt_mp_destroy(app_mp); 2002 2003 fail: 2004 2005 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 2006 2007 nxt_queue_remove(&app->link); 2008 nxt_thread_mutex_destroy(&app->mutex); 2009 nxt_mp_destroy(app->mem_pool); 2010 2011 } nxt_queue_loop; 2012 2013 return NXT_ERROR; 2014 } 2015 2016 2017 #if (NXT_TLS) 2018 2019 static nxt_int_t 2020 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 2021 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, 2022 nxt_tls_init_t *tls_init, nxt_bool_t last) 2023 { 2024 nxt_router_tlssock_t *tls; 2025 2026 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t)); 2027 if (nxt_slow_path(tls == NULL)) { 2028 return NXT_ERROR; 2029 } 2030 2031 tls->tls_init = tls_init; 2032 tls->socket_conf = skcf; 2033 tls->temp_conf = tmcf; 2034 tls->last = last; 2035 nxt_conf_get_string(value, &tls->name); 2036 2037 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 2038 2039 return NXT_OK; 2040 } 2041 2042 #endif 2043 2044 2045 static nxt_int_t 2046 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 2047 nxt_conf_value_t *conf) 2048 { 2049 uint32_t next, i; 2050 nxt_mp_t *mp; 2051 nxt_str_t *type, exten, str; 2052 nxt_int_t ret; 2053 nxt_uint_t exts; 2054 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 2055 2056 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 2057 2058 mp = rtcf->mem_pool; 2059 2060 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 2061 if (nxt_slow_path(ret != NXT_OK)) { 2062 return NXT_ERROR; 2063 } 2064 2065 if (conf == NULL) { 2066 return NXT_OK; 2067 } 2068 2069 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 2070 2071 if (mtypes_conf != NULL) { 2072 next = 0; 2073 2074 for ( ;; ) { 2075 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 2076 2077 if (ext_conf == NULL) { 2078 break; 2079 } 2080 2081 type = nxt_str_dup(mp, NULL, &str); 2082 if (nxt_slow_path(type == NULL)) { 2083 return NXT_ERROR; 2084 } 2085 2086 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 2087 nxt_conf_get_string(ext_conf, &str); 2088 2089 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2090 return NXT_ERROR; 2091 } 2092 2093 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2094 &exten, type); 2095 if (nxt_slow_path(ret != NXT_OK)) { 2096 return NXT_ERROR; 2097 } 2098 2099 continue; 2100 } 2101 2102 exts = nxt_conf_array_elements_count(ext_conf); 2103 2104 for (i = 0; i < exts; i++) { 2105 value = nxt_conf_get_array_element(ext_conf, i); 2106 2107 nxt_conf_get_string(value, &str); 2108 2109 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2110 return NXT_ERROR; 2111 } 2112 2113 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2114 &exten, type); 2115 if (nxt_slow_path(ret != NXT_OK)) { 2116 return NXT_ERROR; 2117 } 2118 } 2119 } 2120 } 2121 2122 return NXT_OK; 2123 } 2124 2125 2126 static nxt_int_t 2127 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2128 nxt_socket_conf_t *skcf, nxt_conf_value_t *conf) 2129 { 2130 char c; 2131 size_t i; 2132 nxt_mp_t *mp; 2133 uint32_t hash; 2134 nxt_str_t header; 2135 nxt_conf_value_t *source_conf, *header_conf, *recursive_conf; 2136 nxt_http_client_ip_t *client_ip; 2137 nxt_http_route_addr_rule_t *source; 2138 2139 static nxt_str_t header_path = nxt_string("/header"); 2140 static nxt_str_t source_path = nxt_string("/source"); 2141 static nxt_str_t recursive_path = nxt_string("/recursive"); 2142 2143 if (conf == NULL) { 2144 skcf->client_ip = NULL; 2145 2146 return NXT_OK; 2147 } 2148 2149 mp = tmcf->router_conf->mem_pool; 2150 2151 source_conf = nxt_conf_get_path(conf, &source_path); 2152 header_conf = nxt_conf_get_path(conf, &header_path); 2153 recursive_conf = nxt_conf_get_path(conf, &recursive_path); 2154 2155 if (source_conf == NULL || header_conf == NULL) { 2156 return NXT_ERROR; 2157 } 2158 2159 client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t)); 2160 if (nxt_slow_path(client_ip == NULL)) { 2161 return NXT_ERROR; 2162 } 2163 2164 source = nxt_http_route_addr_rule_create(task, mp, source_conf); 2165 if (nxt_slow_path(source == NULL)) { 2166 return NXT_ERROR; 2167 } 2168 2169 client_ip->source = source; 2170 2171 nxt_conf_get_string(header_conf, &header); 2172 2173 if (recursive_conf != NULL) { 2174 client_ip->recursive = nxt_conf_get_boolean(recursive_conf); 2175 } 2176 2177 client_ip->header = nxt_str_dup(mp, NULL, &header); 2178 if (nxt_slow_path(client_ip->header == NULL)) { 2179 return NXT_ERROR; 2180 } 2181 2182 hash = NXT_HTTP_FIELD_HASH_INIT; 2183 2184 for (i = 0; i < client_ip->header->length; i++) { 2185 c = client_ip->header->start[i]; 2186 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c)); 2187 } 2188 2189 hash = nxt_http_field_hash_end(hash) & 0xFFFF; 2190 2191 client_ip->header_hash = hash; 2192 2193 skcf->client_ip = client_ip; 2194 2195 return NXT_OK; 2196 } 2197 2198 2199 static nxt_app_t * 2200 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 2201 { 2202 nxt_app_t *app; 2203 2204 nxt_queue_each(app, queue, nxt_app_t, link) { 2205 2206 if (nxt_strstr_eq(name, &app->name)) { 2207 return app; 2208 } 2209 2210 } nxt_queue_loop; 2211 2212 return NULL; 2213 } 2214 2215 2216 static nxt_int_t 2217 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 2218 { 2219 void *mem; 2220 nxt_int_t fd; 2221 2222 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 2223 if (nxt_slow_path(fd == -1)) { 2224 return NXT_ERROR; 2225 } 2226 2227 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 2228 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2229 if (nxt_slow_path(mem == MAP_FAILED)) { 2230 nxt_fd_close(fd); 2231 2232 return NXT_ERROR; 2233 } 2234 2235 nxt_app_queue_init(mem); 2236 2237 port->queue_fd = fd; 2238 port->queue = mem; 2239 2240 return NXT_OK; 2241 } 2242 2243 2244 static nxt_int_t 2245 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 2246 { 2247 void *mem; 2248 nxt_int_t fd; 2249 2250 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 2251 if (nxt_slow_path(fd == -1)) { 2252 return NXT_ERROR; 2253 } 2254 2255 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2256 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2257 if (nxt_slow_path(mem == MAP_FAILED)) { 2258 nxt_fd_close(fd); 2259 2260 return NXT_ERROR; 2261 } 2262 2263 nxt_port_queue_init(mem); 2264 2265 port->queue_fd = fd; 2266 port->queue = mem; 2267 2268 return NXT_OK; 2269 } 2270 2271 2272 static nxt_int_t 2273 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 2274 { 2275 void *mem; 2276 2277 nxt_assert(fd != -1); 2278 2279 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2280 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2281 if (nxt_slow_path(mem == MAP_FAILED)) { 2282 2283 return NXT_ERROR; 2284 } 2285 2286 port->queue = mem; 2287 2288 return NXT_OK; 2289 } 2290 2291 2292 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2293 NXT_LVLHSH_DEFAULT, 2294 nxt_router_apps_hash_test, 2295 nxt_mp_lvlhsh_alloc, 2296 nxt_mp_lvlhsh_free, 2297 }; 2298 2299 2300 static nxt_int_t 2301 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2302 { 2303 nxt_app_t *app; 2304 2305 app = data; 2306 2307 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2308 } 2309 2310 2311 static nxt_int_t 2312 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2313 { 2314 nxt_lvlhsh_query_t lhq; 2315 2316 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2317 lhq.replace = 0; 2318 lhq.key = app->name; 2319 lhq.value = app; 2320 lhq.proto = &nxt_router_apps_hash_proto; 2321 lhq.pool = rtcf->mem_pool; 2322 2323 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2324 2325 case NXT_OK: 2326 return NXT_OK; 2327 2328 case NXT_DECLINED: 2329 nxt_thread_log_alert("router app hash adding failed: " 2330 "\"%V\" is already in hash", &lhq.key); 2331 /* Fall through. */ 2332 default: 2333 return NXT_ERROR; 2334 } 2335 } 2336 2337 2338 static nxt_app_t * 2339 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2340 { 2341 nxt_lvlhsh_query_t lhq; 2342 2343 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2344 lhq.key = *name; 2345 lhq.proto = &nxt_router_apps_hash_proto; 2346 2347 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2348 return NULL; 2349 } 2350 2351 return lhq.value; 2352 } 2353 2354 2355 static void 2356 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2357 { 2358 nxt_app_t *app; 2359 nxt_lvlhsh_each_t lhe; 2360 2361 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2362 2363 for ( ;; ) { 2364 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2365 2366 if (app == NULL) { 2367 break; 2368 } 2369 2370 nxt_router_app_use(task, app, i); 2371 } 2372 } 2373 2374 2375 typedef struct { 2376 nxt_app_t *app; 2377 nxt_int_t target; 2378 } nxt_http_app_conf_t; 2379 2380 2381 nxt_int_t 2382 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name, 2383 nxt_str_t *target, nxt_http_action_t *action) 2384 { 2385 nxt_app_t *app; 2386 nxt_str_t *targets; 2387 nxt_uint_t i; 2388 nxt_http_app_conf_t *conf; 2389 2390 app = nxt_router_apps_hash_get(rtcf, name); 2391 if (app == NULL) { 2392 return NXT_DECLINED; 2393 } 2394 2395 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t)); 2396 if (nxt_slow_path(conf == NULL)) { 2397 return NXT_ERROR; 2398 } 2399 2400 action->handler = nxt_http_application_handler; 2401 action->u.conf = conf; 2402 2403 conf->app = app; 2404 2405 if (target != NULL && target->length != 0) { 2406 targets = app->targets; 2407 2408 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++); 2409 2410 conf->target = i; 2411 2412 } else { 2413 conf->target = 0; 2414 } 2415 2416 return NXT_OK; 2417 } 2418 2419 2420 static nxt_socket_conf_t * 2421 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2422 nxt_str_t *name) 2423 { 2424 size_t size; 2425 nxt_int_t ret; 2426 nxt_bool_t wildcard; 2427 nxt_sockaddr_t *sa; 2428 nxt_socket_conf_t *skcf; 2429 nxt_listen_socket_t *ls; 2430 2431 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2432 if (nxt_slow_path(sa == NULL)) { 2433 nxt_alert(task, "invalid listener \"%V\"", name); 2434 return NULL; 2435 } 2436 2437 sa->type = SOCK_STREAM; 2438 2439 nxt_debug(task, "router listener: \"%*s\"", 2440 (size_t) sa->length, nxt_sockaddr_start(sa)); 2441 2442 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2443 if (nxt_slow_path(skcf == NULL)) { 2444 return NULL; 2445 } 2446 2447 size = nxt_sockaddr_size(sa); 2448 2449 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2450 2451 if (ret != NXT_OK) { 2452 2453 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2454 if (nxt_slow_path(ls == NULL)) { 2455 return NULL; 2456 } 2457 2458 skcf->listen = ls; 2459 2460 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2461 nxt_memcpy(ls->sockaddr, sa, size); 2462 2463 nxt_listen_socket_remote_size(ls); 2464 2465 ls->socket = -1; 2466 ls->backlog = NXT_LISTEN_BACKLOG; 2467 ls->flags = NXT_NONBLOCK; 2468 ls->read_after_accept = 1; 2469 } 2470 2471 switch (sa->u.sockaddr.sa_family) { 2472 #if (NXT_HAVE_UNIX_DOMAIN) 2473 case AF_UNIX: 2474 wildcard = 0; 2475 break; 2476 #endif 2477 #if (NXT_INET6) 2478 case AF_INET6: 2479 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2480 break; 2481 #endif 2482 case AF_INET: 2483 default: 2484 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2485 break; 2486 } 2487 2488 if (!wildcard) { 2489 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2490 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2491 return NULL; 2492 } 2493 2494 nxt_memcpy(skcf->sockaddr, sa, size); 2495 } 2496 2497 return skcf; 2498 } 2499 2500 2501 static nxt_int_t 2502 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2503 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2504 { 2505 nxt_router_t *router; 2506 nxt_queue_link_t *qlk; 2507 nxt_socket_conf_t *skcf; 2508 2509 router = tmcf->router_conf->router; 2510 2511 for (qlk = nxt_queue_first(&router->sockets); 2512 qlk != nxt_queue_tail(&router->sockets); 2513 qlk = nxt_queue_next(qlk)) 2514 { 2515 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2516 2517 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2518 nskcf->listen = skcf->listen; 2519 2520 nxt_queue_remove(qlk); 2521 nxt_queue_insert_tail(&keeping_sockets, qlk); 2522 2523 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2524 2525 return NXT_OK; 2526 } 2527 } 2528 2529 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2530 2531 return NXT_DECLINED; 2532 } 2533 2534 2535 static void 2536 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2537 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2538 { 2539 size_t size; 2540 uint32_t stream; 2541 nxt_int_t ret; 2542 nxt_buf_t *b; 2543 nxt_port_t *main_port, *router_port; 2544 nxt_runtime_t *rt; 2545 nxt_socket_rpc_t *rpc; 2546 2547 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2548 if (rpc == NULL) { 2549 goto fail; 2550 } 2551 2552 rpc->socket_conf = skcf; 2553 rpc->temp_conf = tmcf; 2554 2555 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2556 2557 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2558 if (b == NULL) { 2559 goto fail; 2560 } 2561 2562 b->completion_handler = nxt_buf_dummy_completion; 2563 2564 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2565 2566 rt = task->thread->runtime; 2567 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2568 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2569 2570 stream = nxt_port_rpc_register_handler(task, router_port, 2571 nxt_router_listen_socket_ready, 2572 nxt_router_listen_socket_error, 2573 main_port->pid, rpc); 2574 if (nxt_slow_path(stream == 0)) { 2575 goto fail; 2576 } 2577 2578 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2579 stream, router_port->id, b); 2580 2581 if (nxt_slow_path(ret != NXT_OK)) { 2582 nxt_port_rpc_cancel(task, router_port, stream); 2583 goto fail; 2584 } 2585 2586 return; 2587 2588 fail: 2589 2590 nxt_router_conf_error(task, tmcf); 2591 } 2592 2593 2594 static void 2595 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2596 void *data) 2597 { 2598 nxt_int_t ret; 2599 nxt_socket_t s; 2600 nxt_socket_rpc_t *rpc; 2601 2602 rpc = data; 2603 2604 s = msg->fd[0]; 2605 2606 ret = nxt_socket_nonblocking(task, s); 2607 if (nxt_slow_path(ret != NXT_OK)) { 2608 goto fail; 2609 } 2610 2611 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2612 2613 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2614 if (nxt_slow_path(ret != NXT_OK)) { 2615 goto fail; 2616 } 2617 2618 rpc->socket_conf->listen->socket = s; 2619 2620 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2621 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2622 2623 return; 2624 2625 fail: 2626 2627 nxt_socket_close(task, s); 2628 2629 nxt_router_conf_error(task, rpc->temp_conf); 2630 } 2631 2632 2633 static void 2634 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2635 void *data) 2636 { 2637 nxt_socket_rpc_t *rpc; 2638 nxt_router_temp_conf_t *tmcf; 2639 2640 rpc = data; 2641 tmcf = rpc->temp_conf; 2642 2643 #if 0 2644 u_char *p; 2645 size_t size; 2646 uint8_t error; 2647 nxt_buf_t *in, *out; 2648 nxt_sockaddr_t *sa; 2649 2650 static nxt_str_t socket_errors[] = { 2651 nxt_string("ListenerSystem"), 2652 nxt_string("ListenerNoIPv6"), 2653 nxt_string("ListenerPort"), 2654 nxt_string("ListenerInUse"), 2655 nxt_string("ListenerNoAddress"), 2656 nxt_string("ListenerNoAccess"), 2657 nxt_string("ListenerPath"), 2658 }; 2659 2660 sa = rpc->socket_conf->listen->sockaddr; 2661 2662 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2663 2664 if (nxt_slow_path(in == NULL)) { 2665 return; 2666 } 2667 2668 p = in->mem.pos; 2669 2670 error = *p++; 2671 2672 size = nxt_length("listen socket error: ") 2673 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2674 + sa->length + socket_errors[error].length + (in->mem.free - p); 2675 2676 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2677 if (nxt_slow_path(out == NULL)) { 2678 return; 2679 } 2680 2681 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2682 "listen socket error: " 2683 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2684 (size_t) sa->length, nxt_sockaddr_start(sa), 2685 &socket_errors[error], in->mem.free - p, p); 2686 2687 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2688 #endif 2689 2690 nxt_router_conf_error(task, tmcf); 2691 } 2692 2693 2694 #if (NXT_TLS) 2695 2696 static void 2697 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2698 void *data) 2699 { 2700 nxt_mp_t *mp; 2701 nxt_int_t ret; 2702 nxt_tls_conf_t *tlscf; 2703 nxt_router_tlssock_t *tls; 2704 nxt_tls_bundle_conf_t *bundle; 2705 nxt_router_temp_conf_t *tmcf; 2706 2707 nxt_debug(task, "tls rpc handler"); 2708 2709 tls = data; 2710 tmcf = tls->temp_conf; 2711 2712 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2713 goto fail; 2714 } 2715 2716 mp = tmcf->router_conf->mem_pool; 2717 2718 if (tls->socket_conf->tls == NULL){ 2719 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2720 if (nxt_slow_path(tlscf == NULL)) { 2721 goto fail; 2722 } 2723 2724 tlscf->no_wait_shutdown = 1; 2725 tls->socket_conf->tls = tlscf; 2726 2727 } else { 2728 tlscf = tls->socket_conf->tls; 2729 } 2730 2731 tls->tls_init->conf = tlscf; 2732 2733 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); 2734 if (nxt_slow_path(bundle == NULL)) { 2735 goto fail; 2736 } 2737 2738 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) { 2739 goto fail; 2740 } 2741 2742 bundle->chain_file = msg->fd[0]; 2743 bundle->next = tlscf->bundle; 2744 tlscf->bundle = bundle; 2745 2746 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init, 2747 tls->last); 2748 if (nxt_slow_path(ret != NXT_OK)) { 2749 goto fail; 2750 } 2751 2752 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2753 nxt_router_conf_apply, task, tmcf, NULL); 2754 return; 2755 2756 fail: 2757 2758 nxt_router_conf_error(task, tmcf); 2759 } 2760 2761 #endif 2762 2763 2764 static void 2765 nxt_router_app_rpc_create(nxt_task_t *task, 2766 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2767 { 2768 size_t size; 2769 uint32_t stream; 2770 nxt_fd_t port_fd, queue_fd; 2771 nxt_int_t ret; 2772 nxt_buf_t *b; 2773 nxt_port_t *router_port, *dport; 2774 nxt_runtime_t *rt; 2775 nxt_app_rpc_t *rpc; 2776 2777 rt = task->thread->runtime; 2778 2779 dport = app->proto_port; 2780 2781 if (dport == NULL) { 2782 nxt_debug(task, "app '%V' prototype prefork", &app->name); 2783 2784 size = app->name.length + 1 + app->conf.length; 2785 2786 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2787 if (nxt_slow_path(b == NULL)) { 2788 goto fail; 2789 } 2790 2791 b->completion_handler = nxt_buf_dummy_completion; 2792 2793 nxt_buf_cpystr(b, &app->name); 2794 *b->mem.free++ = '\0'; 2795 nxt_buf_cpystr(b, &app->conf); 2796 2797 dport = rt->port_by_type[NXT_PROCESS_MAIN]; 2798 2799 port_fd = app->shared_port->pair[0]; 2800 queue_fd = app->shared_port->queue_fd; 2801 2802 } else { 2803 nxt_debug(task, "app '%V' prefork", &app->name); 2804 2805 b = NULL; 2806 port_fd = -1; 2807 queue_fd = -1; 2808 } 2809 2810 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2811 2812 rpc = nxt_port_rpc_register_handler_ex(task, router_port, 2813 nxt_router_app_prefork_ready, 2814 nxt_router_app_prefork_error, 2815 sizeof(nxt_app_rpc_t)); 2816 if (nxt_slow_path(rpc == NULL)) { 2817 goto fail; 2818 } 2819 2820 rpc->app = app; 2821 rpc->temp_conf = tmcf; 2822 rpc->proto = (b != NULL); 2823 2824 stream = nxt_port_rpc_ex_stream(rpc); 2825 2826 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, 2827 port_fd, queue_fd, stream, router_port->id, b); 2828 if (nxt_slow_path(ret != NXT_OK)) { 2829 nxt_port_rpc_cancel(task, router_port, stream); 2830 goto fail; 2831 } 2832 2833 if (b == NULL) { 2834 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid); 2835 2836 app->pending_processes++; 2837 } 2838 2839 return; 2840 2841 fail: 2842 2843 nxt_router_conf_error(task, tmcf); 2844 } 2845 2846 2847 static void 2848 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2849 void *data) 2850 { 2851 nxt_app_t *app; 2852 nxt_port_t *port; 2853 nxt_app_rpc_t *rpc; 2854 nxt_event_engine_t *engine; 2855 2856 rpc = data; 2857 app = rpc->app; 2858 2859 port = msg->u.new_port; 2860 2861 nxt_assert(port != NULL); 2862 nxt_assert(port->id == 0); 2863 2864 if (rpc->proto) { 2865 nxt_assert(app->proto_port == NULL); 2866 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); 2867 2868 nxt_port_inc_use(port); 2869 2870 app->proto_port = port; 2871 port->app = app; 2872 2873 nxt_router_app_rpc_create(task, rpc->temp_conf, app); 2874 2875 return; 2876 } 2877 2878 nxt_assert(port->type == NXT_PROCESS_APP); 2879 2880 port->app = app; 2881 port->main_app_port = port; 2882 2883 app->pending_processes--; 2884 app->processes++; 2885 app->idle_processes++; 2886 2887 engine = task->thread->engine; 2888 2889 nxt_queue_insert_tail(&app->ports, &port->app_link); 2890 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2891 2892 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2893 &app->name, port->pid, port->id); 2894 2895 nxt_port_hash_add(&app->port_hash, port); 2896 app->port_hash_count++; 2897 2898 port->idle_start = 0; 2899 2900 nxt_port_inc_use(port); 2901 2902 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 2903 2904 nxt_work_queue_add(&engine->fast_work_queue, 2905 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2906 } 2907 2908 2909 static void 2910 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2911 void *data) 2912 { 2913 nxt_app_t *app; 2914 nxt_app_rpc_t *rpc; 2915 nxt_router_temp_conf_t *tmcf; 2916 2917 rpc = data; 2918 app = rpc->app; 2919 tmcf = rpc->temp_conf; 2920 2921 if (rpc->proto) { 2922 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"", 2923 &app->name); 2924 2925 } else { 2926 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2927 &app->name); 2928 2929 app->pending_processes--; 2930 } 2931 2932 nxt_router_conf_error(task, tmcf); 2933 } 2934 2935 2936 static nxt_int_t 2937 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2938 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2939 { 2940 nxt_int_t ret; 2941 nxt_uint_t n, threads; 2942 nxt_queue_link_t *qlk; 2943 nxt_router_engine_conf_t *recf; 2944 2945 threads = tmcf->router_conf->threads; 2946 2947 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2948 sizeof(nxt_router_engine_conf_t)); 2949 if (nxt_slow_path(tmcf->engines == NULL)) { 2950 return NXT_ERROR; 2951 } 2952 2953 n = 0; 2954 2955 for (qlk = nxt_queue_first(&router->engines); 2956 qlk != nxt_queue_tail(&router->engines); 2957 qlk = nxt_queue_next(qlk)) 2958 { 2959 recf = nxt_array_zero_add(tmcf->engines); 2960 if (nxt_slow_path(recf == NULL)) { 2961 return NXT_ERROR; 2962 } 2963 2964 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2965 2966 if (n < threads) { 2967 recf->action = NXT_ROUTER_ENGINE_KEEP; 2968 ret = nxt_router_engine_conf_update(tmcf, recf); 2969 2970 } else { 2971 recf->action = NXT_ROUTER_ENGINE_DELETE; 2972 ret = nxt_router_engine_conf_delete(tmcf, recf); 2973 } 2974 2975 if (nxt_slow_path(ret != NXT_OK)) { 2976 return ret; 2977 } 2978 2979 n++; 2980 } 2981 2982 tmcf->new_threads = n; 2983 2984 while (n < threads) { 2985 recf = nxt_array_zero_add(tmcf->engines); 2986 if (nxt_slow_path(recf == NULL)) { 2987 return NXT_ERROR; 2988 } 2989 2990 recf->action = NXT_ROUTER_ENGINE_ADD; 2991 2992 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2993 if (nxt_slow_path(recf->engine == NULL)) { 2994 return NXT_ERROR; 2995 } 2996 2997 ret = nxt_router_engine_conf_create(tmcf, recf); 2998 if (nxt_slow_path(ret != NXT_OK)) { 2999 return ret; 3000 } 3001 3002 n++; 3003 } 3004 3005 return NXT_OK; 3006 } 3007 3008 3009 static nxt_int_t 3010 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 3011 nxt_router_engine_conf_t *recf) 3012 { 3013 nxt_int_t ret; 3014 3015 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 3016 nxt_router_listen_socket_create); 3017 if (nxt_slow_path(ret != NXT_OK)) { 3018 return ret; 3019 } 3020 3021 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 3022 nxt_router_listen_socket_create); 3023 if (nxt_slow_path(ret != NXT_OK)) { 3024 return ret; 3025 } 3026 3027 return ret; 3028 } 3029 3030 3031 static nxt_int_t 3032 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 3033 nxt_router_engine_conf_t *recf) 3034 { 3035 nxt_int_t ret; 3036 3037 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 3038 nxt_router_listen_socket_create); 3039 if (nxt_slow_path(ret != NXT_OK)) { 3040 return ret; 3041 } 3042 3043 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 3044 nxt_router_listen_socket_update); 3045 if (nxt_slow_path(ret != NXT_OK)) { 3046 return ret; 3047 } 3048 3049 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3050 if (nxt_slow_path(ret != NXT_OK)) { 3051 return ret; 3052 } 3053 3054 return ret; 3055 } 3056 3057 3058 static nxt_int_t 3059 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 3060 nxt_router_engine_conf_t *recf) 3061 { 3062 nxt_int_t ret; 3063 3064 ret = nxt_router_engine_quit(tmcf, recf); 3065 if (nxt_slow_path(ret != NXT_OK)) { 3066 return ret; 3067 } 3068 3069 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 3070 if (nxt_slow_path(ret != NXT_OK)) { 3071 return ret; 3072 } 3073 3074 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3075 } 3076 3077 3078 static nxt_int_t 3079 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 3080 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 3081 nxt_work_handler_t handler) 3082 { 3083 nxt_int_t ret; 3084 nxt_joint_job_t *job; 3085 nxt_queue_link_t *qlk; 3086 nxt_socket_conf_t *skcf; 3087 nxt_socket_conf_joint_t *joint; 3088 3089 for (qlk = nxt_queue_first(sockets); 3090 qlk != nxt_queue_tail(sockets); 3091 qlk = nxt_queue_next(qlk)) 3092 { 3093 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3094 if (nxt_slow_path(job == NULL)) { 3095 return NXT_ERROR; 3096 } 3097 3098 job->work.next = recf->jobs; 3099 recf->jobs = &job->work; 3100 3101 job->task = tmcf->engine->task; 3102 job->work.handler = handler; 3103 job->work.task = &job->task; 3104 job->work.obj = job; 3105 job->tmcf = tmcf; 3106 3107 tmcf->count++; 3108 3109 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 3110 sizeof(nxt_socket_conf_joint_t)); 3111 if (nxt_slow_path(joint == NULL)) { 3112 return NXT_ERROR; 3113 } 3114 3115 job->work.data = joint; 3116 3117 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 3118 if (nxt_slow_path(ret != NXT_OK)) { 3119 return ret; 3120 } 3121 3122 joint->count = 1; 3123 3124 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3125 skcf->count++; 3126 joint->socket_conf = skcf; 3127 3128 joint->engine = recf->engine; 3129 } 3130 3131 return NXT_OK; 3132 } 3133 3134 3135 static nxt_int_t 3136 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 3137 nxt_router_engine_conf_t *recf) 3138 { 3139 nxt_joint_job_t *job; 3140 3141 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3142 if (nxt_slow_path(job == NULL)) { 3143 return NXT_ERROR; 3144 } 3145 3146 job->work.next = recf->jobs; 3147 recf->jobs = &job->work; 3148 3149 job->task = tmcf->engine->task; 3150 job->work.handler = nxt_router_worker_thread_quit; 3151 job->work.task = &job->task; 3152 job->work.obj = NULL; 3153 job->work.data = NULL; 3154 job->tmcf = NULL; 3155 3156 return NXT_OK; 3157 } 3158 3159 3160 static nxt_int_t 3161 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 3162 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) 3163 { 3164 nxt_joint_job_t *job; 3165 nxt_queue_link_t *qlk; 3166 3167 for (qlk = nxt_queue_first(sockets); 3168 qlk != nxt_queue_tail(sockets); 3169 qlk = nxt_queue_next(qlk)) 3170 { 3171 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3172 if (nxt_slow_path(job == NULL)) { 3173 return NXT_ERROR; 3174 } 3175 3176 job->work.next = recf->jobs; 3177 recf->jobs = &job->work; 3178 3179 job->task = tmcf->engine->task; 3180 job->work.handler = nxt_router_listen_socket_delete; 3181 job->work.task = &job->task; 3182 job->work.obj = job; 3183 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3184 job->tmcf = tmcf; 3185 3186 tmcf->count++; 3187 } 3188 3189 return NXT_OK; 3190 } 3191 3192 3193 static nxt_int_t 3194 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 3195 nxt_router_temp_conf_t *tmcf) 3196 { 3197 nxt_int_t ret; 3198 nxt_uint_t i, threads; 3199 nxt_router_engine_conf_t *recf; 3200 3201 recf = tmcf->engines->elts; 3202 threads = tmcf->router_conf->threads; 3203 3204 for (i = tmcf->new_threads; i < threads; i++) { 3205 ret = nxt_router_thread_create(task, rt, recf[i].engine); 3206 if (nxt_slow_path(ret != NXT_OK)) { 3207 return ret; 3208 } 3209 } 3210 3211 return NXT_OK; 3212 } 3213 3214 3215 static nxt_int_t 3216 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 3217 nxt_event_engine_t *engine) 3218 { 3219 nxt_int_t ret; 3220 nxt_thread_link_t *link; 3221 nxt_thread_handle_t handle; 3222 3223 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 3224 3225 if (nxt_slow_path(link == NULL)) { 3226 return NXT_ERROR; 3227 } 3228 3229 link->start = nxt_router_thread_start; 3230 link->engine = engine; 3231 link->work.handler = nxt_router_thread_exit_handler; 3232 link->work.task = task; 3233 link->work.data = link; 3234 3235 nxt_queue_insert_tail(&rt->engines, &engine->link); 3236 3237 ret = nxt_thread_create(&handle, link); 3238 3239 if (nxt_slow_path(ret != NXT_OK)) { 3240 nxt_queue_remove(&engine->link); 3241 } 3242 3243 return ret; 3244 } 3245 3246 3247 static void 3248 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 3249 nxt_router_temp_conf_t *tmcf) 3250 { 3251 nxt_app_t *app; 3252 3253 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 3254 3255 nxt_router_app_unlink(task, app); 3256 3257 } nxt_queue_loop; 3258 3259 nxt_queue_add(&router->apps, &tmcf->previous); 3260 nxt_queue_add(&router->apps, &tmcf->apps); 3261 } 3262 3263 3264 static void 3265 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 3266 { 3267 nxt_uint_t n; 3268 nxt_event_engine_t *engine; 3269 nxt_router_engine_conf_t *recf; 3270 3271 recf = tmcf->engines->elts; 3272 3273 for (n = tmcf->engines->nelts; n != 0; n--) { 3274 engine = recf->engine; 3275 3276 switch (recf->action) { 3277 3278 case NXT_ROUTER_ENGINE_KEEP: 3279 break; 3280 3281 case NXT_ROUTER_ENGINE_ADD: 3282 nxt_queue_insert_tail(&router->engines, &engine->link0); 3283 break; 3284 3285 case NXT_ROUTER_ENGINE_DELETE: 3286 nxt_queue_remove(&engine->link0); 3287 break; 3288 } 3289 3290 nxt_router_engine_post(engine, recf->jobs); 3291 3292 recf++; 3293 } 3294 } 3295 3296 3297 static void 3298 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) 3299 { 3300 nxt_work_t *work, *next; 3301 3302 for (work = jobs; work != NULL; work = next) { 3303 next = work->next; 3304 work->next = NULL; 3305 3306 nxt_event_engine_post(engine, work); 3307 } 3308 } 3309 3310 3311 static nxt_port_handlers_t nxt_router_app_port_handlers = { 3312 .rpc_error = nxt_port_rpc_handler, 3313 .mmap = nxt_port_mmap_handler, 3314 .data = nxt_port_rpc_handler, 3315 .oosm = nxt_router_oosm_handler, 3316 .req_headers_ack = nxt_port_rpc_handler, 3317 }; 3318 3319 3320 static void 3321 nxt_router_thread_start(void *data) 3322 { 3323 nxt_int_t ret; 3324 nxt_port_t *port; 3325 nxt_task_t *task; 3326 nxt_work_t *work; 3327 nxt_thread_t *thread; 3328 nxt_thread_link_t *link; 3329 nxt_event_engine_t *engine; 3330 3331 link = data; 3332 engine = link->engine; 3333 task = &engine->task; 3334 3335 thread = nxt_thread(); 3336 3337 nxt_event_engine_thread_adopt(engine); 3338 3339 /* STUB */ 3340 thread->runtime = engine->task.thread->runtime; 3341 3342 engine->task.thread = thread; 3343 engine->task.log = thread->log; 3344 thread->engine = engine; 3345 thread->task = &engine->task; 3346 #if 0 3347 thread->fiber = &engine->fibers->fiber; 3348 #endif 3349 3350 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 3351 if (nxt_slow_path(engine->mem_pool == NULL)) { 3352 return; 3353 } 3354 3355 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, 3356 NXT_PROCESS_ROUTER); 3357 if (nxt_slow_path(port == NULL)) { 3358 return; 3359 } 3360 3361 ret = nxt_port_socket_init(task, port, 0); 3362 if (nxt_slow_path(ret != NXT_OK)) { 3363 nxt_port_use(task, port, -1); 3364 return; 3365 } 3366 3367 ret = nxt_router_port_queue_init(task, port); 3368 if (nxt_slow_path(ret != NXT_OK)) { 3369 nxt_port_use(task, port, -1); 3370 return; 3371 } 3372 3373 engine->port = port; 3374 3375 nxt_port_enable(task, port, &nxt_router_app_port_handlers); 3376 3377 work = nxt_zalloc(sizeof(nxt_work_t)); 3378 if (nxt_slow_path(work == NULL)) { 3379 return; 3380 } 3381 3382 work->handler = nxt_router_rt_add_port; 3383 work->task = link->work.task; 3384 work->obj = work; 3385 work->data = port; 3386 3387 nxt_event_engine_post(link->work.task->thread->engine, work); 3388 3389 nxt_event_engine_start(engine); 3390 } 3391 3392 3393 static void 3394 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) 3395 { 3396 nxt_int_t res; 3397 nxt_port_t *port; 3398 nxt_runtime_t *rt; 3399 3400 rt = task->thread->runtime; 3401 port = data; 3402 3403 nxt_free(obj); 3404 3405 res = nxt_port_hash_add(&rt->ports, port); 3406 3407 if (nxt_fast_path(res == NXT_OK)) { 3408 nxt_port_use(task, port, 1); 3409 } 3410 } 3411 3412 3413 static void 3414 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) 3415 { 3416 nxt_joint_job_t *job; 3417 nxt_socket_conf_t *skcf; 3418 nxt_listen_event_t *lev; 3419 nxt_listen_socket_t *ls; 3420 nxt_thread_spinlock_t *lock; 3421 nxt_socket_conf_joint_t *joint; 3422 3423 job = obj; 3424 joint = data; 3425 3426 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link); 3427 3428 skcf = joint->socket_conf; 3429 ls = skcf->listen; 3430 3431 lev = nxt_listen_event(task, ls); 3432 if (nxt_slow_path(lev == NULL)) { 3433 nxt_router_listen_socket_release(task, skcf); 3434 return; 3435 } 3436 3437 lev->socket.data = joint; 3438 3439 lock = &skcf->router_conf->router->lock; 3440 3441 nxt_thread_spin_lock(lock); 3442 ls->count++; 3443 nxt_thread_spin_unlock(lock); 3444 3445 job->work.next = NULL; 3446 job->work.handler = nxt_router_conf_wait; 3447 3448 nxt_event_engine_post(job->tmcf->engine, &job->work); 3449 } 3450 3451 3452 nxt_inline nxt_listen_event_t * 3453 nxt_router_listen_event(nxt_queue_t *listen_connections, 3454 nxt_socket_conf_t *skcf) 3455 { 3456 nxt_socket_t fd; 3457 nxt_queue_link_t *qlk; 3458 nxt_listen_event_t *lev; 3459 3460 fd = skcf->listen->socket; 3461 3462 for (qlk = nxt_queue_first(listen_connections); 3463 qlk != nxt_queue_tail(listen_connections); 3464 qlk = nxt_queue_next(qlk)) 3465 { 3466 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link); 3467 3468 if (fd == lev->socket.fd) { 3469 return lev; 3470 } 3471 } 3472 3473 return NULL; 3474 } 3475 3476 3477 static void 3478 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) 3479 { 3480 nxt_joint_job_t *job; 3481 nxt_event_engine_t *engine; 3482 nxt_listen_event_t *lev; 3483 nxt_socket_conf_joint_t *joint, *old; 3484 3485 job = obj; 3486 joint = data; 3487 3488 engine = task->thread->engine; 3489 3490 nxt_queue_insert_tail(&engine->joints, &joint->link); 3491 3492 lev = nxt_router_listen_event(&engine->listen_connections, 3493 joint->socket_conf); 3494 3495 old = lev->socket.data; 3496 lev->socket.data = joint; 3497 lev->listen = joint->socket_conf->listen; 3498 3499 job->work.next = NULL; 3500 job->work.handler = nxt_router_conf_wait; 3501 3502 nxt_event_engine_post(job->tmcf->engine, &job->work); 3503 3504 /* 3505 * The task is allocated from configuration temporary 3506 * memory pool so it can be freed after engine post operation. 3507 */ 3508 3509 nxt_router_conf_release(&engine->task, old); 3510 } 3511 3512 3513 static void 3514 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) 3515 { 3516 nxt_socket_conf_t *skcf; 3517 nxt_listen_event_t *lev; 3518 nxt_event_engine_t *engine; 3519 nxt_socket_conf_joint_t *joint; 3520 3521 skcf = data; 3522 3523 engine = task->thread->engine; 3524 3525 lev = nxt_router_listen_event(&engine->listen_connections, skcf); 3526 3527 nxt_fd_event_delete(engine, &lev->socket); 3528 3529 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 3530 lev->socket.fd); 3531 3532 joint = lev->socket.data; 3533 joint->close_job = obj; 3534 3535 lev->timer.handler = nxt_router_listen_socket_close; 3536 lev->timer.work_queue = &engine->fast_work_queue; 3537 3538 nxt_timer_add(engine, &lev->timer, 0); 3539 } 3540 3541 3542 static void 3543 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data) 3544 { 3545 nxt_event_engine_t *engine; 3546 3547 nxt_debug(task, "router worker thread quit"); 3548 3549 engine = task->thread->engine; 3550 3551 engine->shutdown = 1; 3552 3553 if (nxt_queue_is_empty(&engine->joints)) { 3554 nxt_thread_exit(task->thread); 3555 } 3556 } 3557 3558 3559 static void 3560 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) 3561 { 3562 nxt_timer_t *timer; 3563 nxt_joint_job_t *job; 3564 nxt_listen_event_t *lev; 3565 nxt_socket_conf_joint_t *joint; 3566 3567 timer = obj; 3568 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 3569 3570 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 3571 lev->socket.fd); 3572 3573 nxt_queue_remove(&lev->link); 3574 3575 joint = lev->socket.data; 3576 lev->socket.data = NULL; 3577 3578 /* 'task' refers to lev->task and we cannot use after nxt_free() */ 3579 task = &task->thread->engine->task; 3580 3581 nxt_router_listen_socket_release(task, joint->socket_conf); 3582 3583 job = joint->close_job; 3584 job->work.next = NULL; 3585 job->work.handler = nxt_router_conf_wait; 3586 3587 nxt_event_engine_post(job->tmcf->engine, &job->work); 3588 3589 nxt_router_listen_event_release(task, lev, joint); 3590 } 3591 3592 3593 static void 3594 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) 3595 { 3596 nxt_listen_socket_t *ls; 3597 nxt_thread_spinlock_t *lock; 3598 3599 ls = skcf->listen; 3600 lock = &skcf->router_conf->router->lock; 3601 3602 nxt_thread_spin_lock(lock); 3603 3604 nxt_debug(task, "engine %p: listen socket release: ls->count %D", 3605 task->thread->engine, ls->count); 3606 3607 if (--ls->count != 0) { 3608 ls = NULL; 3609 } 3610 3611 nxt_thread_spin_unlock(lock); 3612 3613 if (ls != NULL) { 3614 nxt_socket_close(task, ls->socket); 3615 nxt_free(ls); 3616 } 3617 } 3618 3619 3620 void 3621 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, 3622 nxt_socket_conf_joint_t *joint) 3623 { 3624 nxt_event_engine_t *engine; 3625 3626 nxt_debug(task, "listen event count: %D", lev->count); 3627 3628 engine = task->thread->engine; 3629 3630 if (--lev->count == 0) { 3631 if (lev->next != NULL) { 3632 nxt_sockaddr_cache_free(engine, lev->next); 3633 3634 nxt_conn_free(task, lev->next); 3635 } 3636 3637 nxt_free(lev); 3638 } 3639 3640 if (joint != NULL) { 3641 nxt_router_conf_release(task, joint); 3642 } 3643 3644 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { 3645 nxt_thread_exit(task->thread); 3646 } 3647 } 3648 3649 3650 void 3651 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 3652 { 3653 nxt_socket_conf_t *skcf; 3654 nxt_router_conf_t *rtcf; 3655 nxt_thread_spinlock_t *lock; 3656 3657 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); 3658 3659 if (--joint->count != 0) { 3660 return; 3661 } 3662 3663 nxt_queue_remove(&joint->link); 3664 3665 /* 3666 * The joint content can not be safely used after the critical 3667 * section protected by the spinlock because its memory pool may 3668 * be already destroyed by another thread. 3669 */ 3670 skcf = joint->socket_conf; 3671 rtcf = skcf->router_conf; 3672 lock = &rtcf->router->lock; 3673 3674 nxt_thread_spin_lock(lock); 3675 3676 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 3677 rtcf, rtcf->count); 3678 3679 if (--skcf->count != 0) { 3680 skcf = NULL; 3681 rtcf = NULL; 3682 3683 } else { 3684 nxt_queue_remove(&skcf->link); 3685 3686 if (--rtcf->count != 0) { 3687 rtcf = NULL; 3688 } 3689 } 3690 3691 nxt_thread_spin_unlock(lock); 3692 3693 #if (NXT_TLS) 3694 if (skcf != NULL && skcf->tls != NULL) { 3695 task->thread->runtime->tls->server_free(task, skcf->tls); 3696 } 3697 #endif 3698 3699 /* TODO remove engine->port */ 3700 3701 if (rtcf != NULL) { 3702 nxt_debug(task, "old router conf is destroyed"); 3703 3704 nxt_router_apps_hash_use(task, rtcf, -1); 3705 3706 nxt_router_access_log_release(task, lock, rtcf->access_log); 3707 3708 nxt_mp_thread_adopt(rtcf->mem_pool); 3709 3710 nxt_mp_destroy(rtcf->mem_pool); 3711 } 3712 } 3713 3714 3715 static void 3716 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, 3717 nxt_router_access_log_t *access_log) 3718 { 3719 size_t size; 3720 u_char *buf, *p; 3721 nxt_off_t bytes; 3722 3723 static nxt_time_string_t date_cache = { 3724 (nxt_atomic_uint_t) -1, 3725 nxt_router_access_log_date, 3726 "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d", 3727 nxt_length("31/Dec/1986:19:40:00 +0300"), 3728 NXT_THREAD_TIME_LOCAL, 3729 NXT_THREAD_TIME_SEC, 3730 }; 3731 3732 size = r->remote->address_length 3733 + 6 /* ' - - [' */ 3734 + date_cache.size 3735 + 3 /* '] "' */ 3736 + r->method->length 3737 + 1 /* space */ 3738 + r->target.length 3739 + 1 /* space */ 3740 + r->version.length 3741 + 2 /* '" ' */ 3742 + 3 /* status */ 3743 + 1 /* space */ 3744 + NXT_OFF_T_LEN 3745 + 2 /* ' "' */ 3746 + (r->referer != NULL ? r->referer->value_length : 1) 3747 + 3 /* '" "' */ 3748 + (r->user_agent != NULL ? r->user_agent->value_length : 1) 3749 + 2 /* '"\n' */ 3750 ; 3751 3752 buf = nxt_mp_nget(r->mem_pool, size); 3753 if (nxt_slow_path(buf == NULL)) { 3754 return; 3755 } 3756 3757 p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote), 3758 r->remote->address_length); 3759 3760 p = nxt_cpymem(p, " - - [", 6); 3761 3762 p = nxt_thread_time_string(task->thread, &date_cache, p); 3763 3764 p = nxt_cpymem(p, "] \"", 3); 3765 3766 if (r->method->length != 0) { 3767 p = nxt_cpymem(p, r->method->start, r->method->length); 3768 3769 if (r->target.length != 0) { 3770 *p++ = ' '; 3771 p = nxt_cpymem(p, r->target.start, r->target.length); 3772 3773 if (r->version.length != 0) { 3774 *p++ = ' '; 3775 p = nxt_cpymem(p, r->version.start, r->version.length); 3776 } 3777 } 3778 3779 } else { 3780 *p++ = '-'; 3781 } 3782 3783 p = nxt_cpymem(p, "\" ", 2); 3784 3785 p = nxt_sprintf(p, p + 3, "%03d", r->status); 3786 3787 *p++ = ' '; 3788 3789 bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); 3790 3791 p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); 3792 3793 p = nxt_cpymem(p, " \"", 2); 3794 3795 if (r->referer != NULL) { 3796 p = nxt_cpymem(p, r->referer->value, r->referer->value_length); 3797 3798 } else { 3799 *p++ = '-'; 3800 } 3801 3802 p = nxt_cpymem(p, "\" \"", 3); 3803 3804 if (r->user_agent != NULL) { 3805 p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length); 3806 3807 } else { 3808 *p++ = '-'; 3809 } 3810 3811 p = nxt_cpymem(p, "\"\n", 2); 3812 3813 nxt_fd_write(access_log->fd, buf, p - buf); 3814 } 3815 3816 3817 static u_char * 3818 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 3819 size_t size, const char *format) 3820 { 3821 u_char sign; 3822 time_t gmtoff; 3823 3824 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 3825 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 3826 3827 gmtoff = nxt_timezone(tm) / 60; 3828 3829 if (gmtoff < 0) { 3830 gmtoff = -gmtoff; 3831 sign = '-'; 3832 3833 } else { 3834 sign = '+'; 3835 } 3836 3837 return nxt_sprintf(buf, buf + size, format, 3838 tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900, 3839 tm->tm_hour, tm->tm_min, tm->tm_sec, 3840 sign, gmtoff / 60, gmtoff % 60); 3841 } 3842 3843 3844 static void 3845 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 3846 { 3847 uint32_t stream; 3848 nxt_int_t ret; 3849 nxt_buf_t *b; 3850 nxt_port_t *main_port, *router_port; 3851 nxt_runtime_t *rt; 3852 nxt_router_access_log_t *access_log; 3853 3854 access_log = tmcf->router_conf->access_log; 3855 3856 b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0); 3857 if (nxt_slow_path(b == NULL)) { 3858 goto fail; 3859 } 3860 3861 b->completion_handler = nxt_buf_dummy_completion; 3862 3863 nxt_buf_cpystr(b, &access_log->path); 3864 *b->mem.free++ = '\0'; 3865 3866 rt = task->thread->runtime; 3867 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 3868 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 3869 3870 stream = nxt_port_rpc_register_handler(task, router_port, 3871 nxt_router_access_log_ready, 3872 nxt_router_access_log_error, 3873 -1, tmcf); 3874 if (nxt_slow_path(stream == 0)) { 3875 goto fail; 3876 } 3877 3878 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 3879 stream, router_port->id, b); 3880 3881 if (nxt_slow_path(ret != NXT_OK)) { 3882 nxt_port_rpc_cancel(task, router_port, stream); 3883 goto fail; 3884 } 3885 3886 return; 3887 3888 fail: 3889 3890 nxt_router_conf_error(task, tmcf); 3891 } 3892 3893 3894 static void 3895 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3896 void *data) 3897 { 3898 nxt_router_temp_conf_t *tmcf; 3899 nxt_router_access_log_t *access_log; 3900 3901 tmcf = data; 3902 3903 access_log = tmcf->router_conf->access_log; 3904 3905 access_log->fd = msg->fd[0]; 3906 3907 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 3908 nxt_router_conf_apply, task, tmcf, NULL); 3909 } 3910 3911 3912 static void 3913 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 3914 void *data) 3915 { 3916 nxt_router_temp_conf_t *tmcf; 3917 3918 tmcf = data; 3919 3920 nxt_router_conf_error(task, tmcf); 3921 } 3922 3923 3924 static void 3925 nxt_router_access_log_use(nxt_thread_spinlock_t *lock, 3926 nxt_router_access_log_t *access_log) 3927 { 3928 if (access_log == NULL) { 3929 return; 3930 } 3931 3932 nxt_thread_spin_lock(lock); 3933 3934 access_log->count++; 3935 3936 nxt_thread_spin_unlock(lock); 3937 } 3938 3939 3940 static void 3941 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, 3942 nxt_router_access_log_t *access_log) 3943 { 3944 if (access_log == NULL) { 3945 return; 3946 } 3947 3948 nxt_thread_spin_lock(lock); 3949 3950 if (--access_log->count != 0) { 3951 access_log = NULL; 3952 } 3953 3954 nxt_thread_spin_unlock(lock); 3955 3956 if (access_log != NULL) { 3957 3958 if (access_log->fd != -1) { 3959 nxt_fd_close(access_log->fd); 3960 } 3961 3962 nxt_free(access_log); 3963 } 3964 } 3965 3966 3967 typedef struct { 3968 nxt_mp_t *mem_pool; 3969 nxt_router_access_log_t *access_log; 3970 } nxt_router_access_log_reopen_t; 3971 3972 3973 static void 3974 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 3975 { 3976 nxt_mp_t *mp; 3977 uint32_t stream; 3978 nxt_int_t ret; 3979 nxt_buf_t *b; 3980 nxt_port_t *main_port, *router_port; 3981 nxt_runtime_t *rt; 3982 nxt_router_access_log_t *access_log; 3983 nxt_router_access_log_reopen_t *reopen; 3984 3985 access_log = nxt_router->access_log; 3986 3987 if (access_log == NULL) { 3988 return; 3989 } 3990 3991 mp = nxt_mp_create(1024, 128, 256, 32); 3992 if (nxt_slow_path(mp == NULL)) { 3993 return; 3994 } 3995 3996 reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t)); 3997 if (nxt_slow_path(reopen == NULL)) { 3998 goto fail; 3999 } 4000 4001 reopen->mem_pool = mp; 4002 reopen->access_log = access_log; 4003 4004 b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0); 4005 if (nxt_slow_path(b == NULL)) { 4006 goto fail; 4007 } 4008 4009 b->completion_handler = nxt_router_access_log_reopen_completion; 4010 4011 nxt_buf_cpystr(b, &access_log->path); 4012 *b->mem.free++ = '\0'; 4013 4014 rt = task->thread->runtime; 4015 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 4016 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 4017 4018 stream = nxt_port_rpc_register_handler(task, router_port, 4019 nxt_router_access_log_reopen_ready, 4020 nxt_router_access_log_reopen_error, 4021 -1, reopen); 4022 if (nxt_slow_path(stream == 0)) { 4023 goto fail; 4024 } 4025 4026 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1, 4027 stream, router_port->id, b); 4028 4029 if (nxt_slow_path(ret != NXT_OK)) { 4030 nxt_port_rpc_cancel(task, router_port, stream); 4031 goto fail; 4032 } 4033 4034 nxt_mp_retain(mp); 4035 4036 return; 4037 4038 fail: 4039 4040 nxt_mp_destroy(mp); 4041 } 4042 4043 4044 static void 4045 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data) 4046 { 4047 nxt_mp_t *mp; 4048 nxt_buf_t *b; 4049 4050 b = obj; 4051 mp = b->data; 4052 4053 nxt_mp_release(mp); 4054 } 4055 4056 4057 static void 4058 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4059 void *data) 4060 { 4061 nxt_router_access_log_t *access_log; 4062 nxt_router_access_log_reopen_t *reopen; 4063 4064 reopen = data; 4065 4066 access_log = reopen->access_log; 4067 4068 if (access_log == nxt_router->access_log) { 4069 4070 if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { 4071 nxt_alert(task, "dup2(%FD, %FD) failed %E", 4072 msg->fd[0], access_log->fd, nxt_errno); 4073 } 4074 } 4075 4076 nxt_fd_close(msg->fd[0]); 4077 nxt_mp_release(reopen->mem_pool); 4078 } 4079 4080 4081 static void 4082 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4083 void *data) 4084 { 4085 nxt_router_access_log_reopen_t *reopen; 4086 4087 reopen = data; 4088 4089 nxt_mp_release(reopen->mem_pool); 4090 } 4091 4092 4093 static void 4094 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) 4095 { 4096 nxt_port_t *port; 4097 nxt_thread_link_t *link; 4098 nxt_event_engine_t *engine; 4099 nxt_thread_handle_t handle; 4100 4101 handle = (nxt_thread_handle_t) (uintptr_t) obj; 4102 link = data; 4103 4104 nxt_thread_wait(handle); 4105 4106 engine = link->engine; 4107 4108 nxt_queue_remove(&engine->link); 4109 4110 port = engine->port; 4111 4112 // TODO notify all apps 4113 4114 port->engine = task->thread->engine; 4115 nxt_mp_thread_adopt(port->mem_pool); 4116 nxt_port_use(task, port, -1); 4117 4118 nxt_mp_thread_adopt(engine->mem_pool); 4119 nxt_mp_destroy(engine->mem_pool); 4120 4121 nxt_event_engine_free(engine); 4122 4123 nxt_free(link); 4124 } 4125 4126 4127 static void 4128 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4129 void *data) 4130 { 4131 size_t b_size, count; 4132 nxt_int_t ret; 4133 nxt_app_t *app; 4134 nxt_buf_t *b, *next; 4135 nxt_port_t *app_port; 4136 nxt_unit_field_t *f; 4137 nxt_http_field_t *field; 4138 nxt_http_request_t *r; 4139 nxt_unit_response_t *resp; 4140 nxt_request_rpc_data_t *req_rpc_data; 4141 4142 req_rpc_data = data; 4143 4144 r = req_rpc_data->request; 4145 if (nxt_slow_path(r == NULL)) { 4146 return; 4147 } 4148 4149 if (r->error) { 4150 nxt_request_rpc_data_unlink(task, req_rpc_data); 4151 return; 4152 } 4153 4154 app = req_rpc_data->app; 4155 nxt_assert(app != NULL); 4156 4157 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { 4158 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); 4159 4160 return; 4161 } 4162 4163 b = (msg->size == 0) ? NULL : msg->buf; 4164 4165 if (msg->port_msg.last != 0) { 4166 nxt_debug(task, "router data create last buf"); 4167 4168 nxt_buf_chain_add(&b, nxt_http_buf_last(r)); 4169 4170 req_rpc_data->rpc_cancel = 0; 4171 4172 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { 4173 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; 4174 } 4175 4176 nxt_request_rpc_data_unlink(task, req_rpc_data); 4177 4178 } else { 4179 if (app->timeout != 0) { 4180 r->timer.handler = nxt_router_app_timeout; 4181 r->timer_data = req_rpc_data; 4182 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4183 } 4184 } 4185 4186 if (b == NULL) { 4187 return; 4188 } 4189 4190 if (msg->buf == b) { 4191 /* Disable instant buffer completion/re-using by port. */ 4192 msg->buf = NULL; 4193 } 4194 4195 if (r->header_sent) { 4196 nxt_buf_chain_add(&r->out, b); 4197 nxt_http_request_send_body(task, r, NULL); 4198 4199 } else { 4200 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0; 4201 4202 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) { 4203 nxt_alert(task, "response buffer too small: %z", b_size); 4204 goto fail; 4205 } 4206 4207 resp = (void *) b->mem.pos; 4208 count = (b_size - sizeof(nxt_unit_response_t)) 4209 / sizeof(nxt_unit_field_t); 4210 4211 if (nxt_slow_path(count < resp->fields_count)) { 4212 nxt_alert(task, "response buffer too small for fields count: %D", 4213 resp->fields_count); 4214 goto fail; 4215 } 4216 4217 field = NULL; 4218 4219 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { 4220 if (f->skip) { 4221 continue; 4222 } 4223 4224 field = nxt_list_add(r->resp.fields); 4225 4226 if (nxt_slow_path(field == NULL)) { 4227 goto fail; 4228 } 4229 4230 field->hash = f->hash; 4231 field->skip = 0; 4232 field->hopbyhop = 0; 4233 4234 field->name_length = f->name_length; 4235 field->value_length = f->value_length; 4236 field->name = nxt_unit_sptr_get(&f->name); 4237 field->value = nxt_unit_sptr_get(&f->value); 4238 4239 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); 4240 if (nxt_slow_path(ret != NXT_OK)) { 4241 goto fail; 4242 } 4243 4244 nxt_debug(task, "header%s: %*s: %*s", 4245 (field->skip ? " skipped" : ""), 4246 (size_t) field->name_length, field->name, 4247 (size_t) field->value_length, field->value); 4248 4249 if (field->skip) { 4250 r->resp.fields->last->nelts--; 4251 } 4252 } 4253 4254 r->status = resp->status; 4255 4256 if (resp->piggyback_content_length != 0) { 4257 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); 4258 b->mem.free = b->mem.pos + resp->piggyback_content_length; 4259 4260 } else { 4261 b->mem.pos = b->mem.free; 4262 } 4263 4264 if (nxt_buf_mem_used_size(&b->mem) == 0) { 4265 next = b->next; 4266 b->next = NULL; 4267 4268 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 4269 b->completion_handler, task, b, b->parent); 4270 4271 b = next; 4272 } 4273 4274 if (b != NULL) { 4275 nxt_buf_chain_add(&r->out, b); 4276 } 4277 4278 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); 4279 4280 if (r->websocket_handshake 4281 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) 4282 { 4283 app_port = req_rpc_data->app_port; 4284 if (nxt_slow_path(app_port == NULL)) { 4285 goto fail; 4286 } 4287 4288 nxt_thread_mutex_lock(&app->mutex); 4289 4290 app_port->main_app_port->active_websockets++; 4291 4292 nxt_thread_mutex_unlock(&app->mutex); 4293 4294 nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE); 4295 req_rpc_data->apr_action = NXT_APR_CLOSE; 4296 4297 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 4298 4299 r->state = &nxt_http_websocket; 4300 4301 } else { 4302 r->state = &nxt_http_request_send_state; 4303 } 4304 } 4305 4306 return; 4307 4308 fail: 4309 4310 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4311 4312 nxt_request_rpc_data_unlink(task, req_rpc_data); 4313 } 4314 4315 4316 static void 4317 nxt_router_req_headers_ack_handler(nxt_task_t *task, 4318 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 4319 { 4320 int res; 4321 nxt_app_t *app; 4322 nxt_buf_t *b; 4323 nxt_bool_t start_process, unlinked; 4324 nxt_port_t *app_port, *main_app_port, *idle_port; 4325 nxt_queue_link_t *idle_lnk; 4326 nxt_http_request_t *r; 4327 4328 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 4329 req_rpc_data->stream, 4330 msg->port_msg.pid, msg->port_msg.reply_port); 4331 4332 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 4333 msg->port_msg.pid); 4334 4335 app = req_rpc_data->app; 4336 r = req_rpc_data->request; 4337 4338 start_process = 0; 4339 unlinked = 0; 4340 4341 nxt_thread_mutex_lock(&app->mutex); 4342 4343 if (r->app_link.next != NULL) { 4344 nxt_queue_remove(&r->app_link); 4345 r->app_link.next = NULL; 4346 4347 unlinked = 1; 4348 } 4349 4350 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 4351 msg->port_msg.reply_port); 4352 if (nxt_slow_path(app_port == NULL)) { 4353 nxt_thread_mutex_unlock(&app->mutex); 4354 4355 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4356 4357 if (unlinked) { 4358 nxt_mp_release(r->mem_pool); 4359 } 4360 4361 return; 4362 } 4363 4364 main_app_port = app_port->main_app_port; 4365 4366 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 4367 app->idle_processes--; 4368 4369 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", 4370 &app->name, main_app_port->pid, main_app_port->id, 4371 (main_app_port->idle_start ? "idle_ports" : "spare_ports")); 4372 4373 /* Check port was in 'spare_ports' using idle_start field. */ 4374 if (main_app_port->idle_start == 0 4375 && app->idle_processes >= app->spare_processes) 4376 { 4377 /* 4378 * If there is a vacant space in spare ports, 4379 * move the last idle to spare_ports. 4380 */ 4381 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4382 4383 idle_lnk = nxt_queue_last(&app->idle_ports); 4384 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4385 nxt_queue_remove(idle_lnk); 4386 4387 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4388 4389 idle_port->idle_start = 0; 4390 4391 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4392 "to spare_ports", 4393 &app->name, idle_port->pid, idle_port->id); 4394 } 4395 4396 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4397 app->pending_processes++; 4398 start_process = 1; 4399 } 4400 } 4401 4402 main_app_port->active_requests++; 4403 4404 nxt_port_inc_use(app_port); 4405 4406 nxt_thread_mutex_unlock(&app->mutex); 4407 4408 if (unlinked) { 4409 nxt_mp_release(r->mem_pool); 4410 } 4411 4412 if (start_process) { 4413 nxt_router_start_app_process(task, app); 4414 } 4415 4416 nxt_port_use(task, req_rpc_data->app_port, -1); 4417 4418 req_rpc_data->app_port = app_port; 4419 4420 b = req_rpc_data->msg_info.buf; 4421 4422 if (b != NULL) { 4423 /* First buffer is already sent. Start from second. */ 4424 b = b->next; 4425 4426 req_rpc_data->msg_info.buf->next = NULL; 4427 } 4428 4429 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) { 4430 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, 4431 req_rpc_data->msg_info.body_fd); 4432 4433 if (req_rpc_data->msg_info.body_fd != -1) { 4434 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 4435 } 4436 4437 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 4438 req_rpc_data->msg_info.body_fd, 4439 req_rpc_data->stream, 4440 task->thread->engine->port->id, b); 4441 4442 if (nxt_slow_path(res != NXT_OK)) { 4443 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 4444 } 4445 } 4446 4447 if (app->timeout != 0) { 4448 r->timer.handler = nxt_router_app_timeout; 4449 r->timer_data = req_rpc_data; 4450 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 4451 } 4452 } 4453 4454 4455 static const nxt_http_request_state_t nxt_http_request_send_state 4456 nxt_aligned(64) = 4457 { 4458 .error_handler = nxt_http_request_error_handler, 4459 }; 4460 4461 4462 static void 4463 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) 4464 { 4465 nxt_buf_t *out; 4466 nxt_http_request_t *r; 4467 4468 r = obj; 4469 4470 out = r->out; 4471 4472 if (out != NULL) { 4473 r->out = NULL; 4474 nxt_http_request_send(task, r, out); 4475 } 4476 } 4477 4478 4479 static void 4480 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4481 void *data) 4482 { 4483 nxt_request_rpc_data_t *req_rpc_data; 4484 4485 req_rpc_data = data; 4486 4487 req_rpc_data->rpc_cancel = 0; 4488 4489 /* TODO cancel message and return if cancelled. */ 4490 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 4491 4492 if (req_rpc_data->request != NULL) { 4493 nxt_http_request_error(task, req_rpc_data->request, 4494 NXT_HTTP_SERVICE_UNAVAILABLE); 4495 } 4496 4497 nxt_request_rpc_data_unlink(task, req_rpc_data); 4498 } 4499 4500 4501 static void 4502 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4503 void *data) 4504 { 4505 uint32_t n; 4506 nxt_app_t *app; 4507 nxt_bool_t start_process, restarted; 4508 nxt_port_t *port; 4509 nxt_app_joint_t *app_joint; 4510 nxt_app_joint_rpc_t *app_joint_rpc; 4511 4512 nxt_assert(data != NULL); 4513 4514 app_joint_rpc = data; 4515 app_joint = app_joint_rpc->app_joint; 4516 port = msg->u.new_port; 4517 4518 nxt_assert(app_joint != NULL); 4519 nxt_assert(port != NULL); 4520 nxt_assert(port->id == 0); 4521 4522 app = app_joint->app; 4523 4524 nxt_router_app_joint_use(task, app_joint, -1); 4525 4526 if (nxt_slow_path(app == NULL)) { 4527 nxt_debug(task, "new port ready for released app, send QUIT"); 4528 4529 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4530 4531 return; 4532 } 4533 4534 nxt_thread_mutex_lock(&app->mutex); 4535 4536 restarted = (app->generation != app_joint_rpc->generation); 4537 4538 if (app_joint_rpc->proto) { 4539 nxt_assert(app->proto_port == NULL); 4540 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); 4541 4542 n = app->proto_port_requests; 4543 app->proto_port_requests = 0; 4544 4545 if (nxt_slow_path(restarted)) { 4546 nxt_thread_mutex_unlock(&app->mutex); 4547 4548 nxt_debug(task, "proto port ready for restarted app, send QUIT"); 4549 4550 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, 4551 NULL); 4552 4553 } else { 4554 port->app = app; 4555 app->proto_port = port; 4556 4557 nxt_thread_mutex_unlock(&app->mutex); 4558 4559 nxt_port_use(task, port, 1); 4560 } 4561 4562 port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER]; 4563 4564 while (n > 0) { 4565 nxt_router_app_use(task, app, 1); 4566 4567 nxt_router_start_app_process_handler(task, port, app); 4568 4569 n--; 4570 } 4571 4572 return; 4573 } 4574 4575 nxt_assert(port->type == NXT_PROCESS_APP); 4576 nxt_assert(app->pending_processes != 0); 4577 4578 app->pending_processes--; 4579 4580 if (nxt_slow_path(restarted)) { 4581 nxt_debug(task, "new port ready for restarted app, send QUIT"); 4582 4583 start_process = !task->thread->engine->shutdown 4584 && nxt_router_app_can_start(app) 4585 && nxt_router_app_need_start(app); 4586 4587 if (start_process) { 4588 app->pending_processes++; 4589 } 4590 4591 nxt_thread_mutex_unlock(&app->mutex); 4592 4593 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 4594 4595 if (start_process) { 4596 nxt_router_start_app_process(task, app); 4597 } 4598 4599 return; 4600 } 4601 4602 port->app = app; 4603 port->main_app_port = port; 4604 4605 app->processes++; 4606 nxt_port_hash_add(&app->port_hash, port); 4607 app->port_hash_count++; 4608 4609 nxt_thread_mutex_unlock(&app->mutex); 4610 4611 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", 4612 &app->name, port->pid, app->processes, app->pending_processes); 4613 4614 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 4615 4616 nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); 4617 } 4618 4619 4620 static void 4621 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4622 void *data) 4623 { 4624 nxt_app_t *app; 4625 nxt_app_joint_t *app_joint; 4626 nxt_queue_link_t *link; 4627 nxt_http_request_t *r; 4628 nxt_app_joint_rpc_t *app_joint_rpc; 4629 4630 nxt_assert(data != NULL); 4631 4632 app_joint_rpc = data; 4633 app_joint = app_joint_rpc->app_joint; 4634 4635 nxt_assert(app_joint != NULL); 4636 4637 app = app_joint->app; 4638 4639 nxt_router_app_joint_use(task, app_joint, -1); 4640 4641 if (nxt_slow_path(app == NULL)) { 4642 nxt_debug(task, "start error for released app"); 4643 4644 return; 4645 } 4646 4647 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4648 4649 link = NULL; 4650 4651 nxt_thread_mutex_lock(&app->mutex); 4652 4653 nxt_assert(app->pending_processes != 0); 4654 4655 app->pending_processes--; 4656 4657 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4658 link = nxt_queue_first(&app->ack_waiting_req); 4659 4660 nxt_queue_remove(link); 4661 link->next = NULL; 4662 } 4663 4664 nxt_thread_mutex_unlock(&app->mutex); 4665 4666 while (link != NULL) { 4667 r = nxt_container_of(link, nxt_http_request_t, app_link); 4668 4669 nxt_event_engine_post(r->engine, &r->err_work); 4670 4671 link = NULL; 4672 4673 nxt_thread_mutex_lock(&app->mutex); 4674 4675 if (app->processes == 0 && app->pending_processes == 0 4676 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4677 { 4678 link = nxt_queue_first(&app->ack_waiting_req); 4679 4680 nxt_queue_remove(link); 4681 link->next = NULL; 4682 } 4683 4684 nxt_thread_mutex_unlock(&app->mutex); 4685 } 4686 } 4687 4688 4689 nxt_inline nxt_port_t * 4690 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) 4691 { 4692 nxt_port_t *port; 4693 4694 port = NULL; 4695 4696 nxt_thread_mutex_lock(&app->mutex); 4697 4698 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 4699 4700 /* Caller is responsible to decrease port use count. */ 4701 nxt_queue_chk_remove(&port->app_link); 4702 4703 if (nxt_queue_chk_remove(&port->idle_link)) { 4704 app->idle_processes--; 4705 4706 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", 4707 &app->name, port->pid, port->id, 4708 (port->idle_start ? "idle_ports" : "spare_ports")); 4709 } 4710 4711 nxt_port_hash_remove(&app->port_hash, port); 4712 app->port_hash_count--; 4713 4714 port->app = NULL; 4715 app->processes--; 4716 4717 break; 4718 4719 } nxt_queue_loop; 4720 4721 nxt_thread_mutex_unlock(&app->mutex); 4722 4723 return port; 4724 } 4725 4726 4727 static void 4728 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4729 { 4730 int c; 4731 4732 c = nxt_atomic_fetch_add(&app->use_count, i); 4733 4734 if (i < 0 && c == -i) { 4735 4736 if (task->thread->engine != app->engine) { 4737 nxt_event_engine_post(app->engine, &app->joint->free_app_work); 4738 4739 } else { 4740 nxt_router_free_app(task, app->joint, NULL); 4741 } 4742 } 4743 } 4744 4745 4746 static void 4747 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) 4748 { 4749 nxt_debug(task, "app '%V' %p unlink", &app->name, app); 4750 4751 nxt_queue_remove(&app->link); 4752 4753 nxt_router_app_use(task, app, -1); 4754 } 4755 4756 4757 static void 4758 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port, 4759 nxt_apr_action_t action) 4760 { 4761 int inc_use; 4762 uint32_t got_response, dec_requests; 4763 nxt_bool_t adjust_idle_timer; 4764 nxt_port_t *main_app_port; 4765 4766 nxt_assert(port != NULL); 4767 4768 inc_use = 0; 4769 got_response = 0; 4770 dec_requests = 0; 4771 4772 switch (action) { 4773 case NXT_APR_NEW_PORT: 4774 break; 4775 case NXT_APR_REQUEST_FAILED: 4776 dec_requests = 1; 4777 inc_use = -1; 4778 break; 4779 case NXT_APR_GOT_RESPONSE: 4780 got_response = 1; 4781 inc_use = -1; 4782 break; 4783 case NXT_APR_UPGRADE: 4784 got_response = 1; 4785 break; 4786 case NXT_APR_CLOSE: 4787 inc_use = -1; 4788 break; 4789 } 4790 4791 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, 4792 port->pid, port->id, 4793 (int) inc_use, (int) got_response); 4794 4795 if (port->id == NXT_SHARED_PORT_ID) { 4796 nxt_thread_mutex_lock(&app->mutex); 4797 4798 app->active_requests -= got_response + dec_requests; 4799 4800 nxt_thread_mutex_unlock(&app->mutex); 4801 4802 goto adjust_use; 4803 } 4804 4805 main_app_port = port->main_app_port; 4806 4807 nxt_thread_mutex_lock(&app->mutex); 4808 4809 main_app_port->active_requests -= got_response + dec_requests; 4810 app->active_requests -= got_response + dec_requests; 4811 4812 if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) { 4813 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4814 4815 nxt_port_inc_use(main_app_port); 4816 } 4817 4818 adjust_idle_timer = 0; 4819 4820 if (main_app_port->pair[1] != -1 4821 && main_app_port->active_requests == 0 4822 && main_app_port->active_websockets == 0 4823 && main_app_port->idle_link.next == NULL) 4824 { 4825 if (app->idle_processes == app->spare_processes 4826 && app->adjust_idle_work.data == NULL) 4827 { 4828 adjust_idle_timer = 1; 4829 app->adjust_idle_work.data = app; 4830 app->adjust_idle_work.next = NULL; 4831 } 4832 4833 if (app->idle_processes < app->spare_processes) { 4834 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4835 4836 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", 4837 &app->name, main_app_port->pid, main_app_port->id); 4838 } else { 4839 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4840 4841 main_app_port->idle_start = task->thread->engine->timers.now; 4842 4843 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", 4844 &app->name, main_app_port->pid, main_app_port->id); 4845 } 4846 4847 app->idle_processes++; 4848 } 4849 4850 nxt_thread_mutex_unlock(&app->mutex); 4851 4852 if (adjust_idle_timer) { 4853 nxt_router_app_use(task, app, 1); 4854 nxt_event_engine_post(app->engine, &app->adjust_idle_work); 4855 } 4856 4857 /* ? */ 4858 if (main_app_port->pair[1] == -1) { 4859 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", 4860 &app->name, app, main_app_port, main_app_port->pid); 4861 4862 goto adjust_use; 4863 } 4864 4865 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", 4866 &app->name, app); 4867 4868 adjust_use: 4869 4870 nxt_port_use(task, port, inc_use); 4871 } 4872 4873 4874 void 4875 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) 4876 { 4877 nxt_app_t *app; 4878 nxt_bool_t unchain, start_process; 4879 nxt_port_t *idle_port; 4880 nxt_queue_link_t *idle_lnk; 4881 4882 app = port->app; 4883 4884 nxt_assert(app != NULL); 4885 4886 nxt_thread_mutex_lock(&app->mutex); 4887 4888 if (port == app->proto_port) { 4889 app->proto_port = NULL; 4890 port->app = NULL; 4891 4892 nxt_thread_mutex_unlock(&app->mutex); 4893 4894 nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name, 4895 port->pid); 4896 4897 nxt_port_use(task, port, -1); 4898 4899 return; 4900 } 4901 4902 nxt_port_hash_remove(&app->port_hash, port); 4903 app->port_hash_count--; 4904 4905 if (port->id != 0) { 4906 nxt_thread_mutex_unlock(&app->mutex); 4907 4908 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, 4909 port->pid, port->id); 4910 4911 return; 4912 } 4913 4914 unchain = nxt_queue_chk_remove(&port->app_link); 4915 4916 if (nxt_queue_chk_remove(&port->idle_link)) { 4917 app->idle_processes--; 4918 4919 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", 4920 &app->name, port->pid, port->id, 4921 (port->idle_start ? "idle_ports" : "spare_ports")); 4922 4923 if (port->idle_start == 0 4924 && app->idle_processes >= app->spare_processes) 4925 { 4926 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 4927 4928 idle_lnk = nxt_queue_last(&app->idle_ports); 4929 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); 4930 nxt_queue_remove(idle_lnk); 4931 4932 nxt_queue_insert_tail(&app->spare_ports, idle_lnk); 4933 4934 idle_port->idle_start = 0; 4935 4936 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " 4937 "to spare_ports", 4938 &app->name, idle_port->pid, idle_port->id); 4939 } 4940 } 4941 4942 app->processes--; 4943 4944 start_process = !task->thread->engine->shutdown 4945 && nxt_router_app_can_start(app) 4946 && nxt_router_app_need_start(app); 4947 4948 if (start_process) { 4949 app->pending_processes++; 4950 } 4951 4952 nxt_thread_mutex_unlock(&app->mutex); 4953 4954 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); 4955 4956 if (unchain) { 4957 nxt_port_use(task, port, -1); 4958 } 4959 4960 if (start_process) { 4961 nxt_router_start_app_process(task, app); 4962 } 4963 } 4964 4965 4966 static void 4967 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) 4968 { 4969 nxt_app_t *app; 4970 nxt_bool_t queued; 4971 nxt_port_t *port; 4972 nxt_msec_t timeout, threshold; 4973 nxt_queue_link_t *lnk; 4974 nxt_event_engine_t *engine; 4975 4976 app = obj; 4977 queued = (data == app); 4978 4979 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", 4980 &app->name, queued); 4981 4982 engine = task->thread->engine; 4983 4984 nxt_assert(app->engine == engine); 4985 4986 threshold = engine->timers.now + app->joint->idle_timer.bias; 4987 timeout = 0; 4988 4989 nxt_thread_mutex_lock(&app->mutex); 4990 4991 if (queued) { 4992 app->adjust_idle_work.data = NULL; 4993 } 4994 4995 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", 4996 &app->name, 4997 (int) app->idle_processes, (int) app->spare_processes); 4998 4999 while (app->idle_processes > app->spare_processes) { 5000 5001 nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); 5002 5003 lnk = nxt_queue_first(&app->idle_ports); 5004 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); 5005 5006 timeout = port->idle_start + app->idle_timeout; 5007 5008 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", 5009 &app->name, port->pid, 5010 port->idle_start, timeout, threshold); 5011 5012 if (timeout > threshold) { 5013 break; 5014 } 5015 5016 nxt_queue_remove(lnk); 5017 lnk->next = NULL; 5018 5019 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", 5020 &app->name, port->pid, port->id); 5021 5022 nxt_queue_chk_remove(&port->app_link); 5023 5024 nxt_port_hash_remove(&app->port_hash, port); 5025 app->port_hash_count--; 5026 5027 app->idle_processes--; 5028 app->processes--; 5029 port->app = NULL; 5030 5031 nxt_thread_mutex_unlock(&app->mutex); 5032 5033 nxt_debug(task, "app '%V' send QUIT to idle port %PI", 5034 &app->name, port->pid); 5035 5036 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 5037 5038 nxt_port_use(task, port, -1); 5039 5040 nxt_thread_mutex_lock(&app->mutex); 5041 } 5042 5043 nxt_thread_mutex_unlock(&app->mutex); 5044 5045 if (timeout > threshold) { 5046 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); 5047 5048 } else { 5049 nxt_timer_disable(engine, &app->joint->idle_timer); 5050 } 5051 5052 if (queued) { 5053 nxt_router_app_use(task, app, -1); 5054 } 5055 } 5056 5057 5058 static void 5059 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) 5060 { 5061 nxt_timer_t *timer; 5062 nxt_app_joint_t *app_joint; 5063 5064 timer = obj; 5065 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 5066 5067 if (nxt_fast_path(app_joint->app != NULL)) { 5068 nxt_router_adjust_idle_timer(task, app_joint->app, NULL); 5069 } 5070 } 5071 5072 5073 static void 5074 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) 5075 { 5076 nxt_timer_t *timer; 5077 nxt_app_joint_t *app_joint; 5078 5079 timer = obj; 5080 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); 5081 5082 nxt_router_app_joint_use(task, app_joint, -1); 5083 } 5084 5085 5086 static void 5087 nxt_router_free_app(nxt_task_t *task, void *obj, void *data) 5088 { 5089 nxt_app_t *app; 5090 nxt_port_t *port, *proto_port; 5091 nxt_app_joint_t *app_joint; 5092 5093 app_joint = obj; 5094 app = app_joint->app; 5095 5096 for ( ;; ) { 5097 port = nxt_router_app_get_port_for_quit(task, app); 5098 if (port == NULL) { 5099 break; 5100 } 5101 5102 nxt_port_use(task, port, -1); 5103 } 5104 5105 nxt_thread_mutex_lock(&app->mutex); 5106 5107 for ( ;; ) { 5108 port = nxt_port_hash_retrieve(&app->port_hash); 5109 if (port == NULL) { 5110 break; 5111 } 5112 5113 app->port_hash_count--; 5114 5115 port->app = NULL; 5116 5117 nxt_port_close(task, port); 5118 5119 nxt_port_use(task, port, -1); 5120 } 5121 5122 proto_port = app->proto_port; 5123 5124 if (proto_port != NULL) { 5125 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, 5126 proto_port->pid); 5127 5128 app->proto_port = NULL; 5129 proto_port->app = NULL; 5130 } 5131 5132 nxt_thread_mutex_unlock(&app->mutex); 5133 5134 if (proto_port != NULL) { 5135 nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, 5136 -1, 0, 0, NULL); 5137 5138 nxt_port_close(task, proto_port); 5139 5140 nxt_port_use(task, proto_port, -1); 5141 } 5142 5143 nxt_assert(app->proto_port == NULL); 5144 nxt_assert(app->processes == 0); 5145 nxt_assert(app->active_requests == 0); 5146 nxt_assert(app->port_hash_count == 0); 5147 nxt_assert(app->idle_processes == 0); 5148 nxt_assert(nxt_queue_is_empty(&app->ports)); 5149 nxt_assert(nxt_queue_is_empty(&app->spare_ports)); 5150 nxt_assert(nxt_queue_is_empty(&app->idle_ports)); 5151 5152 nxt_port_mmaps_destroy(&app->outgoing, 1); 5153 5154 nxt_thread_mutex_destroy(&app->outgoing.mutex); 5155 5156 if (app->shared_port != NULL) { 5157 app->shared_port->app = NULL; 5158 nxt_port_close(task, app->shared_port); 5159 nxt_port_use(task, app->shared_port, -1); 5160 5161 app->shared_port = NULL; 5162 } 5163 5164 nxt_thread_mutex_destroy(&app->mutex); 5165 nxt_mp_destroy(app->mem_pool); 5166 5167 app_joint->app = NULL; 5168 5169 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { 5170 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; 5171 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); 5172 5173 } else { 5174 nxt_router_app_joint_use(task, app_joint, -1); 5175 } 5176 } 5177 5178 5179 static void 5180 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 5181 nxt_request_rpc_data_t *req_rpc_data) 5182 { 5183 nxt_bool_t start_process; 5184 nxt_port_t *port; 5185 nxt_http_request_t *r; 5186 5187 start_process = 0; 5188 5189 nxt_thread_mutex_lock(&app->mutex); 5190 5191 port = app->shared_port; 5192 nxt_port_inc_use(port); 5193 5194 app->active_requests++; 5195 5196 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 5197 app->pending_processes++; 5198 start_process = 1; 5199 } 5200 5201 r = req_rpc_data->request; 5202 5203 /* 5204 * Put request into application-wide list to be able to cancel request 5205 * if something goes wrong with application processes. 5206 */ 5207 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 5208 5209 nxt_thread_mutex_unlock(&app->mutex); 5210 5211 /* 5212 * Retain request memory pool while request is linked in ack_waiting_req 5213 * to guarantee request structure memory is accessble. 5214 */ 5215 nxt_mp_retain(r->mem_pool); 5216 5217 req_rpc_data->app_port = port; 5218 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 5219 5220 if (start_process) { 5221 nxt_router_start_app_process(task, app); 5222 } 5223 } 5224 5225 5226 void 5227 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, 5228 nxt_http_action_t *action) 5229 { 5230 nxt_event_engine_t *engine; 5231 nxt_http_app_conf_t *conf; 5232 nxt_request_rpc_data_t *req_rpc_data; 5233 5234 conf = action->u.conf; 5235 engine = task->thread->engine; 5236 5237 r->app_target = conf->target; 5238 5239 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, 5240 nxt_router_response_ready_handler, 5241 nxt_router_response_error_handler, 5242 sizeof(nxt_request_rpc_data_t)); 5243 if (nxt_slow_path(req_rpc_data == NULL)) { 5244 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 5245 return; 5246 } 5247 5248 /* 5249 * At this point we have request req_rpc_data allocated and registered 5250 * in port handlers. Need to fixup request memory pool. Counterpart 5251 * release will be called via following call chain: 5252 * nxt_request_rpc_data_unlink() -> 5253 * nxt_router_http_request_release_post() -> 5254 * nxt_router_http_request_release() 5255 */ 5256 nxt_mp_retain(r->mem_pool); 5257 5258 r->timer.task = &engine->task; 5259 r->timer.work_queue = &engine->fast_work_queue; 5260 r->timer.log = engine->task.log; 5261 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 5262 5263 r->engine = engine; 5264 r->err_work.handler = nxt_router_http_request_error; 5265 r->err_work.task = task; 5266 r->err_work.obj = r; 5267 5268 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 5269 req_rpc_data->app = conf->app; 5270 req_rpc_data->msg_info.body_fd = -1; 5271 req_rpc_data->rpc_cancel = 1; 5272 5273 nxt_router_app_use(task, conf->app, 1); 5274 5275 req_rpc_data->request = r; 5276 r->req_rpc_data = req_rpc_data; 5277 5278 if (r->last != NULL) { 5279 r->last->completion_handler = nxt_router_http_request_done; 5280 } 5281 5282 nxt_router_app_port_get(task, conf->app, req_rpc_data); 5283 nxt_router_app_prepare_request(task, req_rpc_data); 5284 } 5285 5286 5287 static void 5288 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 5289 { 5290 nxt_http_request_t *r; 5291 5292 r = obj; 5293 5294 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 5295 5296 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5297 5298 if (r->req_rpc_data != NULL) { 5299 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5300 } 5301 5302 nxt_mp_release(r->mem_pool); 5303 } 5304 5305 5306 static void 5307 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 5308 { 5309 nxt_http_request_t *r; 5310 5311 r = data; 5312 5313 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 5314 5315 if (r->req_rpc_data != NULL) { 5316 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 5317 } 5318 5319 nxt_http_request_close_handler(task, r, r->proto.any); 5320 } 5321 5322 5323 static void 5324 nxt_router_app_prepare_request(nxt_task_t *task, 5325 nxt_request_rpc_data_t *req_rpc_data) 5326 { 5327 nxt_app_t *app; 5328 nxt_buf_t *buf, *body; 5329 nxt_int_t res; 5330 nxt_port_t *port, *reply_port; 5331 5332 int notify; 5333 struct { 5334 nxt_port_msg_t pm; 5335 nxt_port_mmap_msg_t mm; 5336 } msg; 5337 5338 5339 app = req_rpc_data->app; 5340 5341 nxt_assert(app != NULL); 5342 5343 port = req_rpc_data->app_port; 5344 5345 nxt_assert(port != NULL); 5346 nxt_assert(port->queue != NULL); 5347 5348 reply_port = task->thread->engine->port; 5349 5350 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, 5351 nxt_app_msg_prefix[app->type]); 5352 if (nxt_slow_path(buf == NULL)) { 5353 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", 5354 req_rpc_data->stream, &app->name); 5355 5356 nxt_http_request_error(task, req_rpc_data->request, 5357 NXT_HTTP_INTERNAL_SERVER_ERROR); 5358 5359 return; 5360 } 5361 5362 nxt_debug(task, "about to send %O bytes buffer to app process port %d", 5363 nxt_buf_used_size(buf), 5364 port->socket.fd); 5365 5366 req_rpc_data->msg_info.buf = buf; 5367 5368 body = req_rpc_data->request->body; 5369 5370 if (body != NULL && nxt_buf_is_file(body)) { 5371 req_rpc_data->msg_info.body_fd = body->file->fd; 5372 5373 body->file->fd = -1; 5374 5375 } else { 5376 req_rpc_data->msg_info.body_fd = -1; 5377 } 5378 5379 msg.pm.stream = req_rpc_data->stream; 5380 msg.pm.pid = reply_port->pid; 5381 msg.pm.reply_port = reply_port->id; 5382 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; 5383 msg.pm.last = 0; 5384 msg.pm.mmap = 1; 5385 msg.pm.nf = 0; 5386 msg.pm.mf = 0; 5387 5388 nxt_port_mmap_handler_t *mmap_handler = buf->parent; 5389 nxt_port_mmap_header_t *hdr = mmap_handler->hdr; 5390 5391 msg.mm.mmap_id = hdr->id; 5392 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); 5393 msg.mm.size = nxt_buf_used_size(buf); 5394 5395 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), 5396 req_rpc_data->stream, ¬ify, 5397 &req_rpc_data->msg_info.tracking_cookie); 5398 if (nxt_fast_path(res == NXT_OK)) { 5399 if (notify != 0) { 5400 (void) nxt_port_socket_write(task, port, 5401 NXT_PORT_MSG_READ_QUEUE, 5402 -1, req_rpc_data->stream, 5403 reply_port->id, NULL); 5404 5405 } else { 5406 nxt_debug(task, "queue is not empty"); 5407 } 5408 5409 buf->is_port_mmap_sent = 1; 5410 buf->mem.pos = buf->mem.free; 5411 5412 } else { 5413 nxt_alert(task, "stream #%uD, app '%V': failed to send app message", 5414 req_rpc_data->stream, &app->name); 5415 5416 nxt_http_request_error(task, req_rpc_data->request, 5417 NXT_HTTP_INTERNAL_SERVER_ERROR); 5418 } 5419 } 5420 5421 5422 struct nxt_fields_iter_s { 5423 nxt_list_part_t *part; 5424 nxt_http_field_t *field; 5425 }; 5426 5427 typedef struct nxt_fields_iter_s nxt_fields_iter_t; 5428 5429 5430 static nxt_http_field_t * 5431 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) 5432 { 5433 if (part == NULL) { 5434 return NULL; 5435 } 5436 5437 while (part->nelts == 0) { 5438 part = part->next; 5439 if (part == NULL) { 5440 return NULL; 5441 } 5442 } 5443 5444 i->part = part; 5445 i->field = nxt_list_data(i->part); 5446 5447 return i->field; 5448 } 5449 5450 5451 static nxt_http_field_t * 5452 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) 5453 { 5454 return nxt_fields_part_first(nxt_list_part(fields), i); 5455 } 5456 5457 5458 static nxt_http_field_t * 5459 nxt_fields_next(nxt_fields_iter_t *i) 5460 { 5461 nxt_http_field_t *end = nxt_list_data(i->part); 5462 5463 end += i->part->nelts; 5464 i->field++; 5465 5466 if (i->field < end) { 5467 return i->field; 5468 } 5469 5470 return nxt_fields_part_first(i->part->next, i); 5471 } 5472 5473 5474 static nxt_buf_t * 5475 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, 5476 nxt_app_t *app, const nxt_str_t *prefix) 5477 { 5478 void *target_pos, *query_pos; 5479 u_char *pos, *end, *p, c; 5480 size_t fields_count, req_size, size, free_size; 5481 size_t copy_size; 5482 nxt_off_t content_length; 5483 nxt_buf_t *b, *buf, *out, **tail; 5484 nxt_http_field_t *field, *dup; 5485 nxt_unit_field_t *dst_field; 5486 nxt_fields_iter_t iter, dup_iter; 5487 nxt_unit_request_t *req; 5488 5489 req_size = sizeof(nxt_unit_request_t) 5490 + r->method->length + 1 5491 + r->version.length + 1 5492 + r->remote->length + 1 5493 + r->local->length + 1 5494 + r->server_name.length + 1 5495 + r->target.length + 1 5496 + (r->path->start != r->target.start ? r->path->length + 1 : 0); 5497 5498 content_length = r->content_length_n < 0 ? 0 : r->content_length_n; 5499 fields_count = 0; 5500 5501 nxt_list_each(field, r->fields) { 5502 fields_count++; 5503 5504 req_size += field->name_length + prefix->length + 1 5505 + field->value_length + 1; 5506 } nxt_list_loop; 5507 5508 req_size += fields_count * sizeof(nxt_unit_field_t); 5509 5510 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { 5511 nxt_alert(task, "headers to big to fit in shared memory (%d)", 5512 (int) req_size); 5513 5514 return NULL; 5515 } 5516 5517 out = nxt_port_mmap_get_buf(task, &app->outgoing, 5518 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); 5519 if (nxt_slow_path(out == NULL)) { 5520 return NULL; 5521 } 5522 5523 req = (nxt_unit_request_t *) out->mem.free; 5524 out->mem.free += req_size; 5525 5526 req->app_target = r->app_target; 5527 5528 req->content_length = content_length; 5529 5530 p = (u_char *) (req->fields + fields_count); 5531 5532 nxt_debug(task, "fields_count=%d", (int) fields_count); 5533 5534 req->method_length = r->method->length; 5535 nxt_unit_sptr_set(&req->method, p); 5536 p = nxt_cpymem(p, r->method->start, r->method->length); 5537 *p++ = '\0'; 5538 5539 req->version_length = r->version.length; 5540 nxt_unit_sptr_set(&req->version, p); 5541 p = nxt_cpymem(p, r->version.start, r->version.length); 5542 *p++ = '\0'; 5543 5544 req->remote_length = r->remote->address_length; 5545 nxt_unit_sptr_set(&req->remote, p); 5546 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote), 5547 r->remote->address_length); 5548 *p++ = '\0'; 5549 5550 req->local_length = r->local->address_length; 5551 nxt_unit_sptr_set(&req->local, p); 5552 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length); 5553 *p++ = '\0'; 5554 5555 req->tls = (r->tls != NULL); 5556 req->websocket_handshake = r->websocket_handshake; 5557 5558 req->server_name_length = r->server_name.length; 5559 nxt_unit_sptr_set(&req->server_name, p); 5560 p = nxt_cpymem(p, r->server_name.start, r->server_name.length); 5561 *p++ = '\0'; 5562 5563 target_pos = p; 5564 req->target_length = (uint32_t) r->target.length; 5565 nxt_unit_sptr_set(&req->target, p); 5566 p = nxt_cpymem(p, r->target.start, r->target.length); 5567 *p++ = '\0'; 5568 5569 req->path_length = (uint32_t) r->path->length; 5570 if (r->path->start == r->target.start) { 5571 nxt_unit_sptr_set(&req->path, target_pos); 5572 5573 } else { 5574 nxt_unit_sptr_set(&req->path, p); 5575 p = nxt_cpymem(p, r->path->start, r->path->length); 5576 *p++ = '\0'; 5577 } 5578 5579 req->query_length = (uint32_t) r->args->length; 5580 if (r->args->start != NULL) { 5581 query_pos = nxt_pointer_to(target_pos, 5582 r->args->start - r->target.start); 5583 5584 nxt_unit_sptr_set(&req->query, query_pos); 5585 5586 } else { 5587 req->query.offset = 0; 5588 } 5589 5590 req->content_length_field = NXT_UNIT_NONE_FIELD; 5591 req->content_type_field = NXT_UNIT_NONE_FIELD; 5592 req->cookie_field = NXT_UNIT_NONE_FIELD; 5593 req->authorization_field = NXT_UNIT_NONE_FIELD; 5594 5595 dst_field = req->fields; 5596 5597 for (field = nxt_fields_first(r->fields, &iter); 5598 field != NULL; 5599 field = nxt_fields_next(&iter)) 5600 { 5601 if (field->skip) { 5602 continue; 5603 } 5604 5605 dst_field->hash = field->hash; 5606 dst_field->skip = 0; 5607 dst_field->name_length = field->name_length + prefix->length; 5608 dst_field->value_length = field->value_length; 5609 5610 if (field == r->content_length) { 5611 req->content_length_field = dst_field - req->fields; 5612 5613 } else if (field == r->content_type) { 5614 req->content_type_field = dst_field - req->fields; 5615 5616 } else if (field == r->cookie) { 5617 req->cookie_field = dst_field - req->fields; 5618 5619 } else if (field == r->authorization) { 5620 req->authorization_field = dst_field - req->fields; 5621 } 5622 5623 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", 5624 (int) field->hash, (int) field->skip, 5625 (int) field->name_length, field->name, 5626 (int) field->value_length, field->value); 5627 5628 if (prefix->length != 0) { 5629 nxt_unit_sptr_set(&dst_field->name, p); 5630 p = nxt_cpymem(p, prefix->start, prefix->length); 5631 5632 end = field->name + field->name_length; 5633 for (pos = field->name; pos < end; pos++) { 5634 c = *pos; 5635 5636 if (c >= 'a' && c <= 'z') { 5637 *p++ = (c & ~0x20); 5638 continue; 5639 } 5640 5641 if (c == '-') { 5642 *p++ = '_'; 5643 continue; 5644 } 5645 5646 *p++ = c; 5647 } 5648 5649 } else { 5650 nxt_unit_sptr_set(&dst_field->name, p); 5651 p = nxt_cpymem(p, field->name, field->name_length); 5652 } 5653 5654 *p++ = '\0'; 5655 5656 nxt_unit_sptr_set(&dst_field->value, p); 5657 p = nxt_cpymem(p, field->value, field->value_length); 5658 5659 if (prefix->length != 0) { 5660 dup_iter = iter; 5661 5662 for (dup = nxt_fields_next(&dup_iter); 5663 dup != NULL; 5664 dup = nxt_fields_next(&dup_iter)) 5665 { 5666 if (dup->name_length != field->name_length 5667 || dup->skip 5668 || dup->hash != field->hash 5669 || nxt_memcasecmp(dup->name, field->name, dup->name_length)) 5670 { 5671 continue; 5672 } 5673 5674 p = nxt_cpymem(p, ", ", 2); 5675 p = nxt_cpymem(p, dup->value, dup->value_length); 5676 5677 dst_field->value_length += 2 + dup->value_length; 5678 5679 dup->skip = 1; 5680 } 5681 } 5682 5683 *p++ = '\0'; 5684 5685 dst_field++; 5686 } 5687 5688 req->fields_count = (uint32_t) (dst_field - req->fields); 5689 5690 nxt_unit_sptr_set(&req->preread_content, out->mem.free); 5691 5692 buf = out; 5693 tail = &buf->next; 5694 5695 for (b = r->body; b != NULL; b = b->next) { 5696 size = nxt_buf_mem_used_size(&b->mem); 5697 pos = b->mem.pos; 5698 5699 while (size > 0) { 5700 if (buf == NULL) { 5701 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 5702 5703 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5704 if (nxt_slow_path(buf == NULL)) { 5705 while (out != NULL) { 5706 buf = out->next; 5707 out->next = NULL; 5708 out->completion_handler(task, out, out->parent); 5709 out = buf; 5710 } 5711 return NULL; 5712 } 5713 5714 *tail = buf; 5715 tail = &buf->next; 5716 5717 } else { 5718 free_size = nxt_buf_mem_free_size(&buf->mem); 5719 if (free_size < size 5720 && nxt_port_mmap_increase_buf(task, buf, size, 1) 5721 == NXT_OK) 5722 { 5723 free_size = nxt_buf_mem_free_size(&buf->mem); 5724 } 5725 } 5726 5727 if (free_size > 0) { 5728 copy_size = nxt_min(free_size, size); 5729 5730 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); 5731 5732 size -= copy_size; 5733 pos += copy_size; 5734 5735 if (size == 0) { 5736 break; 5737 } 5738 } 5739 5740 buf = NULL; 5741 } 5742 } 5743 5744 return out; 5745 } 5746 5747 5748 static void 5749 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 5750 { 5751 nxt_timer_t *timer; 5752 nxt_http_request_t *r; 5753 nxt_request_rpc_data_t *req_rpc_data; 5754 5755 timer = obj; 5756 5757 nxt_debug(task, "router app timeout"); 5758 5759 r = nxt_timer_data(timer, nxt_http_request_t, timer); 5760 req_rpc_data = r->timer_data; 5761 5762 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 5763 5764 nxt_request_rpc_data_unlink(task, req_rpc_data); 5765 } 5766 5767 5768 static void 5769 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5770 { 5771 r->timer.handler = nxt_router_http_request_release; 5772 nxt_timer_add(task->thread->engine, &r->timer, 0); 5773 } 5774 5775 5776 static void 5777 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) 5778 { 5779 nxt_http_request_t *r; 5780 5781 nxt_debug(task, "http request pool release"); 5782 5783 r = nxt_timer_data(obj, nxt_http_request_t, timer); 5784 5785 nxt_mp_release(r->mem_pool); 5786 } 5787 5788 5789 static void 5790 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5791 { 5792 size_t mi; 5793 uint32_t i; 5794 nxt_bool_t ack; 5795 nxt_process_t *process; 5796 nxt_free_map_t *m; 5797 nxt_port_mmap_handler_t *mmap_handler; 5798 5799 nxt_debug(task, "oosm in %PI", msg->port_msg.pid); 5800 5801 process = nxt_runtime_process_find(task->thread->runtime, 5802 msg->port_msg.pid); 5803 if (nxt_slow_path(process == NULL)) { 5804 return; 5805 } 5806 5807 ack = 0; 5808 5809 /* 5810 * To mitigate possible racing condition (when OOSM message received 5811 * after some of the memory was already freed), need to try to find 5812 * first free segment in shared memory and send ACK if found. 5813 */ 5814 5815 nxt_thread_mutex_lock(&process->incoming.mutex); 5816 5817 for (i = 0; i < process->incoming.size; i++) { 5818 mmap_handler = process->incoming.elts[i].mmap_handler; 5819 5820 if (nxt_slow_path(mmap_handler == NULL)) { 5821 continue; 5822 } 5823 5824 m = mmap_handler->hdr->free_map; 5825 5826 for (mi = 0; mi < MAX_FREE_IDX; mi++) { 5827 if (m[mi] != 0) { 5828 ack = 1; 5829 5830 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", 5831 i, mi, m[mi]); 5832 5833 break; 5834 } 5835 } 5836 } 5837 5838 nxt_thread_mutex_unlock(&process->incoming.mutex); 5839 5840 if (ack) { 5841 nxt_process_broadcast_shm_ack(task, process); 5842 } 5843 } 5844 5845 5846 static void 5847 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5848 { 5849 nxt_fd_t fd; 5850 nxt_port_t *port; 5851 nxt_runtime_t *rt; 5852 nxt_port_mmaps_t *mmaps; 5853 nxt_port_msg_get_mmap_t *get_mmap_msg; 5854 nxt_port_mmap_handler_t *mmap_handler; 5855 5856 rt = task->thread->runtime; 5857 5858 port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5859 msg->port_msg.reply_port); 5860 if (nxt_slow_path(port == NULL)) { 5861 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", 5862 msg->port_msg.pid, msg->port_msg.reply_port); 5863 5864 return; 5865 } 5866 5867 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5868 < (int) sizeof(nxt_port_msg_get_mmap_t))) 5869 { 5870 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", 5871 (int) nxt_buf_used_size(msg->buf)); 5872 5873 return; 5874 } 5875 5876 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; 5877 5878 nxt_assert(port->type == NXT_PROCESS_APP); 5879 5880 if (nxt_slow_path(port->app == NULL)) { 5881 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", 5882 port->pid, port->id); 5883 5884 // FIXME 5885 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5886 -1, msg->port_msg.stream, 0, NULL); 5887 5888 return; 5889 } 5890 5891 mmaps = &port->app->outgoing; 5892 nxt_thread_mutex_lock(&mmaps->mutex); 5893 5894 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { 5895 nxt_thread_mutex_unlock(&mmaps->mutex); 5896 5897 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", 5898 (int) get_mmap_msg->id); 5899 5900 // FIXME 5901 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, 5902 -1, msg->port_msg.stream, 0, NULL); 5903 return; 5904 } 5905 5906 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; 5907 5908 fd = mmap_handler->fd; 5909 5910 nxt_thread_mutex_unlock(&mmaps->mutex); 5911 5912 nxt_debug(task, "get mmap %PI:%d found", 5913 msg->port_msg.pid, (int) get_mmap_msg->id); 5914 5915 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 5916 } 5917 5918 5919 static void 5920 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 5921 { 5922 nxt_port_t *port, *reply_port; 5923 nxt_runtime_t *rt; 5924 nxt_port_msg_get_port_t *get_port_msg; 5925 5926 rt = task->thread->runtime; 5927 5928 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, 5929 msg->port_msg.reply_port); 5930 if (nxt_slow_path(reply_port == NULL)) { 5931 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", 5932 msg->port_msg.pid, msg->port_msg.reply_port); 5933 5934 return; 5935 } 5936 5937 if (nxt_slow_path(nxt_buf_used_size(msg->buf) 5938 < (int) sizeof(nxt_port_msg_get_port_t))) 5939 { 5940 nxt_alert(task, "get_port_handler: message buffer too small (%d)", 5941 (int) nxt_buf_used_size(msg->buf)); 5942 5943 return; 5944 } 5945 5946 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; 5947 5948 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); 5949 if (nxt_slow_path(port == NULL)) { 5950 nxt_alert(task, "get_port_handler: port %PI:%d not found", 5951 get_port_msg->pid, get_port_msg->id); 5952 5953 return; 5954 } 5955 5956 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, 5957 get_port_msg->id); 5958 5959 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); 5960 } 5961