162Sigor@sysoev.ru
262Sigor@sysoev.ru /*
362Sigor@sysoev.ru * Copyright (C) Igor Sysoev
462Sigor@sysoev.ru * Copyright (C) NGINX, Inc.
562Sigor@sysoev.ru */
662Sigor@sysoev.ru
762Sigor@sysoev.ru #include <nxt_main.h>
862Sigor@sysoev.ru
962Sigor@sysoev.ru
1062Sigor@sysoev.ru static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
1162Sigor@sysoev.ru void *data);
1262Sigor@sysoev.ru static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj,
1362Sigor@sysoev.ru void *data);
1462Sigor@sysoev.ru static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data);
1562Sigor@sysoev.ru static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data);
1662Sigor@sysoev.ru static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj,
1762Sigor@sysoev.ru void *data);
1862Sigor@sysoev.ru static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj,
1962Sigor@sysoev.ru void *data);
2062Sigor@sysoev.ru static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
2162Sigor@sysoev.ru nxt_conn_t *source, nxt_conn_t *sink);
2262Sigor@sysoev.ru static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b);
2362Sigor@sysoev.ru static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data);
2462Sigor@sysoev.ru static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj,
2562Sigor@sysoev.ru void *data);
2662Sigor@sysoev.ru static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj,
2762Sigor@sysoev.ru void *data);
2862Sigor@sysoev.ru static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
2962Sigor@sysoev.ru nxt_conn_t *sink, nxt_conn_t *source);
3062Sigor@sysoev.ru static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b);
3162Sigor@sysoev.ru static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data);
3262Sigor@sysoev.ru static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data);
3362Sigor@sysoev.ru static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj,
3462Sigor@sysoev.ru void *data);
3562Sigor@sysoev.ru static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj,
3662Sigor@sysoev.ru void *data);
3762Sigor@sysoev.ru static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data);
3862Sigor@sysoev.ru static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data);
3962Sigor@sysoev.ru static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj,
4062Sigor@sysoev.ru void *data);
4162Sigor@sysoev.ru static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
4262Sigor@sysoev.ru nxt_conn_t *source, nxt_conn_t *sink);
4362Sigor@sysoev.ru static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data);
4462Sigor@sysoev.ru static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data);
4562Sigor@sysoev.ru static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p);
4662Sigor@sysoev.ru static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data);
4762Sigor@sysoev.ru
4862Sigor@sysoev.ru
4962Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_wait_state;
5062Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state;
5162Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state;
5262Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state;
5362Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_read_state;
5462Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_read_state;
5562Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_write_state;
5662Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_write_state;
5762Sigor@sysoev.ru
5862Sigor@sysoev.ru
5962Sigor@sysoev.ru nxt_conn_proxy_t *
nxt_conn_proxy_create(nxt_conn_t * client)6062Sigor@sysoev.ru nxt_conn_proxy_create(nxt_conn_t *client)
6162Sigor@sysoev.ru {
6262Sigor@sysoev.ru nxt_conn_t *peer;
6362Sigor@sysoev.ru nxt_thread_t *thr;
6462Sigor@sysoev.ru nxt_conn_proxy_t *p;
6562Sigor@sysoev.ru
6665Sigor@sysoev.ru p = nxt_mp_zget(client->mem_pool, sizeof(nxt_conn_proxy_t));
6762Sigor@sysoev.ru if (nxt_slow_path(p == NULL)) {
6862Sigor@sysoev.ru return NULL;
6962Sigor@sysoev.ru }
7062Sigor@sysoev.ru
7162Sigor@sysoev.ru peer = nxt_conn_create(client->mem_pool, client->socket.task);
7262Sigor@sysoev.ru if (nxt_slow_path(peer == NULL)) {
7362Sigor@sysoev.ru return NULL;
7462Sigor@sysoev.ru }
7562Sigor@sysoev.ru
7662Sigor@sysoev.ru thr = nxt_thread();
7762Sigor@sysoev.ru
7862Sigor@sysoev.ru client->read_work_queue = &thr->engine->read_work_queue;
7962Sigor@sysoev.ru client->write_work_queue = &thr->engine->write_work_queue;
8062Sigor@sysoev.ru client->socket.read_work_queue = &thr->engine->read_work_queue;
8162Sigor@sysoev.ru client->socket.write_work_queue = &thr->engine->write_work_queue;
8262Sigor@sysoev.ru peer->socket.read_work_queue = &thr->engine->read_work_queue;
8362Sigor@sysoev.ru peer->socket.write_work_queue = &thr->engine->write_work_queue;
8462Sigor@sysoev.ru
8562Sigor@sysoev.ru peer->socket.data = client->socket.data;
8662Sigor@sysoev.ru
8762Sigor@sysoev.ru peer->read_work_queue = client->read_work_queue;
8862Sigor@sysoev.ru peer->write_work_queue = client->write_work_queue;
8962Sigor@sysoev.ru peer->read_timer.work_queue = client->read_work_queue;
9062Sigor@sysoev.ru peer->write_timer.work_queue = client->write_work_queue;
9162Sigor@sysoev.ru
9262Sigor@sysoev.ru p->client = client;
9362Sigor@sysoev.ru p->peer = peer;
9462Sigor@sysoev.ru
9562Sigor@sysoev.ru return p;
9662Sigor@sysoev.ru }
9762Sigor@sysoev.ru
9862Sigor@sysoev.ru
9962Sigor@sysoev.ru void
nxt_conn_proxy(nxt_task_t * task,nxt_conn_proxy_t * p)10062Sigor@sysoev.ru nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p)
10162Sigor@sysoev.ru {
10262Sigor@sysoev.ru nxt_conn_t *peer;
10362Sigor@sysoev.ru
10462Sigor@sysoev.ru /*
10562Sigor@sysoev.ru * Peer read event: not connected, disabled.
10662Sigor@sysoev.ru * Peer write event: not connected, disabled.
10762Sigor@sysoev.ru */
10862Sigor@sysoev.ru
10962Sigor@sysoev.ru if (p->client_wait_timeout == 0) {
11062Sigor@sysoev.ru /*
11162Sigor@sysoev.ru * Peer write event: waiting for connection
11262Sigor@sysoev.ru * to be established with connect_timeout.
11362Sigor@sysoev.ru */
11462Sigor@sysoev.ru peer = p->peer;
11562Sigor@sysoev.ru peer->write_state = &nxt_conn_proxy_peer_connect_state;
11662Sigor@sysoev.ru
11762Sigor@sysoev.ru nxt_conn_connect(task->thread->engine, peer);
11862Sigor@sysoev.ru }
11962Sigor@sysoev.ru
12062Sigor@sysoev.ru /*
12162Sigor@sysoev.ru * Client read event: waiting for client data with
12262Sigor@sysoev.ru * client_wait_timeout before buffer allocation.
12362Sigor@sysoev.ru */
12462Sigor@sysoev.ru p->client->read_state = &nxt_conn_proxy_client_wait_state;
12562Sigor@sysoev.ru
12662Sigor@sysoev.ru nxt_conn_wait(p->client);
12762Sigor@sysoev.ru }
12862Sigor@sysoev.ru
12962Sigor@sysoev.ru
13062Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_wait_state
13162Sigor@sysoev.ru nxt_aligned(64) =
13262Sigor@sysoev.ru {
13362Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_client_buffer_alloc,
13462Sigor@sysoev.ru .close_handler = nxt_conn_proxy_close,
13562Sigor@sysoev.ru .error_handler = nxt_conn_proxy_error,
13662Sigor@sysoev.ru
13762Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_read_timeout,
13862Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value,
13962Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
14062Sigor@sysoev.ru };
14162Sigor@sysoev.ru
14262Sigor@sysoev.ru
14362Sigor@sysoev.ru static void
nxt_conn_proxy_client_buffer_alloc(nxt_task_t * task,void * obj,void * data)14462Sigor@sysoev.ru nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data)
14562Sigor@sysoev.ru {
14662Sigor@sysoev.ru nxt_buf_t *b;
14762Sigor@sysoev.ru nxt_conn_t *client;
14862Sigor@sysoev.ru nxt_conn_proxy_t *p;
14962Sigor@sysoev.ru
15062Sigor@sysoev.ru client = obj;
15162Sigor@sysoev.ru p = data;
15262Sigor@sysoev.ru
15362Sigor@sysoev.ru nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd);
15462Sigor@sysoev.ru
15565Sigor@sysoev.ru b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, 0);
15662Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) {
15762Sigor@sysoev.ru /* An error completion. */
15862Sigor@sysoev.ru nxt_conn_proxy_complete(task, p);
15962Sigor@sysoev.ru return;
16062Sigor@sysoev.ru }
16162Sigor@sysoev.ru
16262Sigor@sysoev.ru p->client_buffer = b;
16362Sigor@sysoev.ru client->read = b;
16462Sigor@sysoev.ru
16562Sigor@sysoev.ru if (p->peer->socket.fd != -1) {
16662Sigor@sysoev.ru /*
16762Sigor@sysoev.ru * Client read event: waiting, no timeout.
16862Sigor@sysoev.ru * Client write event: blocked.
16962Sigor@sysoev.ru * Peer read event: disabled.
17062Sigor@sysoev.ru * Peer write event: waiting for connection to be established
17162Sigor@sysoev.ru * or blocked after the connection has established.
17262Sigor@sysoev.ru */
17362Sigor@sysoev.ru client->read_state = &nxt_conn_proxy_client_read_state;
17462Sigor@sysoev.ru
17562Sigor@sysoev.ru } else {
17662Sigor@sysoev.ru /*
17762Sigor@sysoev.ru * Client read event: waiting for data with client_wait_timeout
17862Sigor@sysoev.ru * before connecting to a peer.
17962Sigor@sysoev.ru * Client write event: blocked.
18062Sigor@sysoev.ru * Peer read event: not connected, disabled.
18162Sigor@sysoev.ru * Peer write event: not connected, disabled.
18262Sigor@sysoev.ru */
18362Sigor@sysoev.ru client->read_state = &nxt_conn_proxy_client_first_read_state;
18462Sigor@sysoev.ru }
18562Sigor@sysoev.ru
18662Sigor@sysoev.ru nxt_conn_read(task->thread->engine, client);
18762Sigor@sysoev.ru }
18862Sigor@sysoev.ru
18962Sigor@sysoev.ru
19062Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state
19162Sigor@sysoev.ru nxt_aligned(64) =
19262Sigor@sysoev.ru {
19362Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_peer_connect,
19462Sigor@sysoev.ru .close_handler = nxt_conn_proxy_close,
19562Sigor@sysoev.ru .error_handler = nxt_conn_proxy_error,
19662Sigor@sysoev.ru
19762Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_read_timeout,
19862Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value,
19962Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
20062Sigor@sysoev.ru .timer_autoreset = 1,
20162Sigor@sysoev.ru };
20262Sigor@sysoev.ru
20362Sigor@sysoev.ru
20462Sigor@sysoev.ru static void
nxt_conn_proxy_peer_connect(nxt_task_t * task,void * obj,void * data)20562Sigor@sysoev.ru nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
20662Sigor@sysoev.ru {
20762Sigor@sysoev.ru nxt_conn_t *client;
20862Sigor@sysoev.ru nxt_conn_proxy_t *p;
20962Sigor@sysoev.ru
21062Sigor@sysoev.ru client = obj;
21162Sigor@sysoev.ru p = data;
21262Sigor@sysoev.ru
21362Sigor@sysoev.ru /*
21462Sigor@sysoev.ru * Client read event: waiting, no timeout.
21562Sigor@sysoev.ru * Client write event: blocked.
21662Sigor@sysoev.ru * Peer read event: disabled.
21762Sigor@sysoev.ru * Peer write event: waiting for connection to be established
21862Sigor@sysoev.ru * with connect_timeout.
21962Sigor@sysoev.ru */
22062Sigor@sysoev.ru client->read_state = &nxt_conn_proxy_client_read_state;
22162Sigor@sysoev.ru
22262Sigor@sysoev.ru p->peer->write_state = &nxt_conn_proxy_peer_connect_state;
22362Sigor@sysoev.ru
22462Sigor@sysoev.ru nxt_conn_connect(task->thread->engine, p->peer);
22562Sigor@sysoev.ru }
22662Sigor@sysoev.ru
22762Sigor@sysoev.ru
22862Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state
22962Sigor@sysoev.ru nxt_aligned(64) =
23062Sigor@sysoev.ru {
23162Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_connected,
23262Sigor@sysoev.ru .close_handler = nxt_conn_proxy_refused,
23362Sigor@sysoev.ru .error_handler = nxt_conn_proxy_error,
23462Sigor@sysoev.ru
23562Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_write_timeout,
23662Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value,
23762Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout),
23862Sigor@sysoev.ru .timer_autoreset = 1,
23962Sigor@sysoev.ru };
24062Sigor@sysoev.ru
24162Sigor@sysoev.ru
24262Sigor@sysoev.ru static void
nxt_conn_proxy_connected(nxt_task_t * task,void * obj,void * data)24362Sigor@sysoev.ru nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
24462Sigor@sysoev.ru {
24562Sigor@sysoev.ru nxt_conn_t *client, *peer;
24662Sigor@sysoev.ru nxt_conn_proxy_t *p;
24762Sigor@sysoev.ru
24862Sigor@sysoev.ru peer = obj;
24962Sigor@sysoev.ru p = data;
25062Sigor@sysoev.ru
25162Sigor@sysoev.ru nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd);
25262Sigor@sysoev.ru
25362Sigor@sysoev.ru p->connected = 1;
25462Sigor@sysoev.ru
25562Sigor@sysoev.ru nxt_conn_tcp_nodelay_on(task, peer);
25662Sigor@sysoev.ru nxt_conn_tcp_nodelay_on(task, p->client);
25762Sigor@sysoev.ru
25862Sigor@sysoev.ru /* Peer read event: waiting with peer_wait_timeout. */
25962Sigor@sysoev.ru
26062Sigor@sysoev.ru peer->read_state = &nxt_conn_proxy_peer_wait_state;
26162Sigor@sysoev.ru peer->write_state = &nxt_conn_proxy_peer_write_state;
26262Sigor@sysoev.ru
26362Sigor@sysoev.ru nxt_conn_wait(peer);
26462Sigor@sysoev.ru
26562Sigor@sysoev.ru if (p->client_buffer != NULL) {
26662Sigor@sysoev.ru client = p->client;
26762Sigor@sysoev.ru
26862Sigor@sysoev.ru client->read_state = &nxt_conn_proxy_client_read_state;
26962Sigor@sysoev.ru client->write_state = &nxt_conn_proxy_client_write_state;
27062Sigor@sysoev.ru /*
27162Sigor@sysoev.ru * Send a client read data to the connected peer.
27262Sigor@sysoev.ru * Client write event: blocked.
27362Sigor@sysoev.ru */
27462Sigor@sysoev.ru nxt_conn_proxy_read_process(task, p, client, peer);
27562Sigor@sysoev.ru }
27662Sigor@sysoev.ru }
27762Sigor@sysoev.ru
27862Sigor@sysoev.ru
27962Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state
28062Sigor@sysoev.ru nxt_aligned(64) =
28162Sigor@sysoev.ru {
28262Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_peer_read,
28362Sigor@sysoev.ru .close_handler = nxt_conn_proxy_close,
28462Sigor@sysoev.ru .error_handler = nxt_conn_proxy_error,
28562Sigor@sysoev.ru
28662Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_read_timeout,
28762Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value,
28862Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout),
28962Sigor@sysoev.ru };
29062Sigor@sysoev.ru
29162Sigor@sysoev.ru
29262Sigor@sysoev.ru static void
nxt_conn_proxy_peer_read(nxt_task_t * task,void * obj,void * data)29362Sigor@sysoev.ru nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
29462Sigor@sysoev.ru {
29562Sigor@sysoev.ru nxt_buf_t *b;
29662Sigor@sysoev.ru nxt_conn_t *peer;
29762Sigor@sysoev.ru nxt_conn_proxy_t *p;
29862Sigor@sysoev.ru
29962Sigor@sysoev.ru peer = obj;
30062Sigor@sysoev.ru p = data;
30162Sigor@sysoev.ru
30262Sigor@sysoev.ru nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd);
30362Sigor@sysoev.ru
30465Sigor@sysoev.ru b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, 0);
30562Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) {
30662Sigor@sysoev.ru /* An error completion. */
30762Sigor@sysoev.ru nxt_conn_proxy_complete(task, p);
30862Sigor@sysoev.ru return;
30962Sigor@sysoev.ru }
31062Sigor@sysoev.ru
31162Sigor@sysoev.ru p->peer_buffer = b;
31262Sigor@sysoev.ru peer->read = b;
31362Sigor@sysoev.ru
31462Sigor@sysoev.ru p->client->write_state = &nxt_conn_proxy_client_write_state;
31562Sigor@sysoev.ru peer->read_state = &nxt_conn_proxy_peer_read_state;
31662Sigor@sysoev.ru peer->write_state = &nxt_conn_proxy_peer_write_state;
31762Sigor@sysoev.ru
31862Sigor@sysoev.ru /*
31962Sigor@sysoev.ru * Client read event: waiting, no timeout.
32062Sigor@sysoev.ru * Client write event: blocked.
32162Sigor@sysoev.ru * Peer read event: waiting with possible peer_wait_timeout.
32262Sigor@sysoev.ru * Peer write event: blocked.
32362Sigor@sysoev.ru */
32462Sigor@sysoev.ru nxt_conn_read(task->thread->engine, peer);
32562Sigor@sysoev.ru }
32662Sigor@sysoev.ru
32762Sigor@sysoev.ru
32862Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_read_state
32962Sigor@sysoev.ru nxt_aligned(64) =
33062Sigor@sysoev.ru {
33162Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_client_read_ready,
33262Sigor@sysoev.ru .close_handler = nxt_conn_proxy_close,
33362Sigor@sysoev.ru .error_handler = nxt_conn_proxy_read_error,
33462Sigor@sysoev.ru };
33562Sigor@sysoev.ru
33662Sigor@sysoev.ru
33762Sigor@sysoev.ru static void
nxt_conn_proxy_client_read_ready(nxt_task_t * task,void * obj,void * data)33862Sigor@sysoev.ru nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data)
33962Sigor@sysoev.ru {
34062Sigor@sysoev.ru nxt_conn_t *client;
34162Sigor@sysoev.ru nxt_conn_proxy_t *p;
34262Sigor@sysoev.ru
34362Sigor@sysoev.ru client = obj;
34462Sigor@sysoev.ru p = data;
34562Sigor@sysoev.ru
34662Sigor@sysoev.ru nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd);
34762Sigor@sysoev.ru
34862Sigor@sysoev.ru nxt_conn_proxy_read_process(task, p, client, p->peer);
34962Sigor@sysoev.ru }
35062Sigor@sysoev.ru
35162Sigor@sysoev.ru
35262Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_read_state
35362Sigor@sysoev.ru nxt_aligned(64) =
35462Sigor@sysoev.ru {
35562Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_peer_read_ready,
35662Sigor@sysoev.ru .close_handler = nxt_conn_proxy_close,
35762Sigor@sysoev.ru .error_handler = nxt_conn_proxy_read_error,
35862Sigor@sysoev.ru };
35962Sigor@sysoev.ru
36062Sigor@sysoev.ru
36162Sigor@sysoev.ru static void
nxt_conn_proxy_peer_read_ready(nxt_task_t * task,void * obj,void * data)36262Sigor@sysoev.ru nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data)
36362Sigor@sysoev.ru {
36462Sigor@sysoev.ru nxt_conn_t *peer;
36562Sigor@sysoev.ru nxt_conn_proxy_t *p;
36662Sigor@sysoev.ru
36762Sigor@sysoev.ru peer = obj;
36862Sigor@sysoev.ru p = data;
36962Sigor@sysoev.ru
37062Sigor@sysoev.ru nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd);
37162Sigor@sysoev.ru
37262Sigor@sysoev.ru nxt_conn_proxy_read_process(task, p, peer, p->client);
37362Sigor@sysoev.ru }
37462Sigor@sysoev.ru
37562Sigor@sysoev.ru
37662Sigor@sysoev.ru static void
nxt_conn_proxy_read_process(nxt_task_t * task,nxt_conn_proxy_t * p,nxt_conn_t * source,nxt_conn_t * sink)37762Sigor@sysoev.ru nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
37862Sigor@sysoev.ru nxt_conn_t *source, nxt_conn_t *sink)
37962Sigor@sysoev.ru {
38062Sigor@sysoev.ru nxt_buf_t *rb, *wb;
38162Sigor@sysoev.ru
38262Sigor@sysoev.ru if (sink->socket.error != 0) {
38362Sigor@sysoev.ru nxt_debug(task, "conn proxy sink fd:%d error:%d",
38462Sigor@sysoev.ru sink->socket.fd, sink->socket.error);
38562Sigor@sysoev.ru
38662Sigor@sysoev.ru nxt_conn_proxy_write_error(task, sink, sink->socket.data);
38762Sigor@sysoev.ru return;
38862Sigor@sysoev.ru }
38962Sigor@sysoev.ru
39062Sigor@sysoev.ru while (source->read != NULL) {
39162Sigor@sysoev.ru
39262Sigor@sysoev.ru rb = source->read;
39362Sigor@sysoev.ru
39462Sigor@sysoev.ru if (rb->mem.pos != rb->mem.free) {
39562Sigor@sysoev.ru
39662Sigor@sysoev.ru /* Add a read part to a write chain. */
39762Sigor@sysoev.ru
39862Sigor@sysoev.ru wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0);
39962Sigor@sysoev.ru if (wb == NULL) {
40062Sigor@sysoev.ru /* An error completion. */
40162Sigor@sysoev.ru nxt_conn_proxy_complete(task, p);
40262Sigor@sysoev.ru return;
40362Sigor@sysoev.ru }
40462Sigor@sysoev.ru
40562Sigor@sysoev.ru wb->mem.pos = rb->mem.pos;
40662Sigor@sysoev.ru wb->mem.free = rb->mem.free;
40762Sigor@sysoev.ru wb->mem.start = rb->mem.pos;
40862Sigor@sysoev.ru wb->mem.end = rb->mem.free;
40962Sigor@sysoev.ru
41062Sigor@sysoev.ru rb->mem.pos = rb->mem.free;
41162Sigor@sysoev.ru rb->mem.start = rb->mem.free;
41262Sigor@sysoev.ru
41362Sigor@sysoev.ru nxt_conn_proxy_write_add(sink, wb);
41462Sigor@sysoev.ru }
41562Sigor@sysoev.ru
41662Sigor@sysoev.ru if (rb->mem.start != rb->mem.end) {
41762Sigor@sysoev.ru nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
41862Sigor@sysoev.ru task, source, source->socket.data);
41962Sigor@sysoev.ru break;
42062Sigor@sysoev.ru }
42162Sigor@sysoev.ru
42262Sigor@sysoev.ru source->read = rb->next;
42362Sigor@sysoev.ru nxt_buf_free(source->mem_pool, rb);
42462Sigor@sysoev.ru }
42562Sigor@sysoev.ru
42662Sigor@sysoev.ru if (p->connected) {
42762Sigor@sysoev.ru nxt_conn_write(task->thread->engine, sink);
42862Sigor@sysoev.ru }
42962Sigor@sysoev.ru }
43062Sigor@sysoev.ru
43162Sigor@sysoev.ru
43262Sigor@sysoev.ru static void
nxt_conn_proxy_write_add(nxt_conn_t * c,nxt_buf_t * b)43362Sigor@sysoev.ru nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b)
43462Sigor@sysoev.ru {
43562Sigor@sysoev.ru nxt_buf_t *first, *second, *prev;
43662Sigor@sysoev.ru
43762Sigor@sysoev.ru first = c->write;
43862Sigor@sysoev.ru
43962Sigor@sysoev.ru if (first == NULL) {
44062Sigor@sysoev.ru c->write = b;
44162Sigor@sysoev.ru return;
44262Sigor@sysoev.ru }
44362Sigor@sysoev.ru
44462Sigor@sysoev.ru /*
44562Sigor@sysoev.ru * A event conn proxy maintains a buffer per each direction.
44662Sigor@sysoev.ru * The buffer is divided by read and write parts. These parts are
44762Sigor@sysoev.ru * linked in buffer chains. There can be no more than two buffers
44862Sigor@sysoev.ru * in write chain at any time, because an added buffer is coalesced
44962Sigor@sysoev.ru * with the last buffer if possible.
45062Sigor@sysoev.ru */
45162Sigor@sysoev.ru
45262Sigor@sysoev.ru second = first->next;
45362Sigor@sysoev.ru
45462Sigor@sysoev.ru if (second == NULL) {
45562Sigor@sysoev.ru
45662Sigor@sysoev.ru if (first->mem.end != b->mem.start) {
45762Sigor@sysoev.ru first->next = b;
45862Sigor@sysoev.ru return;
45962Sigor@sysoev.ru }
46062Sigor@sysoev.ru
46162Sigor@sysoev.ru /*
46262Sigor@sysoev.ru * The first buffer is just before the added buffer, so
46362Sigor@sysoev.ru * expand the first buffer to the end of the added buffer.
46462Sigor@sysoev.ru */
46562Sigor@sysoev.ru prev = first;
46662Sigor@sysoev.ru
46762Sigor@sysoev.ru } else {
46862Sigor@sysoev.ru if (second->mem.end != b->mem.start) {
46962Sigor@sysoev.ru nxt_thread_log_alert("event conn proxy write: second buffer end:%p "
47062Sigor@sysoev.ru "is not equal to added buffer start:%p",
47162Sigor@sysoev.ru second->mem.end, b->mem.start);
47262Sigor@sysoev.ru return;
47362Sigor@sysoev.ru }
47462Sigor@sysoev.ru
47562Sigor@sysoev.ru /*
47662Sigor@sysoev.ru * "second->mem.end == b->mem.start" must be always true here,
47762Sigor@sysoev.ru * that is the second buffer is just before the added buffer,
47862Sigor@sysoev.ru * so expand the second buffer to the end of added buffer.
47962Sigor@sysoev.ru */
48062Sigor@sysoev.ru prev = second;
48162Sigor@sysoev.ru }
48262Sigor@sysoev.ru
48362Sigor@sysoev.ru prev->mem.free = b->mem.end;
48462Sigor@sysoev.ru prev->mem.end = b->mem.end;
48562Sigor@sysoev.ru
48662Sigor@sysoev.ru nxt_buf_free(c->mem_pool, b);
48762Sigor@sysoev.ru }
48862Sigor@sysoev.ru
48962Sigor@sysoev.ru
49062Sigor@sysoev.ru static void
nxt_conn_proxy_read(nxt_task_t * task,void * obj,void * data)49162Sigor@sysoev.ru nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
49262Sigor@sysoev.ru {
49362Sigor@sysoev.ru nxt_conn_t *source, *sink;
49462Sigor@sysoev.ru nxt_conn_proxy_t *p;
49562Sigor@sysoev.ru
49662Sigor@sysoev.ru source = obj;
49762Sigor@sysoev.ru p = data;
49862Sigor@sysoev.ru
49962Sigor@sysoev.ru nxt_debug(task, "conn proxy read fd:%d", source->socket.fd);
50062Sigor@sysoev.ru
50162Sigor@sysoev.ru if (!source->socket.closed) {
50262Sigor@sysoev.ru sink = (source == p->client) ? p->peer : p->client;
50362Sigor@sysoev.ru
50462Sigor@sysoev.ru if (sink->socket.error == 0) {
50562Sigor@sysoev.ru nxt_conn_read(task->thread->engine, source);
50662Sigor@sysoev.ru }
50762Sigor@sysoev.ru }
50862Sigor@sysoev.ru }
50962Sigor@sysoev.ru
51062Sigor@sysoev.ru
51162Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_client_write_state
51262Sigor@sysoev.ru nxt_aligned(64) =
51362Sigor@sysoev.ru {
51462Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_client_write_ready,
51562Sigor@sysoev.ru .error_handler = nxt_conn_proxy_write_error,
51662Sigor@sysoev.ru
51762Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_write_timeout,
51862Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value,
51962Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout),
52062Sigor@sysoev.ru .timer_autoreset = 1,
52162Sigor@sysoev.ru };
52262Sigor@sysoev.ru
52362Sigor@sysoev.ru
52462Sigor@sysoev.ru static void
nxt_conn_proxy_client_write_ready(nxt_task_t * task,void * obj,void * data)52562Sigor@sysoev.ru nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data)
52662Sigor@sysoev.ru {
52762Sigor@sysoev.ru nxt_conn_t *client;
52862Sigor@sysoev.ru nxt_conn_proxy_t *p;
52962Sigor@sysoev.ru
53062Sigor@sysoev.ru client = obj;
53162Sigor@sysoev.ru p = data;
53262Sigor@sysoev.ru
53362Sigor@sysoev.ru nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd);
53462Sigor@sysoev.ru
53562Sigor@sysoev.ru nxt_conn_proxy_write_process(task, p, client, p->peer);
53662Sigor@sysoev.ru }
53762Sigor@sysoev.ru
53862Sigor@sysoev.ru
53962Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_peer_write_state
54062Sigor@sysoev.ru nxt_aligned(64) =
54162Sigor@sysoev.ru {
54262Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_peer_write_ready,
54362Sigor@sysoev.ru .error_handler = nxt_conn_proxy_write_error,
54462Sigor@sysoev.ru
54562Sigor@sysoev.ru .timer_handler = nxt_conn_proxy_write_timeout,
54662Sigor@sysoev.ru .timer_value = nxt_conn_proxy_timeout_value,
54762Sigor@sysoev.ru .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout),
54862Sigor@sysoev.ru .timer_autoreset = 1,
54962Sigor@sysoev.ru };
55062Sigor@sysoev.ru
55162Sigor@sysoev.ru
55262Sigor@sysoev.ru static void
nxt_conn_proxy_peer_write_ready(nxt_task_t * task,void * obj,void * data)55362Sigor@sysoev.ru nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data)
55462Sigor@sysoev.ru {
55562Sigor@sysoev.ru nxt_conn_t *peer;
55662Sigor@sysoev.ru nxt_conn_proxy_t *p;
55762Sigor@sysoev.ru
55862Sigor@sysoev.ru peer = obj;
55962Sigor@sysoev.ru p = data;
56062Sigor@sysoev.ru
56162Sigor@sysoev.ru nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd);
56262Sigor@sysoev.ru
56362Sigor@sysoev.ru nxt_conn_proxy_write_process(task, p, peer, p->client);
56462Sigor@sysoev.ru }
56562Sigor@sysoev.ru
56662Sigor@sysoev.ru
56762Sigor@sysoev.ru static void
nxt_conn_proxy_write_process(nxt_task_t * task,nxt_conn_proxy_t * p,nxt_conn_t * sink,nxt_conn_t * source)56862Sigor@sysoev.ru nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
56962Sigor@sysoev.ru nxt_conn_t *sink, nxt_conn_t *source)
57062Sigor@sysoev.ru {
57162Sigor@sysoev.ru nxt_buf_t *rb, *wb;
57262Sigor@sysoev.ru
57362Sigor@sysoev.ru while (sink->write != NULL) {
57462Sigor@sysoev.ru
57562Sigor@sysoev.ru wb = sink->write;
57662Sigor@sysoev.ru
57762Sigor@sysoev.ru if (nxt_buf_is_sync(wb)) {
57862Sigor@sysoev.ru
57962Sigor@sysoev.ru /* A sync buffer marks the end of stream. */
58062Sigor@sysoev.ru
58162Sigor@sysoev.ru sink->write = NULL;
58262Sigor@sysoev.ru nxt_buf_free(sink->mem_pool, wb);
58362Sigor@sysoev.ru nxt_conn_proxy_shutdown(task, p, source, sink);
58462Sigor@sysoev.ru return;
58562Sigor@sysoev.ru }
58662Sigor@sysoev.ru
58762Sigor@sysoev.ru if (wb->mem.start != wb->mem.pos) {
58862Sigor@sysoev.ru
58962Sigor@sysoev.ru /* Add a written part to a read chain. */
59062Sigor@sysoev.ru
59162Sigor@sysoev.ru rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0);
59262Sigor@sysoev.ru if (rb == NULL) {
59362Sigor@sysoev.ru /* An error completion. */
59462Sigor@sysoev.ru nxt_conn_proxy_complete(task, p);
59562Sigor@sysoev.ru return;
59662Sigor@sysoev.ru }
59762Sigor@sysoev.ru
59862Sigor@sysoev.ru rb->mem.pos = wb->mem.start;
59962Sigor@sysoev.ru rb->mem.free = wb->mem.start;
60062Sigor@sysoev.ru rb->mem.start = wb->mem.start;
60162Sigor@sysoev.ru rb->mem.end = wb->mem.pos;
60262Sigor@sysoev.ru
60362Sigor@sysoev.ru wb->mem.start = wb->mem.pos;
60462Sigor@sysoev.ru
60562Sigor@sysoev.ru nxt_conn_proxy_read_add(source, rb);
60662Sigor@sysoev.ru }
60762Sigor@sysoev.ru
60862Sigor@sysoev.ru if (wb->mem.pos != wb->mem.free) {
60962Sigor@sysoev.ru nxt_conn_write(task->thread->engine, sink);
61062Sigor@sysoev.ru
61162Sigor@sysoev.ru break;
61262Sigor@sysoev.ru }
61362Sigor@sysoev.ru
61462Sigor@sysoev.ru sink->write = wb->next;
61562Sigor@sysoev.ru nxt_buf_free(sink->mem_pool, wb);
61662Sigor@sysoev.ru }
61762Sigor@sysoev.ru
61862Sigor@sysoev.ru nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
61962Sigor@sysoev.ru task, source, source->socket.data);
62062Sigor@sysoev.ru }
62162Sigor@sysoev.ru
62262Sigor@sysoev.ru
62362Sigor@sysoev.ru static void
nxt_conn_proxy_read_add(nxt_conn_t * c,nxt_buf_t * b)62462Sigor@sysoev.ru nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b)
62562Sigor@sysoev.ru {
62662Sigor@sysoev.ru nxt_buf_t *first, *second;
62762Sigor@sysoev.ru
62862Sigor@sysoev.ru first = c->read;
62962Sigor@sysoev.ru
63062Sigor@sysoev.ru if (first == NULL) {
63162Sigor@sysoev.ru c->read = b;
63262Sigor@sysoev.ru return;
63362Sigor@sysoev.ru }
63462Sigor@sysoev.ru
63562Sigor@sysoev.ru /*
63662Sigor@sysoev.ru * A event conn proxy maintains a buffer per each direction.
63762Sigor@sysoev.ru * The buffer is divided by read and write parts. These parts are
63862Sigor@sysoev.ru * linked in buffer chains. There can be no more than two buffers
63962Sigor@sysoev.ru * in read chain at any time, because an added buffer is coalesced
64062Sigor@sysoev.ru * with the last buffer if possible. The first and the second
64162Sigor@sysoev.ru * buffers are also coalesced if possible.
64262Sigor@sysoev.ru */
64362Sigor@sysoev.ru
64462Sigor@sysoev.ru second = first->next;
64562Sigor@sysoev.ru
64662Sigor@sysoev.ru if (second == NULL) {
64762Sigor@sysoev.ru
64862Sigor@sysoev.ru if (first->mem.start == b->mem.end) {
64962Sigor@sysoev.ru /*
65062Sigor@sysoev.ru * The added buffer is just before the first buffer, so expand
65162Sigor@sysoev.ru * the first buffer to the beginning of the added buffer.
65262Sigor@sysoev.ru */
65362Sigor@sysoev.ru first->mem.pos = b->mem.start;
65462Sigor@sysoev.ru first->mem.free = b->mem.start;
65562Sigor@sysoev.ru first->mem.start = b->mem.start;
65662Sigor@sysoev.ru
65762Sigor@sysoev.ru } else if (first->mem.end == b->mem.start) {
65862Sigor@sysoev.ru /*
65962Sigor@sysoev.ru * The added buffer is just after the first buffer, so
66062Sigor@sysoev.ru * expand the first buffer to the end of the added buffer.
66162Sigor@sysoev.ru */
66262Sigor@sysoev.ru first->mem.end = b->mem.end;
66362Sigor@sysoev.ru
66462Sigor@sysoev.ru } else {
66562Sigor@sysoev.ru first->next = b;
66662Sigor@sysoev.ru return;
66762Sigor@sysoev.ru }
66862Sigor@sysoev.ru
66962Sigor@sysoev.ru } else {
67062Sigor@sysoev.ru if (second->mem.end != b->mem.start) {
67162Sigor@sysoev.ru nxt_thread_log_alert("event conn proxy read: second buffer end:%p "
67262Sigor@sysoev.ru "is not equal to added buffer start:%p",
67362Sigor@sysoev.ru second->mem.end, b->mem.start);
67462Sigor@sysoev.ru return;
67562Sigor@sysoev.ru }
67662Sigor@sysoev.ru
67762Sigor@sysoev.ru /*
67862Sigor@sysoev.ru * The added buffer is just after the second buffer, so
67962Sigor@sysoev.ru * expand the second buffer to the end of the added buffer.
68062Sigor@sysoev.ru */
68162Sigor@sysoev.ru second->mem.end = b->mem.end;
68262Sigor@sysoev.ru
68362Sigor@sysoev.ru if (first->mem.start == second->mem.end) {
68462Sigor@sysoev.ru /*
68562Sigor@sysoev.ru * The second buffer is just before the first buffer, so expand
68662Sigor@sysoev.ru * the first buffer to the beginning of the second buffer.
68762Sigor@sysoev.ru */
68862Sigor@sysoev.ru first->mem.pos = second->mem.start;
68962Sigor@sysoev.ru first->mem.free = second->mem.start;
69062Sigor@sysoev.ru first->mem.start = second->mem.start;
69162Sigor@sysoev.ru first->next = NULL;
69262Sigor@sysoev.ru
69362Sigor@sysoev.ru nxt_buf_free(c->mem_pool, second);
69462Sigor@sysoev.ru }
69562Sigor@sysoev.ru }
69662Sigor@sysoev.ru
69762Sigor@sysoev.ru nxt_buf_free(c->mem_pool, b);
69862Sigor@sysoev.ru }
69962Sigor@sysoev.ru
70062Sigor@sysoev.ru
70162Sigor@sysoev.ru static void
nxt_conn_proxy_close(nxt_task_t * task,void * obj,void * data)70262Sigor@sysoev.ru nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data)
70362Sigor@sysoev.ru {
70462Sigor@sysoev.ru nxt_buf_t *b;
70562Sigor@sysoev.ru nxt_conn_t *source, *sink;
70662Sigor@sysoev.ru nxt_conn_proxy_t *p;
70762Sigor@sysoev.ru
70862Sigor@sysoev.ru source = obj;
70962Sigor@sysoev.ru p = data;
71062Sigor@sysoev.ru
71162Sigor@sysoev.ru nxt_debug(task, "conn proxy close fd:%d", source->socket.fd);
71262Sigor@sysoev.ru
71362Sigor@sysoev.ru sink = (source == p->client) ? p->peer : p->client;
71462Sigor@sysoev.ru
71562Sigor@sysoev.ru if (sink->write == NULL) {
71662Sigor@sysoev.ru nxt_conn_proxy_shutdown(task, p, source, sink);
71762Sigor@sysoev.ru return;
71862Sigor@sysoev.ru }
71962Sigor@sysoev.ru
72062Sigor@sysoev.ru b = nxt_buf_sync_alloc(source->mem_pool, 0);
72162Sigor@sysoev.ru if (b == NULL) {
72262Sigor@sysoev.ru /* An error completion. */
72362Sigor@sysoev.ru nxt_conn_proxy_complete(task, p);
72462Sigor@sysoev.ru return;
72562Sigor@sysoev.ru }
72662Sigor@sysoev.ru
72762Sigor@sysoev.ru nxt_buf_chain_add(&sink->write, b);
72862Sigor@sysoev.ru }
72962Sigor@sysoev.ru
73062Sigor@sysoev.ru
73162Sigor@sysoev.ru static void
nxt_conn_proxy_error(nxt_task_t * task,void * obj,void * data)73262Sigor@sysoev.ru nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data)
73362Sigor@sysoev.ru {
73462Sigor@sysoev.ru nxt_conn_t *c;
73562Sigor@sysoev.ru nxt_conn_proxy_t *p;
73662Sigor@sysoev.ru
73762Sigor@sysoev.ru c = obj;
73862Sigor@sysoev.ru p = data;
73962Sigor@sysoev.ru
74062Sigor@sysoev.ru nxt_debug(task, "conn proxy error fd:%d", c->socket.fd);
74162Sigor@sysoev.ru
74262Sigor@sysoev.ru nxt_conn_proxy_close(task, c, p);
74362Sigor@sysoev.ru }
74462Sigor@sysoev.ru
74562Sigor@sysoev.ru
74662Sigor@sysoev.ru static void
nxt_conn_proxy_read_timeout(nxt_task_t * task,void * obj,void * data)74762Sigor@sysoev.ru nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data)
74862Sigor@sysoev.ru {
74962Sigor@sysoev.ru nxt_conn_t *c;
75062Sigor@sysoev.ru nxt_timer_t *timer;
75162Sigor@sysoev.ru
75262Sigor@sysoev.ru timer = obj;
75362Sigor@sysoev.ru
75462Sigor@sysoev.ru c = nxt_read_timer_conn(timer);
75562Sigor@sysoev.ru c->socket.timedout = 1;
75662Sigor@sysoev.ru c->socket.closed = 1;
75762Sigor@sysoev.ru
75862Sigor@sysoev.ru nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd);
75962Sigor@sysoev.ru
76062Sigor@sysoev.ru nxt_conn_proxy_close(task, c, c->socket.data);
76162Sigor@sysoev.ru }
76262Sigor@sysoev.ru
76362Sigor@sysoev.ru
76462Sigor@sysoev.ru static void
nxt_conn_proxy_write_timeout(nxt_task_t * task,void * obj,void * data)76562Sigor@sysoev.ru nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data)
76662Sigor@sysoev.ru {
76762Sigor@sysoev.ru nxt_conn_t *c;
76862Sigor@sysoev.ru nxt_timer_t *timer;
76962Sigor@sysoev.ru
77062Sigor@sysoev.ru timer = obj;
77162Sigor@sysoev.ru
77262Sigor@sysoev.ru c = nxt_write_timer_conn(timer);
77362Sigor@sysoev.ru c->socket.timedout = 1;
77462Sigor@sysoev.ru c->socket.closed = 1;
77562Sigor@sysoev.ru
77662Sigor@sysoev.ru nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd);
77762Sigor@sysoev.ru
77862Sigor@sysoev.ru nxt_conn_proxy_close(task, c, c->socket.data);
77962Sigor@sysoev.ru }
78062Sigor@sysoev.ru
78162Sigor@sysoev.ru
78262Sigor@sysoev.ru static nxt_msec_t
nxt_conn_proxy_timeout_value(nxt_conn_t * c,uintptr_t data)78362Sigor@sysoev.ru nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data)
78462Sigor@sysoev.ru {
785*98Svbart@nginx.com return nxt_value_at(nxt_msec_t, c->socket.data, data);
78662Sigor@sysoev.ru }
78762Sigor@sysoev.ru
78862Sigor@sysoev.ru
78962Sigor@sysoev.ru static void
nxt_conn_proxy_refused(nxt_task_t * task,void * obj,void * data)79062Sigor@sysoev.ru nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
79162Sigor@sysoev.ru {
79262Sigor@sysoev.ru nxt_conn_t *peer;
79362Sigor@sysoev.ru nxt_conn_proxy_t *p;
79462Sigor@sysoev.ru
79562Sigor@sysoev.ru peer = obj;
79662Sigor@sysoev.ru p = data;
79762Sigor@sysoev.ru
79862Sigor@sysoev.ru nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd);
79962Sigor@sysoev.ru
80062Sigor@sysoev.ru if (p->retries == 0) {
80162Sigor@sysoev.ru /* An error completion. */
80262Sigor@sysoev.ru nxt_conn_proxy_complete(task, p);
80362Sigor@sysoev.ru return;
80462Sigor@sysoev.ru }
80562Sigor@sysoev.ru
80662Sigor@sysoev.ru p->retries--;
80762Sigor@sysoev.ru
80862Sigor@sysoev.ru nxt_socket_close(task, peer->socket.fd);
80962Sigor@sysoev.ru peer->socket.fd = -1;
81062Sigor@sysoev.ru peer->socket.error = 0;
81162Sigor@sysoev.ru
81262Sigor@sysoev.ru p->delayed = 1;
81362Sigor@sysoev.ru
81462Sigor@sysoev.ru peer->write_timer.handler = nxt_conn_proxy_reconnect_handler;
81562Sigor@sysoev.ru nxt_timer_add(task->thread->engine, &peer->write_timer,
81662Sigor@sysoev.ru p->reconnect_timeout);
81762Sigor@sysoev.ru }
81862Sigor@sysoev.ru
81962Sigor@sysoev.ru
82062Sigor@sysoev.ru static void
nxt_conn_proxy_reconnect_handler(nxt_task_t * task,void * obj,void * data)82162Sigor@sysoev.ru nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
82262Sigor@sysoev.ru {
82362Sigor@sysoev.ru nxt_conn_t *peer;
82462Sigor@sysoev.ru nxt_timer_t *timer;
82562Sigor@sysoev.ru nxt_conn_proxy_t *p;
82662Sigor@sysoev.ru
82762Sigor@sysoev.ru timer = obj;
82862Sigor@sysoev.ru
82962Sigor@sysoev.ru nxt_debug(task, "conn proxy reconnect timer");
83062Sigor@sysoev.ru
83162Sigor@sysoev.ru peer = nxt_write_timer_conn(timer);
83262Sigor@sysoev.ru p = peer->socket.data;
83362Sigor@sysoev.ru
83462Sigor@sysoev.ru if (p->client->socket.closed) {
83562Sigor@sysoev.ru nxt_conn_proxy_complete(task, p);
83662Sigor@sysoev.ru return;
83762Sigor@sysoev.ru }
83862Sigor@sysoev.ru
83962Sigor@sysoev.ru p->delayed = 0;
84062Sigor@sysoev.ru
84162Sigor@sysoev.ru peer->write_state = &nxt_conn_proxy_peer_connect_state;
84262Sigor@sysoev.ru /*
84362Sigor@sysoev.ru * Peer read event: disabled.
84462Sigor@sysoev.ru * Peer write event: waiting for connection with connect_timeout.
84562Sigor@sysoev.ru */
84662Sigor@sysoev.ru nxt_conn_connect(task->thread->engine, peer);
84762Sigor@sysoev.ru }
84862Sigor@sysoev.ru
84962Sigor@sysoev.ru
85062Sigor@sysoev.ru static void
nxt_conn_proxy_shutdown(nxt_task_t * task,nxt_conn_proxy_t * p,nxt_conn_t * source,nxt_conn_t * sink)85162Sigor@sysoev.ru nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
85262Sigor@sysoev.ru nxt_conn_t *source, nxt_conn_t *sink)
85362Sigor@sysoev.ru {
85462Sigor@sysoev.ru nxt_buf_t *b;
85562Sigor@sysoev.ru
85662Sigor@sysoev.ru nxt_debug(source->socket.task,
85762Sigor@sysoev.ru "conn proxy shutdown source fd:%d cl:%d err:%d",
85862Sigor@sysoev.ru source->socket.fd, source->socket.closed, source->socket.error);
85962Sigor@sysoev.ru
86062Sigor@sysoev.ru nxt_debug(sink->socket.task,
86162Sigor@sysoev.ru "conn proxy shutdown sink fd:%d cl:%d err:%d",
86262Sigor@sysoev.ru sink->socket.fd, sink->socket.closed, sink->socket.error);
86362Sigor@sysoev.ru
86462Sigor@sysoev.ru if (!p->connected || p->delayed) {
86562Sigor@sysoev.ru nxt_conn_proxy_complete(task, p);
86662Sigor@sysoev.ru return;
86762Sigor@sysoev.ru }
86862Sigor@sysoev.ru
86962Sigor@sysoev.ru if (sink->socket.error == 0 && !sink->socket.closed) {
87062Sigor@sysoev.ru sink->socket.shutdown = 1;
87162Sigor@sysoev.ru nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR);
87262Sigor@sysoev.ru }
87362Sigor@sysoev.ru
87462Sigor@sysoev.ru if (sink->socket.error != 0
87562Sigor@sysoev.ru || (sink->socket.closed && source->write == NULL))
87662Sigor@sysoev.ru {
87762Sigor@sysoev.ru /* The opposite direction also has been already closed. */
87862Sigor@sysoev.ru nxt_conn_proxy_complete(task, p);
87962Sigor@sysoev.ru return;
88062Sigor@sysoev.ru }
88162Sigor@sysoev.ru
88262Sigor@sysoev.ru nxt_debug(source->socket.task, "free source buffer");
88362Sigor@sysoev.ru
88462Sigor@sysoev.ru /* Free the direction's buffer. */
88562Sigor@sysoev.ru b = (source == p->client) ? p->client_buffer : p->peer_buffer;
88665Sigor@sysoev.ru nxt_mp_free(source->mem_pool, b);
88762Sigor@sysoev.ru }
88862Sigor@sysoev.ru
88962Sigor@sysoev.ru
89062Sigor@sysoev.ru static void
nxt_conn_proxy_read_error(nxt_task_t * task,void * obj,void * data)89162Sigor@sysoev.ru nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data)
89262Sigor@sysoev.ru {
89362Sigor@sysoev.ru nxt_conn_t *c;
89462Sigor@sysoev.ru nxt_conn_proxy_t *p;
89562Sigor@sysoev.ru
89662Sigor@sysoev.ru c = obj;
89762Sigor@sysoev.ru p = data;
89862Sigor@sysoev.ru
89962Sigor@sysoev.ru nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd);
90062Sigor@sysoev.ru
90162Sigor@sysoev.ru nxt_conn_proxy_close(task, c, p);
90262Sigor@sysoev.ru }
90362Sigor@sysoev.ru
90462Sigor@sysoev.ru
90562Sigor@sysoev.ru static void
nxt_conn_proxy_write_error(nxt_task_t * task,void * obj,void * data)90662Sigor@sysoev.ru nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
90762Sigor@sysoev.ru {
90862Sigor@sysoev.ru nxt_conn_t *source, *sink;
90962Sigor@sysoev.ru nxt_conn_proxy_t *p;
91062Sigor@sysoev.ru
91162Sigor@sysoev.ru sink = obj;
91262Sigor@sysoev.ru p = data;
91362Sigor@sysoev.ru
91462Sigor@sysoev.ru nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd);
91562Sigor@sysoev.ru
91662Sigor@sysoev.ru /* Clear data for the direction sink. */
91762Sigor@sysoev.ru sink->write = NULL;
91862Sigor@sysoev.ru
91962Sigor@sysoev.ru /* Block the direction source. */
92062Sigor@sysoev.ru source = (sink == p->client) ? p->peer : p->client;
92162Sigor@sysoev.ru nxt_fd_event_block_read(task->thread->engine, &source->socket);
92262Sigor@sysoev.ru
92362Sigor@sysoev.ru if (source->write == NULL) {
92462Sigor@sysoev.ru /*
92562Sigor@sysoev.ru * There is no data for the opposite direction and
92662Sigor@sysoev.ru * the next read from the sink will most probably fail.
92762Sigor@sysoev.ru */
92862Sigor@sysoev.ru nxt_conn_proxy_complete(task, p);
92962Sigor@sysoev.ru }
93062Sigor@sysoev.ru }
93162Sigor@sysoev.ru
93262Sigor@sysoev.ru
93362Sigor@sysoev.ru static const nxt_conn_state_t nxt_conn_proxy_close_state
93462Sigor@sysoev.ru nxt_aligned(64) =
93562Sigor@sysoev.ru {
93662Sigor@sysoev.ru .ready_handler = nxt_conn_proxy_completion,
93762Sigor@sysoev.ru };
93862Sigor@sysoev.ru
93962Sigor@sysoev.ru
94062Sigor@sysoev.ru static void
nxt_conn_proxy_complete(nxt_task_t * task,nxt_conn_proxy_t * p)94162Sigor@sysoev.ru nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p)
94262Sigor@sysoev.ru {
94362Sigor@sysoev.ru nxt_event_engine_t *engine;
94462Sigor@sysoev.ru
94562Sigor@sysoev.ru engine = task->thread->engine;
94662Sigor@sysoev.ru
94762Sigor@sysoev.ru nxt_debug(p->client->socket.task, "conn proxy complete %d:%d",
94862Sigor@sysoev.ru p->client->socket.fd, p->peer->socket.fd);
94962Sigor@sysoev.ru
95062Sigor@sysoev.ru if (p->delayed) {
95162Sigor@sysoev.ru p->delayed = 0;
95262Sigor@sysoev.ru nxt_queue_remove(&p->peer->link);
95362Sigor@sysoev.ru }
95462Sigor@sysoev.ru
95562Sigor@sysoev.ru if (p->client->socket.fd != -1) {
95662Sigor@sysoev.ru p->retain = 1;
95762Sigor@sysoev.ru p->client->write_state = &nxt_conn_proxy_close_state;
95862Sigor@sysoev.ru nxt_conn_close(engine, p->client);
95962Sigor@sysoev.ru }
96062Sigor@sysoev.ru
96162Sigor@sysoev.ru if (p->peer->socket.fd != -1) {
96262Sigor@sysoev.ru p->retain++;
96362Sigor@sysoev.ru p->peer->write_state = &nxt_conn_proxy_close_state;
96462Sigor@sysoev.ru nxt_conn_close(engine, p->peer);
96562Sigor@sysoev.ru }
96662Sigor@sysoev.ru }
96762Sigor@sysoev.ru
96862Sigor@sysoev.ru
96962Sigor@sysoev.ru static void
nxt_conn_proxy_completion(nxt_task_t * task,void * obj,void * data)97062Sigor@sysoev.ru nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data)
97162Sigor@sysoev.ru {
97262Sigor@sysoev.ru nxt_conn_proxy_t *p;
97362Sigor@sysoev.ru
97462Sigor@sysoev.ru p = data;
97562Sigor@sysoev.ru
97662Sigor@sysoev.ru nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d",
97762Sigor@sysoev.ru p->retain, p->client->socket.fd, p->peer->socket.fd);
97862Sigor@sysoev.ru
97962Sigor@sysoev.ru p->retain--;
98062Sigor@sysoev.ru
98162Sigor@sysoev.ru if (p->retain == 0) {
98265Sigor@sysoev.ru nxt_mp_free(p->client->mem_pool, p->client_buffer);
98365Sigor@sysoev.ru nxt_mp_free(p->client->mem_pool, p->peer_buffer);
98462Sigor@sysoev.ru
98562Sigor@sysoev.ru p->completion_handler(task, p, NULL);
98662Sigor@sysoev.ru }
98762Sigor@sysoev.ru }
988