xref: /unit/src/nxt_conn_proxy.c (revision 98:4077decf847b)
162Sigor@sysoev.ru 
262Sigor@sysoev.ru /*
362Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
462Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
562Sigor@sysoev.ru  */
662Sigor@sysoev.ru 
762Sigor@sysoev.ru #include <nxt_main.h>
862Sigor@sysoev.ru 
962Sigor@sysoev.ru 
1062Sigor@sysoev.ru static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
1162Sigor@sysoev.ru     void *data);
1262Sigor@sysoev.ru static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj,
1362Sigor@sysoev.ru     void *data);
1462Sigor@sysoev.ru static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data);
1562Sigor@sysoev.ru static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data);
1662Sigor@sysoev.ru static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj,
1762Sigor@sysoev.ru     void *data);
1862Sigor@sysoev.ru static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj,
1962Sigor@sysoev.ru     void *data);
2062Sigor@sysoev.ru static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
2162Sigor@sysoev.ru     nxt_conn_t *source, nxt_conn_t *sink);
2262Sigor@sysoev.ru static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b);
2362Sigor@sysoev.ru static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data);
2462Sigor@sysoev.ru static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj,
2562Sigor@sysoev.ru     void *data);
2662Sigor@sysoev.ru static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj,
2762Sigor@sysoev.ru     void *data);
2862Sigor@sysoev.ru static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
2962Sigor@sysoev.ru     nxt_conn_t *sink, nxt_conn_t *source);
3062Sigor@sysoev.ru static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b);
3162Sigor@sysoev.ru static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data);
3262Sigor@sysoev.ru static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data);
3362Sigor@sysoev.ru static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj,
3462Sigor@sysoev.ru     void *data);
3562Sigor@sysoev.ru static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj,
3662Sigor@sysoev.ru     void *data);
3762Sigor@sysoev.ru static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data);
3862Sigor@sysoev.ru static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data);
3962Sigor@sysoev.ru static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj,
4062Sigor@sysoev.ru     void *data);
4162Sigor@sysoev.ru static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
4262Sigor@sysoev.ru     nxt_conn_t *source, nxt_conn_t *sink);
4362Sigor@sysoev.ru static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data);
4462Sigor@sysoev.ru static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data);
4562Sigor@sysoev.ru static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p);
4662Sigor@sysoev.ru static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data);
4762Sigor@sysoev.ru 
4862Sigor@sysoev.ru 
4962Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_wait_state;
5062Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_first_read_state;
5162Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_connect_state;
5262Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_wait_state;
5362Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_read_state;
5462Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_read_state;
5562Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_write_state;
5662Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_write_state;
5762Sigor@sysoev.ru 
5862Sigor@sysoev.ru 
5962Sigor@sysoev.ru nxt_conn_proxy_t *
nxt_conn_proxy_create(nxt_conn_t * client)6062Sigor@sysoev.ru nxt_conn_proxy_create(nxt_conn_t *client)
6162Sigor@sysoev.ru {
6262Sigor@sysoev.ru     nxt_conn_t        *peer;
6362Sigor@sysoev.ru     nxt_thread_t      *thr;
6462Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
6562Sigor@sysoev.ru 
6665Sigor@sysoev.ru     p = nxt_mp_zget(client->mem_pool, sizeof(nxt_conn_proxy_t));
6762Sigor@sysoev.ru     if (nxt_slow_path(p == NULL)) {
6862Sigor@sysoev.ru         return NULL;
6962Sigor@sysoev.ru     }
7062Sigor@sysoev.ru 
7162Sigor@sysoev.ru     peer = nxt_conn_create(client->mem_pool, client->socket.task);
7262Sigor@sysoev.ru     if (nxt_slow_path(peer == NULL)) {
7362Sigor@sysoev.ru         return NULL;
7462Sigor@sysoev.ru     }
7562Sigor@sysoev.ru 
7662Sigor@sysoev.ru     thr = nxt_thread();
7762Sigor@sysoev.ru 
7862Sigor@sysoev.ru     client->read_work_queue = &thr->engine->read_work_queue;
7962Sigor@sysoev.ru     client->write_work_queue = &thr->engine->write_work_queue;
8062Sigor@sysoev.ru     client->socket.read_work_queue = &thr->engine->read_work_queue;
8162Sigor@sysoev.ru     client->socket.write_work_queue = &thr->engine->write_work_queue;
8262Sigor@sysoev.ru     peer->socket.read_work_queue = &thr->engine->read_work_queue;
8362Sigor@sysoev.ru     peer->socket.write_work_queue = &thr->engine->write_work_queue;
8462Sigor@sysoev.ru 
8562Sigor@sysoev.ru     peer->socket.data = client->socket.data;
8662Sigor@sysoev.ru 
8762Sigor@sysoev.ru     peer->read_work_queue = client->read_work_queue;
8862Sigor@sysoev.ru     peer->write_work_queue = client->write_work_queue;
8962Sigor@sysoev.ru     peer->read_timer.work_queue = client->read_work_queue;
9062Sigor@sysoev.ru     peer->write_timer.work_queue = client->write_work_queue;
9162Sigor@sysoev.ru 
9262Sigor@sysoev.ru     p->client = client;
9362Sigor@sysoev.ru     p->peer = peer;
9462Sigor@sysoev.ru 
9562Sigor@sysoev.ru     return p;
9662Sigor@sysoev.ru }
9762Sigor@sysoev.ru 
9862Sigor@sysoev.ru 
9962Sigor@sysoev.ru void
nxt_conn_proxy(nxt_task_t * task,nxt_conn_proxy_t * p)10062Sigor@sysoev.ru nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p)
10162Sigor@sysoev.ru {
10262Sigor@sysoev.ru     nxt_conn_t  *peer;
10362Sigor@sysoev.ru 
10462Sigor@sysoev.ru     /*
10562Sigor@sysoev.ru      * Peer read event: not connected, disabled.
10662Sigor@sysoev.ru      * Peer write event: not connected, disabled.
10762Sigor@sysoev.ru      */
10862Sigor@sysoev.ru 
10962Sigor@sysoev.ru     if (p->client_wait_timeout == 0) {
11062Sigor@sysoev.ru         /*
11162Sigor@sysoev.ru          * Peer write event: waiting for connection
11262Sigor@sysoev.ru          * to be established with connect_timeout.
11362Sigor@sysoev.ru          */
11462Sigor@sysoev.ru         peer = p->peer;
11562Sigor@sysoev.ru         peer->write_state = &nxt_conn_proxy_peer_connect_state;
11662Sigor@sysoev.ru 
11762Sigor@sysoev.ru         nxt_conn_connect(task->thread->engine, peer);
11862Sigor@sysoev.ru     }
11962Sigor@sysoev.ru 
12062Sigor@sysoev.ru     /*
12162Sigor@sysoev.ru      * Client read event: waiting for client data with
12262Sigor@sysoev.ru      * client_wait_timeout before buffer allocation.
12362Sigor@sysoev.ru      */
12462Sigor@sysoev.ru     p->client->read_state = &nxt_conn_proxy_client_wait_state;
12562Sigor@sysoev.ru 
12662Sigor@sysoev.ru     nxt_conn_wait(p->client);
12762Sigor@sysoev.ru }
12862Sigor@sysoev.ru 
12962Sigor@sysoev.ru 
13062Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_wait_state
13162Sigor@sysoev.ru     nxt_aligned(64) =
13262Sigor@sysoev.ru {
13362Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_client_buffer_alloc,
13462Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_close,
13562Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_error,
13662Sigor@sysoev.ru 
13762Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_read_timeout,
13862Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
13962Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
14062Sigor@sysoev.ru };
14162Sigor@sysoev.ru 
14262Sigor@sysoev.ru 
14362Sigor@sysoev.ru static void
nxt_conn_proxy_client_buffer_alloc(nxt_task_t * task,void * obj,void * data)14462Sigor@sysoev.ru nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data)
14562Sigor@sysoev.ru {
14662Sigor@sysoev.ru     nxt_buf_t         *b;
14762Sigor@sysoev.ru     nxt_conn_t        *client;
14862Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
14962Sigor@sysoev.ru 
15062Sigor@sysoev.ru     client = obj;
15162Sigor@sysoev.ru     p = data;
15262Sigor@sysoev.ru 
15362Sigor@sysoev.ru     nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd);
15462Sigor@sysoev.ru 
15565Sigor@sysoev.ru     b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, 0);
15662Sigor@sysoev.ru     if (nxt_slow_path(b == NULL)) {
15762Sigor@sysoev.ru         /* An error completion. */
15862Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
15962Sigor@sysoev.ru         return;
16062Sigor@sysoev.ru     }
16162Sigor@sysoev.ru 
16262Sigor@sysoev.ru     p->client_buffer = b;
16362Sigor@sysoev.ru     client->read = b;
16462Sigor@sysoev.ru 
16562Sigor@sysoev.ru     if (p->peer->socket.fd != -1) {
16662Sigor@sysoev.ru         /*
16762Sigor@sysoev.ru          * Client read event: waiting, no timeout.
16862Sigor@sysoev.ru          * Client write event: blocked.
16962Sigor@sysoev.ru          * Peer read event: disabled.
17062Sigor@sysoev.ru          * Peer write event: waiting for connection to be established
17162Sigor@sysoev.ru          * or blocked after the connection has established.
17262Sigor@sysoev.ru          */
17362Sigor@sysoev.ru         client->read_state = &nxt_conn_proxy_client_read_state;
17462Sigor@sysoev.ru 
17562Sigor@sysoev.ru     } else {
17662Sigor@sysoev.ru         /*
17762Sigor@sysoev.ru          * Client read event: waiting for data with client_wait_timeout
17862Sigor@sysoev.ru          * before connecting to a peer.
17962Sigor@sysoev.ru          * Client write event: blocked.
18062Sigor@sysoev.ru          * Peer read event: not connected, disabled.
18162Sigor@sysoev.ru          * Peer write event: not connected, disabled.
18262Sigor@sysoev.ru          */
18362Sigor@sysoev.ru         client->read_state = &nxt_conn_proxy_client_first_read_state;
18462Sigor@sysoev.ru     }
18562Sigor@sysoev.ru 
18662Sigor@sysoev.ru     nxt_conn_read(task->thread->engine, client);
18762Sigor@sysoev.ru }
18862Sigor@sysoev.ru 
18962Sigor@sysoev.ru 
19062Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_first_read_state
19162Sigor@sysoev.ru     nxt_aligned(64) =
19262Sigor@sysoev.ru {
19362Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_peer_connect,
19462Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_close,
19562Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_error,
19662Sigor@sysoev.ru 
19762Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_read_timeout,
19862Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
19962Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
20062Sigor@sysoev.ru     .timer_autoreset = 1,
20162Sigor@sysoev.ru };
20262Sigor@sysoev.ru 
20362Sigor@sysoev.ru 
20462Sigor@sysoev.ru static void
nxt_conn_proxy_peer_connect(nxt_task_t * task,void * obj,void * data)20562Sigor@sysoev.ru nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
20662Sigor@sysoev.ru {
20762Sigor@sysoev.ru     nxt_conn_t        *client;
20862Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
20962Sigor@sysoev.ru 
21062Sigor@sysoev.ru     client = obj;
21162Sigor@sysoev.ru     p = data;
21262Sigor@sysoev.ru 
21362Sigor@sysoev.ru     /*
21462Sigor@sysoev.ru      * Client read event: waiting, no timeout.
21562Sigor@sysoev.ru      * Client write event: blocked.
21662Sigor@sysoev.ru      * Peer read event: disabled.
21762Sigor@sysoev.ru      * Peer write event: waiting for connection to be established
21862Sigor@sysoev.ru      * with connect_timeout.
21962Sigor@sysoev.ru      */
22062Sigor@sysoev.ru     client->read_state = &nxt_conn_proxy_client_read_state;
22162Sigor@sysoev.ru 
22262Sigor@sysoev.ru     p->peer->write_state = &nxt_conn_proxy_peer_connect_state;
22362Sigor@sysoev.ru 
22462Sigor@sysoev.ru     nxt_conn_connect(task->thread->engine, p->peer);
22562Sigor@sysoev.ru }
22662Sigor@sysoev.ru 
22762Sigor@sysoev.ru 
22862Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_connect_state
22962Sigor@sysoev.ru     nxt_aligned(64) =
23062Sigor@sysoev.ru {
23162Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_connected,
23262Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_refused,
23362Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_error,
23462Sigor@sysoev.ru 
23562Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_write_timeout,
23662Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
23762Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout),
23862Sigor@sysoev.ru     .timer_autoreset = 1,
23962Sigor@sysoev.ru };
24062Sigor@sysoev.ru 
24162Sigor@sysoev.ru 
24262Sigor@sysoev.ru static void
nxt_conn_proxy_connected(nxt_task_t * task,void * obj,void * data)24362Sigor@sysoev.ru nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
24462Sigor@sysoev.ru {
24562Sigor@sysoev.ru     nxt_conn_t        *client, *peer;
24662Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
24762Sigor@sysoev.ru 
24862Sigor@sysoev.ru     peer = obj;
24962Sigor@sysoev.ru     p = data;
25062Sigor@sysoev.ru 
25162Sigor@sysoev.ru     nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd);
25262Sigor@sysoev.ru 
25362Sigor@sysoev.ru     p->connected = 1;
25462Sigor@sysoev.ru 
25562Sigor@sysoev.ru     nxt_conn_tcp_nodelay_on(task, peer);
25662Sigor@sysoev.ru     nxt_conn_tcp_nodelay_on(task, p->client);
25762Sigor@sysoev.ru 
25862Sigor@sysoev.ru     /* Peer read event: waiting with peer_wait_timeout.  */
25962Sigor@sysoev.ru 
26062Sigor@sysoev.ru     peer->read_state = &nxt_conn_proxy_peer_wait_state;
26162Sigor@sysoev.ru     peer->write_state = &nxt_conn_proxy_peer_write_state;
26262Sigor@sysoev.ru 
26362Sigor@sysoev.ru     nxt_conn_wait(peer);
26462Sigor@sysoev.ru 
26562Sigor@sysoev.ru     if (p->client_buffer != NULL) {
26662Sigor@sysoev.ru         client = p->client;
26762Sigor@sysoev.ru 
26862Sigor@sysoev.ru         client->read_state = &nxt_conn_proxy_client_read_state;
26962Sigor@sysoev.ru         client->write_state = &nxt_conn_proxy_client_write_state;
27062Sigor@sysoev.ru         /*
27162Sigor@sysoev.ru          * Send a client read data to the connected peer.
27262Sigor@sysoev.ru          * Client write event: blocked.
27362Sigor@sysoev.ru          */
27462Sigor@sysoev.ru         nxt_conn_proxy_read_process(task, p, client, peer);
27562Sigor@sysoev.ru     }
27662Sigor@sysoev.ru }
27762Sigor@sysoev.ru 
27862Sigor@sysoev.ru 
27962Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_wait_state
28062Sigor@sysoev.ru     nxt_aligned(64) =
28162Sigor@sysoev.ru {
28262Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_peer_read,
28362Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_close,
28462Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_error,
28562Sigor@sysoev.ru 
28662Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_read_timeout,
28762Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
28862Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout),
28962Sigor@sysoev.ru };
29062Sigor@sysoev.ru 
29162Sigor@sysoev.ru 
29262Sigor@sysoev.ru static void
nxt_conn_proxy_peer_read(nxt_task_t * task,void * obj,void * data)29362Sigor@sysoev.ru nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
29462Sigor@sysoev.ru {
29562Sigor@sysoev.ru     nxt_buf_t         *b;
29662Sigor@sysoev.ru     nxt_conn_t        *peer;
29762Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
29862Sigor@sysoev.ru 
29962Sigor@sysoev.ru     peer = obj;
30062Sigor@sysoev.ru     p = data;
30162Sigor@sysoev.ru 
30262Sigor@sysoev.ru     nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd);
30362Sigor@sysoev.ru 
30465Sigor@sysoev.ru     b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, 0);
30562Sigor@sysoev.ru     if (nxt_slow_path(b == NULL)) {
30662Sigor@sysoev.ru         /* An error completion. */
30762Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
30862Sigor@sysoev.ru         return;
30962Sigor@sysoev.ru     }
31062Sigor@sysoev.ru 
31162Sigor@sysoev.ru     p->peer_buffer = b;
31262Sigor@sysoev.ru     peer->read = b;
31362Sigor@sysoev.ru 
31462Sigor@sysoev.ru     p->client->write_state = &nxt_conn_proxy_client_write_state;
31562Sigor@sysoev.ru     peer->read_state = &nxt_conn_proxy_peer_read_state;
31662Sigor@sysoev.ru     peer->write_state = &nxt_conn_proxy_peer_write_state;
31762Sigor@sysoev.ru 
31862Sigor@sysoev.ru     /*
31962Sigor@sysoev.ru      * Client read event: waiting, no timeout.
32062Sigor@sysoev.ru      * Client write event: blocked.
32162Sigor@sysoev.ru      * Peer read event: waiting with possible peer_wait_timeout.
32262Sigor@sysoev.ru      * Peer write event: blocked.
32362Sigor@sysoev.ru      */
32462Sigor@sysoev.ru     nxt_conn_read(task->thread->engine, peer);
32562Sigor@sysoev.ru }
32662Sigor@sysoev.ru 
32762Sigor@sysoev.ru 
32862Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_read_state
32962Sigor@sysoev.ru     nxt_aligned(64) =
33062Sigor@sysoev.ru {
33162Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_client_read_ready,
33262Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_close,
33362Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_read_error,
33462Sigor@sysoev.ru };
33562Sigor@sysoev.ru 
33662Sigor@sysoev.ru 
33762Sigor@sysoev.ru static void
nxt_conn_proxy_client_read_ready(nxt_task_t * task,void * obj,void * data)33862Sigor@sysoev.ru nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data)
33962Sigor@sysoev.ru {
34062Sigor@sysoev.ru     nxt_conn_t        *client;
34162Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
34262Sigor@sysoev.ru 
34362Sigor@sysoev.ru     client = obj;
34462Sigor@sysoev.ru     p = data;
34562Sigor@sysoev.ru 
34662Sigor@sysoev.ru     nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd);
34762Sigor@sysoev.ru 
34862Sigor@sysoev.ru     nxt_conn_proxy_read_process(task, p, client, p->peer);
34962Sigor@sysoev.ru }
35062Sigor@sysoev.ru 
35162Sigor@sysoev.ru 
35262Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_read_state
35362Sigor@sysoev.ru     nxt_aligned(64) =
35462Sigor@sysoev.ru {
35562Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_peer_read_ready,
35662Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_close,
35762Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_read_error,
35862Sigor@sysoev.ru };
35962Sigor@sysoev.ru 
36062Sigor@sysoev.ru 
36162Sigor@sysoev.ru static void
nxt_conn_proxy_peer_read_ready(nxt_task_t * task,void * obj,void * data)36262Sigor@sysoev.ru nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data)
36362Sigor@sysoev.ru {
36462Sigor@sysoev.ru     nxt_conn_t        *peer;
36562Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
36662Sigor@sysoev.ru 
36762Sigor@sysoev.ru     peer = obj;
36862Sigor@sysoev.ru     p = data;
36962Sigor@sysoev.ru 
37062Sigor@sysoev.ru     nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd);
37162Sigor@sysoev.ru 
37262Sigor@sysoev.ru     nxt_conn_proxy_read_process(task, p, peer, p->client);
37362Sigor@sysoev.ru }
37462Sigor@sysoev.ru 
37562Sigor@sysoev.ru 
37662Sigor@sysoev.ru static void
nxt_conn_proxy_read_process(nxt_task_t * task,nxt_conn_proxy_t * p,nxt_conn_t * source,nxt_conn_t * sink)37762Sigor@sysoev.ru nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
37862Sigor@sysoev.ru     nxt_conn_t *source, nxt_conn_t *sink)
37962Sigor@sysoev.ru {
38062Sigor@sysoev.ru     nxt_buf_t  *rb, *wb;
38162Sigor@sysoev.ru 
38262Sigor@sysoev.ru     if (sink->socket.error != 0) {
38362Sigor@sysoev.ru         nxt_debug(task, "conn proxy sink fd:%d error:%d",
38462Sigor@sysoev.ru                   sink->socket.fd, sink->socket.error);
38562Sigor@sysoev.ru 
38662Sigor@sysoev.ru         nxt_conn_proxy_write_error(task, sink, sink->socket.data);
38762Sigor@sysoev.ru         return;
38862Sigor@sysoev.ru     }
38962Sigor@sysoev.ru 
39062Sigor@sysoev.ru     while (source->read != NULL) {
39162Sigor@sysoev.ru 
39262Sigor@sysoev.ru         rb = source->read;
39362Sigor@sysoev.ru 
39462Sigor@sysoev.ru         if (rb->mem.pos != rb->mem.free) {
39562Sigor@sysoev.ru 
39662Sigor@sysoev.ru             /* Add a read part to a write chain. */
39762Sigor@sysoev.ru 
39862Sigor@sysoev.ru             wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0);
39962Sigor@sysoev.ru             if (wb == NULL) {
40062Sigor@sysoev.ru                 /* An error completion. */
40162Sigor@sysoev.ru                 nxt_conn_proxy_complete(task, p);
40262Sigor@sysoev.ru                 return;
40362Sigor@sysoev.ru             }
40462Sigor@sysoev.ru 
40562Sigor@sysoev.ru             wb->mem.pos = rb->mem.pos;
40662Sigor@sysoev.ru             wb->mem.free = rb->mem.free;
40762Sigor@sysoev.ru             wb->mem.start = rb->mem.pos;
40862Sigor@sysoev.ru             wb->mem.end = rb->mem.free;
40962Sigor@sysoev.ru 
41062Sigor@sysoev.ru             rb->mem.pos = rb->mem.free;
41162Sigor@sysoev.ru             rb->mem.start = rb->mem.free;
41262Sigor@sysoev.ru 
41362Sigor@sysoev.ru             nxt_conn_proxy_write_add(sink, wb);
41462Sigor@sysoev.ru         }
41562Sigor@sysoev.ru 
41662Sigor@sysoev.ru         if (rb->mem.start != rb->mem.end) {
41762Sigor@sysoev.ru             nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
41862Sigor@sysoev.ru                                task, source, source->socket.data);
41962Sigor@sysoev.ru             break;
42062Sigor@sysoev.ru         }
42162Sigor@sysoev.ru 
42262Sigor@sysoev.ru         source->read = rb->next;
42362Sigor@sysoev.ru         nxt_buf_free(source->mem_pool, rb);
42462Sigor@sysoev.ru     }
42562Sigor@sysoev.ru 
42662Sigor@sysoev.ru     if (p->connected) {
42762Sigor@sysoev.ru         nxt_conn_write(task->thread->engine, sink);
42862Sigor@sysoev.ru     }
42962Sigor@sysoev.ru }
43062Sigor@sysoev.ru 
43162Sigor@sysoev.ru 
43262Sigor@sysoev.ru static void
nxt_conn_proxy_write_add(nxt_conn_t * c,nxt_buf_t * b)43362Sigor@sysoev.ru nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b)
43462Sigor@sysoev.ru {
43562Sigor@sysoev.ru     nxt_buf_t  *first, *second, *prev;
43662Sigor@sysoev.ru 
43762Sigor@sysoev.ru     first = c->write;
43862Sigor@sysoev.ru 
43962Sigor@sysoev.ru     if (first == NULL) {
44062Sigor@sysoev.ru         c->write = b;
44162Sigor@sysoev.ru         return;
44262Sigor@sysoev.ru     }
44362Sigor@sysoev.ru 
44462Sigor@sysoev.ru     /*
44562Sigor@sysoev.ru      * A event conn proxy maintains a buffer per each direction.
44662Sigor@sysoev.ru      * The buffer is divided by read and write parts.  These parts are
44762Sigor@sysoev.ru      * linked in buffer chains.  There can be no more than two buffers
44862Sigor@sysoev.ru      * in write chain at any time, because an added buffer is coalesced
44962Sigor@sysoev.ru      * with the last buffer if possible.
45062Sigor@sysoev.ru      */
45162Sigor@sysoev.ru 
45262Sigor@sysoev.ru     second = first->next;
45362Sigor@sysoev.ru 
45462Sigor@sysoev.ru     if (second == NULL) {
45562Sigor@sysoev.ru 
45662Sigor@sysoev.ru         if (first->mem.end != b->mem.start) {
45762Sigor@sysoev.ru             first->next = b;
45862Sigor@sysoev.ru             return;
45962Sigor@sysoev.ru         }
46062Sigor@sysoev.ru 
46162Sigor@sysoev.ru         /*
46262Sigor@sysoev.ru          * The first buffer is just before the added buffer, so
46362Sigor@sysoev.ru          * expand the first buffer to the end of the added buffer.
46462Sigor@sysoev.ru          */
46562Sigor@sysoev.ru         prev = first;
46662Sigor@sysoev.ru 
46762Sigor@sysoev.ru     } else {
46862Sigor@sysoev.ru         if (second->mem.end != b->mem.start) {
46962Sigor@sysoev.ru             nxt_thread_log_alert("event conn proxy write: second buffer end:%p "
47062Sigor@sysoev.ru                                  "is not equal to added buffer start:%p",
47162Sigor@sysoev.ru                                  second->mem.end, b->mem.start);
47262Sigor@sysoev.ru             return;
47362Sigor@sysoev.ru         }
47462Sigor@sysoev.ru 
47562Sigor@sysoev.ru         /*
47662Sigor@sysoev.ru          * "second->mem.end == b->mem.start" must be always true here,
47762Sigor@sysoev.ru          * that is the second buffer is just before the added buffer,
47862Sigor@sysoev.ru          * so expand the second buffer to the end of added buffer.
47962Sigor@sysoev.ru          */
48062Sigor@sysoev.ru         prev = second;
48162Sigor@sysoev.ru     }
48262Sigor@sysoev.ru 
48362Sigor@sysoev.ru     prev->mem.free = b->mem.end;
48462Sigor@sysoev.ru     prev->mem.end = b->mem.end;
48562Sigor@sysoev.ru 
48662Sigor@sysoev.ru     nxt_buf_free(c->mem_pool, b);
48762Sigor@sysoev.ru }
48862Sigor@sysoev.ru 
48962Sigor@sysoev.ru 
49062Sigor@sysoev.ru static void
nxt_conn_proxy_read(nxt_task_t * task,void * obj,void * data)49162Sigor@sysoev.ru nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
49262Sigor@sysoev.ru {
49362Sigor@sysoev.ru     nxt_conn_t        *source, *sink;
49462Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
49562Sigor@sysoev.ru 
49662Sigor@sysoev.ru     source = obj;
49762Sigor@sysoev.ru     p = data;
49862Sigor@sysoev.ru 
49962Sigor@sysoev.ru     nxt_debug(task, "conn proxy read fd:%d", source->socket.fd);
50062Sigor@sysoev.ru 
50162Sigor@sysoev.ru     if (!source->socket.closed) {
50262Sigor@sysoev.ru         sink = (source == p->client) ? p->peer : p->client;
50362Sigor@sysoev.ru 
50462Sigor@sysoev.ru         if (sink->socket.error == 0) {
50562Sigor@sysoev.ru             nxt_conn_read(task->thread->engine, source);
50662Sigor@sysoev.ru         }
50762Sigor@sysoev.ru     }
50862Sigor@sysoev.ru }
50962Sigor@sysoev.ru 
51062Sigor@sysoev.ru 
51162Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_write_state
51262Sigor@sysoev.ru     nxt_aligned(64) =
51362Sigor@sysoev.ru {
51462Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_client_write_ready,
51562Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_write_error,
51662Sigor@sysoev.ru 
51762Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_write_timeout,
51862Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
51962Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout),
52062Sigor@sysoev.ru     .timer_autoreset = 1,
52162Sigor@sysoev.ru };
52262Sigor@sysoev.ru 
52362Sigor@sysoev.ru 
52462Sigor@sysoev.ru static void
nxt_conn_proxy_client_write_ready(nxt_task_t * task,void * obj,void * data)52562Sigor@sysoev.ru nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data)
52662Sigor@sysoev.ru {
52762Sigor@sysoev.ru     nxt_conn_t        *client;
52862Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
52962Sigor@sysoev.ru 
53062Sigor@sysoev.ru     client = obj;
53162Sigor@sysoev.ru     p = data;
53262Sigor@sysoev.ru 
53362Sigor@sysoev.ru     nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd);
53462Sigor@sysoev.ru 
53562Sigor@sysoev.ru     nxt_conn_proxy_write_process(task, p, client, p->peer);
53662Sigor@sysoev.ru }
53762Sigor@sysoev.ru 
53862Sigor@sysoev.ru 
53962Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_write_state
54062Sigor@sysoev.ru     nxt_aligned(64) =
54162Sigor@sysoev.ru {
54262Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_peer_write_ready,
54362Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_write_error,
54462Sigor@sysoev.ru 
54562Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_write_timeout,
54662Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
54762Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout),
54862Sigor@sysoev.ru     .timer_autoreset = 1,
54962Sigor@sysoev.ru };
55062Sigor@sysoev.ru 
55162Sigor@sysoev.ru 
55262Sigor@sysoev.ru static void
nxt_conn_proxy_peer_write_ready(nxt_task_t * task,void * obj,void * data)55362Sigor@sysoev.ru nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data)
55462Sigor@sysoev.ru {
55562Sigor@sysoev.ru     nxt_conn_t        *peer;
55662Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
55762Sigor@sysoev.ru 
55862Sigor@sysoev.ru     peer = obj;
55962Sigor@sysoev.ru     p = data;
56062Sigor@sysoev.ru 
56162Sigor@sysoev.ru     nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd);
56262Sigor@sysoev.ru 
56362Sigor@sysoev.ru     nxt_conn_proxy_write_process(task, p, peer, p->client);
56462Sigor@sysoev.ru }
56562Sigor@sysoev.ru 
56662Sigor@sysoev.ru 
56762Sigor@sysoev.ru static void
nxt_conn_proxy_write_process(nxt_task_t * task,nxt_conn_proxy_t * p,nxt_conn_t * sink,nxt_conn_t * source)56862Sigor@sysoev.ru nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
56962Sigor@sysoev.ru     nxt_conn_t *sink, nxt_conn_t *source)
57062Sigor@sysoev.ru {
57162Sigor@sysoev.ru     nxt_buf_t  *rb, *wb;
57262Sigor@sysoev.ru 
57362Sigor@sysoev.ru     while (sink->write != NULL) {
57462Sigor@sysoev.ru 
57562Sigor@sysoev.ru         wb = sink->write;
57662Sigor@sysoev.ru 
57762Sigor@sysoev.ru         if (nxt_buf_is_sync(wb)) {
57862Sigor@sysoev.ru 
57962Sigor@sysoev.ru             /* A sync buffer marks the end of stream. */
58062Sigor@sysoev.ru 
58162Sigor@sysoev.ru             sink->write = NULL;
58262Sigor@sysoev.ru             nxt_buf_free(sink->mem_pool, wb);
58362Sigor@sysoev.ru             nxt_conn_proxy_shutdown(task, p, source, sink);
58462Sigor@sysoev.ru             return;
58562Sigor@sysoev.ru         }
58662Sigor@sysoev.ru 
58762Sigor@sysoev.ru         if (wb->mem.start != wb->mem.pos) {
58862Sigor@sysoev.ru 
58962Sigor@sysoev.ru             /* Add a written part to a read chain. */
59062Sigor@sysoev.ru 
59162Sigor@sysoev.ru             rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0);
59262Sigor@sysoev.ru             if (rb == NULL) {
59362Sigor@sysoev.ru                 /* An error completion. */
59462Sigor@sysoev.ru                 nxt_conn_proxy_complete(task, p);
59562Sigor@sysoev.ru                 return;
59662Sigor@sysoev.ru             }
59762Sigor@sysoev.ru 
59862Sigor@sysoev.ru             rb->mem.pos = wb->mem.start;
59962Sigor@sysoev.ru             rb->mem.free = wb->mem.start;
60062Sigor@sysoev.ru             rb->mem.start = wb->mem.start;
60162Sigor@sysoev.ru             rb->mem.end = wb->mem.pos;
60262Sigor@sysoev.ru 
60362Sigor@sysoev.ru             wb->mem.start = wb->mem.pos;
60462Sigor@sysoev.ru 
60562Sigor@sysoev.ru             nxt_conn_proxy_read_add(source, rb);
60662Sigor@sysoev.ru         }
60762Sigor@sysoev.ru 
60862Sigor@sysoev.ru         if (wb->mem.pos != wb->mem.free) {
60962Sigor@sysoev.ru             nxt_conn_write(task->thread->engine, sink);
61062Sigor@sysoev.ru 
61162Sigor@sysoev.ru             break;
61262Sigor@sysoev.ru         }
61362Sigor@sysoev.ru 
61462Sigor@sysoev.ru         sink->write = wb->next;
61562Sigor@sysoev.ru         nxt_buf_free(sink->mem_pool, wb);
61662Sigor@sysoev.ru     }
61762Sigor@sysoev.ru 
61862Sigor@sysoev.ru     nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
61962Sigor@sysoev.ru                        task, source, source->socket.data);
62062Sigor@sysoev.ru }
62162Sigor@sysoev.ru 
62262Sigor@sysoev.ru 
62362Sigor@sysoev.ru static void
nxt_conn_proxy_read_add(nxt_conn_t * c,nxt_buf_t * b)62462Sigor@sysoev.ru nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b)
62562Sigor@sysoev.ru {
62662Sigor@sysoev.ru     nxt_buf_t  *first, *second;
62762Sigor@sysoev.ru 
62862Sigor@sysoev.ru     first = c->read;
62962Sigor@sysoev.ru 
63062Sigor@sysoev.ru     if (first == NULL) {
63162Sigor@sysoev.ru         c->read = b;
63262Sigor@sysoev.ru         return;
63362Sigor@sysoev.ru     }
63462Sigor@sysoev.ru 
63562Sigor@sysoev.ru     /*
63662Sigor@sysoev.ru      * A event conn proxy maintains a buffer per each direction.
63762Sigor@sysoev.ru      * The buffer is divided by read and write parts.  These parts are
63862Sigor@sysoev.ru      * linked in buffer chains.  There can be no more than two buffers
63962Sigor@sysoev.ru      * in read chain at any time, because an added buffer is coalesced
64062Sigor@sysoev.ru      * with the last buffer if possible.  The first and the second
64162Sigor@sysoev.ru      * buffers are also coalesced if possible.
64262Sigor@sysoev.ru      */
64362Sigor@sysoev.ru 
64462Sigor@sysoev.ru     second = first->next;
64562Sigor@sysoev.ru 
64662Sigor@sysoev.ru     if (second == NULL) {
64762Sigor@sysoev.ru 
64862Sigor@sysoev.ru         if (first->mem.start == b->mem.end) {
64962Sigor@sysoev.ru             /*
65062Sigor@sysoev.ru              * The added buffer is just before the first buffer, so expand
65162Sigor@sysoev.ru              * the first buffer to the beginning of the added buffer.
65262Sigor@sysoev.ru              */
65362Sigor@sysoev.ru             first->mem.pos = b->mem.start;
65462Sigor@sysoev.ru             first->mem.free = b->mem.start;
65562Sigor@sysoev.ru             first->mem.start = b->mem.start;
65662Sigor@sysoev.ru 
65762Sigor@sysoev.ru         } else if (first->mem.end == b->mem.start) {
65862Sigor@sysoev.ru             /*
65962Sigor@sysoev.ru              * The added buffer is just after the first buffer, so
66062Sigor@sysoev.ru              * expand the first buffer to the end of the added buffer.
66162Sigor@sysoev.ru              */
66262Sigor@sysoev.ru             first->mem.end = b->mem.end;
66362Sigor@sysoev.ru 
66462Sigor@sysoev.ru         } else {
66562Sigor@sysoev.ru             first->next = b;
66662Sigor@sysoev.ru             return;
66762Sigor@sysoev.ru         }
66862Sigor@sysoev.ru 
66962Sigor@sysoev.ru     } else {
67062Sigor@sysoev.ru         if (second->mem.end != b->mem.start) {
67162Sigor@sysoev.ru             nxt_thread_log_alert("event conn proxy read: second buffer end:%p "
67262Sigor@sysoev.ru                                  "is not equal to added buffer start:%p",
67362Sigor@sysoev.ru                                  second->mem.end, b->mem.start);
67462Sigor@sysoev.ru             return;
67562Sigor@sysoev.ru         }
67662Sigor@sysoev.ru 
67762Sigor@sysoev.ru         /*
67862Sigor@sysoev.ru          * The added buffer is just after the second buffer, so
67962Sigor@sysoev.ru          * expand the second buffer to the end of the added buffer.
68062Sigor@sysoev.ru          */
68162Sigor@sysoev.ru         second->mem.end = b->mem.end;
68262Sigor@sysoev.ru 
68362Sigor@sysoev.ru         if (first->mem.start == second->mem.end) {
68462Sigor@sysoev.ru             /*
68562Sigor@sysoev.ru              * The second buffer is just before the first buffer, so expand
68662Sigor@sysoev.ru              * the first buffer to the beginning of the second buffer.
68762Sigor@sysoev.ru              */
68862Sigor@sysoev.ru             first->mem.pos = second->mem.start;
68962Sigor@sysoev.ru             first->mem.free = second->mem.start;
69062Sigor@sysoev.ru             first->mem.start = second->mem.start;
69162Sigor@sysoev.ru             first->next = NULL;
69262Sigor@sysoev.ru 
69362Sigor@sysoev.ru             nxt_buf_free(c->mem_pool, second);
69462Sigor@sysoev.ru         }
69562Sigor@sysoev.ru     }
69662Sigor@sysoev.ru 
69762Sigor@sysoev.ru     nxt_buf_free(c->mem_pool, b);
69862Sigor@sysoev.ru }
69962Sigor@sysoev.ru 
70062Sigor@sysoev.ru 
70162Sigor@sysoev.ru static void
nxt_conn_proxy_close(nxt_task_t * task,void * obj,void * data)70262Sigor@sysoev.ru nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data)
70362Sigor@sysoev.ru {
70462Sigor@sysoev.ru     nxt_buf_t         *b;
70562Sigor@sysoev.ru     nxt_conn_t        *source, *sink;
70662Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
70762Sigor@sysoev.ru 
70862Sigor@sysoev.ru     source = obj;
70962Sigor@sysoev.ru     p = data;
71062Sigor@sysoev.ru 
71162Sigor@sysoev.ru     nxt_debug(task, "conn proxy close fd:%d", source->socket.fd);
71262Sigor@sysoev.ru 
71362Sigor@sysoev.ru     sink = (source == p->client) ? p->peer : p->client;
71462Sigor@sysoev.ru 
71562Sigor@sysoev.ru     if (sink->write == NULL) {
71662Sigor@sysoev.ru         nxt_conn_proxy_shutdown(task, p, source, sink);
71762Sigor@sysoev.ru         return;
71862Sigor@sysoev.ru     }
71962Sigor@sysoev.ru 
72062Sigor@sysoev.ru     b = nxt_buf_sync_alloc(source->mem_pool, 0);
72162Sigor@sysoev.ru     if (b == NULL) {
72262Sigor@sysoev.ru         /* An error completion. */
72362Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
72462Sigor@sysoev.ru         return;
72562Sigor@sysoev.ru     }
72662Sigor@sysoev.ru 
72762Sigor@sysoev.ru     nxt_buf_chain_add(&sink->write, b);
72862Sigor@sysoev.ru }
72962Sigor@sysoev.ru 
73062Sigor@sysoev.ru 
73162Sigor@sysoev.ru static void
nxt_conn_proxy_error(nxt_task_t * task,void * obj,void * data)73262Sigor@sysoev.ru nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data)
73362Sigor@sysoev.ru {
73462Sigor@sysoev.ru     nxt_conn_t        *c;
73562Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
73662Sigor@sysoev.ru 
73762Sigor@sysoev.ru     c = obj;
73862Sigor@sysoev.ru     p = data;
73962Sigor@sysoev.ru 
74062Sigor@sysoev.ru     nxt_debug(task, "conn proxy error fd:%d", c->socket.fd);
74162Sigor@sysoev.ru 
74262Sigor@sysoev.ru     nxt_conn_proxy_close(task, c, p);
74362Sigor@sysoev.ru }
74462Sigor@sysoev.ru 
74562Sigor@sysoev.ru 
74662Sigor@sysoev.ru static void
nxt_conn_proxy_read_timeout(nxt_task_t * task,void * obj,void * data)74762Sigor@sysoev.ru nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data)
74862Sigor@sysoev.ru {
74962Sigor@sysoev.ru     nxt_conn_t   *c;
75062Sigor@sysoev.ru     nxt_timer_t  *timer;
75162Sigor@sysoev.ru 
75262Sigor@sysoev.ru     timer = obj;
75362Sigor@sysoev.ru 
75462Sigor@sysoev.ru     c = nxt_read_timer_conn(timer);
75562Sigor@sysoev.ru     c->socket.timedout = 1;
75662Sigor@sysoev.ru     c->socket.closed = 1;
75762Sigor@sysoev.ru 
75862Sigor@sysoev.ru     nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd);
75962Sigor@sysoev.ru 
76062Sigor@sysoev.ru     nxt_conn_proxy_close(task, c, c->socket.data);
76162Sigor@sysoev.ru }
76262Sigor@sysoev.ru 
76362Sigor@sysoev.ru 
76462Sigor@sysoev.ru static void
nxt_conn_proxy_write_timeout(nxt_task_t * task,void * obj,void * data)76562Sigor@sysoev.ru nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data)
76662Sigor@sysoev.ru {
76762Sigor@sysoev.ru     nxt_conn_t   *c;
76862Sigor@sysoev.ru     nxt_timer_t  *timer;
76962Sigor@sysoev.ru 
77062Sigor@sysoev.ru     timer = obj;
77162Sigor@sysoev.ru 
77262Sigor@sysoev.ru     c = nxt_write_timer_conn(timer);
77362Sigor@sysoev.ru     c->socket.timedout = 1;
77462Sigor@sysoev.ru     c->socket.closed = 1;
77562Sigor@sysoev.ru 
77662Sigor@sysoev.ru     nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd);
77762Sigor@sysoev.ru 
77862Sigor@sysoev.ru     nxt_conn_proxy_close(task, c, c->socket.data);
77962Sigor@sysoev.ru }
78062Sigor@sysoev.ru 
78162Sigor@sysoev.ru 
78262Sigor@sysoev.ru static nxt_msec_t
nxt_conn_proxy_timeout_value(nxt_conn_t * c,uintptr_t data)78362Sigor@sysoev.ru nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data)
78462Sigor@sysoev.ru {
785*98Svbart@nginx.com     return nxt_value_at(nxt_msec_t, c->socket.data, data);
78662Sigor@sysoev.ru }
78762Sigor@sysoev.ru 
78862Sigor@sysoev.ru 
78962Sigor@sysoev.ru static void
nxt_conn_proxy_refused(nxt_task_t * task,void * obj,void * data)79062Sigor@sysoev.ru nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
79162Sigor@sysoev.ru {
79262Sigor@sysoev.ru     nxt_conn_t        *peer;
79362Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
79462Sigor@sysoev.ru 
79562Sigor@sysoev.ru     peer = obj;
79662Sigor@sysoev.ru     p = data;
79762Sigor@sysoev.ru 
79862Sigor@sysoev.ru     nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd);
79962Sigor@sysoev.ru 
80062Sigor@sysoev.ru     if (p->retries == 0) {
80162Sigor@sysoev.ru         /* An error completion. */
80262Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
80362Sigor@sysoev.ru         return;
80462Sigor@sysoev.ru     }
80562Sigor@sysoev.ru 
80662Sigor@sysoev.ru     p->retries--;
80762Sigor@sysoev.ru 
80862Sigor@sysoev.ru     nxt_socket_close(task, peer->socket.fd);
80962Sigor@sysoev.ru     peer->socket.fd = -1;
81062Sigor@sysoev.ru     peer->socket.error = 0;
81162Sigor@sysoev.ru 
81262Sigor@sysoev.ru     p->delayed = 1;
81362Sigor@sysoev.ru 
81462Sigor@sysoev.ru     peer->write_timer.handler = nxt_conn_proxy_reconnect_handler;
81562Sigor@sysoev.ru     nxt_timer_add(task->thread->engine, &peer->write_timer,
81662Sigor@sysoev.ru                   p->reconnect_timeout);
81762Sigor@sysoev.ru }
81862Sigor@sysoev.ru 
81962Sigor@sysoev.ru 
82062Sigor@sysoev.ru static void
nxt_conn_proxy_reconnect_handler(nxt_task_t * task,void * obj,void * data)82162Sigor@sysoev.ru nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
82262Sigor@sysoev.ru {
82362Sigor@sysoev.ru     nxt_conn_t        *peer;
82462Sigor@sysoev.ru     nxt_timer_t       *timer;
82562Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
82662Sigor@sysoev.ru 
82762Sigor@sysoev.ru     timer = obj;
82862Sigor@sysoev.ru 
82962Sigor@sysoev.ru     nxt_debug(task, "conn proxy reconnect timer");
83062Sigor@sysoev.ru 
83162Sigor@sysoev.ru     peer = nxt_write_timer_conn(timer);
83262Sigor@sysoev.ru     p = peer->socket.data;
83362Sigor@sysoev.ru 
83462Sigor@sysoev.ru     if (p->client->socket.closed) {
83562Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
83662Sigor@sysoev.ru         return;
83762Sigor@sysoev.ru     }
83862Sigor@sysoev.ru 
83962Sigor@sysoev.ru     p->delayed = 0;
84062Sigor@sysoev.ru 
84162Sigor@sysoev.ru     peer->write_state = &nxt_conn_proxy_peer_connect_state;
84262Sigor@sysoev.ru     /*
84362Sigor@sysoev.ru      * Peer read event: disabled.
84462Sigor@sysoev.ru      * Peer write event: waiting for connection with connect_timeout.
84562Sigor@sysoev.ru      */
84662Sigor@sysoev.ru     nxt_conn_connect(task->thread->engine, peer);
84762Sigor@sysoev.ru }
84862Sigor@sysoev.ru 
84962Sigor@sysoev.ru 
85062Sigor@sysoev.ru static void
nxt_conn_proxy_shutdown(nxt_task_t * task,nxt_conn_proxy_t * p,nxt_conn_t * source,nxt_conn_t * sink)85162Sigor@sysoev.ru nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
85262Sigor@sysoev.ru     nxt_conn_t *source, nxt_conn_t *sink)
85362Sigor@sysoev.ru {
85462Sigor@sysoev.ru     nxt_buf_t  *b;
85562Sigor@sysoev.ru 
85662Sigor@sysoev.ru     nxt_debug(source->socket.task,
85762Sigor@sysoev.ru               "conn proxy shutdown source fd:%d cl:%d err:%d",
85862Sigor@sysoev.ru               source->socket.fd, source->socket.closed, source->socket.error);
85962Sigor@sysoev.ru 
86062Sigor@sysoev.ru     nxt_debug(sink->socket.task,
86162Sigor@sysoev.ru               "conn proxy shutdown sink fd:%d cl:%d err:%d",
86262Sigor@sysoev.ru               sink->socket.fd, sink->socket.closed, sink->socket.error);
86362Sigor@sysoev.ru 
86462Sigor@sysoev.ru     if (!p->connected || p->delayed) {
86562Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
86662Sigor@sysoev.ru         return;
86762Sigor@sysoev.ru     }
86862Sigor@sysoev.ru 
86962Sigor@sysoev.ru     if (sink->socket.error == 0 && !sink->socket.closed) {
87062Sigor@sysoev.ru         sink->socket.shutdown = 1;
87162Sigor@sysoev.ru         nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR);
87262Sigor@sysoev.ru     }
87362Sigor@sysoev.ru 
87462Sigor@sysoev.ru     if (sink->socket.error != 0
87562Sigor@sysoev.ru         || (sink->socket.closed && source->write == NULL))
87662Sigor@sysoev.ru     {
87762Sigor@sysoev.ru         /* The opposite direction also has been already closed. */
87862Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
87962Sigor@sysoev.ru         return;
88062Sigor@sysoev.ru     }
88162Sigor@sysoev.ru 
88262Sigor@sysoev.ru     nxt_debug(source->socket.task, "free source buffer");
88362Sigor@sysoev.ru 
88462Sigor@sysoev.ru     /* Free the direction's buffer. */
88562Sigor@sysoev.ru     b = (source == p->client) ? p->client_buffer : p->peer_buffer;
88665Sigor@sysoev.ru     nxt_mp_free(source->mem_pool, b);
88762Sigor@sysoev.ru }
88862Sigor@sysoev.ru 
88962Sigor@sysoev.ru 
89062Sigor@sysoev.ru static void
nxt_conn_proxy_read_error(nxt_task_t * task,void * obj,void * data)89162Sigor@sysoev.ru nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data)
89262Sigor@sysoev.ru {
89362Sigor@sysoev.ru     nxt_conn_t        *c;
89462Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
89562Sigor@sysoev.ru 
89662Sigor@sysoev.ru     c = obj;
89762Sigor@sysoev.ru     p = data;
89862Sigor@sysoev.ru 
89962Sigor@sysoev.ru     nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd);
90062Sigor@sysoev.ru 
90162Sigor@sysoev.ru     nxt_conn_proxy_close(task, c, p);
90262Sigor@sysoev.ru }
90362Sigor@sysoev.ru 
90462Sigor@sysoev.ru 
90562Sigor@sysoev.ru static void
nxt_conn_proxy_write_error(nxt_task_t * task,void * obj,void * data)90662Sigor@sysoev.ru nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
90762Sigor@sysoev.ru {
90862Sigor@sysoev.ru     nxt_conn_t        *source, *sink;
90962Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
91062Sigor@sysoev.ru 
91162Sigor@sysoev.ru     sink = obj;
91262Sigor@sysoev.ru     p = data;
91362Sigor@sysoev.ru 
91462Sigor@sysoev.ru     nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd);
91562Sigor@sysoev.ru 
91662Sigor@sysoev.ru     /* Clear data for the direction sink. */
91762Sigor@sysoev.ru     sink->write = NULL;
91862Sigor@sysoev.ru 
91962Sigor@sysoev.ru     /* Block the direction source. */
92062Sigor@sysoev.ru     source = (sink == p->client) ? p->peer : p->client;
92162Sigor@sysoev.ru     nxt_fd_event_block_read(task->thread->engine, &source->socket);
92262Sigor@sysoev.ru 
92362Sigor@sysoev.ru     if (source->write == NULL) {
92462Sigor@sysoev.ru         /*
92562Sigor@sysoev.ru          * There is no data for the opposite direction and
92662Sigor@sysoev.ru          * the next read from the sink will most probably fail.
92762Sigor@sysoev.ru          */
92862Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
92962Sigor@sysoev.ru     }
93062Sigor@sysoev.ru }
93162Sigor@sysoev.ru 
93262Sigor@sysoev.ru 
93362Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_close_state
93462Sigor@sysoev.ru     nxt_aligned(64) =
93562Sigor@sysoev.ru {
93662Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_completion,
93762Sigor@sysoev.ru };
93862Sigor@sysoev.ru 
93962Sigor@sysoev.ru 
94062Sigor@sysoev.ru static void
nxt_conn_proxy_complete(nxt_task_t * task,nxt_conn_proxy_t * p)94162Sigor@sysoev.ru nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p)
94262Sigor@sysoev.ru {
94362Sigor@sysoev.ru     nxt_event_engine_t  *engine;
94462Sigor@sysoev.ru 
94562Sigor@sysoev.ru     engine = task->thread->engine;
94662Sigor@sysoev.ru 
94762Sigor@sysoev.ru     nxt_debug(p->client->socket.task, "conn proxy complete %d:%d",
94862Sigor@sysoev.ru               p->client->socket.fd, p->peer->socket.fd);
94962Sigor@sysoev.ru 
95062Sigor@sysoev.ru     if (p->delayed) {
95162Sigor@sysoev.ru         p->delayed = 0;
95262Sigor@sysoev.ru         nxt_queue_remove(&p->peer->link);
95362Sigor@sysoev.ru     }
95462Sigor@sysoev.ru 
95562Sigor@sysoev.ru     if (p->client->socket.fd != -1) {
95662Sigor@sysoev.ru         p->retain = 1;
95762Sigor@sysoev.ru         p->client->write_state = &nxt_conn_proxy_close_state;
95862Sigor@sysoev.ru         nxt_conn_close(engine, p->client);
95962Sigor@sysoev.ru     }
96062Sigor@sysoev.ru 
96162Sigor@sysoev.ru     if (p->peer->socket.fd != -1) {
96262Sigor@sysoev.ru         p->retain++;
96362Sigor@sysoev.ru         p->peer->write_state = &nxt_conn_proxy_close_state;
96462Sigor@sysoev.ru         nxt_conn_close(engine, p->peer);
96562Sigor@sysoev.ru     }
96662Sigor@sysoev.ru }
96762Sigor@sysoev.ru 
96862Sigor@sysoev.ru 
96962Sigor@sysoev.ru static void
nxt_conn_proxy_completion(nxt_task_t * task,void * obj,void * data)97062Sigor@sysoev.ru nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data)
97162Sigor@sysoev.ru {
97262Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
97362Sigor@sysoev.ru 
97462Sigor@sysoev.ru     p = data;
97562Sigor@sysoev.ru 
97662Sigor@sysoev.ru     nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d",
97762Sigor@sysoev.ru               p->retain, p->client->socket.fd, p->peer->socket.fd);
97862Sigor@sysoev.ru 
97962Sigor@sysoev.ru     p->retain--;
98062Sigor@sysoev.ru 
98162Sigor@sysoev.ru     if (p->retain == 0) {
98265Sigor@sysoev.ru         nxt_mp_free(p->client->mem_pool, p->client_buffer);
98365Sigor@sysoev.ru         nxt_mp_free(p->client->mem_pool, p->peer_buffer);
98462Sigor@sysoev.ru 
98562Sigor@sysoev.ru         p->completion_handler(task, p, NULL);
98662Sigor@sysoev.ru     }
98762Sigor@sysoev.ru }
988