1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, 11 void *data); 12 static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, 13 void *data); 14 static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data); 15 static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data); 16 static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, 17 void *data); 18 static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, 19 void *data); 20 static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, 21 nxt_conn_t *source, nxt_conn_t *sink); 22 static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b); 23 static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data); 24 static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, 25 void *data); 26 static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, 27 void *data); 28 static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, 29 nxt_conn_t *sink, nxt_conn_t *source); 30 static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b); 31 static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data); 32 static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data); 33 static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, 34 void *data); 35 static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, 36 void *data); 37 static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data); 38 static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data); 39 static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, 40 void *data); 41 static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, 42 nxt_conn_t *source, nxt_conn_t *sink); 43 static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data); 44 static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data); 45 static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p); 46 static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data); 47 48 49 static const nxt_conn_state_t nxt_conn_proxy_client_wait_state; 50 static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state; 51 static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state; 52 static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state; 53 static const nxt_conn_state_t nxt_conn_proxy_client_read_state; 54 static const nxt_conn_state_t nxt_conn_proxy_peer_read_state; 55 static const nxt_conn_state_t nxt_conn_proxy_client_write_state; 56 static const nxt_conn_state_t nxt_conn_proxy_peer_write_state; 57 58 59 nxt_conn_proxy_t * 60 nxt_conn_proxy_create(nxt_conn_t *client) 61 { 62 nxt_conn_t *peer; 63 nxt_thread_t *thr; 64 nxt_conn_proxy_t *p; 65 66 p = nxt_mp_zget(client->mem_pool, sizeof(nxt_conn_proxy_t)); 67 if (nxt_slow_path(p == NULL)) { 68 return NULL; 69 } 70 71 peer = nxt_conn_create(client->mem_pool, client->socket.task); 72 if (nxt_slow_path(peer == NULL)) { 73 return NULL; 74 } 75 76 thr = nxt_thread(); 77 78 client->read_work_queue = &thr->engine->read_work_queue; 79 client->write_work_queue = &thr->engine->write_work_queue; 80 client->socket.read_work_queue = &thr->engine->read_work_queue; 81 client->socket.write_work_queue = &thr->engine->write_work_queue; 82 peer->socket.read_work_queue = &thr->engine->read_work_queue; 83 peer->socket.write_work_queue = &thr->engine->write_work_queue; 84 85 peer->socket.data = client->socket.data; 86 87 peer->read_work_queue = client->read_work_queue; 88 peer->write_work_queue = client->write_work_queue; 89 peer->read_timer.work_queue = client->read_work_queue; 90 peer->write_timer.work_queue = client->write_work_queue; 91 92 p->client = client; 93 p->peer = peer; 94 95 return p; 96 } 97 98 99 void 100 nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p) 101 { 102 nxt_conn_t *peer; 103 104 /* 105 * Peer read event: not connected, disabled. 106 * Peer write event: not connected, disabled. 107 */ 108 109 if (p->client_wait_timeout == 0) { 110 /* 111 * Peer write event: waiting for connection 112 * to be established with connect_timeout. 113 */ 114 peer = p->peer; 115 peer->write_state = &nxt_conn_proxy_peer_connect_state; 116 117 nxt_conn_connect(task->thread->engine, peer); 118 } 119 120 /* 121 * Client read event: waiting for client data with 122 * client_wait_timeout before buffer allocation. 123 */ 124 p->client->read_state = &nxt_conn_proxy_client_wait_state; 125 126 nxt_conn_wait(p->client); 127 } 128 129 130 static const nxt_conn_state_t nxt_conn_proxy_client_wait_state 131 nxt_aligned(64) = 132 { 133 .ready_handler = nxt_conn_proxy_client_buffer_alloc, 134 .close_handler = nxt_conn_proxy_close, 135 .error_handler = nxt_conn_proxy_error, 136 137 .timer_handler = nxt_conn_proxy_read_timeout, 138 .timer_value = nxt_conn_proxy_timeout_value, 139 .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), 140 }; 141 142 143 static void 144 nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data) 145 { 146 nxt_buf_t *b; 147 nxt_conn_t *client; 148 nxt_conn_proxy_t *p; 149 150 client = obj; 151 p = data; 152 153 nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd); 154 155 b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, 0); 156 if (nxt_slow_path(b == NULL)) { 157 /* An error completion. */ 158 nxt_conn_proxy_complete(task, p); 159 return; 160 } 161 162 p->client_buffer = b; 163 client->read = b; 164 165 if (p->peer->socket.fd != -1) { 166 /* 167 * Client read event: waiting, no timeout. 168 * Client write event: blocked. 169 * Peer read event: disabled. 170 * Peer write event: waiting for connection to be established 171 * or blocked after the connection has established. 172 */ 173 client->read_state = &nxt_conn_proxy_client_read_state; 174 175 } else { 176 /* 177 * Client read event: waiting for data with client_wait_timeout 178 * before connecting to a peer. 179 * Client write event: blocked. 180 * Peer read event: not connected, disabled. 181 * Peer write event: not connected, disabled. 182 */ 183 client->read_state = &nxt_conn_proxy_client_first_read_state; 184 } 185 186 nxt_conn_read(task->thread->engine, client); 187 } 188 189 190 static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state 191 nxt_aligned(64) = 192 { 193 .ready_handler = nxt_conn_proxy_peer_connect, 194 .close_handler = nxt_conn_proxy_close, 195 .error_handler = nxt_conn_proxy_error, 196 197 .timer_handler = nxt_conn_proxy_read_timeout, 198 .timer_value = nxt_conn_proxy_timeout_value, 199 .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), 200 .timer_autoreset = 1, 201 }; 202 203 204 static void 205 nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) 206 { 207 nxt_conn_t *client; 208 nxt_conn_proxy_t *p; 209 210 client = obj; 211 p = data; 212 213 /* 214 * Client read event: waiting, no timeout. 215 * Client write event: blocked. 216 * Peer read event: disabled. 217 * Peer write event: waiting for connection to be established 218 * with connect_timeout. 219 */ 220 client->read_state = &nxt_conn_proxy_client_read_state; 221 222 p->peer->write_state = &nxt_conn_proxy_peer_connect_state; 223 224 nxt_conn_connect(task->thread->engine, p->peer); 225 } 226 227 228 static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state 229 nxt_aligned(64) = 230 { 231 .ready_handler = nxt_conn_proxy_connected, 232 .close_handler = nxt_conn_proxy_refused, 233 .error_handler = nxt_conn_proxy_error, 234 235 .timer_handler = nxt_conn_proxy_write_timeout, 236 .timer_value = nxt_conn_proxy_timeout_value, 237 .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout), 238 .timer_autoreset = 1, 239 }; 240 241 242 static void 243 nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) 244 { 245 nxt_conn_t *client, *peer; 246 nxt_conn_proxy_t *p; 247 248 peer = obj; 249 p = data; 250 251 nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd); 252 253 p->connected = 1; 254 255 nxt_conn_tcp_nodelay_on(task, peer); 256 nxt_conn_tcp_nodelay_on(task, p->client); 257 258 /* Peer read event: waiting with peer_wait_timeout. */ 259 260 peer->read_state = &nxt_conn_proxy_peer_wait_state; 261 peer->write_state = &nxt_conn_proxy_peer_write_state; 262 263 nxt_conn_wait(peer); 264 265 if (p->client_buffer != NULL) { 266 client = p->client; 267 268 client->read_state = &nxt_conn_proxy_client_read_state; 269 client->write_state = &nxt_conn_proxy_client_write_state; 270 /* 271 * Send a client read data to the connected peer. 272 * Client write event: blocked. 273 */ 274 nxt_conn_proxy_read_process(task, p, client, peer); 275 } 276 } 277 278 279 static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state 280 nxt_aligned(64) = 281 { 282 .ready_handler = nxt_conn_proxy_peer_read, 283 .close_handler = nxt_conn_proxy_close, 284 .error_handler = nxt_conn_proxy_error, 285 286 .timer_handler = nxt_conn_proxy_read_timeout, 287 .timer_value = nxt_conn_proxy_timeout_value, 288 .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout), 289 }; 290 291 292 static void 293 nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) 294 { 295 nxt_buf_t *b; 296 nxt_conn_t *peer; 297 nxt_conn_proxy_t *p; 298 299 peer = obj; 300 p = data; 301 302 nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd); 303 304 b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, 0); 305 if (nxt_slow_path(b == NULL)) { 306 /* An error completion. */ 307 nxt_conn_proxy_complete(task, p); 308 return; 309 } 310 311 p->peer_buffer = b; 312 peer->read = b; 313 314 p->client->write_state = &nxt_conn_proxy_client_write_state; 315 peer->read_state = &nxt_conn_proxy_peer_read_state; 316 peer->write_state = &nxt_conn_proxy_peer_write_state; 317 318 /* 319 * Client read event: waiting, no timeout. 320 * Client write event: blocked. 321 * Peer read event: waiting with possible peer_wait_timeout. 322 * Peer write event: blocked. 323 */ 324 nxt_conn_read(task->thread->engine, peer); 325 } 326 327 328 static const nxt_conn_state_t nxt_conn_proxy_client_read_state 329 nxt_aligned(64) = 330 { 331 .ready_handler = nxt_conn_proxy_client_read_ready, 332 .close_handler = nxt_conn_proxy_close, 333 .error_handler = nxt_conn_proxy_read_error, 334 }; 335 336 337 static void 338 nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data) 339 { 340 nxt_conn_t *client; 341 nxt_conn_proxy_t *p; 342 343 client = obj; 344 p = data; 345 346 nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd); 347 348 nxt_conn_proxy_read_process(task, p, client, p->peer); 349 } 350 351 352 static const nxt_conn_state_t nxt_conn_proxy_peer_read_state 353 nxt_aligned(64) = 354 { 355 .ready_handler = nxt_conn_proxy_peer_read_ready, 356 .close_handler = nxt_conn_proxy_close, 357 .error_handler = nxt_conn_proxy_read_error, 358 }; 359 360 361 static void 362 nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data) 363 { 364 nxt_conn_t *peer; 365 nxt_conn_proxy_t *p; 366 367 peer = obj; 368 p = data; 369 370 nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd); 371 372 nxt_conn_proxy_read_process(task, p, peer, p->client); 373 } 374 375 376 static void 377 nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, 378 nxt_conn_t *source, nxt_conn_t *sink) 379 { 380 nxt_buf_t *rb, *wb; 381 382 if (sink->socket.error != 0) { 383 nxt_debug(task, "conn proxy sink fd:%d error:%d", 384 sink->socket.fd, sink->socket.error); 385 386 nxt_conn_proxy_write_error(task, sink, sink->socket.data); 387 return; 388 } 389 390 while (source->read != NULL) { 391 392 rb = source->read; 393 394 if (rb->mem.pos != rb->mem.free) { 395 396 /* Add a read part to a write chain. */ 397 398 wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0); 399 if (wb == NULL) { 400 /* An error completion. */ 401 nxt_conn_proxy_complete(task, p); 402 return; 403 } 404 405 wb->mem.pos = rb->mem.pos; 406 wb->mem.free = rb->mem.free; 407 wb->mem.start = rb->mem.pos; 408 wb->mem.end = rb->mem.free; 409 410 rb->mem.pos = rb->mem.free; 411 rb->mem.start = rb->mem.free; 412 413 nxt_conn_proxy_write_add(sink, wb); 414 } 415 416 if (rb->mem.start != rb->mem.end) { 417 nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, 418 task, source, source->socket.data); 419 break; 420 } 421 422 source->read = rb->next; 423 nxt_buf_free(source->mem_pool, rb); 424 } 425 426 if (p->connected) { 427 nxt_conn_write(task->thread->engine, sink); 428 } 429 } 430 431 432 static void 433 nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b) 434 { 435 nxt_buf_t *first, *second, *prev; 436 437 first = c->write; 438 439 if (first == NULL) { 440 c->write = b; 441 return; 442 } 443 444 /* 445 * A event conn proxy maintains a buffer per each direction. 446 * The buffer is divided by read and write parts. These parts are 447 * linked in buffer chains. There can be no more than two buffers 448 * in write chain at any time, because an added buffer is coalesced 449 * with the last buffer if possible. 450 */ 451 452 second = first->next; 453 454 if (second == NULL) { 455 456 if (first->mem.end != b->mem.start) { 457 first->next = b; 458 return; 459 } 460 461 /* 462 * The first buffer is just before the added buffer, so 463 * expand the first buffer to the end of the added buffer. 464 */ 465 prev = first; 466 467 } else { 468 if (second->mem.end != b->mem.start) { 469 nxt_thread_log_alert("event conn proxy write: second buffer end:%p " 470 "is not equal to added buffer start:%p", 471 second->mem.end, b->mem.start); 472 return; 473 } 474 475 /* 476 * "second->mem.end == b->mem.start" must be always true here, 477 * that is the second buffer is just before the added buffer, 478 * so expand the second buffer to the end of added buffer. 479 */ 480 prev = second; 481 } 482 483 prev->mem.free = b->mem.end; 484 prev->mem.end = b->mem.end; 485 486 nxt_buf_free(c->mem_pool, b); 487 } 488 489 490 static void 491 nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data) 492 { 493 nxt_conn_t *source, *sink; 494 nxt_conn_proxy_t *p; 495 496 source = obj; 497 p = data; 498 499 nxt_debug(task, "conn proxy read fd:%d", source->socket.fd); 500 501 if (!source->socket.closed) { 502 sink = (source == p->client) ? p->peer : p->client; 503 504 if (sink->socket.error == 0) { 505 nxt_conn_read(task->thread->engine, source); 506 } 507 } 508 } 509 510 511 static const nxt_conn_state_t nxt_conn_proxy_client_write_state 512 nxt_aligned(64) = 513 { 514 .ready_handler = nxt_conn_proxy_client_write_ready, 515 .error_handler = nxt_conn_proxy_write_error, 516 517 .timer_handler = nxt_conn_proxy_write_timeout, 518 .timer_value = nxt_conn_proxy_timeout_value, 519 .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout), 520 .timer_autoreset = 1, 521 }; 522 523 524 static void 525 nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data) 526 { 527 nxt_conn_t *client; 528 nxt_conn_proxy_t *p; 529 530 client = obj; 531 p = data; 532 533 nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd); 534 535 nxt_conn_proxy_write_process(task, p, client, p->peer); 536 } 537 538 539 static const nxt_conn_state_t nxt_conn_proxy_peer_write_state 540 nxt_aligned(64) = 541 { 542 .ready_handler = nxt_conn_proxy_peer_write_ready, 543 .error_handler = nxt_conn_proxy_write_error, 544 545 .timer_handler = nxt_conn_proxy_write_timeout, 546 .timer_value = nxt_conn_proxy_timeout_value, 547 .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout), 548 .timer_autoreset = 1, 549 }; 550 551 552 static void 553 nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data) 554 { 555 nxt_conn_t *peer; 556 nxt_conn_proxy_t *p; 557 558 peer = obj; 559 p = data; 560 561 nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd); 562 563 nxt_conn_proxy_write_process(task, p, peer, p->client); 564 } 565 566 567 static void 568 nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, 569 nxt_conn_t *sink, nxt_conn_t *source) 570 { 571 nxt_buf_t *rb, *wb; 572 573 while (sink->write != NULL) { 574 575 wb = sink->write; 576 577 if (nxt_buf_is_sync(wb)) { 578 579 /* A sync buffer marks the end of stream. */ 580 581 sink->write = NULL; 582 nxt_buf_free(sink->mem_pool, wb); 583 nxt_conn_proxy_shutdown(task, p, source, sink); 584 return; 585 } 586 587 if (wb->mem.start != wb->mem.pos) { 588 589 /* Add a written part to a read chain. */ 590 591 rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0); 592 if (rb == NULL) { 593 /* An error completion. */ 594 nxt_conn_proxy_complete(task, p); 595 return; 596 } 597 598 rb->mem.pos = wb->mem.start; 599 rb->mem.free = wb->mem.start; 600 rb->mem.start = wb->mem.start; 601 rb->mem.end = wb->mem.pos; 602 603 wb->mem.start = wb->mem.pos; 604 605 nxt_conn_proxy_read_add(source, rb); 606 } 607 608 if (wb->mem.pos != wb->mem.free) { 609 nxt_conn_write(task->thread->engine, sink); 610 611 break; 612 } 613 614 sink->write = wb->next; 615 nxt_buf_free(sink->mem_pool, wb); 616 } 617 618 nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, 619 task, source, source->socket.data); 620 } 621 622 623 static void 624 nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b) 625 { 626 nxt_buf_t *first, *second; 627 628 first = c->read; 629 630 if (first == NULL) { 631 c->read = b; 632 return; 633 } 634 635 /* 636 * A event conn proxy maintains a buffer per each direction. 637 * The buffer is divided by read and write parts. These parts are 638 * linked in buffer chains. There can be no more than two buffers 639 * in read chain at any time, because an added buffer is coalesced 640 * with the last buffer if possible. The first and the second 641 * buffers are also coalesced if possible. 642 */ 643 644 second = first->next; 645 646 if (second == NULL) { 647 648 if (first->mem.start == b->mem.end) { 649 /* 650 * The added buffer is just before the first buffer, so expand 651 * the first buffer to the beginning of the added buffer. 652 */ 653 first->mem.pos = b->mem.start; 654 first->mem.free = b->mem.start; 655 first->mem.start = b->mem.start; 656 657 } else if (first->mem.end == b->mem.start) { 658 /* 659 * The added buffer is just after the first buffer, so 660 * expand the first buffer to the end of the added buffer. 661 */ 662 first->mem.end = b->mem.end; 663 664 } else { 665 first->next = b; 666 return; 667 } 668 669 } else { 670 if (second->mem.end != b->mem.start) { 671 nxt_thread_log_alert("event conn proxy read: second buffer end:%p " 672 "is not equal to added buffer start:%p", 673 second->mem.end, b->mem.start); 674 return; 675 } 676 677 /* 678 * The added buffer is just after the second buffer, so 679 * expand the second buffer to the end of the added buffer. 680 */ 681 second->mem.end = b->mem.end; 682 683 if (first->mem.start == second->mem.end) { 684 /* 685 * The second buffer is just before the first buffer, so expand 686 * the first buffer to the beginning of the second buffer. 687 */ 688 first->mem.pos = second->mem.start; 689 first->mem.free = second->mem.start; 690 first->mem.start = second->mem.start; 691 first->next = NULL; 692 693 nxt_buf_free(c->mem_pool, second); 694 } 695 } 696 697 nxt_buf_free(c->mem_pool, b); 698 } 699 700 701 static void 702 nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data) 703 { 704 nxt_buf_t *b; 705 nxt_conn_t *source, *sink; 706 nxt_conn_proxy_t *p; 707 708 source = obj; 709 p = data; 710 711 nxt_debug(task, "conn proxy close fd:%d", source->socket.fd); 712 713 sink = (source == p->client) ? p->peer : p->client; 714 715 if (sink->write == NULL) { 716 nxt_conn_proxy_shutdown(task, p, source, sink); 717 return; 718 } 719 720 b = nxt_buf_sync_alloc(source->mem_pool, 0); 721 if (b == NULL) { 722 /* An error completion. */ 723 nxt_conn_proxy_complete(task, p); 724 return; 725 } 726 727 nxt_buf_chain_add(&sink->write, b); 728 } 729 730 731 static void 732 nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data) 733 { 734 nxt_conn_t *c; 735 nxt_conn_proxy_t *p; 736 737 c = obj; 738 p = data; 739 740 nxt_debug(task, "conn proxy error fd:%d", c->socket.fd); 741 742 nxt_conn_proxy_close(task, c, p); 743 } 744 745 746 static void 747 nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data) 748 { 749 nxt_conn_t *c; 750 nxt_timer_t *timer; 751 752 timer = obj; 753 754 c = nxt_read_timer_conn(timer); 755 c->socket.timedout = 1; 756 c->socket.closed = 1; 757 758 nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd); 759 760 nxt_conn_proxy_close(task, c, c->socket.data); 761 } 762 763 764 static void 765 nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data) 766 { 767 nxt_conn_t *c; 768 nxt_timer_t *timer; 769 770 timer = obj; 771 772 c = nxt_write_timer_conn(timer); 773 c->socket.timedout = 1; 774 c->socket.closed = 1; 775 776 nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd); 777 778 nxt_conn_proxy_close(task, c, c->socket.data); 779 } 780 781 782 static nxt_msec_t 783 nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data) 784 { 785 nxt_msec_t *timer; 786 nxt_conn_proxy_t *p; 787 788 p = c->socket.data; 789 790 timer = (nxt_msec_t *) ((char *) p + data); 791 792 return *timer; 793 } 794 795 796 static void 797 nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) 798 { 799 nxt_conn_t *peer; 800 nxt_conn_proxy_t *p; 801 802 peer = obj; 803 p = data; 804 805 nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd); 806 807 if (p->retries == 0) { 808 /* An error completion. */ 809 nxt_conn_proxy_complete(task, p); 810 return; 811 } 812 813 p->retries--; 814 815 nxt_socket_close(task, peer->socket.fd); 816 peer->socket.fd = -1; 817 peer->socket.error = 0; 818 819 p->delayed = 1; 820 821 peer->write_timer.handler = nxt_conn_proxy_reconnect_handler; 822 nxt_timer_add(task->thread->engine, &peer->write_timer, 823 p->reconnect_timeout); 824 } 825 826 827 static void 828 nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) 829 { 830 nxt_conn_t *peer; 831 nxt_timer_t *timer; 832 nxt_conn_proxy_t *p; 833 834 timer = obj; 835 836 nxt_debug(task, "conn proxy reconnect timer"); 837 838 peer = nxt_write_timer_conn(timer); 839 p = peer->socket.data; 840 841 if (p->client->socket.closed) { 842 nxt_conn_proxy_complete(task, p); 843 return; 844 } 845 846 p->delayed = 0; 847 848 peer->write_state = &nxt_conn_proxy_peer_connect_state; 849 /* 850 * Peer read event: disabled. 851 * Peer write event: waiting for connection with connect_timeout. 852 */ 853 nxt_conn_connect(task->thread->engine, peer); 854 } 855 856 857 static void 858 nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, 859 nxt_conn_t *source, nxt_conn_t *sink) 860 { 861 nxt_buf_t *b; 862 863 nxt_debug(source->socket.task, 864 "conn proxy shutdown source fd:%d cl:%d err:%d", 865 source->socket.fd, source->socket.closed, source->socket.error); 866 867 nxt_debug(sink->socket.task, 868 "conn proxy shutdown sink fd:%d cl:%d err:%d", 869 sink->socket.fd, sink->socket.closed, sink->socket.error); 870 871 if (!p->connected || p->delayed) { 872 nxt_conn_proxy_complete(task, p); 873 return; 874 } 875 876 if (sink->socket.error == 0 && !sink->socket.closed) { 877 sink->socket.shutdown = 1; 878 nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR); 879 } 880 881 if (sink->socket.error != 0 882 || (sink->socket.closed && source->write == NULL)) 883 { 884 /* The opposite direction also has been already closed. */ 885 nxt_conn_proxy_complete(task, p); 886 return; 887 } 888 889 nxt_debug(source->socket.task, "free source buffer"); 890 891 /* Free the direction's buffer. */ 892 b = (source == p->client) ? p->client_buffer : p->peer_buffer; 893 nxt_mp_free(source->mem_pool, b); 894 } 895 896 897 static void 898 nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data) 899 { 900 nxt_conn_t *c; 901 nxt_conn_proxy_t *p; 902 903 c = obj; 904 p = data; 905 906 nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd); 907 908 nxt_conn_proxy_close(task, c, p); 909 } 910 911 912 static void 913 nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) 914 { 915 nxt_conn_t *source, *sink; 916 nxt_conn_proxy_t *p; 917 918 sink = obj; 919 p = data; 920 921 nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd); 922 923 /* Clear data for the direction sink. */ 924 sink->write = NULL; 925 926 /* Block the direction source. */ 927 source = (sink == p->client) ? p->peer : p->client; 928 nxt_fd_event_block_read(task->thread->engine, &source->socket); 929 930 if (source->write == NULL) { 931 /* 932 * There is no data for the opposite direction and 933 * the next read from the sink will most probably fail. 934 */ 935 nxt_conn_proxy_complete(task, p); 936 } 937 } 938 939 940 static const nxt_conn_state_t nxt_conn_proxy_close_state 941 nxt_aligned(64) = 942 { 943 .ready_handler = nxt_conn_proxy_completion, 944 }; 945 946 947 static void 948 nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p) 949 { 950 nxt_event_engine_t *engine; 951 952 engine = task->thread->engine; 953 954 nxt_debug(p->client->socket.task, "conn proxy complete %d:%d", 955 p->client->socket.fd, p->peer->socket.fd); 956 957 if (p->delayed) { 958 p->delayed = 0; 959 nxt_queue_remove(&p->peer->link); 960 } 961 962 if (p->client->socket.fd != -1) { 963 p->retain = 1; 964 p->client->write_state = &nxt_conn_proxy_close_state; 965 nxt_conn_close(engine, p->client); 966 } 967 968 if (p->peer->socket.fd != -1) { 969 p->retain++; 970 p->peer->write_state = &nxt_conn_proxy_close_state; 971 nxt_conn_close(engine, p->peer); 972 } 973 } 974 975 976 static void 977 nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data) 978 { 979 nxt_conn_proxy_t *p; 980 981 p = data; 982 983 nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d", 984 p->retain, p->client->socket.fd, p->peer->socket.fd); 985 986 p->retain--; 987 988 if (p->retain == 0) { 989 nxt_mp_free(p->client->mem_pool, p->client_buffer); 990 nxt_mp_free(p->client->mem_pool, p->peer_buffer); 991 992 p->completion_handler(task, p, NULL); 993 } 994 } 995