162Sigor@sysoev.ru 262Sigor@sysoev.ru /* 362Sigor@sysoev.ru * Copyright (C) Igor Sysoev 462Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 562Sigor@sysoev.ru */ 662Sigor@sysoev.ru 762Sigor@sysoev.ru #include <nxt_main.h> 862Sigor@sysoev.ru 962Sigor@sysoev.ru 1062Sigor@sysoev.ru static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, 1162Sigor@sysoev.ru void *data); 1262Sigor@sysoev.ru static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, 1362Sigor@sysoev.ru void *data); 1462Sigor@sysoev.ru static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data); 1562Sigor@sysoev.ru static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data); 1662Sigor@sysoev.ru static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, 1762Sigor@sysoev.ru void *data); 1862Sigor@sysoev.ru static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, 1962Sigor@sysoev.ru void *data); 2062Sigor@sysoev.ru static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, 2162Sigor@sysoev.ru nxt_conn_t *source, nxt_conn_t *sink); 2262Sigor@sysoev.ru static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b); 2362Sigor@sysoev.ru static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data); 2462Sigor@sysoev.ru static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, 2562Sigor@sysoev.ru void *data); 2662Sigor@sysoev.ru static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, 2762Sigor@sysoev.ru void *data); 2862Sigor@sysoev.ru static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, 2962Sigor@sysoev.ru nxt_conn_t *sink, nxt_conn_t *source); 3062Sigor@sysoev.ru static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b); 3162Sigor@sysoev.ru static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data); 3262Sigor@sysoev.ru static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data); 3362Sigor@sysoev.ru static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, 3462Sigor@sysoev.ru void *data); 3562Sigor@sysoev.ru static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, 3662Sigor@sysoev.ru void *data); 3762Sigor@sysoev.ru static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data); 3862Sigor@sysoev.ru static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data); 3962Sigor@sysoev.ru static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, 4062Sigor@sysoev.ru void *data); 4162Sigor@sysoev.ru static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, 4262Sigor@sysoev.ru nxt_conn_t *source, nxt_conn_t *sink); 4362Sigor@sysoev.ru static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data); 4462Sigor@sysoev.ru static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data); 4562Sigor@sysoev.ru static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p); 4662Sigor@sysoev.ru static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data); 4762Sigor@sysoev.ru 4862Sigor@sysoev.ru 4962Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_wait_state; 5062Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state; 5162Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state; 5262Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state; 5362Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_read_state; 5462Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_read_state; 5562Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_write_state; 5662Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_write_state; 5762Sigor@sysoev.ru 5862Sigor@sysoev.ru 5962Sigor@sysoev.ru nxt_conn_proxy_t * 6062Sigor@sysoev.ru nxt_conn_proxy_create(nxt_conn_t *client) 6162Sigor@sysoev.ru { 6262Sigor@sysoev.ru nxt_conn_t *peer; 6362Sigor@sysoev.ru nxt_thread_t *thr; 6462Sigor@sysoev.ru nxt_conn_proxy_t *p; 6562Sigor@sysoev.ru 6665Sigor@sysoev.ru p = nxt_mp_zget(client->mem_pool, sizeof(nxt_conn_proxy_t)); 6762Sigor@sysoev.ru if (nxt_slow_path(p == NULL)) { 6862Sigor@sysoev.ru return NULL; 6962Sigor@sysoev.ru } 7062Sigor@sysoev.ru 7162Sigor@sysoev.ru peer = nxt_conn_create(client->mem_pool, client->socket.task); 7262Sigor@sysoev.ru if (nxt_slow_path(peer == NULL)) { 7362Sigor@sysoev.ru return NULL; 7462Sigor@sysoev.ru } 7562Sigor@sysoev.ru 7662Sigor@sysoev.ru thr = nxt_thread(); 7762Sigor@sysoev.ru 7862Sigor@sysoev.ru client->read_work_queue = &thr->engine->read_work_queue; 7962Sigor@sysoev.ru client->write_work_queue = &thr->engine->write_work_queue; 8062Sigor@sysoev.ru client->socket.read_work_queue = &thr->engine->read_work_queue; 8162Sigor@sysoev.ru client->socket.write_work_queue = &thr->engine->write_work_queue; 8262Sigor@sysoev.ru peer->socket.read_work_queue = &thr->engine->read_work_queue; 8362Sigor@sysoev.ru peer->socket.write_work_queue = &thr->engine->write_work_queue; 8462Sigor@sysoev.ru 8562Sigor@sysoev.ru peer->socket.data = client->socket.data; 8662Sigor@sysoev.ru 8762Sigor@sysoev.ru peer->read_work_queue = client->read_work_queue; 8862Sigor@sysoev.ru peer->write_work_queue = client->write_work_queue; 8962Sigor@sysoev.ru peer->read_timer.work_queue = client->read_work_queue; 9062Sigor@sysoev.ru peer->write_timer.work_queue = client->write_work_queue; 9162Sigor@sysoev.ru 9262Sigor@sysoev.ru p->client = client; 9362Sigor@sysoev.ru p->peer = peer; 9462Sigor@sysoev.ru 9562Sigor@sysoev.ru return p; 9662Sigor@sysoev.ru } 9762Sigor@sysoev.ru 9862Sigor@sysoev.ru 9962Sigor@sysoev.ru void 10062Sigor@sysoev.ru nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p) 10162Sigor@sysoev.ru { 10262Sigor@sysoev.ru nxt_conn_t *peer; 10362Sigor@sysoev.ru 10462Sigor@sysoev.ru /* 10562Sigor@sysoev.ru * Peer read event: not connected, disabled. 10662Sigor@sysoev.ru * Peer write event: not connected, disabled. 10762Sigor@sysoev.ru */ 10862Sigor@sysoev.ru 10962Sigor@sysoev.ru if (p->client_wait_timeout == 0) { 11062Sigor@sysoev.ru /* 11162Sigor@sysoev.ru * Peer write event: waiting for connection 11262Sigor@sysoev.ru * to be established with connect_timeout. 11362Sigor@sysoev.ru */ 11462Sigor@sysoev.ru peer = p->peer; 11562Sigor@sysoev.ru peer->write_state = &nxt_conn_proxy_peer_connect_state; 11662Sigor@sysoev.ru 11762Sigor@sysoev.ru nxt_conn_connect(task->thread->engine, peer); 11862Sigor@sysoev.ru } 11962Sigor@sysoev.ru 12062Sigor@sysoev.ru /* 12162Sigor@sysoev.ru * Client read event: waiting for client data with 12262Sigor@sysoev.ru * client_wait_timeout before buffer allocation. 12362Sigor@sysoev.ru */ 12462Sigor@sysoev.ru p->client->read_state = &nxt_conn_proxy_client_wait_state; 12562Sigor@sysoev.ru 12662Sigor@sysoev.ru nxt_conn_wait(p->client); 12762Sigor@sysoev.ru } 12862Sigor@sysoev.ru 12962Sigor@sysoev.ru 13062Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_wait_state 13162Sigor@sysoev.ru nxt_aligned(64) = 13262Sigor@sysoev.ru { 13362Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_client_buffer_alloc, 13462Sigor@sysoev.ru .close_handler = nxt_conn_proxy_close, 13562Sigor@sysoev.ru .error_handler = nxt_conn_proxy_error, 13662Sigor@sysoev.ru 13762Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_read_timeout, 13862Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value, 13962Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), 14062Sigor@sysoev.ru }; 14162Sigor@sysoev.ru 14262Sigor@sysoev.ru 14362Sigor@sysoev.ru static void 14462Sigor@sysoev.ru nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data) 14562Sigor@sysoev.ru { 14662Sigor@sysoev.ru nxt_buf_t *b; 14762Sigor@sysoev.ru nxt_conn_t *client; 14862Sigor@sysoev.ru nxt_conn_proxy_t *p; 14962Sigor@sysoev.ru 15062Sigor@sysoev.ru client = obj; 15162Sigor@sysoev.ru p = data; 15262Sigor@sysoev.ru 15362Sigor@sysoev.ru nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd); 15462Sigor@sysoev.ru 15565Sigor@sysoev.ru b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, 0); 15662Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 15762Sigor@sysoev.ru /* An error completion. */ 15862Sigor@sysoev.ru nxt_conn_proxy_complete(task, p); 15962Sigor@sysoev.ru return; 16062Sigor@sysoev.ru } 16162Sigor@sysoev.ru 16262Sigor@sysoev.ru p->client_buffer = b; 16362Sigor@sysoev.ru client->read = b; 16462Sigor@sysoev.ru 16562Sigor@sysoev.ru if (p->peer->socket.fd != -1) { 16662Sigor@sysoev.ru /* 16762Sigor@sysoev.ru * Client read event: waiting, no timeout. 16862Sigor@sysoev.ru * Client write event: blocked. 16962Sigor@sysoev.ru * Peer read event: disabled. 17062Sigor@sysoev.ru * Peer write event: waiting for connection to be established 17162Sigor@sysoev.ru * or blocked after the connection has established. 17262Sigor@sysoev.ru */ 17362Sigor@sysoev.ru client->read_state = &nxt_conn_proxy_client_read_state; 17462Sigor@sysoev.ru 17562Sigor@sysoev.ru } else { 17662Sigor@sysoev.ru /* 17762Sigor@sysoev.ru * Client read event: waiting for data with client_wait_timeout 17862Sigor@sysoev.ru * before connecting to a peer. 17962Sigor@sysoev.ru * Client write event: blocked. 18062Sigor@sysoev.ru * Peer read event: not connected, disabled. 18162Sigor@sysoev.ru * Peer write event: not connected, disabled. 18262Sigor@sysoev.ru */ 18362Sigor@sysoev.ru client->read_state = &nxt_conn_proxy_client_first_read_state; 18462Sigor@sysoev.ru } 18562Sigor@sysoev.ru 18662Sigor@sysoev.ru nxt_conn_read(task->thread->engine, client); 18762Sigor@sysoev.ru } 18862Sigor@sysoev.ru 18962Sigor@sysoev.ru 19062Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state 19162Sigor@sysoev.ru nxt_aligned(64) = 19262Sigor@sysoev.ru { 19362Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_peer_connect, 19462Sigor@sysoev.ru .close_handler = nxt_conn_proxy_close, 19562Sigor@sysoev.ru .error_handler = nxt_conn_proxy_error, 19662Sigor@sysoev.ru 19762Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_read_timeout, 19862Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value, 19962Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), 20062Sigor@sysoev.ru .timer_autoreset = 1, 20162Sigor@sysoev.ru }; 20262Sigor@sysoev.ru 20362Sigor@sysoev.ru 20462Sigor@sysoev.ru static void 20562Sigor@sysoev.ru nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) 20662Sigor@sysoev.ru { 20762Sigor@sysoev.ru nxt_conn_t *client; 20862Sigor@sysoev.ru nxt_conn_proxy_t *p; 20962Sigor@sysoev.ru 21062Sigor@sysoev.ru client = obj; 21162Sigor@sysoev.ru p = data; 21262Sigor@sysoev.ru 21362Sigor@sysoev.ru /* 21462Sigor@sysoev.ru * Client read event: waiting, no timeout. 21562Sigor@sysoev.ru * Client write event: blocked. 21662Sigor@sysoev.ru * Peer read event: disabled. 21762Sigor@sysoev.ru * Peer write event: waiting for connection to be established 21862Sigor@sysoev.ru * with connect_timeout. 21962Sigor@sysoev.ru */ 22062Sigor@sysoev.ru client->read_state = &nxt_conn_proxy_client_read_state; 22162Sigor@sysoev.ru 22262Sigor@sysoev.ru p->peer->write_state = &nxt_conn_proxy_peer_connect_state; 22362Sigor@sysoev.ru 22462Sigor@sysoev.ru nxt_conn_connect(task->thread->engine, p->peer); 22562Sigor@sysoev.ru } 22662Sigor@sysoev.ru 22762Sigor@sysoev.ru 22862Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state 22962Sigor@sysoev.ru nxt_aligned(64) = 23062Sigor@sysoev.ru { 23162Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_connected, 23262Sigor@sysoev.ru .close_handler = nxt_conn_proxy_refused, 23362Sigor@sysoev.ru .error_handler = nxt_conn_proxy_error, 23462Sigor@sysoev.ru 23562Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_write_timeout, 23662Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value, 23762Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout), 23862Sigor@sysoev.ru .timer_autoreset = 1, 23962Sigor@sysoev.ru }; 24062Sigor@sysoev.ru 24162Sigor@sysoev.ru 24262Sigor@sysoev.ru static void 24362Sigor@sysoev.ru nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) 24462Sigor@sysoev.ru { 24562Sigor@sysoev.ru nxt_conn_t *client, *peer; 24662Sigor@sysoev.ru nxt_conn_proxy_t *p; 24762Sigor@sysoev.ru 24862Sigor@sysoev.ru peer = obj; 24962Sigor@sysoev.ru p = data; 25062Sigor@sysoev.ru 25162Sigor@sysoev.ru nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd); 25262Sigor@sysoev.ru 25362Sigor@sysoev.ru p->connected = 1; 25462Sigor@sysoev.ru 25562Sigor@sysoev.ru nxt_conn_tcp_nodelay_on(task, peer); 25662Sigor@sysoev.ru nxt_conn_tcp_nodelay_on(task, p->client); 25762Sigor@sysoev.ru 25862Sigor@sysoev.ru /* Peer read event: waiting with peer_wait_timeout. */ 25962Sigor@sysoev.ru 26062Sigor@sysoev.ru peer->read_state = &nxt_conn_proxy_peer_wait_state; 26162Sigor@sysoev.ru peer->write_state = &nxt_conn_proxy_peer_write_state; 26262Sigor@sysoev.ru 26362Sigor@sysoev.ru nxt_conn_wait(peer); 26462Sigor@sysoev.ru 26562Sigor@sysoev.ru if (p->client_buffer != NULL) { 26662Sigor@sysoev.ru client = p->client; 26762Sigor@sysoev.ru 26862Sigor@sysoev.ru client->read_state = &nxt_conn_proxy_client_read_state; 26962Sigor@sysoev.ru client->write_state = &nxt_conn_proxy_client_write_state; 27062Sigor@sysoev.ru /* 27162Sigor@sysoev.ru * Send a client read data to the connected peer. 27262Sigor@sysoev.ru * Client write event: blocked. 27362Sigor@sysoev.ru */ 27462Sigor@sysoev.ru nxt_conn_proxy_read_process(task, p, client, peer); 27562Sigor@sysoev.ru } 27662Sigor@sysoev.ru } 27762Sigor@sysoev.ru 27862Sigor@sysoev.ru 27962Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state 28062Sigor@sysoev.ru nxt_aligned(64) = 28162Sigor@sysoev.ru { 28262Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_peer_read, 28362Sigor@sysoev.ru .close_handler = nxt_conn_proxy_close, 28462Sigor@sysoev.ru .error_handler = nxt_conn_proxy_error, 28562Sigor@sysoev.ru 28662Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_read_timeout, 28762Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value, 28862Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout), 28962Sigor@sysoev.ru }; 29062Sigor@sysoev.ru 29162Sigor@sysoev.ru 29262Sigor@sysoev.ru static void 29362Sigor@sysoev.ru nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) 29462Sigor@sysoev.ru { 29562Sigor@sysoev.ru nxt_buf_t *b; 29662Sigor@sysoev.ru nxt_conn_t *peer; 29762Sigor@sysoev.ru nxt_conn_proxy_t *p; 29862Sigor@sysoev.ru 29962Sigor@sysoev.ru peer = obj; 30062Sigor@sysoev.ru p = data; 30162Sigor@sysoev.ru 30262Sigor@sysoev.ru nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd); 30362Sigor@sysoev.ru 30465Sigor@sysoev.ru b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, 0); 30562Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 30662Sigor@sysoev.ru /* An error completion. */ 30762Sigor@sysoev.ru nxt_conn_proxy_complete(task, p); 30862Sigor@sysoev.ru return; 30962Sigor@sysoev.ru } 31062Sigor@sysoev.ru 31162Sigor@sysoev.ru p->peer_buffer = b; 31262Sigor@sysoev.ru peer->read = b; 31362Sigor@sysoev.ru 31462Sigor@sysoev.ru p->client->write_state = &nxt_conn_proxy_client_write_state; 31562Sigor@sysoev.ru peer->read_state = &nxt_conn_proxy_peer_read_state; 31662Sigor@sysoev.ru peer->write_state = &nxt_conn_proxy_peer_write_state; 31762Sigor@sysoev.ru 31862Sigor@sysoev.ru /* 31962Sigor@sysoev.ru * Client read event: waiting, no timeout. 32062Sigor@sysoev.ru * Client write event: blocked. 32162Sigor@sysoev.ru * Peer read event: waiting with possible peer_wait_timeout. 32262Sigor@sysoev.ru * Peer write event: blocked. 32362Sigor@sysoev.ru */ 32462Sigor@sysoev.ru nxt_conn_read(task->thread->engine, peer); 32562Sigor@sysoev.ru } 32662Sigor@sysoev.ru 32762Sigor@sysoev.ru 32862Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_read_state 32962Sigor@sysoev.ru nxt_aligned(64) = 33062Sigor@sysoev.ru { 33162Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_client_read_ready, 33262Sigor@sysoev.ru .close_handler = nxt_conn_proxy_close, 33362Sigor@sysoev.ru .error_handler = nxt_conn_proxy_read_error, 33462Sigor@sysoev.ru }; 33562Sigor@sysoev.ru 33662Sigor@sysoev.ru 33762Sigor@sysoev.ru static void 33862Sigor@sysoev.ru nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data) 33962Sigor@sysoev.ru { 34062Sigor@sysoev.ru nxt_conn_t *client; 34162Sigor@sysoev.ru nxt_conn_proxy_t *p; 34262Sigor@sysoev.ru 34362Sigor@sysoev.ru client = obj; 34462Sigor@sysoev.ru p = data; 34562Sigor@sysoev.ru 34662Sigor@sysoev.ru nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd); 34762Sigor@sysoev.ru 34862Sigor@sysoev.ru nxt_conn_proxy_read_process(task, p, client, p->peer); 34962Sigor@sysoev.ru } 35062Sigor@sysoev.ru 35162Sigor@sysoev.ru 35262Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_read_state 35362Sigor@sysoev.ru nxt_aligned(64) = 35462Sigor@sysoev.ru { 35562Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_peer_read_ready, 35662Sigor@sysoev.ru .close_handler = nxt_conn_proxy_close, 35762Sigor@sysoev.ru .error_handler = nxt_conn_proxy_read_error, 35862Sigor@sysoev.ru }; 35962Sigor@sysoev.ru 36062Sigor@sysoev.ru 36162Sigor@sysoev.ru static void 36262Sigor@sysoev.ru nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data) 36362Sigor@sysoev.ru { 36462Sigor@sysoev.ru nxt_conn_t *peer; 36562Sigor@sysoev.ru nxt_conn_proxy_t *p; 36662Sigor@sysoev.ru 36762Sigor@sysoev.ru peer = obj; 36862Sigor@sysoev.ru p = data; 36962Sigor@sysoev.ru 37062Sigor@sysoev.ru nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd); 37162Sigor@sysoev.ru 37262Sigor@sysoev.ru nxt_conn_proxy_read_process(task, p, peer, p->client); 37362Sigor@sysoev.ru } 37462Sigor@sysoev.ru 37562Sigor@sysoev.ru 37662Sigor@sysoev.ru static void 37762Sigor@sysoev.ru nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, 37862Sigor@sysoev.ru nxt_conn_t *source, nxt_conn_t *sink) 37962Sigor@sysoev.ru { 38062Sigor@sysoev.ru nxt_buf_t *rb, *wb; 38162Sigor@sysoev.ru 38262Sigor@sysoev.ru if (sink->socket.error != 0) { 38362Sigor@sysoev.ru nxt_debug(task, "conn proxy sink fd:%d error:%d", 38462Sigor@sysoev.ru sink->socket.fd, sink->socket.error); 38562Sigor@sysoev.ru 38662Sigor@sysoev.ru nxt_conn_proxy_write_error(task, sink, sink->socket.data); 38762Sigor@sysoev.ru return; 38862Sigor@sysoev.ru } 38962Sigor@sysoev.ru 39062Sigor@sysoev.ru while (source->read != NULL) { 39162Sigor@sysoev.ru 39262Sigor@sysoev.ru rb = source->read; 39362Sigor@sysoev.ru 39462Sigor@sysoev.ru if (rb->mem.pos != rb->mem.free) { 39562Sigor@sysoev.ru 39662Sigor@sysoev.ru /* Add a read part to a write chain. */ 39762Sigor@sysoev.ru 39862Sigor@sysoev.ru wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0); 39962Sigor@sysoev.ru if (wb == NULL) { 40062Sigor@sysoev.ru /* An error completion. */ 40162Sigor@sysoev.ru nxt_conn_proxy_complete(task, p); 40262Sigor@sysoev.ru return; 40362Sigor@sysoev.ru } 40462Sigor@sysoev.ru 40562Sigor@sysoev.ru wb->mem.pos = rb->mem.pos; 40662Sigor@sysoev.ru wb->mem.free = rb->mem.free; 40762Sigor@sysoev.ru wb->mem.start = rb->mem.pos; 40862Sigor@sysoev.ru wb->mem.end = rb->mem.free; 40962Sigor@sysoev.ru 41062Sigor@sysoev.ru rb->mem.pos = rb->mem.free; 41162Sigor@sysoev.ru rb->mem.start = rb->mem.free; 41262Sigor@sysoev.ru 41362Sigor@sysoev.ru nxt_conn_proxy_write_add(sink, wb); 41462Sigor@sysoev.ru } 41562Sigor@sysoev.ru 41662Sigor@sysoev.ru if (rb->mem.start != rb->mem.end) { 41762Sigor@sysoev.ru nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, 41862Sigor@sysoev.ru task, source, source->socket.data); 41962Sigor@sysoev.ru break; 42062Sigor@sysoev.ru } 42162Sigor@sysoev.ru 42262Sigor@sysoev.ru source->read = rb->next; 42362Sigor@sysoev.ru nxt_buf_free(source->mem_pool, rb); 42462Sigor@sysoev.ru } 42562Sigor@sysoev.ru 42662Sigor@sysoev.ru if (p->connected) { 42762Sigor@sysoev.ru nxt_conn_write(task->thread->engine, sink); 42862Sigor@sysoev.ru } 42962Sigor@sysoev.ru } 43062Sigor@sysoev.ru 43162Sigor@sysoev.ru 43262Sigor@sysoev.ru static void 43362Sigor@sysoev.ru nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b) 43462Sigor@sysoev.ru { 43562Sigor@sysoev.ru nxt_buf_t *first, *second, *prev; 43662Sigor@sysoev.ru 43762Sigor@sysoev.ru first = c->write; 43862Sigor@sysoev.ru 43962Sigor@sysoev.ru if (first == NULL) { 44062Sigor@sysoev.ru c->write = b; 44162Sigor@sysoev.ru return; 44262Sigor@sysoev.ru } 44362Sigor@sysoev.ru 44462Sigor@sysoev.ru /* 44562Sigor@sysoev.ru * A event conn proxy maintains a buffer per each direction. 44662Sigor@sysoev.ru * The buffer is divided by read and write parts. These parts are 44762Sigor@sysoev.ru * linked in buffer chains. There can be no more than two buffers 44862Sigor@sysoev.ru * in write chain at any time, because an added buffer is coalesced 44962Sigor@sysoev.ru * with the last buffer if possible. 45062Sigor@sysoev.ru */ 45162Sigor@sysoev.ru 45262Sigor@sysoev.ru second = first->next; 45362Sigor@sysoev.ru 45462Sigor@sysoev.ru if (second == NULL) { 45562Sigor@sysoev.ru 45662Sigor@sysoev.ru if (first->mem.end != b->mem.start) { 45762Sigor@sysoev.ru first->next = b; 45862Sigor@sysoev.ru return; 45962Sigor@sysoev.ru } 46062Sigor@sysoev.ru 46162Sigor@sysoev.ru /* 46262Sigor@sysoev.ru * The first buffer is just before the added buffer, so 46362Sigor@sysoev.ru * expand the first buffer to the end of the added buffer. 46462Sigor@sysoev.ru */ 46562Sigor@sysoev.ru prev = first; 46662Sigor@sysoev.ru 46762Sigor@sysoev.ru } else { 46862Sigor@sysoev.ru if (second->mem.end != b->mem.start) { 46962Sigor@sysoev.ru nxt_thread_log_alert("event conn proxy write: second buffer end:%p " 47062Sigor@sysoev.ru "is not equal to added buffer start:%p", 47162Sigor@sysoev.ru second->mem.end, b->mem.start); 47262Sigor@sysoev.ru return; 47362Sigor@sysoev.ru } 47462Sigor@sysoev.ru 47562Sigor@sysoev.ru /* 47662Sigor@sysoev.ru * "second->mem.end == b->mem.start" must be always true here, 47762Sigor@sysoev.ru * that is the second buffer is just before the added buffer, 47862Sigor@sysoev.ru * so expand the second buffer to the end of added buffer. 47962Sigor@sysoev.ru */ 48062Sigor@sysoev.ru prev = second; 48162Sigor@sysoev.ru } 48262Sigor@sysoev.ru 48362Sigor@sysoev.ru prev->mem.free = b->mem.end; 48462Sigor@sysoev.ru prev->mem.end = b->mem.end; 48562Sigor@sysoev.ru 48662Sigor@sysoev.ru nxt_buf_free(c->mem_pool, b); 48762Sigor@sysoev.ru } 48862Sigor@sysoev.ru 48962Sigor@sysoev.ru 49062Sigor@sysoev.ru static void 49162Sigor@sysoev.ru nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data) 49262Sigor@sysoev.ru { 49362Sigor@sysoev.ru nxt_conn_t *source, *sink; 49462Sigor@sysoev.ru nxt_conn_proxy_t *p; 49562Sigor@sysoev.ru 49662Sigor@sysoev.ru source = obj; 49762Sigor@sysoev.ru p = data; 49862Sigor@sysoev.ru 49962Sigor@sysoev.ru nxt_debug(task, "conn proxy read fd:%d", source->socket.fd); 50062Sigor@sysoev.ru 50162Sigor@sysoev.ru if (!source->socket.closed) { 50262Sigor@sysoev.ru sink = (source == p->client) ? p->peer : p->client; 50362Sigor@sysoev.ru 50462Sigor@sysoev.ru if (sink->socket.error == 0) { 50562Sigor@sysoev.ru nxt_conn_read(task->thread->engine, source); 50662Sigor@sysoev.ru } 50762Sigor@sysoev.ru } 50862Sigor@sysoev.ru } 50962Sigor@sysoev.ru 51062Sigor@sysoev.ru 51162Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_write_state 51262Sigor@sysoev.ru nxt_aligned(64) = 51362Sigor@sysoev.ru { 51462Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_client_write_ready, 51562Sigor@sysoev.ru .error_handler = nxt_conn_proxy_write_error, 51662Sigor@sysoev.ru 51762Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_write_timeout, 51862Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value, 51962Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout), 52062Sigor@sysoev.ru .timer_autoreset = 1, 52162Sigor@sysoev.ru }; 52262Sigor@sysoev.ru 52362Sigor@sysoev.ru 52462Sigor@sysoev.ru static void 52562Sigor@sysoev.ru nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data) 52662Sigor@sysoev.ru { 52762Sigor@sysoev.ru nxt_conn_t *client; 52862Sigor@sysoev.ru nxt_conn_proxy_t *p; 52962Sigor@sysoev.ru 53062Sigor@sysoev.ru client = obj; 53162Sigor@sysoev.ru p = data; 53262Sigor@sysoev.ru 53362Sigor@sysoev.ru nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd); 53462Sigor@sysoev.ru 53562Sigor@sysoev.ru nxt_conn_proxy_write_process(task, p, client, p->peer); 53662Sigor@sysoev.ru } 53762Sigor@sysoev.ru 53862Sigor@sysoev.ru 53962Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_write_state 54062Sigor@sysoev.ru nxt_aligned(64) = 54162Sigor@sysoev.ru { 54262Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_peer_write_ready, 54362Sigor@sysoev.ru .error_handler = nxt_conn_proxy_write_error, 54462Sigor@sysoev.ru 54562Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_write_timeout, 54662Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value, 54762Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout), 54862Sigor@sysoev.ru .timer_autoreset = 1, 54962Sigor@sysoev.ru }; 55062Sigor@sysoev.ru 55162Sigor@sysoev.ru 55262Sigor@sysoev.ru static void 55362Sigor@sysoev.ru nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data) 55462Sigor@sysoev.ru { 55562Sigor@sysoev.ru nxt_conn_t *peer; 55662Sigor@sysoev.ru nxt_conn_proxy_t *p; 55762Sigor@sysoev.ru 55862Sigor@sysoev.ru peer = obj; 55962Sigor@sysoev.ru p = data; 56062Sigor@sysoev.ru 56162Sigor@sysoev.ru nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd); 56262Sigor@sysoev.ru 56362Sigor@sysoev.ru nxt_conn_proxy_write_process(task, p, peer, p->client); 56462Sigor@sysoev.ru } 56562Sigor@sysoev.ru 56662Sigor@sysoev.ru 56762Sigor@sysoev.ru static void 56862Sigor@sysoev.ru nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, 56962Sigor@sysoev.ru nxt_conn_t *sink, nxt_conn_t *source) 57062Sigor@sysoev.ru { 57162Sigor@sysoev.ru nxt_buf_t *rb, *wb; 57262Sigor@sysoev.ru 57362Sigor@sysoev.ru while (sink->write != NULL) { 57462Sigor@sysoev.ru 57562Sigor@sysoev.ru wb = sink->write; 57662Sigor@sysoev.ru 57762Sigor@sysoev.ru if (nxt_buf_is_sync(wb)) { 57862Sigor@sysoev.ru 57962Sigor@sysoev.ru /* A sync buffer marks the end of stream. */ 58062Sigor@sysoev.ru 58162Sigor@sysoev.ru sink->write = NULL; 58262Sigor@sysoev.ru nxt_buf_free(sink->mem_pool, wb); 58362Sigor@sysoev.ru nxt_conn_proxy_shutdown(task, p, source, sink); 58462Sigor@sysoev.ru return; 58562Sigor@sysoev.ru } 58662Sigor@sysoev.ru 58762Sigor@sysoev.ru if (wb->mem.start != wb->mem.pos) { 58862Sigor@sysoev.ru 58962Sigor@sysoev.ru /* Add a written part to a read chain. */ 59062Sigor@sysoev.ru 59162Sigor@sysoev.ru rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0); 59262Sigor@sysoev.ru if (rb == NULL) { 59362Sigor@sysoev.ru /* An error completion. */ 59462Sigor@sysoev.ru nxt_conn_proxy_complete(task, p); 59562Sigor@sysoev.ru return; 59662Sigor@sysoev.ru } 59762Sigor@sysoev.ru 59862Sigor@sysoev.ru rb->mem.pos = wb->mem.start; 59962Sigor@sysoev.ru rb->mem.free = wb->mem.start; 60062Sigor@sysoev.ru rb->mem.start = wb->mem.start; 60162Sigor@sysoev.ru rb->mem.end = wb->mem.pos; 60262Sigor@sysoev.ru 60362Sigor@sysoev.ru wb->mem.start = wb->mem.pos; 60462Sigor@sysoev.ru 60562Sigor@sysoev.ru nxt_conn_proxy_read_add(source, rb); 60662Sigor@sysoev.ru } 60762Sigor@sysoev.ru 60862Sigor@sysoev.ru if (wb->mem.pos != wb->mem.free) { 60962Sigor@sysoev.ru nxt_conn_write(task->thread->engine, sink); 61062Sigor@sysoev.ru 61162Sigor@sysoev.ru break; 61262Sigor@sysoev.ru } 61362Sigor@sysoev.ru 61462Sigor@sysoev.ru sink->write = wb->next; 61562Sigor@sysoev.ru nxt_buf_free(sink->mem_pool, wb); 61662Sigor@sysoev.ru } 61762Sigor@sysoev.ru 61862Sigor@sysoev.ru nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, 61962Sigor@sysoev.ru task, source, source->socket.data); 62062Sigor@sysoev.ru } 62162Sigor@sysoev.ru 62262Sigor@sysoev.ru 62362Sigor@sysoev.ru static void 62462Sigor@sysoev.ru nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b) 62562Sigor@sysoev.ru { 62662Sigor@sysoev.ru nxt_buf_t *first, *second; 62762Sigor@sysoev.ru 62862Sigor@sysoev.ru first = c->read; 62962Sigor@sysoev.ru 63062Sigor@sysoev.ru if (first == NULL) { 63162Sigor@sysoev.ru c->read = b; 63262Sigor@sysoev.ru return; 63362Sigor@sysoev.ru } 63462Sigor@sysoev.ru 63562Sigor@sysoev.ru /* 63662Sigor@sysoev.ru * A event conn proxy maintains a buffer per each direction. 63762Sigor@sysoev.ru * The buffer is divided by read and write parts. These parts are 63862Sigor@sysoev.ru * linked in buffer chains. There can be no more than two buffers 63962Sigor@sysoev.ru * in read chain at any time, because an added buffer is coalesced 64062Sigor@sysoev.ru * with the last buffer if possible. The first and the second 64162Sigor@sysoev.ru * buffers are also coalesced if possible. 64262Sigor@sysoev.ru */ 64362Sigor@sysoev.ru 64462Sigor@sysoev.ru second = first->next; 64562Sigor@sysoev.ru 64662Sigor@sysoev.ru if (second == NULL) { 64762Sigor@sysoev.ru 64862Sigor@sysoev.ru if (first->mem.start == b->mem.end) { 64962Sigor@sysoev.ru /* 65062Sigor@sysoev.ru * The added buffer is just before the first buffer, so expand 65162Sigor@sysoev.ru * the first buffer to the beginning of the added buffer. 65262Sigor@sysoev.ru */ 65362Sigor@sysoev.ru first->mem.pos = b->mem.start; 65462Sigor@sysoev.ru first->mem.free = b->mem.start; 65562Sigor@sysoev.ru first->mem.start = b->mem.start; 65662Sigor@sysoev.ru 65762Sigor@sysoev.ru } else if (first->mem.end == b->mem.start) { 65862Sigor@sysoev.ru /* 65962Sigor@sysoev.ru * The added buffer is just after the first buffer, so 66062Sigor@sysoev.ru * expand the first buffer to the end of the added buffer. 66162Sigor@sysoev.ru */ 66262Sigor@sysoev.ru first->mem.end = b->mem.end; 66362Sigor@sysoev.ru 66462Sigor@sysoev.ru } else { 66562Sigor@sysoev.ru first->next = b; 66662Sigor@sysoev.ru return; 66762Sigor@sysoev.ru } 66862Sigor@sysoev.ru 66962Sigor@sysoev.ru } else { 67062Sigor@sysoev.ru if (second->mem.end != b->mem.start) { 67162Sigor@sysoev.ru nxt_thread_log_alert("event conn proxy read: second buffer end:%p " 67262Sigor@sysoev.ru "is not equal to added buffer start:%p", 67362Sigor@sysoev.ru second->mem.end, b->mem.start); 67462Sigor@sysoev.ru return; 67562Sigor@sysoev.ru } 67662Sigor@sysoev.ru 67762Sigor@sysoev.ru /* 67862Sigor@sysoev.ru * The added buffer is just after the second buffer, so 67962Sigor@sysoev.ru * expand the second buffer to the end of the added buffer. 68062Sigor@sysoev.ru */ 68162Sigor@sysoev.ru second->mem.end = b->mem.end; 68262Sigor@sysoev.ru 68362Sigor@sysoev.ru if (first->mem.start == second->mem.end) { 68462Sigor@sysoev.ru /* 68562Sigor@sysoev.ru * The second buffer is just before the first buffer, so expand 68662Sigor@sysoev.ru * the first buffer to the beginning of the second buffer. 68762Sigor@sysoev.ru */ 68862Sigor@sysoev.ru first->mem.pos = second->mem.start; 68962Sigor@sysoev.ru first->mem.free = second->mem.start; 69062Sigor@sysoev.ru first->mem.start = second->mem.start; 69162Sigor@sysoev.ru first->next = NULL; 69262Sigor@sysoev.ru 69362Sigor@sysoev.ru nxt_buf_free(c->mem_pool, second); 69462Sigor@sysoev.ru } 69562Sigor@sysoev.ru } 69662Sigor@sysoev.ru 69762Sigor@sysoev.ru nxt_buf_free(c->mem_pool, b); 69862Sigor@sysoev.ru } 69962Sigor@sysoev.ru 70062Sigor@sysoev.ru 70162Sigor@sysoev.ru static void 70262Sigor@sysoev.ru nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data) 70362Sigor@sysoev.ru { 70462Sigor@sysoev.ru nxt_buf_t *b; 70562Sigor@sysoev.ru nxt_conn_t *source, *sink; 70662Sigor@sysoev.ru nxt_conn_proxy_t *p; 70762Sigor@sysoev.ru 70862Sigor@sysoev.ru source = obj; 70962Sigor@sysoev.ru p = data; 71062Sigor@sysoev.ru 71162Sigor@sysoev.ru nxt_debug(task, "conn proxy close fd:%d", source->socket.fd); 71262Sigor@sysoev.ru 71362Sigor@sysoev.ru sink = (source == p->client) ? p->peer : p->client; 71462Sigor@sysoev.ru 71562Sigor@sysoev.ru if (sink->write == NULL) { 71662Sigor@sysoev.ru nxt_conn_proxy_shutdown(task, p, source, sink); 71762Sigor@sysoev.ru return; 71862Sigor@sysoev.ru } 71962Sigor@sysoev.ru 72062Sigor@sysoev.ru b = nxt_buf_sync_alloc(source->mem_pool, 0); 72162Sigor@sysoev.ru if (b == NULL) { 72262Sigor@sysoev.ru /* An error completion. */ 72362Sigor@sysoev.ru nxt_conn_proxy_complete(task, p); 72462Sigor@sysoev.ru return; 72562Sigor@sysoev.ru } 72662Sigor@sysoev.ru 72762Sigor@sysoev.ru nxt_buf_chain_add(&sink->write, b); 72862Sigor@sysoev.ru } 72962Sigor@sysoev.ru 73062Sigor@sysoev.ru 73162Sigor@sysoev.ru static void 73262Sigor@sysoev.ru nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data) 73362Sigor@sysoev.ru { 73462Sigor@sysoev.ru nxt_conn_t *c; 73562Sigor@sysoev.ru nxt_conn_proxy_t *p; 73662Sigor@sysoev.ru 73762Sigor@sysoev.ru c = obj; 73862Sigor@sysoev.ru p = data; 73962Sigor@sysoev.ru 74062Sigor@sysoev.ru nxt_debug(task, "conn proxy error fd:%d", c->socket.fd); 74162Sigor@sysoev.ru 74262Sigor@sysoev.ru nxt_conn_proxy_close(task, c, p); 74362Sigor@sysoev.ru } 74462Sigor@sysoev.ru 74562Sigor@sysoev.ru 74662Sigor@sysoev.ru static void 74762Sigor@sysoev.ru nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data) 74862Sigor@sysoev.ru { 74962Sigor@sysoev.ru nxt_conn_t *c; 75062Sigor@sysoev.ru nxt_timer_t *timer; 75162Sigor@sysoev.ru 75262Sigor@sysoev.ru timer = obj; 75362Sigor@sysoev.ru 75462Sigor@sysoev.ru c = nxt_read_timer_conn(timer); 75562Sigor@sysoev.ru c->socket.timedout = 1; 75662Sigor@sysoev.ru c->socket.closed = 1; 75762Sigor@sysoev.ru 75862Sigor@sysoev.ru nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd); 75962Sigor@sysoev.ru 76062Sigor@sysoev.ru nxt_conn_proxy_close(task, c, c->socket.data); 76162Sigor@sysoev.ru } 76262Sigor@sysoev.ru 76362Sigor@sysoev.ru 76462Sigor@sysoev.ru static void 76562Sigor@sysoev.ru nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data) 76662Sigor@sysoev.ru { 76762Sigor@sysoev.ru nxt_conn_t *c; 76862Sigor@sysoev.ru nxt_timer_t *timer; 76962Sigor@sysoev.ru 77062Sigor@sysoev.ru timer = obj; 77162Sigor@sysoev.ru 77262Sigor@sysoev.ru c = nxt_write_timer_conn(timer); 77362Sigor@sysoev.ru c->socket.timedout = 1; 77462Sigor@sysoev.ru c->socket.closed = 1; 77562Sigor@sysoev.ru 77662Sigor@sysoev.ru nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd); 77762Sigor@sysoev.ru 77862Sigor@sysoev.ru nxt_conn_proxy_close(task, c, c->socket.data); 77962Sigor@sysoev.ru } 78062Sigor@sysoev.ru 78162Sigor@sysoev.ru 78262Sigor@sysoev.ru static nxt_msec_t 78362Sigor@sysoev.ru nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data) 78462Sigor@sysoev.ru { 785*98Svbart@nginx.com return nxt_value_at(nxt_msec_t, c->socket.data, data); 78662Sigor@sysoev.ru } 78762Sigor@sysoev.ru 78862Sigor@sysoev.ru 78962Sigor@sysoev.ru static void 79062Sigor@sysoev.ru nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) 79162Sigor@sysoev.ru { 79262Sigor@sysoev.ru nxt_conn_t *peer; 79362Sigor@sysoev.ru nxt_conn_proxy_t *p; 79462Sigor@sysoev.ru 79562Sigor@sysoev.ru peer = obj; 79662Sigor@sysoev.ru p = data; 79762Sigor@sysoev.ru 79862Sigor@sysoev.ru nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd); 79962Sigor@sysoev.ru 80062Sigor@sysoev.ru if (p->retries == 0) { 80162Sigor@sysoev.ru /* An error completion. */ 80262Sigor@sysoev.ru nxt_conn_proxy_complete(task, p); 80362Sigor@sysoev.ru return; 80462Sigor@sysoev.ru } 80562Sigor@sysoev.ru 80662Sigor@sysoev.ru p->retries--; 80762Sigor@sysoev.ru 80862Sigor@sysoev.ru nxt_socket_close(task, peer->socket.fd); 80962Sigor@sysoev.ru peer->socket.fd = -1; 81062Sigor@sysoev.ru peer->socket.error = 0; 81162Sigor@sysoev.ru 81262Sigor@sysoev.ru p->delayed = 1; 81362Sigor@sysoev.ru 81462Sigor@sysoev.ru peer->write_timer.handler = nxt_conn_proxy_reconnect_handler; 81562Sigor@sysoev.ru nxt_timer_add(task->thread->engine, &peer->write_timer, 81662Sigor@sysoev.ru p->reconnect_timeout); 81762Sigor@sysoev.ru } 81862Sigor@sysoev.ru 81962Sigor@sysoev.ru 82062Sigor@sysoev.ru static void 82162Sigor@sysoev.ru nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) 82262Sigor@sysoev.ru { 82362Sigor@sysoev.ru nxt_conn_t *peer; 82462Sigor@sysoev.ru nxt_timer_t *timer; 82562Sigor@sysoev.ru nxt_conn_proxy_t *p; 82662Sigor@sysoev.ru 82762Sigor@sysoev.ru timer = obj; 82862Sigor@sysoev.ru 82962Sigor@sysoev.ru nxt_debug(task, "conn proxy reconnect timer"); 83062Sigor@sysoev.ru 83162Sigor@sysoev.ru peer = nxt_write_timer_conn(timer); 83262Sigor@sysoev.ru p = peer->socket.data; 83362Sigor@sysoev.ru 83462Sigor@sysoev.ru if (p->client->socket.closed) { 83562Sigor@sysoev.ru nxt_conn_proxy_complete(task, p); 83662Sigor@sysoev.ru return; 83762Sigor@sysoev.ru } 83862Sigor@sysoev.ru 83962Sigor@sysoev.ru p->delayed = 0; 84062Sigor@sysoev.ru 84162Sigor@sysoev.ru peer->write_state = &nxt_conn_proxy_peer_connect_state; 84262Sigor@sysoev.ru /* 84362Sigor@sysoev.ru * Peer read event: disabled. 84462Sigor@sysoev.ru * Peer write event: waiting for connection with connect_timeout. 84562Sigor@sysoev.ru */ 84662Sigor@sysoev.ru nxt_conn_connect(task->thread->engine, peer); 84762Sigor@sysoev.ru } 84862Sigor@sysoev.ru 84962Sigor@sysoev.ru 85062Sigor@sysoev.ru static void 85162Sigor@sysoev.ru nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, 85262Sigor@sysoev.ru nxt_conn_t *source, nxt_conn_t *sink) 85362Sigor@sysoev.ru { 85462Sigor@sysoev.ru nxt_buf_t *b; 85562Sigor@sysoev.ru 85662Sigor@sysoev.ru nxt_debug(source->socket.task, 85762Sigor@sysoev.ru "conn proxy shutdown source fd:%d cl:%d err:%d", 85862Sigor@sysoev.ru source->socket.fd, source->socket.closed, source->socket.error); 85962Sigor@sysoev.ru 86062Sigor@sysoev.ru nxt_debug(sink->socket.task, 86162Sigor@sysoev.ru "conn proxy shutdown sink fd:%d cl:%d err:%d", 86262Sigor@sysoev.ru sink->socket.fd, sink->socket.closed, sink->socket.error); 86362Sigor@sysoev.ru 86462Sigor@sysoev.ru if (!p->connected || p->delayed) { 86562Sigor@sysoev.ru nxt_conn_proxy_complete(task, p); 86662Sigor@sysoev.ru return; 86762Sigor@sysoev.ru } 86862Sigor@sysoev.ru 86962Sigor@sysoev.ru if (sink->socket.error == 0 && !sink->socket.closed) { 87062Sigor@sysoev.ru sink->socket.shutdown = 1; 87162Sigor@sysoev.ru nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR); 87262Sigor@sysoev.ru } 87362Sigor@sysoev.ru 87462Sigor@sysoev.ru if (sink->socket.error != 0 87562Sigor@sysoev.ru || (sink->socket.closed && source->write == NULL)) 87662Sigor@sysoev.ru { 87762Sigor@sysoev.ru /* The opposite direction also has been already closed. */ 87862Sigor@sysoev.ru nxt_conn_proxy_complete(task, p); 87962Sigor@sysoev.ru return; 88062Sigor@sysoev.ru } 88162Sigor@sysoev.ru 88262Sigor@sysoev.ru nxt_debug(source->socket.task, "free source buffer"); 88362Sigor@sysoev.ru 88462Sigor@sysoev.ru /* Free the direction's buffer. */ 88562Sigor@sysoev.ru b = (source == p->client) ? p->client_buffer : p->peer_buffer; 88665Sigor@sysoev.ru nxt_mp_free(source->mem_pool, b); 88762Sigor@sysoev.ru } 88862Sigor@sysoev.ru 88962Sigor@sysoev.ru 89062Sigor@sysoev.ru static void 89162Sigor@sysoev.ru nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data) 89262Sigor@sysoev.ru { 89362Sigor@sysoev.ru nxt_conn_t *c; 89462Sigor@sysoev.ru nxt_conn_proxy_t *p; 89562Sigor@sysoev.ru 89662Sigor@sysoev.ru c = obj; 89762Sigor@sysoev.ru p = data; 89862Sigor@sysoev.ru 89962Sigor@sysoev.ru nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd); 90062Sigor@sysoev.ru 90162Sigor@sysoev.ru nxt_conn_proxy_close(task, c, p); 90262Sigor@sysoev.ru } 90362Sigor@sysoev.ru 90462Sigor@sysoev.ru 90562Sigor@sysoev.ru static void 90662Sigor@sysoev.ru nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) 90762Sigor@sysoev.ru { 90862Sigor@sysoev.ru nxt_conn_t *source, *sink; 90962Sigor@sysoev.ru nxt_conn_proxy_t *p; 91062Sigor@sysoev.ru 91162Sigor@sysoev.ru sink = obj; 91262Sigor@sysoev.ru p = data; 91362Sigor@sysoev.ru 91462Sigor@sysoev.ru nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd); 91562Sigor@sysoev.ru 91662Sigor@sysoev.ru /* Clear data for the direction sink. */ 91762Sigor@sysoev.ru sink->write = NULL; 91862Sigor@sysoev.ru 91962Sigor@sysoev.ru /* Block the direction source. */ 92062Sigor@sysoev.ru source = (sink == p->client) ? p->peer : p->client; 92162Sigor@sysoev.ru nxt_fd_event_block_read(task->thread->engine, &source->socket); 92262Sigor@sysoev.ru 92362Sigor@sysoev.ru if (source->write == NULL) { 92462Sigor@sysoev.ru /* 92562Sigor@sysoev.ru * There is no data for the opposite direction and 92662Sigor@sysoev.ru * the next read from the sink will most probably fail. 92762Sigor@sysoev.ru */ 92862Sigor@sysoev.ru nxt_conn_proxy_complete(task, p); 92962Sigor@sysoev.ru } 93062Sigor@sysoev.ru } 93162Sigor@sysoev.ru 93262Sigor@sysoev.ru 93362Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_close_state 93462Sigor@sysoev.ru nxt_aligned(64) = 93562Sigor@sysoev.ru { 93662Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_completion, 93762Sigor@sysoev.ru }; 93862Sigor@sysoev.ru 93962Sigor@sysoev.ru 94062Sigor@sysoev.ru static void 94162Sigor@sysoev.ru nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p) 94262Sigor@sysoev.ru { 94362Sigor@sysoev.ru nxt_event_engine_t *engine; 94462Sigor@sysoev.ru 94562Sigor@sysoev.ru engine = task->thread->engine; 94662Sigor@sysoev.ru 94762Sigor@sysoev.ru nxt_debug(p->client->socket.task, "conn proxy complete %d:%d", 94862Sigor@sysoev.ru p->client->socket.fd, p->peer->socket.fd); 94962Sigor@sysoev.ru 95062Sigor@sysoev.ru if (p->delayed) { 95162Sigor@sysoev.ru p->delayed = 0; 95262Sigor@sysoev.ru nxt_queue_remove(&p->peer->link); 95362Sigor@sysoev.ru } 95462Sigor@sysoev.ru 95562Sigor@sysoev.ru if (p->client->socket.fd != -1) { 95662Sigor@sysoev.ru p->retain = 1; 95762Sigor@sysoev.ru p->client->write_state = &nxt_conn_proxy_close_state; 95862Sigor@sysoev.ru nxt_conn_close(engine, p->client); 95962Sigor@sysoev.ru } 96062Sigor@sysoev.ru 96162Sigor@sysoev.ru if (p->peer->socket.fd != -1) { 96262Sigor@sysoev.ru p->retain++; 96362Sigor@sysoev.ru p->peer->write_state = &nxt_conn_proxy_close_state; 96462Sigor@sysoev.ru nxt_conn_close(engine, p->peer); 96562Sigor@sysoev.ru } 96662Sigor@sysoev.ru } 96762Sigor@sysoev.ru 96862Sigor@sysoev.ru 96962Sigor@sysoev.ru static void 97062Sigor@sysoev.ru nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data) 97162Sigor@sysoev.ru { 97262Sigor@sysoev.ru nxt_conn_proxy_t *p; 97362Sigor@sysoev.ru 97462Sigor@sysoev.ru p = data; 97562Sigor@sysoev.ru 97662Sigor@sysoev.ru nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d", 97762Sigor@sysoev.ru p->retain, p->client->socket.fd, p->peer->socket.fd); 97862Sigor@sysoev.ru 97962Sigor@sysoev.ru p->retain--; 98062Sigor@sysoev.ru 98162Sigor@sysoev.ru if (p->retain == 0) { 98265Sigor@sysoev.ru nxt_mp_free(p->client->mem_pool, p->client_buffer); 98365Sigor@sysoev.ru nxt_mp_free(p->client->mem_pool, p->peer_buffer); 98462Sigor@sysoev.ru 98562Sigor@sysoev.ru p->completion_handler(task, p, NULL); 98662Sigor@sysoev.ru } 98762Sigor@sysoev.ru } 988