1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, 11 void *data); 12 static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, 13 void *data); 14 static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data); 15 static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data); 16 static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, 17 void *data); 18 static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, 19 void *data); 20 static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, 21 nxt_conn_t *source, nxt_conn_t *sink); 22 static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b); 23 static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data); 24 static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, 25 void *data); 26 static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, 27 void *data); 28 static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, 29 nxt_conn_t *sink, nxt_conn_t *source); 30 static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b); 31 static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data); 32 static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data); 33 static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, 34 void *data); 35 static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, 36 void *data); 37 static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data); 38 static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data); 39 static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, 40 void *data); 41 static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, 42 nxt_conn_t *source, nxt_conn_t *sink); 43 static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data); 44 static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data); 45 static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p); 46 static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data); 47 48 49 static const nxt_conn_state_t nxt_conn_proxy_client_wait_state; 50 static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state; 51 static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state; 52 static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state; 53 static const nxt_conn_state_t nxt_conn_proxy_client_read_state; 54 static const nxt_conn_state_t nxt_conn_proxy_peer_read_state; 55 static const nxt_conn_state_t nxt_conn_proxy_client_write_state; 56 static const nxt_conn_state_t nxt_conn_proxy_peer_write_state; 57 58 59 nxt_conn_proxy_t * 60 nxt_conn_proxy_create(nxt_conn_t *client) 61 { 62 nxt_conn_t *peer; 63 nxt_thread_t *thr; 64 nxt_conn_proxy_t *p; 65 66 p = nxt_mem_zalloc(client->mem_pool, sizeof(nxt_conn_proxy_t)); 67 if (nxt_slow_path(p == NULL)) { 68 return NULL; 69 } 70 71 peer = nxt_conn_create(client->mem_pool, client->socket.task); 72 if (nxt_slow_path(peer == NULL)) { 73 return NULL; 74 } 75 76 thr = nxt_thread(); 77 78 client->read_work_queue = &thr->engine->read_work_queue; 79 client->write_work_queue = &thr->engine->write_work_queue; 80 client->socket.read_work_queue = &thr->engine->read_work_queue; 81 client->socket.write_work_queue = &thr->engine->write_work_queue; 82 peer->socket.read_work_queue = &thr->engine->read_work_queue; 83 peer->socket.write_work_queue = &thr->engine->write_work_queue; 84 85 peer->socket.data = client->socket.data; 86 87 peer->read_work_queue = client->read_work_queue; 88 peer->write_work_queue = client->write_work_queue; 89 peer->read_timer.work_queue = client->read_work_queue; 90 peer->write_timer.work_queue = client->write_work_queue; 91 92 p->client = client; 93 p->peer = peer; 94 95 return p; 96 } 97 98 99 void 100 nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p) 101 { 102 nxt_conn_t *peer; 103 104 /* 105 * Peer read event: not connected, disabled. 106 * Peer write event: not connected, disabled. 107 */ 108 109 if (p->client_wait_timeout == 0) { 110 /* 111 * Peer write event: waiting for connection 112 * to be established with connect_timeout. 113 */ 114 peer = p->peer; 115 peer->write_state = &nxt_conn_proxy_peer_connect_state; 116 117 nxt_conn_connect(task->thread->engine, peer); 118 } 119 120 /* 121 * Client read event: waiting for client data with 122 * client_wait_timeout before buffer allocation. 123 */ 124 p->client->read_state = &nxt_conn_proxy_client_wait_state; 125 126 nxt_conn_wait(p->client); 127 } 128 129 130 static const nxt_conn_state_t nxt_conn_proxy_client_wait_state 131 nxt_aligned(64) = 132 { 133 .ready_handler = nxt_conn_proxy_client_buffer_alloc, 134 .close_handler = nxt_conn_proxy_close, 135 .error_handler = nxt_conn_proxy_error, 136 137 .timer_handler = nxt_conn_proxy_read_timeout, 138 .timer_value = nxt_conn_proxy_timeout_value, 139 .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), 140 }; 141 142 143 static void 144 nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data) 145 { 146 nxt_buf_t *b; 147 nxt_conn_t *client; 148 nxt_conn_proxy_t *p; 149 150 client = obj; 151 p = data; 152 153 nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd); 154 155 b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, 156 NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); 157 158 if (nxt_slow_path(b == NULL)) { 159 /* An error completion. */ 160 nxt_conn_proxy_complete(task, p); 161 return; 162 } 163 164 p->client_buffer = b; 165 client->read = b; 166 167 if (p->peer->socket.fd != -1) { 168 /* 169 * Client read event: waiting, no timeout. 170 * Client write event: blocked. 171 * Peer read event: disabled. 172 * Peer write event: waiting for connection to be established 173 * or blocked after the connection has established. 174 */ 175 client->read_state = &nxt_conn_proxy_client_read_state; 176 177 } else { 178 /* 179 * Client read event: waiting for data with client_wait_timeout 180 * before connecting to a peer. 181 * Client write event: blocked. 182 * Peer read event: not connected, disabled. 183 * Peer write event: not connected, disabled. 184 */ 185 client->read_state = &nxt_conn_proxy_client_first_read_state; 186 } 187 188 nxt_conn_read(task->thread->engine, client); 189 } 190 191 192 static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state 193 nxt_aligned(64) = 194 { 195 .ready_handler = nxt_conn_proxy_peer_connect, 196 .close_handler = nxt_conn_proxy_close, 197 .error_handler = nxt_conn_proxy_error, 198 199 .timer_handler = nxt_conn_proxy_read_timeout, 200 .timer_value = nxt_conn_proxy_timeout_value, 201 .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), 202 .timer_autoreset = 1, 203 }; 204 205 206 static void 207 nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) 208 { 209 nxt_conn_t *client; 210 nxt_conn_proxy_t *p; 211 212 client = obj; 213 p = data; 214 215 /* 216 * Client read event: waiting, no timeout. 217 * Client write event: blocked. 218 * Peer read event: disabled. 219 * Peer write event: waiting for connection to be established 220 * with connect_timeout. 221 */ 222 client->read_state = &nxt_conn_proxy_client_read_state; 223 224 p->peer->write_state = &nxt_conn_proxy_peer_connect_state; 225 226 nxt_conn_connect(task->thread->engine, p->peer); 227 } 228 229 230 static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state 231 nxt_aligned(64) = 232 { 233 .ready_handler = nxt_conn_proxy_connected, 234 .close_handler = nxt_conn_proxy_refused, 235 .error_handler = nxt_conn_proxy_error, 236 237 .timer_handler = nxt_conn_proxy_write_timeout, 238 .timer_value = nxt_conn_proxy_timeout_value, 239 .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout), 240 .timer_autoreset = 1, 241 }; 242 243 244 static void 245 nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) 246 { 247 nxt_conn_t *client, *peer; 248 nxt_conn_proxy_t *p; 249 250 peer = obj; 251 p = data; 252 253 nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd); 254 255 p->connected = 1; 256 257 nxt_conn_tcp_nodelay_on(task, peer); 258 nxt_conn_tcp_nodelay_on(task, p->client); 259 260 /* Peer read event: waiting with peer_wait_timeout. */ 261 262 peer->read_state = &nxt_conn_proxy_peer_wait_state; 263 peer->write_state = &nxt_conn_proxy_peer_write_state; 264 265 nxt_conn_wait(peer); 266 267 if (p->client_buffer != NULL) { 268 client = p->client; 269 270 client->read_state = &nxt_conn_proxy_client_read_state; 271 client->write_state = &nxt_conn_proxy_client_write_state; 272 /* 273 * Send a client read data to the connected peer. 274 * Client write event: blocked. 275 */ 276 nxt_conn_proxy_read_process(task, p, client, peer); 277 } 278 } 279 280 281 static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state 282 nxt_aligned(64) = 283 { 284 .ready_handler = nxt_conn_proxy_peer_read, 285 .close_handler = nxt_conn_proxy_close, 286 .error_handler = nxt_conn_proxy_error, 287 288 .timer_handler = nxt_conn_proxy_read_timeout, 289 .timer_value = nxt_conn_proxy_timeout_value, 290 .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout), 291 }; 292 293 294 static void 295 nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) 296 { 297 nxt_buf_t *b; 298 nxt_conn_t *peer; 299 nxt_conn_proxy_t *p; 300 301 peer = obj; 302 p = data; 303 304 nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd); 305 306 b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, 307 NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); 308 309 if (nxt_slow_path(b == NULL)) { 310 /* An error completion. */ 311 nxt_conn_proxy_complete(task, p); 312 return; 313 } 314 315 p->peer_buffer = b; 316 peer->read = b; 317 318 p->client->write_state = &nxt_conn_proxy_client_write_state; 319 peer->read_state = &nxt_conn_proxy_peer_read_state; 320 peer->write_state = &nxt_conn_proxy_peer_write_state; 321 322 /* 323 * Client read event: waiting, no timeout. 324 * Client write event: blocked. 325 * Peer read event: waiting with possible peer_wait_timeout. 326 * Peer write event: blocked. 327 */ 328 nxt_conn_read(task->thread->engine, peer); 329 } 330 331 332 static const nxt_conn_state_t nxt_conn_proxy_client_read_state 333 nxt_aligned(64) = 334 { 335 .ready_handler = nxt_conn_proxy_client_read_ready, 336 .close_handler = nxt_conn_proxy_close, 337 .error_handler = nxt_conn_proxy_read_error, 338 }; 339 340 341 static void 342 nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data) 343 { 344 nxt_conn_t *client; 345 nxt_conn_proxy_t *p; 346 347 client = obj; 348 p = data; 349 350 nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd); 351 352 nxt_conn_proxy_read_process(task, p, client, p->peer); 353 } 354 355 356 static const nxt_conn_state_t nxt_conn_proxy_peer_read_state 357 nxt_aligned(64) = 358 { 359 .ready_handler = nxt_conn_proxy_peer_read_ready, 360 .close_handler = nxt_conn_proxy_close, 361 .error_handler = nxt_conn_proxy_read_error, 362 }; 363 364 365 static void 366 nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data) 367 { 368 nxt_conn_t *peer; 369 nxt_conn_proxy_t *p; 370 371 peer = obj; 372 p = data; 373 374 nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd); 375 376 nxt_conn_proxy_read_process(task, p, peer, p->client); 377 } 378 379 380 static void 381 nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, 382 nxt_conn_t *source, nxt_conn_t *sink) 383 { 384 nxt_buf_t *rb, *wb; 385 386 if (sink->socket.error != 0) { 387 nxt_debug(task, "conn proxy sink fd:%d error:%d", 388 sink->socket.fd, sink->socket.error); 389 390 nxt_conn_proxy_write_error(task, sink, sink->socket.data); 391 return; 392 } 393 394 while (source->read != NULL) { 395 396 rb = source->read; 397 398 if (rb->mem.pos != rb->mem.free) { 399 400 /* Add a read part to a write chain. */ 401 402 wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0); 403 if (wb == NULL) { 404 /* An error completion. */ 405 nxt_conn_proxy_complete(task, p); 406 return; 407 } 408 409 wb->mem.pos = rb->mem.pos; 410 wb->mem.free = rb->mem.free; 411 wb->mem.start = rb->mem.pos; 412 wb->mem.end = rb->mem.free; 413 414 rb->mem.pos = rb->mem.free; 415 rb->mem.start = rb->mem.free; 416 417 nxt_conn_proxy_write_add(sink, wb); 418 } 419 420 if (rb->mem.start != rb->mem.end) { 421 nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, 422 task, source, source->socket.data); 423 break; 424 } 425 426 source->read = rb->next; 427 nxt_buf_free(source->mem_pool, rb); 428 } 429 430 if (p->connected) { 431 nxt_conn_write(task->thread->engine, sink); 432 } 433 } 434 435 436 static void 437 nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b) 438 { 439 nxt_buf_t *first, *second, *prev; 440 441 first = c->write; 442 443 if (first == NULL) { 444 c->write = b; 445 return; 446 } 447 448 /* 449 * A event conn proxy maintains a buffer per each direction. 450 * The buffer is divided by read and write parts. These parts are 451 * linked in buffer chains. There can be no more than two buffers 452 * in write chain at any time, because an added buffer is coalesced 453 * with the last buffer if possible. 454 */ 455 456 second = first->next; 457 458 if (second == NULL) { 459 460 if (first->mem.end != b->mem.start) { 461 first->next = b; 462 return; 463 } 464 465 /* 466 * The first buffer is just before the added buffer, so 467 * expand the first buffer to the end of the added buffer. 468 */ 469 prev = first; 470 471 } else { 472 if (second->mem.end != b->mem.start) { 473 nxt_thread_log_alert("event conn proxy write: second buffer end:%p " 474 "is not equal to added buffer start:%p", 475 second->mem.end, b->mem.start); 476 return; 477 } 478 479 /* 480 * "second->mem.end == b->mem.start" must be always true here, 481 * that is the second buffer is just before the added buffer, 482 * so expand the second buffer to the end of added buffer. 483 */ 484 prev = second; 485 } 486 487 prev->mem.free = b->mem.end; 488 prev->mem.end = b->mem.end; 489 490 nxt_buf_free(c->mem_pool, b); 491 } 492 493 494 static void 495 nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data) 496 { 497 nxt_conn_t *source, *sink; 498 nxt_conn_proxy_t *p; 499 500 source = obj; 501 p = data; 502 503 nxt_debug(task, "conn proxy read fd:%d", source->socket.fd); 504 505 if (!source->socket.closed) { 506 sink = (source == p->client) ? p->peer : p->client; 507 508 if (sink->socket.error == 0) { 509 nxt_conn_read(task->thread->engine, source); 510 } 511 } 512 } 513 514 515 static const nxt_conn_state_t nxt_conn_proxy_client_write_state 516 nxt_aligned(64) = 517 { 518 .ready_handler = nxt_conn_proxy_client_write_ready, 519 .error_handler = nxt_conn_proxy_write_error, 520 521 .timer_handler = nxt_conn_proxy_write_timeout, 522 .timer_value = nxt_conn_proxy_timeout_value, 523 .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout), 524 .timer_autoreset = 1, 525 }; 526 527 528 static void 529 nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data) 530 { 531 nxt_conn_t *client; 532 nxt_conn_proxy_t *p; 533 534 client = obj; 535 p = data; 536 537 nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd); 538 539 nxt_conn_proxy_write_process(task, p, client, p->peer); 540 } 541 542 543 static const nxt_conn_state_t nxt_conn_proxy_peer_write_state 544 nxt_aligned(64) = 545 { 546 .ready_handler = nxt_conn_proxy_peer_write_ready, 547 .error_handler = nxt_conn_proxy_write_error, 548 549 .timer_handler = nxt_conn_proxy_write_timeout, 550 .timer_value = nxt_conn_proxy_timeout_value, 551 .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout), 552 .timer_autoreset = 1, 553 }; 554 555 556 static void 557 nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data) 558 { 559 nxt_conn_t *peer; 560 nxt_conn_proxy_t *p; 561 562 peer = obj; 563 p = data; 564 565 nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd); 566 567 nxt_conn_proxy_write_process(task, p, peer, p->client); 568 } 569 570 571 static void 572 nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, 573 nxt_conn_t *sink, nxt_conn_t *source) 574 { 575 nxt_buf_t *rb, *wb; 576 577 while (sink->write != NULL) { 578 579 wb = sink->write; 580 581 if (nxt_buf_is_sync(wb)) { 582 583 /* A sync buffer marks the end of stream. */ 584 585 sink->write = NULL; 586 nxt_buf_free(sink->mem_pool, wb); 587 nxt_conn_proxy_shutdown(task, p, source, sink); 588 return; 589 } 590 591 if (wb->mem.start != wb->mem.pos) { 592 593 /* Add a written part to a read chain. */ 594 595 rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0); 596 if (rb == NULL) { 597 /* An error completion. */ 598 nxt_conn_proxy_complete(task, p); 599 return; 600 } 601 602 rb->mem.pos = wb->mem.start; 603 rb->mem.free = wb->mem.start; 604 rb->mem.start = wb->mem.start; 605 rb->mem.end = wb->mem.pos; 606 607 wb->mem.start = wb->mem.pos; 608 609 nxt_conn_proxy_read_add(source, rb); 610 } 611 612 if (wb->mem.pos != wb->mem.free) { 613 nxt_conn_write(task->thread->engine, sink); 614 615 break; 616 } 617 618 sink->write = wb->next; 619 nxt_buf_free(sink->mem_pool, wb); 620 } 621 622 nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, 623 task, source, source->socket.data); 624 } 625 626 627 static void 628 nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b) 629 { 630 nxt_buf_t *first, *second; 631 632 first = c->read; 633 634 if (first == NULL) { 635 c->read = b; 636 return; 637 } 638 639 /* 640 * A event conn proxy maintains a buffer per each direction. 641 * The buffer is divided by read and write parts. These parts are 642 * linked in buffer chains. There can be no more than two buffers 643 * in read chain at any time, because an added buffer is coalesced 644 * with the last buffer if possible. The first and the second 645 * buffers are also coalesced if possible. 646 */ 647 648 second = first->next; 649 650 if (second == NULL) { 651 652 if (first->mem.start == b->mem.end) { 653 /* 654 * The added buffer is just before the first buffer, so expand 655 * the first buffer to the beginning of the added buffer. 656 */ 657 first->mem.pos = b->mem.start; 658 first->mem.free = b->mem.start; 659 first->mem.start = b->mem.start; 660 661 } else if (first->mem.end == b->mem.start) { 662 /* 663 * The added buffer is just after the first buffer, so 664 * expand the first buffer to the end of the added buffer. 665 */ 666 first->mem.end = b->mem.end; 667 668 } else { 669 first->next = b; 670 return; 671 } 672 673 } else { 674 if (second->mem.end != b->mem.start) { 675 nxt_thread_log_alert("event conn proxy read: second buffer end:%p " 676 "is not equal to added buffer start:%p", 677 second->mem.end, b->mem.start); 678 return; 679 } 680 681 /* 682 * The added buffer is just after the second buffer, so 683 * expand the second buffer to the end of the added buffer. 684 */ 685 second->mem.end = b->mem.end; 686 687 if (first->mem.start == second->mem.end) { 688 /* 689 * The second buffer is just before the first buffer, so expand 690 * the first buffer to the beginning of the second buffer. 691 */ 692 first->mem.pos = second->mem.start; 693 first->mem.free = second->mem.start; 694 first->mem.start = second->mem.start; 695 first->next = NULL; 696 697 nxt_buf_free(c->mem_pool, second); 698 } 699 } 700 701 nxt_buf_free(c->mem_pool, b); 702 } 703 704 705 static void 706 nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data) 707 { 708 nxt_buf_t *b; 709 nxt_conn_t *source, *sink; 710 nxt_conn_proxy_t *p; 711 712 source = obj; 713 p = data; 714 715 nxt_debug(task, "conn proxy close fd:%d", source->socket.fd); 716 717 sink = (source == p->client) ? p->peer : p->client; 718 719 if (sink->write == NULL) { 720 nxt_conn_proxy_shutdown(task, p, source, sink); 721 return; 722 } 723 724 b = nxt_buf_sync_alloc(source->mem_pool, 0); 725 if (b == NULL) { 726 /* An error completion. */ 727 nxt_conn_proxy_complete(task, p); 728 return; 729 } 730 731 nxt_buf_chain_add(&sink->write, b); 732 } 733 734 735 static void 736 nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data) 737 { 738 nxt_conn_t *c; 739 nxt_conn_proxy_t *p; 740 741 c = obj; 742 p = data; 743 744 nxt_debug(task, "conn proxy error fd:%d", c->socket.fd); 745 746 nxt_conn_proxy_close(task, c, p); 747 } 748 749 750 static void 751 nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data) 752 { 753 nxt_conn_t *c; 754 nxt_timer_t *timer; 755 756 timer = obj; 757 758 c = nxt_read_timer_conn(timer); 759 c->socket.timedout = 1; 760 c->socket.closed = 1; 761 762 nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd); 763 764 nxt_conn_proxy_close(task, c, c->socket.data); 765 } 766 767 768 static void 769 nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data) 770 { 771 nxt_conn_t *c; 772 nxt_timer_t *timer; 773 774 timer = obj; 775 776 c = nxt_write_timer_conn(timer); 777 c->socket.timedout = 1; 778 c->socket.closed = 1; 779 780 nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd); 781 782 nxt_conn_proxy_close(task, c, c->socket.data); 783 } 784 785 786 static nxt_msec_t 787 nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data) 788 { 789 nxt_msec_t *timer; 790 nxt_conn_proxy_t *p; 791 792 p = c->socket.data; 793 794 timer = (nxt_msec_t *) ((char *) p + data); 795 796 return *timer; 797 } 798 799 800 static void 801 nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) 802 { 803 nxt_conn_t *peer; 804 nxt_conn_proxy_t *p; 805 806 peer = obj; 807 p = data; 808 809 nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd); 810 811 if (p->retries == 0) { 812 /* An error completion. */ 813 nxt_conn_proxy_complete(task, p); 814 return; 815 } 816 817 p->retries--; 818 819 nxt_socket_close(task, peer->socket.fd); 820 peer->socket.fd = -1; 821 peer->socket.error = 0; 822 823 p->delayed = 1; 824 825 peer->write_timer.handler = nxt_conn_proxy_reconnect_handler; 826 nxt_timer_add(task->thread->engine, &peer->write_timer, 827 p->reconnect_timeout); 828 } 829 830 831 static void 832 nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) 833 { 834 nxt_conn_t *peer; 835 nxt_timer_t *timer; 836 nxt_conn_proxy_t *p; 837 838 timer = obj; 839 840 nxt_debug(task, "conn proxy reconnect timer"); 841 842 peer = nxt_write_timer_conn(timer); 843 p = peer->socket.data; 844 845 if (p->client->socket.closed) { 846 nxt_conn_proxy_complete(task, p); 847 return; 848 } 849 850 p->delayed = 0; 851 852 peer->write_state = &nxt_conn_proxy_peer_connect_state; 853 /* 854 * Peer read event: disabled. 855 * Peer write event: waiting for connection with connect_timeout. 856 */ 857 nxt_conn_connect(task->thread->engine, peer); 858 } 859 860 861 static void 862 nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, 863 nxt_conn_t *source, nxt_conn_t *sink) 864 { 865 nxt_buf_t *b; 866 867 nxt_debug(source->socket.task, 868 "conn proxy shutdown source fd:%d cl:%d err:%d", 869 source->socket.fd, source->socket.closed, source->socket.error); 870 871 nxt_debug(sink->socket.task, 872 "conn proxy shutdown sink fd:%d cl:%d err:%d", 873 sink->socket.fd, sink->socket.closed, sink->socket.error); 874 875 if (!p->connected || p->delayed) { 876 nxt_conn_proxy_complete(task, p); 877 return; 878 } 879 880 if (sink->socket.error == 0 && !sink->socket.closed) { 881 sink->socket.shutdown = 1; 882 nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR); 883 } 884 885 if (sink->socket.error != 0 886 || (sink->socket.closed && source->write == NULL)) 887 { 888 /* The opposite direction also has been already closed. */ 889 nxt_conn_proxy_complete(task, p); 890 return; 891 } 892 893 nxt_debug(source->socket.task, "free source buffer"); 894 895 /* Free the direction's buffer. */ 896 b = (source == p->client) ? p->client_buffer : p->peer_buffer; 897 nxt_mem_free(source->mem_pool, b); 898 } 899 900 901 static void 902 nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data) 903 { 904 nxt_conn_t *c; 905 nxt_conn_proxy_t *p; 906 907 c = obj; 908 p = data; 909 910 nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd); 911 912 nxt_conn_proxy_close(task, c, p); 913 } 914 915 916 static void 917 nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) 918 { 919 nxt_conn_t *source, *sink; 920 nxt_conn_proxy_t *p; 921 922 sink = obj; 923 p = data; 924 925 nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd); 926 927 /* Clear data for the direction sink. */ 928 sink->write = NULL; 929 930 /* Block the direction source. */ 931 source = (sink == p->client) ? p->peer : p->client; 932 nxt_fd_event_block_read(task->thread->engine, &source->socket); 933 934 if (source->write == NULL) { 935 /* 936 * There is no data for the opposite direction and 937 * the next read from the sink will most probably fail. 938 */ 939 nxt_conn_proxy_complete(task, p); 940 } 941 } 942 943 944 static const nxt_conn_state_t nxt_conn_proxy_close_state 945 nxt_aligned(64) = 946 { 947 .ready_handler = nxt_conn_proxy_completion, 948 }; 949 950 951 static void 952 nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p) 953 { 954 nxt_event_engine_t *engine; 955 956 engine = task->thread->engine; 957 958 nxt_debug(p->client->socket.task, "conn proxy complete %d:%d", 959 p->client->socket.fd, p->peer->socket.fd); 960 961 if (p->delayed) { 962 p->delayed = 0; 963 nxt_queue_remove(&p->peer->link); 964 } 965 966 if (p->client->socket.fd != -1) { 967 p->retain = 1; 968 p->client->write_state = &nxt_conn_proxy_close_state; 969 nxt_conn_close(engine, p->client); 970 } 971 972 if (p->peer->socket.fd != -1) { 973 p->retain++; 974 p->peer->write_state = &nxt_conn_proxy_close_state; 975 nxt_conn_close(engine, p->peer); 976 } 977 } 978 979 980 static void 981 nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data) 982 { 983 nxt_conn_proxy_t *p; 984 985 p = data; 986 987 nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d", 988 p->retain, p->client->socket.fd, p->peer->socket.fd); 989 990 p->retain--; 991 992 if (p->retain == 0) { 993 nxt_mem_free(p->client->mem_pool, p->client_buffer); 994 nxt_mem_free(p->client->mem_pool, p->peer_buffer); 995 996 p->completion_handler(task, p, NULL); 997 } 998 } 999