1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8 #include <nxt_router.h> 9 #include <nxt_conf.h> 10 #if (NXT_TLS) 11 #include <nxt_cert.h> 12 #endif 13 #include <nxt_http.h> 14 #include <nxt_port_memory_int.h> 15 #include <nxt_unit_request.h> 16 #include <nxt_unit_response.h> 17 #include <nxt_router_request.h> 18 #include <nxt_app_queue.h> 19 #include <nxt_port_queue.h> 20 21 #define NXT_SHARED_PORT_ID 0xFFFFu 22 23 typedef struct { 24 nxt_str_t type; 25 uint32_t processes; 26 uint32_t max_processes; 27 uint32_t spare_processes; 28 nxt_msec_t timeout; 29 nxt_msec_t idle_timeout; 30 nxt_conf_value_t *limits_value; 31 nxt_conf_value_t *processes_value; 32 nxt_conf_value_t *targets_value; 33 } nxt_router_app_conf_t; 34 35 36 typedef struct { 37 nxt_str_t pass; 38 nxt_str_t application; 39 } nxt_router_listener_conf_t; 40 41 42 #if (NXT_TLS) 43 44 typedef struct { 45 nxt_str_t name; 46 nxt_socket_conf_t *socket_conf; 47 nxt_router_temp_conf_t *temp_conf; 48 nxt_tls_init_t *tls_init; 49 nxt_bool_t last; 50 51 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ 52 } nxt_router_tlssock_t; 53 54 #endif 55 56 57 typedef struct { 58 nxt_str_t *name; 59 nxt_socket_conf_t *socket_conf; 60 nxt_router_temp_conf_t *temp_conf; 61 nxt_bool_t last; 62 } nxt_socket_rpc_t; 63 64 65 typedef struct { 66 nxt_app_t *app; 67 nxt_router_temp_conf_t *temp_conf; 68 uint8_t proto; /* 1 bit */ 69 } nxt_app_rpc_t; 70 71 72 typedef struct { 73 nxt_app_joint_t *app_joint; 74 uint32_t generation; 75 uint8_t proto; /* 1 bit */ 76 } nxt_app_joint_rpc_t; 77 78 79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, 80 nxt_mp_t *mp); 81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); 82 static void nxt_router_greet_controller(nxt_task_t *task, 83 nxt_port_t *controller_port); 84 85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); 86 87 static void nxt_router_new_port_handler(nxt_task_t *task, 88 nxt_port_recv_msg_t *msg); 89 static void nxt_router_conf_data_handler(nxt_task_t *task, 90 nxt_port_recv_msg_t *msg); 91 static void nxt_router_app_restart_handler(nxt_task_t *task, 92 nxt_port_recv_msg_t *msg); 93 static void nxt_router_remove_pid_handler(nxt_task_t *task, 94 nxt_port_recv_msg_t *msg); 95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task, 96 nxt_port_recv_msg_t *msg); 97 98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 100 static void nxt_router_conf_ready(nxt_task_t *task, 101 nxt_router_temp_conf_t *tmcf); 102 static void nxt_router_conf_error(nxt_task_t *task, 103 nxt_router_temp_conf_t *tmcf); 104 static void nxt_router_conf_send(nxt_task_t *task, 105 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); 106 107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 108 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); 109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, 110 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); 111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task, 112 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf, 113 nxt_conf_value_t *conf); 114 115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); 116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, 118 nxt_app_t *app); 119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, 120 nxt_str_t *name); 121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, 122 int i); 123 124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, 125 nxt_port_t *port); 126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, 127 nxt_port_t *port); 128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, 129 nxt_port_t *port, nxt_fd_t fd); 130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, 131 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); 132 static void nxt_router_listen_socket_ready(nxt_task_t *task, 133 nxt_port_recv_msg_t *msg, void *data); 134 static void nxt_router_listen_socket_error(nxt_task_t *task, 135 nxt_port_recv_msg_t *msg, void *data); 136 #if (NXT_TLS) 137 static void nxt_router_tls_rpc_handler(nxt_task_t *task, 138 nxt_port_recv_msg_t *msg, void *data); 139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 140 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init, 141 nxt_bool_t last); 142 #endif 143 static void nxt_router_app_rpc_create(nxt_task_t *task, 144 nxt_router_temp_conf_t *tmcf, nxt_app_t *app); 145 static void nxt_router_app_prefork_ready(nxt_task_t *task, 146 nxt_port_recv_msg_t *msg, void *data); 147 static void nxt_router_app_prefork_error(nxt_task_t *task, 148 nxt_port_recv_msg_t *msg, void *data); 149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, 150 nxt_router_temp_conf_t *tmcf, nxt_str_t *name); 151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 152 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa); 153 154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 155 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 156 const nxt_event_interface_t *interface); 157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 158 nxt_router_engine_conf_t *recf); 159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 160 nxt_router_engine_conf_t *recf); 161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 162 nxt_router_engine_conf_t *recf); 163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 164 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 165 nxt_work_handler_t handler); 166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 167 nxt_router_engine_conf_t *recf); 168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, 169 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); 170 171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 172 nxt_router_temp_conf_t *tmcf); 173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 174 nxt_event_engine_t *engine); 175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, 176 nxt_router_temp_conf_t *tmcf); 177 178 static void nxt_router_engines_post(nxt_router_t *router, 179 nxt_router_temp_conf_t *tmcf); 180 static void nxt_router_engine_post(nxt_event_engine_t *engine, 181 nxt_work_t *jobs); 182 183 static void nxt_router_thread_start(void *data); 184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, 185 void *data); 186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 187 void *data); 188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 189 void *data); 190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 191 void *data); 192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, 193 void *data); 194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, 195 void *data); 196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 197 void *data); 198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task, 199 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 200 static void nxt_router_listen_socket_release(nxt_task_t *task, 201 nxt_socket_conf_t *skcf); 202 203 static void nxt_router_access_log_writer(nxt_task_t *task, 204 nxt_http_request_t *r, nxt_router_access_log_t *access_log); 205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, 206 struct tm *tm, size_t size, const char *format); 207 static void nxt_router_access_log_open(nxt_task_t *task, 208 nxt_router_temp_conf_t *tmcf); 209 static void nxt_router_access_log_ready(nxt_task_t *task, 210 nxt_port_recv_msg_t *msg, void *data); 211 static void nxt_router_access_log_error(nxt_task_t *task, 212 nxt_port_recv_msg_t *msg, void *data); 213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock, 214 nxt_router_access_log_t *access_log); 215 static void nxt_router_access_log_release(nxt_task_t *task, 216 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); 217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, 218 void *data); 219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task, 220 nxt_port_recv_msg_t *msg, void *data); 221 static void nxt_router_access_log_reopen_error(nxt_task_t *task, 222 nxt_port_recv_msg_t *msg, void *data); 223 224 static void nxt_router_app_port_ready(nxt_task_t *task, 225 nxt_port_recv_msg_t *msg, void *data); 226 static void nxt_router_app_port_error(nxt_task_t *task, 227 nxt_port_recv_msg_t *msg, void *data); 228 229 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); 230 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 231 232 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, 233 nxt_port_t *port, nxt_apr_action_t action); 234 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 235 nxt_request_rpc_data_t *req_rpc_data); 236 static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 237 void *data); 238 static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 239 void *data); 240 241 static void nxt_router_app_prepare_request(nxt_task_t *task, 242 nxt_request_rpc_data_t *req_rpc_data); 243 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 244 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 245 246 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); 247 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, 248 void *data); 249 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, 250 void *data); 251 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, 252 void *data); 253 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); 254 255 static const nxt_http_request_state_t nxt_http_request_send_state; 256 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 257 258 static void nxt_router_app_joint_use(nxt_task_t *task, 259 nxt_app_joint_t *app_joint, int i); 260 261 static void nxt_router_http_request_release_post(nxt_task_t *task, 262 nxt_http_request_t *r); 263 static void nxt_router_http_request_release(nxt_task_t *task, void *obj, 264 void *data); 265 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 266 static void nxt_router_get_port_handler(nxt_task_t *task, 267 nxt_port_recv_msg_t *msg); 268 static void nxt_router_get_mmap_handler(nxt_task_t *task, 269 nxt_port_recv_msg_t *msg); 270 271 extern const nxt_http_request_state_t nxt_http_websocket; 272 273 static nxt_router_t *nxt_router; 274 275 static const nxt_str_t http_prefix = nxt_string("HTTP_"); 276 static const nxt_str_t empty_prefix = nxt_string(""); 277 278 static const nxt_str_t *nxt_app_msg_prefix[] = { 279 &empty_prefix, 280 &empty_prefix, 281 &http_prefix, 282 &http_prefix, 283 &http_prefix, 284 &empty_prefix, 285 }; 286 287 288 static const nxt_port_handlers_t nxt_router_process_port_handlers = { 289 .quit = nxt_signal_quit_handler, 290 .new_port = nxt_router_new_port_handler, 291 .get_port = nxt_router_get_port_handler, 292 .change_file = nxt_port_change_log_file_handler, 293 .mmap = nxt_port_mmap_handler, 294 .get_mmap = nxt_router_get_mmap_handler, 295 .data = nxt_router_conf_data_handler, 296 .app_restart = nxt_router_app_restart_handler, 297 .remove_pid = nxt_router_remove_pid_handler, 298 .access_log = nxt_router_access_log_reopen_handler, 299 .rpc_ready = nxt_port_rpc_handler, 300 .rpc_error = nxt_port_rpc_handler, 301 .oosm = nxt_router_oosm_handler, 302 }; 303 304 305 const nxt_process_init_t nxt_router_process = { 306 .name = "router", 307 .type = NXT_PROCESS_ROUTER, 308 .prefork = nxt_router_prefork, 309 .restart = 1, 310 .setup = nxt_process_core_setup, 311 .start = nxt_router_start, 312 .port_handlers = &nxt_router_process_port_handlers, 313 .signals = nxt_process_signals, 314 }; 315 316 317 /* Queues of nxt_socket_conf_t */ 318 nxt_queue_t creating_sockets; 319 nxt_queue_t pending_sockets; 320 nxt_queue_t updating_sockets; 321 nxt_queue_t keeping_sockets; 322 nxt_queue_t deleting_sockets; 323 324 325 static nxt_int_t 326 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) 327 { 328 nxt_runtime_stop_app_processes(task, task->thread->runtime); 329 330 return NXT_OK; 331 } 332 333 334 static nxt_int_t 335 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) 336 { 337 nxt_int_t ret; 338 nxt_port_t *controller_port; 339 nxt_router_t *router; 340 nxt_runtime_t *rt; 341 342 rt = task->thread->runtime; 343 344 nxt_log(task, NXT_LOG_INFO, "router started"); 345 346 #if (NXT_TLS) 347 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); 348 if (nxt_slow_path(rt->tls == NULL)) { 349 return NXT_ERROR; 350 } 351 352 ret = rt->tls->library_init(task); 353 if (nxt_slow_path(ret != NXT_OK)) { 354 return ret; 355 } 356 #endif 357 358 ret = nxt_http_init(task); 359 if (nxt_slow_path(ret != NXT_OK)) { 360 return ret; 361 } 362 363 router = nxt_zalloc(sizeof(nxt_router_t)); 364 if (nxt_slow_path(router == NULL)) { 365 return NXT_ERROR; 366 } 367 368 nxt_queue_init(&router->engines); 369 nxt_queue_init(&router->sockets); 370 nxt_queue_init(&router->apps); 371 372 nxt_router = router; 373 374 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 375 if (controller_port != NULL) { 376 nxt_router_greet_controller(task, controller_port); 377 } 378 379 return NXT_OK; 380 } 381 382 383 static void 384 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) 385 { 386 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, 387 -1, 0, 0, NULL); 388 } 389 390 391 static void 392 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, 393 void *data) 394 { 395 size_t size; 396 uint32_t stream; 397 nxt_fd_t port_fd, queue_fd; 398 nxt_int_t ret; 399 nxt_app_t *app; 400 nxt_buf_t *b; 401 nxt_port_t *dport; 402 nxt_runtime_t *rt; 403 nxt_app_joint_rpc_t *app_joint_rpc; 404 405 app = data; 406 407 nxt_thread_mutex_lock(&app->mutex); 408 409 dport = app->proto_port; 410 411 nxt_thread_mutex_unlock(&app->mutex); 412 413 if (dport != NULL) { 414 nxt_debug(task, "app '%V' %p start process", &app->name, app); 415 416 b = NULL; 417 port_fd = -1; 418 queue_fd = -1; 419 420 } else { 421 if (app->proto_port_requests > 0) { 422 nxt_debug(task, "app '%V' %p wait for prototype process", 423 &app->name, app); 424 425 app->proto_port_requests++; 426 427 goto skip; 428 } 429 430 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app); 431 432 rt = task->thread->runtime; 433 dport = rt->port_by_type[NXT_PROCESS_MAIN]; 434 435 size = app->name.length + 1 + app->conf.length; 436 437 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0); 438 if (nxt_slow_path(b == NULL)) { 439 goto failed; 440 } 441 442 nxt_buf_cpystr(b, &app->name); 443 *b->mem.free++ = '\0'; 444 nxt_buf_cpystr(b, &app->conf); 445 446 port_fd = app->shared_port->pair[0]; 447 queue_fd = app->shared_port->queue_fd; 448 } 449 450 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, 451 nxt_router_app_port_ready, 452 nxt_router_app_port_error, 453 sizeof(nxt_app_joint_rpc_t)); 454 if (nxt_slow_path(app_joint_rpc == NULL)) { 455 goto failed; 456 } 457 458 stream = nxt_port_rpc_ex_stream(app_joint_rpc); 459 460 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, 461 port_fd, queue_fd, stream, port->id, b); 462 if (nxt_slow_path(ret != NXT_OK)) { 463 nxt_port_rpc_cancel(task, port, stream); 464 465 goto failed; 466 } 467 468 app_joint_rpc->app_joint = app->joint; 469 app_joint_rpc->generation = app->generation; 470 app_joint_rpc->proto = (b != NULL); 471 472 if (b != NULL) { 473 app->proto_port_requests++; 474 475 b = NULL; 476 } 477 478 nxt_router_app_joint_use(task, app->joint, 1); 479 480 failed: 481 482 if (b != NULL) { 483 nxt_mp_free(b->data, b); 484 } 485 486 skip: 487 488 nxt_router_app_use(task, app, -1); 489 } 490 491 492 static void 493 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) 494 { 495 app_joint->use_count += i; 496 497 if (app_joint->use_count == 0) { 498 nxt_assert(app_joint->app == NULL); 499 500 nxt_free(app_joint); 501 } 502 } 503 504 505 static nxt_int_t 506 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) 507 { 508 nxt_int_t res; 509 nxt_port_t *router_port; 510 nxt_runtime_t *rt; 511 512 nxt_debug(task, "app '%V' start process", &app->name); 513 514 rt = task->thread->runtime; 515 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 516 517 nxt_router_app_use(task, app, 1); 518 519 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, 520 app); 521 522 if (res == NXT_OK) { 523 return res; 524 } 525 526 nxt_thread_mutex_lock(&app->mutex); 527 528 app->pending_processes--; 529 530 nxt_thread_mutex_unlock(&app->mutex); 531 532 nxt_router_app_use(task, app, -1); 533 534 return NXT_ERROR; 535 } 536 537 538 nxt_inline nxt_bool_t 539 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) 540 { 541 nxt_buf_t *b, *next; 542 nxt_bool_t cancelled; 543 nxt_port_t *app_port; 544 nxt_msg_info_t *msg_info; 545 546 msg_info = &req_rpc_data->msg_info; 547 548 if (msg_info->buf == NULL) { 549 return 0; 550 } 551 552 app_port = req_rpc_data->app_port; 553 554 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) { 555 cancelled = nxt_app_queue_cancel(app_port->queue, 556 msg_info->tracking_cookie, 557 req_rpc_data->stream); 558 559 if (cancelled) { 560 nxt_debug(task, "stream #%uD: cancelled by router", 561 req_rpc_data->stream); 562 } 563 564 } else { 565 cancelled = 0; 566 } 567 568 for (b = msg_info->buf; b != NULL; b = next) { 569 next = b->next; 570 b->next = NULL; 571 572 if (b->is_port_mmap_sent) { 573 b->is_port_mmap_sent = cancelled == 0; 574 } 575 576 b->completion_handler(task, b, b->parent); 577 } 578 579 msg_info->buf = NULL; 580 581 return cancelled; 582 } 583 584 585 nxt_inline nxt_bool_t 586 nxt_queue_chk_remove(nxt_queue_link_t *lnk) 587 { 588 if (lnk->next != NULL) { 589 nxt_queue_remove(lnk); 590 591 lnk->next = NULL; 592 593 return 1; 594 } 595 596 return 0; 597 } 598 599 600 nxt_inline void 601 nxt_request_rpc_data_unlink(nxt_task_t *task, 602 nxt_request_rpc_data_t *req_rpc_data) 603 { 604 nxt_app_t *app; 605 nxt_bool_t unlinked; 606 nxt_http_request_t *r; 607 608 nxt_router_msg_cancel(task, req_rpc_data); 609 610 app = req_rpc_data->app; 611 612 if (req_rpc_data->app_port != NULL) { 613 nxt_router_app_port_release(task, app, req_rpc_data->app_port, 614 req_rpc_data->apr_action); 615 616 req_rpc_data->app_port = NULL; 617 } 618 619 r = req_rpc_data->request; 620 621 if (r != NULL) { 622 r->timer_data = NULL; 623 624 nxt_router_http_request_release_post(task, r); 625 626 r->req_rpc_data = NULL; 627 req_rpc_data->request = NULL; 628 629 if (app != NULL) { 630 unlinked = 0; 631 632 nxt_thread_mutex_lock(&app->mutex); 633 634 if (r->app_link.next != NULL) { 635 nxt_queue_remove(&r->app_link); 636 r->app_link.next = NULL; 637 638 unlinked = 1; 639 } 640 641 nxt_thread_mutex_unlock(&app->mutex); 642 643 if (unlinked) { 644 nxt_mp_release(r->mem_pool); 645 } 646 } 647 } 648 649 if (app != NULL) { 650 nxt_router_app_use(task, app, -1); 651 652 req_rpc_data->app = NULL; 653 } 654 655 if (req_rpc_data->msg_info.body_fd != -1) { 656 nxt_fd_close(req_rpc_data->msg_info.body_fd); 657 658 req_rpc_data->msg_info.body_fd = -1; 659 } 660 661 if (req_rpc_data->rpc_cancel) { 662 req_rpc_data->rpc_cancel = 0; 663 664 nxt_port_rpc_cancel(task, task->thread->engine->port, 665 req_rpc_data->stream); 666 } 667 } 668 669 670 static void 671 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 672 { 673 nxt_int_t res; 674 nxt_app_t *app; 675 nxt_port_t *port, *main_app_port; 676 nxt_runtime_t *rt; 677 678 nxt_port_new_port_handler(task, msg); 679 680 port = msg->u.new_port; 681 682 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 683 nxt_router_greet_controller(task, msg->u.new_port); 684 } 685 686 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) { 687 nxt_port_rpc_handler(task, msg); 688 689 return; 690 } 691 692 if (port == NULL || port->type != NXT_PROCESS_APP) { 693 694 if (msg->port_msg.stream == 0) { 695 return; 696 } 697 698 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 699 700 } else { 701 if (msg->fd[1] != -1) { 702 res = nxt_router_port_queue_map(task, port, msg->fd[1]); 703 if (nxt_slow_path(res != NXT_OK)) { 704 return; 705 } 706 707 nxt_fd_close(msg->fd[1]); 708 msg->fd[1] = -1; 709 } 710 } 711 712 if (msg->port_msg.stream != 0) { 713 nxt_port_rpc_handler(task, msg); 714 return; 715 } 716 717 nxt_debug(task, "new port id %d (%d)", port->id, port->type); 718 719 /* 720 * Port with "id == 0" is application 'main' port and it always 721 * should come with non-zero stream. 722 */ 723 nxt_assert(port->id != 0); 724 725 /* Find 'main' app port and get app reference. */ 726 rt = task->thread->runtime; 727 728 /* 729 * It is safe to access 'runtime->ports' hash because 'NEW_PORT' 730 * sent to main port (with id == 0) and processed in main thread. 731 */ 732 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); 733 nxt_assert(main_app_port != NULL); 734 735 app = main_app_port->app; 736 737 if (nxt_fast_path(app != NULL)) { 738 nxt_thread_mutex_lock(&app->mutex); 739 740 /* TODO here should be find-and-add code because there can be 741 port waiters in port_hash */ 742 nxt_port_hash_add(&app->port_hash, port); 743 app->port_hash_count++; 744 745 nxt_thread_mutex_unlock(&app->mutex); 746 747 port->app = app; 748 } 749 750 port->main_app_port = main_app_port; 751 752 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 753 } 754 755 756 static void 757 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 758 { 759 void *p; 760 size_t size; 761 nxt_int_t ret; 762 nxt_port_t *port; 763 nxt_router_temp_conf_t *tmcf; 764 765 port = nxt_runtime_port_find(task->thread->runtime, 766 msg->port_msg.pid, 767 msg->port_msg.reply_port); 768 if (nxt_slow_path(port == NULL)) { 769 nxt_alert(task, "conf_data_handler: reply port not found"); 770 return; 771 } 772 773 p = MAP_FAILED; 774 775 /* 776 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be 777 * initialized in 'cleanup' section. 778 */ 779 size = 0; 780 781 tmcf = nxt_router_temp_conf(task); 782 if (nxt_slow_path(tmcf == NULL)) { 783 goto fail; 784 } 785 786 if (nxt_slow_path(msg->fd[0] == -1)) { 787 nxt_alert(task, "conf_data_handler: invalid shm fd"); 788 goto fail; 789 } 790 791 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { 792 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", 793 (int) nxt_buf_mem_used_size(&msg->buf->mem)); 794 goto fail; 795 } 796 797 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); 798 799 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); 800 801 nxt_fd_close(msg->fd[0]); 802 msg->fd[0] = -1; 803 804 if (nxt_slow_path(p == MAP_FAILED)) { 805 goto fail; 806 } 807 808 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); 809 810 tmcf->router_conf->router = nxt_router; 811 tmcf->stream = msg->port_msg.stream; 812 tmcf->port = port; 813 814 nxt_port_use(task, tmcf->port, 1); 815 816 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); 817 818 if (nxt_fast_path(ret == NXT_OK)) { 819 nxt_router_conf_apply(task, tmcf, NULL); 820 821 } else { 822 nxt_router_conf_error(task, tmcf); 823 } 824 825 goto cleanup; 826 827 fail: 828 829 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, 830 msg->port_msg.stream, 0, NULL); 831 832 if (tmcf != NULL) { 833 nxt_mp_release(tmcf->mem_pool); 834 } 835 836 cleanup: 837 838 if (p != MAP_FAILED) { 839 nxt_mem_munmap(p, size); 840 } 841 842 if (msg->fd[0] != -1) { 843 nxt_fd_close(msg->fd[0]); 844 msg->fd[0] = -1; 845 } 846 } 847 848 849 static void 850 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 851 { 852 nxt_app_t *app; 853 nxt_int_t ret; 854 nxt_str_t app_name; 855 nxt_port_t *reply_port, *shared_port, *old_shared_port; 856 nxt_port_t *proto_port; 857 nxt_port_msg_type_t reply; 858 859 reply_port = nxt_runtime_port_find(task->thread->runtime, 860 msg->port_msg.pid, 861 msg->port_msg.reply_port); 862 if (nxt_slow_path(reply_port == NULL)) { 863 nxt_alert(task, "app_restart_handler: reply port not found"); 864 return; 865 } 866 867 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem); 868 app_name.start = msg->buf->mem.pos; 869 870 nxt_debug(task, "app_restart_handler: %V", &app_name); 871 872 app = nxt_router_app_find(&nxt_router->apps, &app_name); 873 874 if (nxt_fast_path(app != NULL)) { 875 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 876 NXT_PROCESS_APP); 877 if (nxt_slow_path(shared_port == NULL)) { 878 goto fail; 879 } 880 881 ret = nxt_port_socket_init(task, shared_port, 0); 882 if (nxt_slow_path(ret != NXT_OK)) { 883 nxt_port_use(task, shared_port, -1); 884 goto fail; 885 } 886 887 ret = nxt_router_app_queue_init(task, shared_port); 888 if (nxt_slow_path(ret != NXT_OK)) { 889 nxt_port_write_close(shared_port); 890 nxt_port_read_close(shared_port); 891 nxt_port_use(task, shared_port, -1); 892 goto fail; 893 } 894 895 nxt_port_write_enable(task, shared_port); 896 897 nxt_thread_mutex_lock(&app->mutex); 898 899 proto_port = app->proto_port; 900 901 if (proto_port != NULL) { 902 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, 903 proto_port->pid); 904 905 app->proto_port = NULL; 906 proto_port->app = NULL; 907 } 908 909 app->generation++; 910 911 shared_port->app = app; 912 913 old_shared_port = app->shared_port; 914 old_shared_port->app = NULL; 915 916 app->shared_port = shared_port; 917 918 nxt_thread_mutex_unlock(&app->mutex); 919 920 nxt_port_close(task, old_shared_port); 921 nxt_port_use(task, old_shared_port, -1); 922 923 if (proto_port != NULL) { 924 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, 925 -1, 0, 0, NULL); 926 927 nxt_port_close(task, proto_port); 928 929 nxt_port_use(task, proto_port, -1); 930 } 931 932 reply = NXT_PORT_MSG_RPC_READY_LAST; 933 934 } else { 935 936 fail: 937 938 reply = NXT_PORT_MSG_RPC_ERROR; 939 } 940 941 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream, 942 0, NULL); 943 } 944 945 946 static void 947 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, 948 void *data) 949 { 950 union { 951 nxt_pid_t removed_pid; 952 void *data; 953 } u; 954 955 u.data = data; 956 957 nxt_port_rpc_remove_peer(task, port, u.removed_pid); 958 } 959 960 961 static void 962 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 963 { 964 nxt_event_engine_t *engine; 965 966 nxt_port_remove_pid_handler(task, msg); 967 968 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 969 { 970 if (nxt_fast_path(engine->port != NULL)) { 971 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, 972 msg->u.data); 973 } 974 } 975 nxt_queue_loop; 976 977 if (msg->port_msg.stream == 0) { 978 return; 979 } 980 981 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 982 983 nxt_port_rpc_handler(task, msg); 984 } 985 986 987 static nxt_router_temp_conf_t * 988 nxt_router_temp_conf(nxt_task_t *task) 989 { 990 nxt_mp_t *mp, *tmp; 991 nxt_router_conf_t *rtcf; 992 nxt_router_temp_conf_t *tmcf; 993 994 mp = nxt_mp_create(1024, 128, 256, 32); 995 if (nxt_slow_path(mp == NULL)) { 996 return NULL; 997 } 998 999 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t)); 1000 if (nxt_slow_path(rtcf == NULL)) { 1001 goto fail; 1002 } 1003 1004 rtcf->mem_pool = mp; 1005 1006 tmp = nxt_mp_create(1024, 128, 256, 32); 1007 if (nxt_slow_path(tmp == NULL)) { 1008 goto fail; 1009 } 1010 1011 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t)); 1012 if (nxt_slow_path(tmcf == NULL)) { 1013 goto temp_fail; 1014 } 1015 1016 tmcf->mem_pool = tmp; 1017 tmcf->router_conf = rtcf; 1018 tmcf->count = 1; 1019 tmcf->engine = task->thread->engine; 1020 1021 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, 1022 sizeof(nxt_router_engine_conf_t)); 1023 if (nxt_slow_path(tmcf->engines == NULL)) { 1024 goto temp_fail; 1025 } 1026 1027 nxt_queue_init(&creating_sockets); 1028 nxt_queue_init(&pending_sockets); 1029 nxt_queue_init(&updating_sockets); 1030 nxt_queue_init(&keeping_sockets); 1031 nxt_queue_init(&deleting_sockets); 1032 1033 #if (NXT_TLS) 1034 nxt_queue_init(&tmcf->tls); 1035 #endif 1036 1037 nxt_queue_init(&tmcf->apps); 1038 nxt_queue_init(&tmcf->previous); 1039 1040 return tmcf; 1041 1042 temp_fail: 1043 1044 nxt_mp_destroy(tmp); 1045 1046 fail: 1047 1048 nxt_mp_destroy(mp); 1049 1050 return NULL; 1051 } 1052 1053 1054 nxt_inline nxt_bool_t 1055 nxt_router_app_can_start(nxt_app_t *app) 1056 { 1057 return app->processes + app->pending_processes < app->max_processes 1058 && app->pending_processes < app->max_pending_processes; 1059 } 1060 1061 1062 nxt_inline nxt_bool_t 1063 nxt_router_app_need_start(nxt_app_t *app) 1064 { 1065 return (app->active_requests 1066 > app->port_hash_count + app->pending_processes) 1067 || (app->spare_processes 1068 > app->idle_processes + app->pending_processes); 1069 } 1070 1071 1072 static void 1073 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) 1074 { 1075 nxt_int_t ret; 1076 nxt_app_t *app; 1077 nxt_router_t *router; 1078 nxt_runtime_t *rt; 1079 nxt_queue_link_t *qlk; 1080 nxt_socket_conf_t *skcf; 1081 nxt_router_conf_t *rtcf; 1082 nxt_router_temp_conf_t *tmcf; 1083 const nxt_event_interface_t *interface; 1084 #if (NXT_TLS) 1085 nxt_router_tlssock_t *tls; 1086 #endif 1087 1088 tmcf = obj; 1089 1090 qlk = nxt_queue_first(&pending_sockets); 1091 1092 if (qlk != nxt_queue_tail(&pending_sockets)) { 1093 nxt_queue_remove(qlk); 1094 nxt_queue_insert_tail(&creating_sockets, qlk); 1095 1096 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1097 1098 nxt_router_listen_socket_rpc_create(task, tmcf, skcf); 1099 1100 return; 1101 } 1102 1103 #if (NXT_TLS) 1104 qlk = nxt_queue_last(&tmcf->tls); 1105 1106 if (qlk != nxt_queue_head(&tmcf->tls)) { 1107 nxt_queue_remove(qlk); 1108 1109 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); 1110 1111 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, 1112 nxt_router_tls_rpc_handler, tls); 1113 return; 1114 } 1115 #endif 1116 1117 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1118 1119 if (nxt_router_app_need_start(app)) { 1120 nxt_router_app_rpc_create(task, tmcf, app); 1121 return; 1122 } 1123 1124 } nxt_queue_loop; 1125 1126 rtcf = tmcf->router_conf; 1127 1128 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) { 1129 nxt_router_access_log_open(task, tmcf); 1130 return; 1131 } 1132 1133 rt = task->thread->runtime; 1134 1135 interface = nxt_service_get(rt->services, "engine", NULL); 1136 1137 router = rtcf->router; 1138 1139 ret = nxt_router_engines_create(task, router, tmcf, interface); 1140 if (nxt_slow_path(ret != NXT_OK)) { 1141 goto fail; 1142 } 1143 1144 ret = nxt_router_threads_create(task, rt, tmcf); 1145 if (nxt_slow_path(ret != NXT_OK)) { 1146 goto fail; 1147 } 1148 1149 nxt_router_apps_sort(task, router, tmcf); 1150 1151 nxt_router_apps_hash_use(task, rtcf, 1); 1152 1153 nxt_router_engines_post(router, tmcf); 1154 1155 nxt_queue_add(&router->sockets, &updating_sockets); 1156 nxt_queue_add(&router->sockets, &creating_sockets); 1157 1158 if (router->access_log != rtcf->access_log) { 1159 nxt_router_access_log_use(&router->lock, rtcf->access_log); 1160 1161 nxt_router_access_log_release(task, &router->lock, router->access_log); 1162 1163 router->access_log = rtcf->access_log; 1164 } 1165 1166 nxt_router_conf_ready(task, tmcf); 1167 1168 return; 1169 1170 fail: 1171 1172 nxt_router_conf_error(task, tmcf); 1173 1174 return; 1175 } 1176 1177 1178 static void 1179 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) 1180 { 1181 nxt_joint_job_t *job; 1182 1183 job = obj; 1184 1185 nxt_router_conf_ready(task, job->tmcf); 1186 } 1187 1188 1189 static void 1190 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1191 { 1192 uint32_t count; 1193 nxt_router_conf_t *rtcf; 1194 nxt_thread_spinlock_t *lock; 1195 1196 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); 1197 1198 if (--tmcf->count > 0) { 1199 return; 1200 } 1201 1202 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); 1203 1204 rtcf = tmcf->router_conf; 1205 1206 lock = &rtcf->router->lock; 1207 1208 nxt_thread_spin_lock(lock); 1209 1210 count = rtcf->count; 1211 1212 nxt_thread_spin_unlock(lock); 1213 1214 nxt_debug(task, "rtcf %p: %D", rtcf, count); 1215 1216 if (count == 0) { 1217 nxt_router_apps_hash_use(task, rtcf, -1); 1218 1219 nxt_router_access_log_release(task, lock, rtcf->access_log); 1220 1221 nxt_mp_destroy(rtcf->mem_pool); 1222 } 1223 1224 nxt_mp_release(tmcf->mem_pool); 1225 } 1226 1227 1228 static void 1229 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) 1230 { 1231 nxt_app_t *app; 1232 nxt_queue_t new_socket_confs; 1233 nxt_socket_t s; 1234 nxt_router_t *router; 1235 nxt_queue_link_t *qlk; 1236 nxt_socket_conf_t *skcf; 1237 nxt_router_conf_t *rtcf; 1238 1239 nxt_alert(task, "failed to apply new conf"); 1240 1241 for (qlk = nxt_queue_first(&creating_sockets); 1242 qlk != nxt_queue_tail(&creating_sockets); 1243 qlk = nxt_queue_next(qlk)) 1244 { 1245 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 1246 s = skcf->listen->socket; 1247 1248 if (s != -1) { 1249 nxt_socket_close(task, s); 1250 } 1251 1252 nxt_free(skcf->listen); 1253 } 1254 1255 nxt_queue_init(&new_socket_confs); 1256 nxt_queue_add(&new_socket_confs, &updating_sockets); 1257 nxt_queue_add(&new_socket_confs, &pending_sockets); 1258 nxt_queue_add(&new_socket_confs, &creating_sockets); 1259 1260 rtcf = tmcf->router_conf; 1261 1262 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 1263 1264 nxt_router_app_unlink(task, app); 1265 1266 } nxt_queue_loop; 1267 1268 router = rtcf->router; 1269 1270 nxt_queue_add(&router->sockets, &keeping_sockets); 1271 nxt_queue_add(&router->sockets, &deleting_sockets); 1272 1273 nxt_queue_add(&router->apps, &tmcf->previous); 1274 1275 // TODO: new engines and threads 1276 1277 nxt_router_access_log_release(task, &router->lock, rtcf->access_log); 1278 1279 nxt_mp_destroy(rtcf->mem_pool); 1280 1281 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); 1282 1283 nxt_mp_release(tmcf->mem_pool); 1284 } 1285 1286 1287 static void 1288 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1289 nxt_port_msg_type_t type) 1290 { 1291 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); 1292 1293 nxt_port_use(task, tmcf->port, -1); 1294 1295 tmcf->port = NULL; 1296 } 1297 1298 1299 static nxt_conf_map_t nxt_router_conf[] = { 1300 { 1301 nxt_string("listeners_threads"), 1302 NXT_CONF_MAP_INT32, 1303 offsetof(nxt_router_conf_t, threads), 1304 }, 1305 }; 1306 1307 1308 static nxt_conf_map_t nxt_router_app_conf[] = { 1309 { 1310 nxt_string("type"), 1311 NXT_CONF_MAP_STR, 1312 offsetof(nxt_router_app_conf_t, type), 1313 }, 1314 1315 { 1316 nxt_string("limits"), 1317 NXT_CONF_MAP_PTR, 1318 offsetof(nxt_router_app_conf_t, limits_value), 1319 }, 1320 1321 { 1322 nxt_string("processes"), 1323 NXT_CONF_MAP_INT32, 1324 offsetof(nxt_router_app_conf_t, processes), 1325 }, 1326 1327 { 1328 nxt_string("processes"), 1329 NXT_CONF_MAP_PTR, 1330 offsetof(nxt_router_app_conf_t, processes_value), 1331 }, 1332 1333 { 1334 nxt_string("targets"), 1335 NXT_CONF_MAP_PTR, 1336 offsetof(nxt_router_app_conf_t, targets_value), 1337 }, 1338 }; 1339 1340 1341 static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1342 { 1343 nxt_string("timeout"), 1344 NXT_CONF_MAP_MSEC, 1345 offsetof(nxt_router_app_conf_t, timeout), 1346 }, 1347 }; 1348 1349 1350 static nxt_conf_map_t nxt_router_app_processes_conf[] = { 1351 { 1352 nxt_string("spare"), 1353 NXT_CONF_MAP_INT32, 1354 offsetof(nxt_router_app_conf_t, spare_processes), 1355 }, 1356 1357 { 1358 nxt_string("max"), 1359 NXT_CONF_MAP_INT32, 1360 offsetof(nxt_router_app_conf_t, max_processes), 1361 }, 1362 1363 { 1364 nxt_string("idle_timeout"), 1365 NXT_CONF_MAP_MSEC, 1366 offsetof(nxt_router_app_conf_t, idle_timeout), 1367 }, 1368 }; 1369 1370 1371 static nxt_conf_map_t nxt_router_listener_conf[] = { 1372 { 1373 nxt_string("pass"), 1374 NXT_CONF_MAP_STR_COPY, 1375 offsetof(nxt_router_listener_conf_t, pass), 1376 }, 1377 1378 { 1379 nxt_string("application"), 1380 NXT_CONF_MAP_STR_COPY, 1381 offsetof(nxt_router_listener_conf_t, application), 1382 }, 1383 }; 1384 1385 1386 static nxt_conf_map_t nxt_router_http_conf[] = { 1387 { 1388 nxt_string("header_buffer_size"), 1389 NXT_CONF_MAP_SIZE, 1390 offsetof(nxt_socket_conf_t, header_buffer_size), 1391 }, 1392 1393 { 1394 nxt_string("large_header_buffer_size"), 1395 NXT_CONF_MAP_SIZE, 1396 offsetof(nxt_socket_conf_t, large_header_buffer_size), 1397 }, 1398 1399 { 1400 nxt_string("large_header_buffers"), 1401 NXT_CONF_MAP_SIZE, 1402 offsetof(nxt_socket_conf_t, large_header_buffers), 1403 }, 1404 1405 { 1406 nxt_string("body_buffer_size"), 1407 NXT_CONF_MAP_SIZE, 1408 offsetof(nxt_socket_conf_t, body_buffer_size), 1409 }, 1410 1411 { 1412 nxt_string("max_body_size"), 1413 NXT_CONF_MAP_SIZE, 1414 offsetof(nxt_socket_conf_t, max_body_size), 1415 }, 1416 1417 { 1418 nxt_string("idle_timeout"), 1419 NXT_CONF_MAP_MSEC, 1420 offsetof(nxt_socket_conf_t, idle_timeout), 1421 }, 1422 1423 { 1424 nxt_string("header_read_timeout"), 1425 NXT_CONF_MAP_MSEC, 1426 offsetof(nxt_socket_conf_t, header_read_timeout), 1427 }, 1428 1429 { 1430 nxt_string("body_read_timeout"), 1431 NXT_CONF_MAP_MSEC, 1432 offsetof(nxt_socket_conf_t, body_read_timeout), 1433 }, 1434 1435 { 1436 nxt_string("send_timeout"), 1437 NXT_CONF_MAP_MSEC, 1438 offsetof(nxt_socket_conf_t, send_timeout), 1439 }, 1440 1441 { 1442 nxt_string("body_temp_path"), 1443 NXT_CONF_MAP_STR, 1444 offsetof(nxt_socket_conf_t, body_temp_path), 1445 }, 1446 1447 { 1448 nxt_string("discard_unsafe_fields"), 1449 NXT_CONF_MAP_INT8, 1450 offsetof(nxt_socket_conf_t, discard_unsafe_fields), 1451 }, 1452 }; 1453 1454 1455 static nxt_conf_map_t nxt_router_websocket_conf[] = { 1456 { 1457 nxt_string("max_frame_size"), 1458 NXT_CONF_MAP_SIZE, 1459 offsetof(nxt_websocket_conf_t, max_frame_size), 1460 }, 1461 1462 { 1463 nxt_string("read_timeout"), 1464 NXT_CONF_MAP_MSEC, 1465 offsetof(nxt_websocket_conf_t, read_timeout), 1466 }, 1467 1468 { 1469 nxt_string("keepalive_interval"), 1470 NXT_CONF_MAP_MSEC, 1471 offsetof(nxt_websocket_conf_t, keepalive_interval), 1472 }, 1473 1474 }; 1475 1476 1477 static nxt_int_t 1478 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 1479 u_char *start, u_char *end) 1480 { 1481 u_char *p; 1482 size_t size; 1483 nxt_mp_t *mp, *app_mp; 1484 uint32_t next, next_target; 1485 nxt_int_t ret; 1486 nxt_str_t name, path, target; 1487 nxt_app_t *app, *prev; 1488 nxt_str_t *t, *s, *targets; 1489 nxt_uint_t n, i; 1490 nxt_port_t *port; 1491 nxt_router_t *router; 1492 nxt_app_joint_t *app_joint; 1493 #if (NXT_TLS) 1494 nxt_tls_init_t *tls_init; 1495 nxt_conf_value_t *certificate; 1496 #endif 1497 nxt_conf_value_t *conf, *http, *value, *websocket; 1498 nxt_conf_value_t *applications, *application; 1499 nxt_conf_value_t *listeners, *listener; 1500 nxt_conf_value_t *routes_conf, *static_conf, *client_ip_conf; 1501 nxt_socket_conf_t *skcf; 1502 nxt_http_routes_t *routes; 1503 nxt_event_engine_t *engine; 1504 nxt_app_lang_module_t *lang; 1505 nxt_router_app_conf_t apcf; 1506 nxt_router_access_log_t *access_log; 1507 nxt_router_listener_conf_t lscf; 1508 1509 static nxt_str_t http_path = nxt_string("/settings/http"); 1510 static nxt_str_t applications_path = nxt_string("/applications"); 1511 static nxt_str_t listeners_path = nxt_string("/listeners"); 1512 static nxt_str_t routes_path = nxt_string("/routes"); 1513 static nxt_str_t access_log_path = nxt_string("/access_log"); 1514 #if (NXT_TLS) 1515 static nxt_str_t certificate_path = nxt_string("/tls/certificate"); 1516 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands"); 1517 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size"); 1518 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout"); 1519 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets"); 1520 #endif 1521 static nxt_str_t static_path = nxt_string("/settings/http/static"); 1522 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); 1523 static nxt_str_t client_ip_path = nxt_string("/client_ip"); 1524 1525 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); 1526 if (conf == NULL) { 1527 nxt_alert(task, "configuration parsing error"); 1528 return NXT_ERROR; 1529 } 1530 1531 mp = tmcf->router_conf->mem_pool; 1532 1533 ret = nxt_conf_map_object(mp, conf, nxt_router_conf, 1534 nxt_nitems(nxt_router_conf), tmcf->router_conf); 1535 if (ret != NXT_OK) { 1536 nxt_alert(task, "root map error"); 1537 return NXT_ERROR; 1538 } 1539 1540 if (tmcf->router_conf->threads == 0) { 1541 tmcf->router_conf->threads = nxt_ncpu; 1542 } 1543 1544 static_conf = nxt_conf_get_path(conf, &static_path); 1545 1546 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf); 1547 if (nxt_slow_path(ret != NXT_OK)) { 1548 return NXT_ERROR; 1549 } 1550 1551 router = tmcf->router_conf->router; 1552 1553 applications = nxt_conf_get_path(conf, &applications_path); 1554 1555 if (applications != NULL) { 1556 next = 0; 1557 1558 for ( ;; ) { 1559 application = nxt_conf_next_object_member(applications, 1560 &name, &next); 1561 if (application == NULL) { 1562 break; 1563 } 1564 1565 nxt_debug(task, "application \"%V\"", &name); 1566 1567 size = nxt_conf_json_length(application, NULL); 1568 1569 app_mp = nxt_mp_create(4096, 128, 1024, 64); 1570 if (nxt_slow_path(app_mp == NULL)) { 1571 goto fail; 1572 } 1573 1574 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); 1575 if (app == NULL) { 1576 goto app_fail; 1577 } 1578 1579 nxt_memzero(app, sizeof(nxt_app_t)); 1580 1581 app->mem_pool = app_mp; 1582 1583 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); 1584 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) 1585 + name.length); 1586 1587 p = nxt_conf_json_print(app->conf.start, application, NULL); 1588 app->conf.length = p - app->conf.start; 1589 1590 nxt_assert(app->conf.length <= size); 1591 1592 nxt_debug(task, "application conf \"%V\"", &app->conf); 1593 1594 prev = nxt_router_app_find(&router->apps, &name); 1595 1596 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { 1597 nxt_mp_destroy(app_mp); 1598 1599 nxt_queue_remove(&prev->link); 1600 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1601 1602 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev); 1603 if (nxt_slow_path(ret != NXT_OK)) { 1604 goto fail; 1605 } 1606 1607 continue; 1608 } 1609 1610 apcf.processes = 1; 1611 apcf.max_processes = 1; 1612 apcf.spare_processes = 0; 1613 apcf.timeout = 0; 1614 apcf.idle_timeout = 15000; 1615 apcf.limits_value = NULL; 1616 apcf.processes_value = NULL; 1617 apcf.targets_value = NULL; 1618 1619 app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); 1620 if (nxt_slow_path(app_joint == NULL)) { 1621 goto app_fail; 1622 } 1623 1624 nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); 1625 1626 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1627 nxt_nitems(nxt_router_app_conf), &apcf); 1628 if (ret != NXT_OK) { 1629 nxt_alert(task, "application map error"); 1630 goto app_fail; 1631 } 1632 1633 if (apcf.limits_value != NULL) { 1634 1635 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 1636 nxt_alert(task, "application limits is not object"); 1637 goto app_fail; 1638 } 1639 1640 ret = nxt_conf_map_object(mp, apcf.limits_value, 1641 nxt_router_app_limits_conf, 1642 nxt_nitems(nxt_router_app_limits_conf), 1643 &apcf); 1644 if (ret != NXT_OK) { 1645 nxt_alert(task, "application limits map error"); 1646 goto app_fail; 1647 } 1648 } 1649 1650 if (apcf.processes_value != NULL 1651 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) 1652 { 1653 ret = nxt_conf_map_object(mp, apcf.processes_value, 1654 nxt_router_app_processes_conf, 1655 nxt_nitems(nxt_router_app_processes_conf), 1656 &apcf); 1657 if (ret != NXT_OK) { 1658 nxt_alert(task, "application processes map error"); 1659 goto app_fail; 1660 } 1661 1662 } else { 1663 apcf.max_processes = apcf.processes; 1664 apcf.spare_processes = apcf.processes; 1665 } 1666 1667 if (apcf.targets_value != NULL) { 1668 n = nxt_conf_object_members_count(apcf.targets_value); 1669 1670 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); 1671 if (nxt_slow_path(targets == NULL)) { 1672 goto app_fail; 1673 } 1674 1675 next_target = 0; 1676 1677 for (i = 0; i < n; i++) { 1678 (void) nxt_conf_next_object_member(apcf.targets_value, 1679 &target, &next_target); 1680 1681 s = nxt_str_dup(app_mp, &targets[i], &target); 1682 if (nxt_slow_path(s == NULL)) { 1683 goto app_fail; 1684 } 1685 } 1686 1687 } else { 1688 targets = NULL; 1689 } 1690 1691 nxt_debug(task, "application type: %V", &apcf.type); 1692 nxt_debug(task, "application processes: %D", apcf.processes); 1693 nxt_debug(task, "application request timeout: %M", apcf.timeout); 1694 1695 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1696 1697 if (lang == NULL) { 1698 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); 1699 goto app_fail; 1700 } 1701 1702 nxt_debug(task, "application language module: \"%s\"", lang->file); 1703 1704 ret = nxt_thread_mutex_create(&app->mutex); 1705 if (ret != NXT_OK) { 1706 goto app_fail; 1707 } 1708 1709 nxt_queue_init(&app->ports); 1710 nxt_queue_init(&app->spare_ports); 1711 nxt_queue_init(&app->idle_ports); 1712 nxt_queue_init(&app->ack_waiting_req); 1713 1714 app->name.length = name.length; 1715 nxt_memcpy(app->name.start, name.start, name.length); 1716 1717 app->type = lang->type; 1718 app->max_processes = apcf.max_processes; 1719 app->spare_processes = apcf.spare_processes; 1720 app->max_pending_processes = apcf.spare_processes 1721 ? apcf.spare_processes : 1; 1722 app->timeout = apcf.timeout; 1723 app->idle_timeout = apcf.idle_timeout; 1724 1725 app->targets = targets; 1726 1727 engine = task->thread->engine; 1728 1729 app->engine = engine; 1730 1731 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; 1732 app->adjust_idle_work.task = &engine->task; 1733 app->adjust_idle_work.obj = app; 1734 1735 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1736 1737 ret = nxt_router_apps_hash_add(tmcf->router_conf, app); 1738 if (nxt_slow_path(ret != NXT_OK)) { 1739 goto app_fail; 1740 } 1741 1742 nxt_router_app_use(task, app, 1); 1743 1744 app->joint = app_joint; 1745 1746 app_joint->use_count = 1; 1747 app_joint->app = app; 1748 1749 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; 1750 app_joint->idle_timer.work_queue = &engine->fast_work_queue; 1751 app_joint->idle_timer.handler = nxt_router_app_idle_timeout; 1752 app_joint->idle_timer.task = &engine->task; 1753 app_joint->idle_timer.log = app_joint->idle_timer.task->log; 1754 1755 app_joint->free_app_work.handler = nxt_router_free_app; 1756 app_joint->free_app_work.task = &engine->task; 1757 app_joint->free_app_work.obj = app_joint; 1758 1759 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, 1760 NXT_PROCESS_APP); 1761 if (nxt_slow_path(port == NULL)) { 1762 return NXT_ERROR; 1763 } 1764 1765 ret = nxt_port_socket_init(task, port, 0); 1766 if (nxt_slow_path(ret != NXT_OK)) { 1767 nxt_port_use(task, port, -1); 1768 return NXT_ERROR; 1769 } 1770 1771 ret = nxt_router_app_queue_init(task, port); 1772 if (nxt_slow_path(ret != NXT_OK)) { 1773 nxt_port_write_close(port); 1774 nxt_port_read_close(port); 1775 nxt_port_use(task, port, -1); 1776 return NXT_ERROR; 1777 } 1778 1779 nxt_port_write_enable(task, port); 1780 port->app = app; 1781 1782 app->shared_port = port; 1783 1784 nxt_thread_mutex_create(&app->outgoing.mutex); 1785 } 1786 } 1787 1788 routes_conf = nxt_conf_get_path(conf, &routes_path); 1789 if (nxt_fast_path(routes_conf != NULL)) { 1790 routes = nxt_http_routes_create(task, tmcf, routes_conf); 1791 if (nxt_slow_path(routes == NULL)) { 1792 return NXT_ERROR; 1793 } 1794 tmcf->router_conf->routes = routes; 1795 } 1796 1797 ret = nxt_upstreams_create(task, tmcf, conf); 1798 if (nxt_slow_path(ret != NXT_OK)) { 1799 return ret; 1800 } 1801 1802 http = nxt_conf_get_path(conf, &http_path); 1803 #if 0 1804 if (http == NULL) { 1805 nxt_alert(task, "no \"http\" block"); 1806 return NXT_ERROR; 1807 } 1808 #endif 1809 1810 websocket = nxt_conf_get_path(conf, &websocket_path); 1811 1812 listeners = nxt_conf_get_path(conf, &listeners_path); 1813 1814 if (listeners != NULL) { 1815 next = 0; 1816 1817 for ( ;; ) { 1818 listener = nxt_conf_next_object_member(listeners, &name, &next); 1819 if (listener == NULL) { 1820 break; 1821 } 1822 1823 skcf = nxt_router_socket_conf(task, tmcf, &name); 1824 if (skcf == NULL) { 1825 goto fail; 1826 } 1827 1828 nxt_memzero(&lscf, sizeof(lscf)); 1829 1830 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, 1831 nxt_nitems(nxt_router_listener_conf), 1832 &lscf); 1833 if (ret != NXT_OK) { 1834 nxt_alert(task, "listener map error"); 1835 goto fail; 1836 } 1837 1838 nxt_debug(task, "application: %V", &lscf.application); 1839 1840 // STUB, default values if http block is not defined. 1841 skcf->header_buffer_size = 2048; 1842 skcf->large_header_buffer_size = 8192; 1843 skcf->large_header_buffers = 4; 1844 skcf->discard_unsafe_fields = 1; 1845 skcf->body_buffer_size = 16 * 1024; 1846 skcf->max_body_size = 8 * 1024 * 1024; 1847 skcf->proxy_header_buffer_size = 64 * 1024; 1848 skcf->proxy_buffer_size = 4096; 1849 skcf->proxy_buffers = 256; 1850 skcf->idle_timeout = 180 * 1000; 1851 skcf->header_read_timeout = 30 * 1000; 1852 skcf->body_read_timeout = 30 * 1000; 1853 skcf->send_timeout = 30 * 1000; 1854 skcf->proxy_timeout = 60 * 1000; 1855 skcf->proxy_send_timeout = 30 * 1000; 1856 skcf->proxy_read_timeout = 30 * 1000; 1857 1858 skcf->websocket_conf.max_frame_size = 1024 * 1024; 1859 skcf->websocket_conf.read_timeout = 60 * 1000; 1860 skcf->websocket_conf.keepalive_interval = 30 * 1000; 1861 1862 nxt_str_null(&skcf->body_temp_path); 1863 1864 if (http != NULL) { 1865 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, 1866 nxt_nitems(nxt_router_http_conf), 1867 skcf); 1868 if (ret != NXT_OK) { 1869 nxt_alert(task, "http map error"); 1870 goto fail; 1871 } 1872 } 1873 1874 if (websocket != NULL) { 1875 ret = nxt_conf_map_object(mp, websocket, 1876 nxt_router_websocket_conf, 1877 nxt_nitems(nxt_router_websocket_conf), 1878 &skcf->websocket_conf); 1879 if (ret != NXT_OK) { 1880 nxt_alert(task, "websocket map error"); 1881 goto fail; 1882 } 1883 } 1884 1885 t = &skcf->body_temp_path; 1886 1887 if (t->length == 0) { 1888 t->start = (u_char *) task->thread->runtime->tmp; 1889 t->length = nxt_strlen(t->start); 1890 } 1891 1892 client_ip_conf = nxt_conf_get_path(listener, &client_ip_path); 1893 ret = nxt_router_conf_process_client_ip(task, tmcf, skcf, 1894 client_ip_conf); 1895 if (nxt_slow_path(ret != NXT_OK)) { 1896 return NXT_ERROR; 1897 } 1898 1899 #if (NXT_TLS) 1900 certificate = nxt_conf_get_path(listener, &certificate_path); 1901 1902 if (certificate != NULL) { 1903 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t)); 1904 if (nxt_slow_path(tls_init == NULL)) { 1905 return NXT_ERROR; 1906 } 1907 1908 tls_init->cache_size = 0; 1909 tls_init->timeout = 300; 1910 1911 value = nxt_conf_get_path(listener, &conf_cache_path); 1912 if (value != NULL) { 1913 tls_init->cache_size = nxt_conf_get_number(value); 1914 } 1915 1916 value = nxt_conf_get_path(listener, &conf_timeout_path); 1917 if (value != NULL) { 1918 tls_init->timeout = nxt_conf_get_number(value); 1919 } 1920 1921 tls_init->conf_cmds = nxt_conf_get_path(listener, 1922 &conf_commands_path); 1923 1924 tls_init->tickets_conf = nxt_conf_get_path(listener, 1925 &conf_tickets); 1926 1927 n = nxt_conf_array_elements_count_or_1(certificate); 1928 1929 for (i = 0; i < n; i++) { 1930 value = nxt_conf_get_array_element_or_itself(certificate, 1931 i); 1932 nxt_assert(value != NULL); 1933 1934 ret = nxt_router_conf_tls_insert(tmcf, value, skcf, 1935 tls_init, i == 0); 1936 if (nxt_slow_path(ret != NXT_OK)) { 1937 goto fail; 1938 } 1939 } 1940 } 1941 #endif 1942 1943 skcf->listen->handler = nxt_http_conn_init; 1944 skcf->router_conf = tmcf->router_conf; 1945 skcf->router_conf->count++; 1946 1947 if (lscf.pass.length != 0) { 1948 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); 1949 1950 /* COMPATIBILITY: listener application. */ 1951 } else if (lscf.application.length > 0) { 1952 skcf->action = nxt_http_pass_application(task, 1953 tmcf->router_conf, 1954 &lscf.application); 1955 } 1956 1957 if (nxt_slow_path(skcf->action == NULL)) { 1958 goto fail; 1959 } 1960 } 1961 } 1962 1963 ret = nxt_http_routes_resolve(task, tmcf); 1964 if (nxt_slow_path(ret != NXT_OK)) { 1965 goto fail; 1966 } 1967 1968 value = nxt_conf_get_path(conf, &access_log_path); 1969 1970 if (value != NULL) { 1971 nxt_conf_get_string(value, &path); 1972 1973 access_log = router->access_log; 1974 1975 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { 1976 nxt_router_access_log_use(&router->lock, access_log); 1977 1978 } else { 1979 access_log = nxt_malloc(sizeof(nxt_router_access_log_t) 1980 + path.length); 1981 if (access_log == NULL) { 1982 nxt_alert(task, "failed to allocate access log structure"); 1983 goto fail; 1984 } 1985 1986 access_log->fd = -1; 1987 access_log->handler = &nxt_router_access_log_writer; 1988 access_log->count = 1; 1989 1990 access_log->path.length = path.length; 1991 access_log->path.start = (u_char *) access_log 1992 + sizeof(nxt_router_access_log_t); 1993 1994 nxt_memcpy(access_log->path.start, path.start, path.length); 1995 } 1996 1997 tmcf->router_conf->access_log = access_log; 1998 } 1999 2000 nxt_queue_add(&deleting_sockets, &router->sockets); 2001 nxt_queue_init(&router->sockets); 2002 2003 return NXT_OK; 2004 2005 app_fail: 2006 2007 nxt_mp_destroy(app_mp); 2008 2009 fail: 2010 2011 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { 2012 2013 nxt_queue_remove(&app->link); 2014 nxt_thread_mutex_destroy(&app->mutex); 2015 nxt_mp_destroy(app->mem_pool); 2016 2017 } nxt_queue_loop; 2018 2019 return NXT_ERROR; 2020 } 2021 2022 2023 #if (NXT_TLS) 2024 2025 static nxt_int_t 2026 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, 2027 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, 2028 nxt_tls_init_t *tls_init, nxt_bool_t last) 2029 { 2030 nxt_router_tlssock_t *tls; 2031 2032 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t)); 2033 if (nxt_slow_path(tls == NULL)) { 2034 return NXT_ERROR; 2035 } 2036 2037 tls->tls_init = tls_init; 2038 tls->socket_conf = skcf; 2039 tls->temp_conf = tmcf; 2040 tls->last = last; 2041 nxt_conf_get_string(value, &tls->name); 2042 2043 nxt_queue_insert_tail(&tmcf->tls, &tls->link); 2044 2045 return NXT_OK; 2046 } 2047 2048 #endif 2049 2050 2051 static nxt_int_t 2052 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, 2053 nxt_conf_value_t *conf) 2054 { 2055 uint32_t next, i; 2056 nxt_mp_t *mp; 2057 nxt_str_t *type, exten, str; 2058 nxt_int_t ret; 2059 nxt_uint_t exts; 2060 nxt_conf_value_t *mtypes_conf, *ext_conf, *value; 2061 2062 static nxt_str_t mtypes_path = nxt_string("/mime_types"); 2063 2064 mp = rtcf->mem_pool; 2065 2066 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash); 2067 if (nxt_slow_path(ret != NXT_OK)) { 2068 return NXT_ERROR; 2069 } 2070 2071 if (conf == NULL) { 2072 return NXT_OK; 2073 } 2074 2075 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path); 2076 2077 if (mtypes_conf != NULL) { 2078 next = 0; 2079 2080 for ( ;; ) { 2081 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next); 2082 2083 if (ext_conf == NULL) { 2084 break; 2085 } 2086 2087 type = nxt_str_dup(mp, NULL, &str); 2088 if (nxt_slow_path(type == NULL)) { 2089 return NXT_ERROR; 2090 } 2091 2092 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { 2093 nxt_conf_get_string(ext_conf, &str); 2094 2095 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2096 return NXT_ERROR; 2097 } 2098 2099 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2100 &exten, type); 2101 if (nxt_slow_path(ret != NXT_OK)) { 2102 return NXT_ERROR; 2103 } 2104 2105 continue; 2106 } 2107 2108 exts = nxt_conf_array_elements_count(ext_conf); 2109 2110 for (i = 0; i < exts; i++) { 2111 value = nxt_conf_get_array_element(ext_conf, i); 2112 2113 nxt_conf_get_string(value, &str); 2114 2115 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { 2116 return NXT_ERROR; 2117 } 2118 2119 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, 2120 &exten, type); 2121 if (nxt_slow_path(ret != NXT_OK)) { 2122 return NXT_ERROR; 2123 } 2124 } 2125 } 2126 } 2127 2128 return NXT_OK; 2129 } 2130 2131 2132 static nxt_int_t 2133 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2134 nxt_socket_conf_t *skcf, nxt_conf_value_t *conf) 2135 { 2136 char c; 2137 size_t i; 2138 nxt_mp_t *mp; 2139 uint32_t hash; 2140 nxt_str_t header; 2141 nxt_conf_value_t *source_conf, *header_conf, *recursive_conf; 2142 nxt_http_client_ip_t *client_ip; 2143 nxt_http_route_addr_rule_t *source; 2144 2145 static nxt_str_t header_path = nxt_string("/header"); 2146 static nxt_str_t source_path = nxt_string("/source"); 2147 static nxt_str_t recursive_path = nxt_string("/recursive"); 2148 2149 if (conf == NULL) { 2150 skcf->client_ip = NULL; 2151 2152 return NXT_OK; 2153 } 2154 2155 mp = tmcf->router_conf->mem_pool; 2156 2157 source_conf = nxt_conf_get_path(conf, &source_path); 2158 header_conf = nxt_conf_get_path(conf, &header_path); 2159 recursive_conf = nxt_conf_get_path(conf, &recursive_path); 2160 2161 if (source_conf == NULL || header_conf == NULL) { 2162 return NXT_ERROR; 2163 } 2164 2165 client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t)); 2166 if (nxt_slow_path(client_ip == NULL)) { 2167 return NXT_ERROR; 2168 } 2169 2170 source = nxt_http_route_addr_rule_create(task, mp, source_conf); 2171 if (nxt_slow_path(source == NULL)) { 2172 return NXT_ERROR; 2173 } 2174 2175 client_ip->source = source; 2176 2177 nxt_conf_get_string(header_conf, &header); 2178 2179 if (recursive_conf != NULL) { 2180 client_ip->recursive = nxt_conf_get_boolean(recursive_conf); 2181 } 2182 2183 client_ip->header = nxt_str_dup(mp, NULL, &header); 2184 if (nxt_slow_path(client_ip->header == NULL)) { 2185 return NXT_ERROR; 2186 } 2187 2188 hash = NXT_HTTP_FIELD_HASH_INIT; 2189 2190 for (i = 0; i < client_ip->header->length; i++) { 2191 c = client_ip->header->start[i]; 2192 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c)); 2193 } 2194 2195 hash = nxt_http_field_hash_end(hash) & 0xFFFF; 2196 2197 client_ip->header_hash = hash; 2198 2199 skcf->client_ip = client_ip; 2200 2201 return NXT_OK; 2202 } 2203 2204 2205 static nxt_app_t * 2206 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) 2207 { 2208 nxt_app_t *app; 2209 2210 nxt_queue_each(app, queue, nxt_app_t, link) { 2211 2212 if (nxt_strstr_eq(name, &app->name)) { 2213 return app; 2214 } 2215 2216 } nxt_queue_loop; 2217 2218 return NULL; 2219 } 2220 2221 2222 static nxt_int_t 2223 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) 2224 { 2225 void *mem; 2226 nxt_int_t fd; 2227 2228 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); 2229 if (nxt_slow_path(fd == -1)) { 2230 return NXT_ERROR; 2231 } 2232 2233 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), 2234 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2235 if (nxt_slow_path(mem == MAP_FAILED)) { 2236 nxt_fd_close(fd); 2237 2238 return NXT_ERROR; 2239 } 2240 2241 nxt_app_queue_init(mem); 2242 2243 port->queue_fd = fd; 2244 port->queue = mem; 2245 2246 return NXT_OK; 2247 } 2248 2249 2250 static nxt_int_t 2251 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) 2252 { 2253 void *mem; 2254 nxt_int_t fd; 2255 2256 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); 2257 if (nxt_slow_path(fd == -1)) { 2258 return NXT_ERROR; 2259 } 2260 2261 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2262 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2263 if (nxt_slow_path(mem == MAP_FAILED)) { 2264 nxt_fd_close(fd); 2265 2266 return NXT_ERROR; 2267 } 2268 2269 nxt_port_queue_init(mem); 2270 2271 port->queue_fd = fd; 2272 port->queue = mem; 2273 2274 return NXT_OK; 2275 } 2276 2277 2278 static nxt_int_t 2279 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) 2280 { 2281 void *mem; 2282 2283 nxt_assert(fd != -1); 2284 2285 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 2286 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2287 if (nxt_slow_path(mem == MAP_FAILED)) { 2288 2289 return NXT_ERROR; 2290 } 2291 2292 port->queue = mem; 2293 2294 return NXT_OK; 2295 } 2296 2297 2298 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { 2299 NXT_LVLHSH_DEFAULT, 2300 nxt_router_apps_hash_test, 2301 nxt_mp_lvlhsh_alloc, 2302 nxt_mp_lvlhsh_free, 2303 }; 2304 2305 2306 static nxt_int_t 2307 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 2308 { 2309 nxt_app_t *app; 2310 2311 app = data; 2312 2313 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; 2314 } 2315 2316 2317 static nxt_int_t 2318 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) 2319 { 2320 nxt_lvlhsh_query_t lhq; 2321 2322 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); 2323 lhq.replace = 0; 2324 lhq.key = app->name; 2325 lhq.value = app; 2326 lhq.proto = &nxt_router_apps_hash_proto; 2327 lhq.pool = rtcf->mem_pool; 2328 2329 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { 2330 2331 case NXT_OK: 2332 return NXT_OK; 2333 2334 case NXT_DECLINED: 2335 nxt_thread_log_alert("router app hash adding failed: " 2336 "\"%V\" is already in hash", &lhq.key); 2337 /* Fall through. */ 2338 default: 2339 return NXT_ERROR; 2340 } 2341 } 2342 2343 2344 static nxt_app_t * 2345 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) 2346 { 2347 nxt_lvlhsh_query_t lhq; 2348 2349 lhq.key_hash = nxt_djb_hash(name->start, name->length); 2350 lhq.key = *name; 2351 lhq.proto = &nxt_router_apps_hash_proto; 2352 2353 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { 2354 return NULL; 2355 } 2356 2357 return lhq.value; 2358 } 2359 2360 2361 static void 2362 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) 2363 { 2364 nxt_app_t *app; 2365 nxt_lvlhsh_each_t lhe; 2366 2367 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); 2368 2369 for ( ;; ) { 2370 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); 2371 2372 if (app == NULL) { 2373 break; 2374 } 2375 2376 nxt_router_app_use(task, app, i); 2377 } 2378 } 2379 2380 2381 typedef struct { 2382 nxt_app_t *app; 2383 nxt_int_t target; 2384 } nxt_http_app_conf_t; 2385 2386 2387 nxt_int_t 2388 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name, 2389 nxt_str_t *target, nxt_http_action_t *action) 2390 { 2391 nxt_app_t *app; 2392 nxt_str_t *targets; 2393 nxt_uint_t i; 2394 nxt_http_app_conf_t *conf; 2395 2396 app = nxt_router_apps_hash_get(rtcf, name); 2397 if (app == NULL) { 2398 return NXT_DECLINED; 2399 } 2400 2401 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t)); 2402 if (nxt_slow_path(conf == NULL)) { 2403 return NXT_ERROR; 2404 } 2405 2406 action->handler = nxt_http_application_handler; 2407 action->u.conf = conf; 2408 2409 conf->app = app; 2410 2411 if (target != NULL && target->length != 0) { 2412 targets = app->targets; 2413 2414 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++); 2415 2416 conf->target = i; 2417 2418 } else { 2419 conf->target = 0; 2420 } 2421 2422 return NXT_OK; 2423 } 2424 2425 2426 static nxt_socket_conf_t * 2427 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 2428 nxt_str_t *name) 2429 { 2430 size_t size; 2431 nxt_int_t ret; 2432 nxt_bool_t wildcard; 2433 nxt_sockaddr_t *sa; 2434 nxt_socket_conf_t *skcf; 2435 nxt_listen_socket_t *ls; 2436 2437 sa = nxt_sockaddr_parse(tmcf->mem_pool, name); 2438 if (nxt_slow_path(sa == NULL)) { 2439 nxt_alert(task, "invalid listener \"%V\"", name); 2440 return NULL; 2441 } 2442 2443 sa->type = SOCK_STREAM; 2444 2445 nxt_debug(task, "router listener: \"%*s\"", 2446 (size_t) sa->length, nxt_sockaddr_start(sa)); 2447 2448 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t)); 2449 if (nxt_slow_path(skcf == NULL)) { 2450 return NULL; 2451 } 2452 2453 size = nxt_sockaddr_size(sa); 2454 2455 ret = nxt_router_listen_socket_find(tmcf, skcf, sa); 2456 2457 if (ret != NXT_OK) { 2458 2459 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size); 2460 if (nxt_slow_path(ls == NULL)) { 2461 return NULL; 2462 } 2463 2464 skcf->listen = ls; 2465 2466 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t)); 2467 nxt_memcpy(ls->sockaddr, sa, size); 2468 2469 nxt_listen_socket_remote_size(ls); 2470 2471 ls->socket = -1; 2472 ls->backlog = NXT_LISTEN_BACKLOG; 2473 ls->flags = NXT_NONBLOCK; 2474 ls->read_after_accept = 1; 2475 } 2476 2477 switch (sa->u.sockaddr.sa_family) { 2478 #if (NXT_HAVE_UNIX_DOMAIN) 2479 case AF_UNIX: 2480 wildcard = 0; 2481 break; 2482 #endif 2483 #if (NXT_INET6) 2484 case AF_INET6: 2485 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr); 2486 break; 2487 #endif 2488 case AF_INET: 2489 default: 2490 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY); 2491 break; 2492 } 2493 2494 if (!wildcard) { 2495 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size); 2496 if (nxt_slow_path(skcf->sockaddr == NULL)) { 2497 return NULL; 2498 } 2499 2500 nxt_memcpy(skcf->sockaddr, sa, size); 2501 } 2502 2503 return skcf; 2504 } 2505 2506 2507 static nxt_int_t 2508 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, 2509 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa) 2510 { 2511 nxt_router_t *router; 2512 nxt_queue_link_t *qlk; 2513 nxt_socket_conf_t *skcf; 2514 2515 router = tmcf->router_conf->router; 2516 2517 for (qlk = nxt_queue_first(&router->sockets); 2518 qlk != nxt_queue_tail(&router->sockets); 2519 qlk = nxt_queue_next(qlk)) 2520 { 2521 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 2522 2523 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) { 2524 nskcf->listen = skcf->listen; 2525 2526 nxt_queue_remove(qlk); 2527 nxt_queue_insert_tail(&keeping_sockets, qlk); 2528 2529 nxt_queue_insert_tail(&updating_sockets, &nskcf->link); 2530 2531 return NXT_OK; 2532 } 2533 } 2534 2535 nxt_queue_insert_tail(&pending_sockets, &nskcf->link); 2536 2537 return NXT_DECLINED; 2538 } 2539 2540 2541 static void 2542 nxt_router_listen_socket_rpc_create(nxt_task_t *task, 2543 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf) 2544 { 2545 size_t size; 2546 uint32_t stream; 2547 nxt_int_t ret; 2548 nxt_buf_t *b; 2549 nxt_port_t *main_port, *router_port; 2550 nxt_runtime_t *rt; 2551 nxt_socket_rpc_t *rpc; 2552 2553 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t)); 2554 if (rpc == NULL) { 2555 goto fail; 2556 } 2557 2558 rpc->socket_conf = skcf; 2559 rpc->temp_conf = tmcf; 2560 2561 size = nxt_sockaddr_size(skcf->listen->sockaddr); 2562 2563 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2564 if (b == NULL) { 2565 goto fail; 2566 } 2567 2568 b->completion_handler = nxt_buf_dummy_completion; 2569 2570 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); 2571 2572 rt = task->thread->runtime; 2573 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 2574 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2575 2576 stream = nxt_port_rpc_register_handler(task, router_port, 2577 nxt_router_listen_socket_ready, 2578 nxt_router_listen_socket_error, 2579 main_port->pid, rpc); 2580 if (nxt_slow_path(stream == 0)) { 2581 goto fail; 2582 } 2583 2584 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1, 2585 stream, router_port->id, b); 2586 2587 if (nxt_slow_path(ret != NXT_OK)) { 2588 nxt_port_rpc_cancel(task, router_port, stream); 2589 goto fail; 2590 } 2591 2592 return; 2593 2594 fail: 2595 2596 nxt_router_conf_error(task, tmcf); 2597 } 2598 2599 2600 static void 2601 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2602 void *data) 2603 { 2604 nxt_int_t ret; 2605 nxt_socket_t s; 2606 nxt_socket_rpc_t *rpc; 2607 2608 rpc = data; 2609 2610 s = msg->fd[0]; 2611 2612 ret = nxt_socket_nonblocking(task, s); 2613 if (nxt_slow_path(ret != NXT_OK)) { 2614 goto fail; 2615 } 2616 2617 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr); 2618 2619 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 2620 if (nxt_slow_path(ret != NXT_OK)) { 2621 goto fail; 2622 } 2623 2624 rpc->socket_conf->listen->socket = s; 2625 2626 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2627 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2628 2629 return; 2630 2631 fail: 2632 2633 nxt_socket_close(task, s); 2634 2635 nxt_router_conf_error(task, rpc->temp_conf); 2636 } 2637 2638 2639 static void 2640 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2641 void *data) 2642 { 2643 nxt_socket_rpc_t *rpc; 2644 nxt_router_temp_conf_t *tmcf; 2645 2646 rpc = data; 2647 tmcf = rpc->temp_conf; 2648 2649 #if 0 2650 u_char *p; 2651 size_t size; 2652 uint8_t error; 2653 nxt_buf_t *in, *out; 2654 nxt_sockaddr_t *sa; 2655 2656 static nxt_str_t socket_errors[] = { 2657 nxt_string("ListenerSystem"), 2658 nxt_string("ListenerNoIPv6"), 2659 nxt_string("ListenerPort"), 2660 nxt_string("ListenerInUse"), 2661 nxt_string("ListenerNoAddress"), 2662 nxt_string("ListenerNoAccess"), 2663 nxt_string("ListenerPath"), 2664 }; 2665 2666 sa = rpc->socket_conf->listen->sockaddr; 2667 2668 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); 2669 2670 if (nxt_slow_path(in == NULL)) { 2671 return; 2672 } 2673 2674 p = in->mem.pos; 2675 2676 error = *p++; 2677 2678 size = nxt_length("listen socket error: ") 2679 + nxt_length("{listener: \"\", code:\"\", message: \"\"}") 2680 + sa->length + socket_errors[error].length + (in->mem.free - p); 2681 2682 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2683 if (nxt_slow_path(out == NULL)) { 2684 return; 2685 } 2686 2687 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end, 2688 "listen socket error: " 2689 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}", 2690 (size_t) sa->length, nxt_sockaddr_start(sa), 2691 &socket_errors[error], in->mem.free - p, p); 2692 2693 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos); 2694 #endif 2695 2696 nxt_router_conf_error(task, tmcf); 2697 } 2698 2699 2700 #if (NXT_TLS) 2701 2702 static void 2703 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2704 void *data) 2705 { 2706 nxt_mp_t *mp; 2707 nxt_int_t ret; 2708 nxt_tls_conf_t *tlscf; 2709 nxt_router_tlssock_t *tls; 2710 nxt_tls_bundle_conf_t *bundle; 2711 nxt_router_temp_conf_t *tmcf; 2712 2713 nxt_debug(task, "tls rpc handler"); 2714 2715 tls = data; 2716 tmcf = tls->temp_conf; 2717 2718 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) { 2719 goto fail; 2720 } 2721 2722 mp = tmcf->router_conf->mem_pool; 2723 2724 if (tls->socket_conf->tls == NULL){ 2725 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t)); 2726 if (nxt_slow_path(tlscf == NULL)) { 2727 goto fail; 2728 } 2729 2730 tlscf->no_wait_shutdown = 1; 2731 tls->socket_conf->tls = tlscf; 2732 2733 } else { 2734 tlscf = tls->socket_conf->tls; 2735 } 2736 2737 tls->tls_init->conf = tlscf; 2738 2739 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); 2740 if (nxt_slow_path(bundle == NULL)) { 2741 goto fail; 2742 } 2743 2744 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) { 2745 goto fail; 2746 } 2747 2748 bundle->chain_file = msg->fd[0]; 2749 bundle->next = tlscf->bundle; 2750 tlscf->bundle = bundle; 2751 2752 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init, 2753 tls->last); 2754 if (nxt_slow_path(ret != NXT_OK)) { 2755 goto fail; 2756 } 2757 2758 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2759 nxt_router_conf_apply, task, tmcf, NULL); 2760 return; 2761 2762 fail: 2763 2764 nxt_router_conf_error(task, tmcf); 2765 } 2766 2767 #endif 2768 2769 2770 static void 2771 nxt_router_app_rpc_create(nxt_task_t *task, 2772 nxt_router_temp_conf_t *tmcf, nxt_app_t *app) 2773 { 2774 size_t size; 2775 uint32_t stream; 2776 nxt_fd_t port_fd, queue_fd; 2777 nxt_int_t ret; 2778 nxt_buf_t *b; 2779 nxt_port_t *router_port, *dport; 2780 nxt_runtime_t *rt; 2781 nxt_app_rpc_t *rpc; 2782 2783 rt = task->thread->runtime; 2784 2785 dport = app->proto_port; 2786 2787 if (dport == NULL) { 2788 nxt_debug(task, "app '%V' prototype prefork", &app->name); 2789 2790 size = app->name.length + 1 + app->conf.length; 2791 2792 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); 2793 if (nxt_slow_path(b == NULL)) { 2794 goto fail; 2795 } 2796 2797 b->completion_handler = nxt_buf_dummy_completion; 2798 2799 nxt_buf_cpystr(b, &app->name); 2800 *b->mem.free++ = '\0'; 2801 nxt_buf_cpystr(b, &app->conf); 2802 2803 dport = rt->port_by_type[NXT_PROCESS_MAIN]; 2804 2805 port_fd = app->shared_port->pair[0]; 2806 queue_fd = app->shared_port->queue_fd; 2807 2808 } else { 2809 nxt_debug(task, "app '%V' prefork", &app->name); 2810 2811 b = NULL; 2812 port_fd = -1; 2813 queue_fd = -1; 2814 } 2815 2816 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 2817 2818 rpc = nxt_port_rpc_register_handler_ex(task, router_port, 2819 nxt_router_app_prefork_ready, 2820 nxt_router_app_prefork_error, 2821 sizeof(nxt_app_rpc_t)); 2822 if (nxt_slow_path(rpc == NULL)) { 2823 goto fail; 2824 } 2825 2826 rpc->app = app; 2827 rpc->temp_conf = tmcf; 2828 rpc->proto = (b != NULL); 2829 2830 stream = nxt_port_rpc_ex_stream(rpc); 2831 2832 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, 2833 port_fd, queue_fd, stream, router_port->id, b); 2834 if (nxt_slow_path(ret != NXT_OK)) { 2835 nxt_port_rpc_cancel(task, router_port, stream); 2836 goto fail; 2837 } 2838 2839 if (b == NULL) { 2840 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid); 2841 2842 app->pending_processes++; 2843 } 2844 2845 return; 2846 2847 fail: 2848 2849 nxt_router_conf_error(task, tmcf); 2850 } 2851 2852 2853 static void 2854 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2855 void *data) 2856 { 2857 nxt_app_t *app; 2858 nxt_port_t *port; 2859 nxt_app_rpc_t *rpc; 2860 nxt_event_engine_t *engine; 2861 2862 rpc = data; 2863 app = rpc->app; 2864 2865 port = msg->u.new_port; 2866 2867 nxt_assert(port != NULL); 2868 nxt_assert(port->id == 0); 2869 2870 if (rpc->proto) { 2871 nxt_assert(app->proto_port == NULL); 2872 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); 2873 2874 nxt_port_inc_use(port); 2875 2876 app->proto_port = port; 2877 port->app = app; 2878 2879 nxt_router_app_rpc_create(task, rpc->temp_conf, app); 2880 2881 return; 2882 } 2883 2884 nxt_assert(port->type == NXT_PROCESS_APP); 2885 2886 port->app = app; 2887 port->main_app_port = port; 2888 2889 app->pending_processes--; 2890 app->processes++; 2891 app->idle_processes++; 2892 2893 engine = task->thread->engine; 2894 2895 nxt_queue_insert_tail(&app->ports, &port->app_link); 2896 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); 2897 2898 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", 2899 &app->name, port->pid, port->id); 2900 2901 nxt_port_hash_add(&app->port_hash, port); 2902 app->port_hash_count++; 2903 2904 port->idle_start = 0; 2905 2906 nxt_port_inc_use(port); 2907 2908 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); 2909 2910 nxt_work_queue_add(&engine->fast_work_queue, 2911 nxt_router_conf_apply, task, rpc->temp_conf, NULL); 2912 } 2913 2914 2915 static void 2916 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2917 void *data) 2918 { 2919 nxt_app_t *app; 2920 nxt_app_rpc_t *rpc; 2921 nxt_router_temp_conf_t *tmcf; 2922 2923 rpc = data; 2924 app = rpc->app; 2925 tmcf = rpc->temp_conf; 2926 2927 if (rpc->proto) { 2928 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"", 2929 &app->name); 2930 2931 } else { 2932 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", 2933 &app->name); 2934 2935 app->pending_processes--; 2936 } 2937 2938 nxt_router_conf_error(task, tmcf); 2939 } 2940 2941 2942 static nxt_int_t 2943 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 2944 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 2945 { 2946 nxt_int_t ret; 2947 nxt_uint_t n, threads; 2948 nxt_queue_link_t *qlk; 2949 nxt_router_engine_conf_t *recf; 2950 2951 threads = tmcf->router_conf->threads; 2952 2953 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 2954 sizeof(nxt_router_engine_conf_t)); 2955 if (nxt_slow_path(tmcf->engines == NULL)) { 2956 return NXT_ERROR; 2957 } 2958 2959 n = 0; 2960 2961 for (qlk = nxt_queue_first(&router->engines); 2962 qlk != nxt_queue_tail(&router->engines); 2963 qlk = nxt_queue_next(qlk)) 2964 { 2965 recf = nxt_array_zero_add(tmcf->engines); 2966 if (nxt_slow_path(recf == NULL)) { 2967 return NXT_ERROR; 2968 } 2969 2970 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); 2971 2972 if (n < threads) { 2973 recf->action = NXT_ROUTER_ENGINE_KEEP; 2974 ret = nxt_router_engine_conf_update(tmcf, recf); 2975 2976 } else { 2977 recf->action = NXT_ROUTER_ENGINE_DELETE; 2978 ret = nxt_router_engine_conf_delete(tmcf, recf); 2979 } 2980 2981 if (nxt_slow_path(ret != NXT_OK)) { 2982 return ret; 2983 } 2984 2985 n++; 2986 } 2987 2988 tmcf->new_threads = n; 2989 2990 while (n < threads) { 2991 recf = nxt_array_zero_add(tmcf->engines); 2992 if (nxt_slow_path(recf == NULL)) { 2993 return NXT_ERROR; 2994 } 2995 2996 recf->action = NXT_ROUTER_ENGINE_ADD; 2997 2998 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 2999 if (nxt_slow_path(recf->engine == NULL)) { 3000 return NXT_ERROR; 3001 } 3002 3003 ret = nxt_router_engine_conf_create(tmcf, recf); 3004 if (nxt_slow_path(ret != NXT_OK)) { 3005 return ret; 3006 } 3007 3008 n++; 3009 } 3010 3011 return NXT_OK; 3012 } 3013 3014 3015 static nxt_int_t 3016 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 3017 nxt_router_engine_conf_t *recf) 3018 { 3019 nxt_int_t ret; 3020 3021 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 3022 nxt_router_listen_socket_create); 3023 if (nxt_slow_path(ret != NXT_OK)) { 3024 return ret; 3025 } 3026 3027 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 3028 nxt_router_listen_socket_create); 3029 if (nxt_slow_path(ret != NXT_OK)) { 3030 return ret; 3031 } 3032 3033 return ret; 3034 } 3035 3036 3037 static nxt_int_t 3038 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 3039 nxt_router_engine_conf_t *recf) 3040 { 3041 nxt_int_t ret; 3042 3043 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, 3044 nxt_router_listen_socket_create); 3045 if (nxt_slow_path(ret != NXT_OK)) { 3046 return ret; 3047 } 3048 3049 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, 3050 nxt_router_listen_socket_update); 3051 if (nxt_slow_path(ret != NXT_OK)) { 3052 return ret; 3053 } 3054 3055 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3056 if (nxt_slow_path(ret != NXT_OK)) { 3057 return ret; 3058 } 3059 3060 return ret; 3061 } 3062 3063 3064 static nxt_int_t 3065 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 3066 nxt_router_engine_conf_t *recf) 3067 { 3068 nxt_int_t ret; 3069 3070 ret = nxt_router_engine_quit(tmcf, recf); 3071 if (nxt_slow_path(ret != NXT_OK)) { 3072 return ret; 3073 } 3074 3075 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); 3076 if (nxt_slow_path(ret != NXT_OK)) { 3077 return ret; 3078 } 3079 3080 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); 3081 } 3082 3083 3084 static nxt_int_t 3085 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, 3086 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, 3087 nxt_work_handler_t handler) 3088 { 3089 nxt_int_t ret; 3090 nxt_joint_job_t *job; 3091 nxt_queue_link_t *qlk; 3092 nxt_socket_conf_t *skcf; 3093 nxt_socket_conf_joint_t *joint; 3094 3095 for (qlk = nxt_queue_first(sockets); 3096 qlk != nxt_queue_tail(sockets); 3097 qlk = nxt_queue_next(qlk)) 3098 { 3099 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3100 if (nxt_slow_path(job == NULL)) { 3101 return NXT_ERROR; 3102 } 3103 3104 job->work.next = recf->jobs; 3105 recf->jobs = &job->work; 3106 3107 job->task = tmcf->engine->task; 3108 job->work.handler = handler; 3109 job->work.task = &job->task; 3110 job->work.obj = job; 3111 job->tmcf = tmcf; 3112 3113 tmcf->count++; 3114 3115 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool, 3116 sizeof(nxt_socket_conf_joint_t)); 3117 if (nxt_slow_path(joint == NULL)) { 3118 return NXT_ERROR; 3119 } 3120 3121 job->work.data = joint; 3122 3123 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); 3124 if (nxt_slow_path(ret != NXT_OK)) { 3125 return ret; 3126 } 3127 3128 joint->count = 1; 3129 3130 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 3131 skcf->count++; 3132 joint->socket_conf = skcf; 3133 3134 joint->engine = recf->engine; 3135 } 3136 3137 return NXT_OK; 3138 } 3139 3140 3141 static nxt_int_t 3142 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf, 3143 nxt_router_engine_conf_t *recf) 3144 { 3145 nxt_joint_job_t *job; 3146 3147 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); 3148 if (nxt_slow_path(job == NULL)) { 3149 return NXT_ERROR; 3150 } 3151 3152 job->work.next = recf->jobs; 3153 recf->jobs = &job->work; 3154 3155 job->task = tmcf->engine->task; 3156 job->work.handler = nxt_router_worker_thread_quit; 3157 job->work.task = &job->task; 3158 job->work.obj = NULL; 3159 job->work.data = NULL; 3160 job->tmcf = NULL; 3161 3162 return NXT_OK; 3163 } 3164 3165 3166 static nxt_int_t 3167 nxt_router_engine_joints_delete(