xref: /unit/src/nxt_conn_proxy.c (revision 62)
1*62Sigor@sysoev.ru 
2*62Sigor@sysoev.ru /*
3*62Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
4*62Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
5*62Sigor@sysoev.ru  */
6*62Sigor@sysoev.ru 
7*62Sigor@sysoev.ru #include <nxt_main.h>
8*62Sigor@sysoev.ru 
9*62Sigor@sysoev.ru 
10*62Sigor@sysoev.ru static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
11*62Sigor@sysoev.ru     void *data);
12*62Sigor@sysoev.ru static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj,
13*62Sigor@sysoev.ru     void *data);
14*62Sigor@sysoev.ru static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data);
15*62Sigor@sysoev.ru static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data);
16*62Sigor@sysoev.ru static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj,
17*62Sigor@sysoev.ru     void *data);
18*62Sigor@sysoev.ru static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj,
19*62Sigor@sysoev.ru     void *data);
20*62Sigor@sysoev.ru static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
21*62Sigor@sysoev.ru     nxt_conn_t *source, nxt_conn_t *sink);
22*62Sigor@sysoev.ru static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b);
23*62Sigor@sysoev.ru static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data);
24*62Sigor@sysoev.ru static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj,
25*62Sigor@sysoev.ru     void *data);
26*62Sigor@sysoev.ru static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj,
27*62Sigor@sysoev.ru     void *data);
28*62Sigor@sysoev.ru static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
29*62Sigor@sysoev.ru     nxt_conn_t *sink, nxt_conn_t *source);
30*62Sigor@sysoev.ru static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b);
31*62Sigor@sysoev.ru static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data);
32*62Sigor@sysoev.ru static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data);
33*62Sigor@sysoev.ru static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj,
34*62Sigor@sysoev.ru     void *data);
35*62Sigor@sysoev.ru static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj,
36*62Sigor@sysoev.ru     void *data);
37*62Sigor@sysoev.ru static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data);
38*62Sigor@sysoev.ru static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data);
39*62Sigor@sysoev.ru static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj,
40*62Sigor@sysoev.ru     void *data);
41*62Sigor@sysoev.ru static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
42*62Sigor@sysoev.ru     nxt_conn_t *source, nxt_conn_t *sink);
43*62Sigor@sysoev.ru static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data);
44*62Sigor@sysoev.ru static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data);
45*62Sigor@sysoev.ru static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p);
46*62Sigor@sysoev.ru static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data);
47*62Sigor@sysoev.ru 
48*62Sigor@sysoev.ru 
49*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_wait_state;
50*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_first_read_state;
51*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_connect_state;
52*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_wait_state;
53*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_read_state;
54*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_read_state;
55*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_write_state;
56*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_write_state;
57*62Sigor@sysoev.ru 
58*62Sigor@sysoev.ru 
59*62Sigor@sysoev.ru nxt_conn_proxy_t *
60*62Sigor@sysoev.ru nxt_conn_proxy_create(nxt_conn_t *client)
61*62Sigor@sysoev.ru {
62*62Sigor@sysoev.ru     nxt_conn_t        *peer;
63*62Sigor@sysoev.ru     nxt_thread_t      *thr;
64*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
65*62Sigor@sysoev.ru 
66*62Sigor@sysoev.ru     p = nxt_mem_zalloc(client->mem_pool, sizeof(nxt_conn_proxy_t));
67*62Sigor@sysoev.ru     if (nxt_slow_path(p == NULL)) {
68*62Sigor@sysoev.ru         return NULL;
69*62Sigor@sysoev.ru     }
70*62Sigor@sysoev.ru 
71*62Sigor@sysoev.ru     peer = nxt_conn_create(client->mem_pool, client->socket.task);
72*62Sigor@sysoev.ru     if (nxt_slow_path(peer == NULL)) {
73*62Sigor@sysoev.ru         return NULL;
74*62Sigor@sysoev.ru     }
75*62Sigor@sysoev.ru 
76*62Sigor@sysoev.ru     thr = nxt_thread();
77*62Sigor@sysoev.ru 
78*62Sigor@sysoev.ru     client->read_work_queue = &thr->engine->read_work_queue;
79*62Sigor@sysoev.ru     client->write_work_queue = &thr->engine->write_work_queue;
80*62Sigor@sysoev.ru     client->socket.read_work_queue = &thr->engine->read_work_queue;
81*62Sigor@sysoev.ru     client->socket.write_work_queue = &thr->engine->write_work_queue;
82*62Sigor@sysoev.ru     peer->socket.read_work_queue = &thr->engine->read_work_queue;
83*62Sigor@sysoev.ru     peer->socket.write_work_queue = &thr->engine->write_work_queue;
84*62Sigor@sysoev.ru 
85*62Sigor@sysoev.ru     peer->socket.data = client->socket.data;
86*62Sigor@sysoev.ru 
87*62Sigor@sysoev.ru     peer->read_work_queue = client->read_work_queue;
88*62Sigor@sysoev.ru     peer->write_work_queue = client->write_work_queue;
89*62Sigor@sysoev.ru     peer->read_timer.work_queue = client->read_work_queue;
90*62Sigor@sysoev.ru     peer->write_timer.work_queue = client->write_work_queue;
91*62Sigor@sysoev.ru 
92*62Sigor@sysoev.ru     p->client = client;
93*62Sigor@sysoev.ru     p->peer = peer;
94*62Sigor@sysoev.ru 
95*62Sigor@sysoev.ru     return p;
96*62Sigor@sysoev.ru }
97*62Sigor@sysoev.ru 
98*62Sigor@sysoev.ru 
99*62Sigor@sysoev.ru void
100*62Sigor@sysoev.ru nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p)
101*62Sigor@sysoev.ru {
102*62Sigor@sysoev.ru     nxt_conn_t  *peer;
103*62Sigor@sysoev.ru 
104*62Sigor@sysoev.ru     /*
105*62Sigor@sysoev.ru      * Peer read event: not connected, disabled.
106*62Sigor@sysoev.ru      * Peer write event: not connected, disabled.
107*62Sigor@sysoev.ru      */
108*62Sigor@sysoev.ru 
109*62Sigor@sysoev.ru     if (p->client_wait_timeout == 0) {
110*62Sigor@sysoev.ru         /*
111*62Sigor@sysoev.ru          * Peer write event: waiting for connection
112*62Sigor@sysoev.ru          * to be established with connect_timeout.
113*62Sigor@sysoev.ru          */
114*62Sigor@sysoev.ru         peer = p->peer;
115*62Sigor@sysoev.ru         peer->write_state = &nxt_conn_proxy_peer_connect_state;
116*62Sigor@sysoev.ru 
117*62Sigor@sysoev.ru         nxt_conn_connect(task->thread->engine, peer);
118*62Sigor@sysoev.ru     }
119*62Sigor@sysoev.ru 
120*62Sigor@sysoev.ru     /*
121*62Sigor@sysoev.ru      * Client read event: waiting for client data with
122*62Sigor@sysoev.ru      * client_wait_timeout before buffer allocation.
123*62Sigor@sysoev.ru      */
124*62Sigor@sysoev.ru     p->client->read_state = &nxt_conn_proxy_client_wait_state;
125*62Sigor@sysoev.ru 
126*62Sigor@sysoev.ru     nxt_conn_wait(p->client);
127*62Sigor@sysoev.ru }
128*62Sigor@sysoev.ru 
129*62Sigor@sysoev.ru 
130*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_wait_state
131*62Sigor@sysoev.ru     nxt_aligned(64) =
132*62Sigor@sysoev.ru {
133*62Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_client_buffer_alloc,
134*62Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_close,
135*62Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_error,
136*62Sigor@sysoev.ru 
137*62Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_read_timeout,
138*62Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
139*62Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
140*62Sigor@sysoev.ru };
141*62Sigor@sysoev.ru 
142*62Sigor@sysoev.ru 
143*62Sigor@sysoev.ru static void
144*62Sigor@sysoev.ru nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data)
145*62Sigor@sysoev.ru {
146*62Sigor@sysoev.ru     nxt_buf_t         *b;
147*62Sigor@sysoev.ru     nxt_conn_t        *client;
148*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
149*62Sigor@sysoev.ru 
150*62Sigor@sysoev.ru     client = obj;
151*62Sigor@sysoev.ru     p = data;
152*62Sigor@sysoev.ru 
153*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd);
154*62Sigor@sysoev.ru 
155*62Sigor@sysoev.ru     b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size,
156*62Sigor@sysoev.ru                           NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE);
157*62Sigor@sysoev.ru 
158*62Sigor@sysoev.ru     if (nxt_slow_path(b == NULL)) {
159*62Sigor@sysoev.ru         /* An error completion. */
160*62Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
161*62Sigor@sysoev.ru         return;
162*62Sigor@sysoev.ru     }
163*62Sigor@sysoev.ru 
164*62Sigor@sysoev.ru     p->client_buffer = b;
165*62Sigor@sysoev.ru     client->read = b;
166*62Sigor@sysoev.ru 
167*62Sigor@sysoev.ru     if (p->peer->socket.fd != -1) {
168*62Sigor@sysoev.ru         /*
169*62Sigor@sysoev.ru          * Client read event: waiting, no timeout.
170*62Sigor@sysoev.ru          * Client write event: blocked.
171*62Sigor@sysoev.ru          * Peer read event: disabled.
172*62Sigor@sysoev.ru          * Peer write event: waiting for connection to be established
173*62Sigor@sysoev.ru          * or blocked after the connection has established.
174*62Sigor@sysoev.ru          */
175*62Sigor@sysoev.ru         client->read_state = &nxt_conn_proxy_client_read_state;
176*62Sigor@sysoev.ru 
177*62Sigor@sysoev.ru     } else {
178*62Sigor@sysoev.ru         /*
179*62Sigor@sysoev.ru          * Client read event: waiting for data with client_wait_timeout
180*62Sigor@sysoev.ru          * before connecting to a peer.
181*62Sigor@sysoev.ru          * Client write event: blocked.
182*62Sigor@sysoev.ru          * Peer read event: not connected, disabled.
183*62Sigor@sysoev.ru          * Peer write event: not connected, disabled.
184*62Sigor@sysoev.ru          */
185*62Sigor@sysoev.ru         client->read_state = &nxt_conn_proxy_client_first_read_state;
186*62Sigor@sysoev.ru     }
187*62Sigor@sysoev.ru 
188*62Sigor@sysoev.ru     nxt_conn_read(task->thread->engine, client);
189*62Sigor@sysoev.ru }
190*62Sigor@sysoev.ru 
191*62Sigor@sysoev.ru 
192*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_first_read_state
193*62Sigor@sysoev.ru     nxt_aligned(64) =
194*62Sigor@sysoev.ru {
195*62Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_peer_connect,
196*62Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_close,
197*62Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_error,
198*62Sigor@sysoev.ru 
199*62Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_read_timeout,
200*62Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
201*62Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
202*62Sigor@sysoev.ru     .timer_autoreset = 1,
203*62Sigor@sysoev.ru };
204*62Sigor@sysoev.ru 
205*62Sigor@sysoev.ru 
206*62Sigor@sysoev.ru static void
207*62Sigor@sysoev.ru nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
208*62Sigor@sysoev.ru {
209*62Sigor@sysoev.ru     nxt_conn_t        *client;
210*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
211*62Sigor@sysoev.ru 
212*62Sigor@sysoev.ru     client = obj;
213*62Sigor@sysoev.ru     p = data;
214*62Sigor@sysoev.ru 
215*62Sigor@sysoev.ru     /*
216*62Sigor@sysoev.ru      * Client read event: waiting, no timeout.
217*62Sigor@sysoev.ru      * Client write event: blocked.
218*62Sigor@sysoev.ru      * Peer read event: disabled.
219*62Sigor@sysoev.ru      * Peer write event: waiting for connection to be established
220*62Sigor@sysoev.ru      * with connect_timeout.
221*62Sigor@sysoev.ru      */
222*62Sigor@sysoev.ru     client->read_state = &nxt_conn_proxy_client_read_state;
223*62Sigor@sysoev.ru 
224*62Sigor@sysoev.ru     p->peer->write_state = &nxt_conn_proxy_peer_connect_state;
225*62Sigor@sysoev.ru 
226*62Sigor@sysoev.ru     nxt_conn_connect(task->thread->engine, p->peer);
227*62Sigor@sysoev.ru }
228*62Sigor@sysoev.ru 
229*62Sigor@sysoev.ru 
230*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_connect_state
231*62Sigor@sysoev.ru     nxt_aligned(64) =
232*62Sigor@sysoev.ru {
233*62Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_connected,
234*62Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_refused,
235*62Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_error,
236*62Sigor@sysoev.ru 
237*62Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_write_timeout,
238*62Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
239*62Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout),
240*62Sigor@sysoev.ru     .timer_autoreset = 1,
241*62Sigor@sysoev.ru };
242*62Sigor@sysoev.ru 
243*62Sigor@sysoev.ru 
244*62Sigor@sysoev.ru static void
245*62Sigor@sysoev.ru nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
246*62Sigor@sysoev.ru {
247*62Sigor@sysoev.ru     nxt_conn_t        *client, *peer;
248*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
249*62Sigor@sysoev.ru 
250*62Sigor@sysoev.ru     peer = obj;
251*62Sigor@sysoev.ru     p = data;
252*62Sigor@sysoev.ru 
253*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd);
254*62Sigor@sysoev.ru 
255*62Sigor@sysoev.ru     p->connected = 1;
256*62Sigor@sysoev.ru 
257*62Sigor@sysoev.ru     nxt_conn_tcp_nodelay_on(task, peer);
258*62Sigor@sysoev.ru     nxt_conn_tcp_nodelay_on(task, p->client);
259*62Sigor@sysoev.ru 
260*62Sigor@sysoev.ru     /* Peer read event: waiting with peer_wait_timeout.  */
261*62Sigor@sysoev.ru 
262*62Sigor@sysoev.ru     peer->read_state = &nxt_conn_proxy_peer_wait_state;
263*62Sigor@sysoev.ru     peer->write_state = &nxt_conn_proxy_peer_write_state;
264*62Sigor@sysoev.ru 
265*62Sigor@sysoev.ru     nxt_conn_wait(peer);
266*62Sigor@sysoev.ru 
267*62Sigor@sysoev.ru     if (p->client_buffer != NULL) {
268*62Sigor@sysoev.ru         client = p->client;
269*62Sigor@sysoev.ru 
270*62Sigor@sysoev.ru         client->read_state = &nxt_conn_proxy_client_read_state;
271*62Sigor@sysoev.ru         client->write_state = &nxt_conn_proxy_client_write_state;
272*62Sigor@sysoev.ru         /*
273*62Sigor@sysoev.ru          * Send a client read data to the connected peer.
274*62Sigor@sysoev.ru          * Client write event: blocked.
275*62Sigor@sysoev.ru          */
276*62Sigor@sysoev.ru         nxt_conn_proxy_read_process(task, p, client, peer);
277*62Sigor@sysoev.ru     }
278*62Sigor@sysoev.ru }
279*62Sigor@sysoev.ru 
280*62Sigor@sysoev.ru 
281*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_wait_state
282*62Sigor@sysoev.ru     nxt_aligned(64) =
283*62Sigor@sysoev.ru {
284*62Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_peer_read,
285*62Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_close,
286*62Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_error,
287*62Sigor@sysoev.ru 
288*62Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_read_timeout,
289*62Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
290*62Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout),
291*62Sigor@sysoev.ru };
292*62Sigor@sysoev.ru 
293*62Sigor@sysoev.ru 
294*62Sigor@sysoev.ru static void
295*62Sigor@sysoev.ru nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
296*62Sigor@sysoev.ru {
297*62Sigor@sysoev.ru     nxt_buf_t         *b;
298*62Sigor@sysoev.ru     nxt_conn_t        *peer;
299*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
300*62Sigor@sysoev.ru 
301*62Sigor@sysoev.ru     peer = obj;
302*62Sigor@sysoev.ru     p = data;
303*62Sigor@sysoev.ru 
304*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd);
305*62Sigor@sysoev.ru 
306*62Sigor@sysoev.ru     b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size,
307*62Sigor@sysoev.ru                           NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE);
308*62Sigor@sysoev.ru 
309*62Sigor@sysoev.ru     if (nxt_slow_path(b == NULL)) {
310*62Sigor@sysoev.ru         /* An error completion. */
311*62Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
312*62Sigor@sysoev.ru         return;
313*62Sigor@sysoev.ru     }
314*62Sigor@sysoev.ru 
315*62Sigor@sysoev.ru     p->peer_buffer = b;
316*62Sigor@sysoev.ru     peer->read = b;
317*62Sigor@sysoev.ru 
318*62Sigor@sysoev.ru     p->client->write_state = &nxt_conn_proxy_client_write_state;
319*62Sigor@sysoev.ru     peer->read_state = &nxt_conn_proxy_peer_read_state;
320*62Sigor@sysoev.ru     peer->write_state = &nxt_conn_proxy_peer_write_state;
321*62Sigor@sysoev.ru 
322*62Sigor@sysoev.ru     /*
323*62Sigor@sysoev.ru      * Client read event: waiting, no timeout.
324*62Sigor@sysoev.ru      * Client write event: blocked.
325*62Sigor@sysoev.ru      * Peer read event: waiting with possible peer_wait_timeout.
326*62Sigor@sysoev.ru      * Peer write event: blocked.
327*62Sigor@sysoev.ru      */
328*62Sigor@sysoev.ru     nxt_conn_read(task->thread->engine, peer);
329*62Sigor@sysoev.ru }
330*62Sigor@sysoev.ru 
331*62Sigor@sysoev.ru 
332*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_read_state
333*62Sigor@sysoev.ru     nxt_aligned(64) =
334*62Sigor@sysoev.ru {
335*62Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_client_read_ready,
336*62Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_close,
337*62Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_read_error,
338*62Sigor@sysoev.ru };
339*62Sigor@sysoev.ru 
340*62Sigor@sysoev.ru 
341*62Sigor@sysoev.ru static void
342*62Sigor@sysoev.ru nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data)
343*62Sigor@sysoev.ru {
344*62Sigor@sysoev.ru     nxt_conn_t        *client;
345*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
346*62Sigor@sysoev.ru 
347*62Sigor@sysoev.ru     client = obj;
348*62Sigor@sysoev.ru     p = data;
349*62Sigor@sysoev.ru 
350*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd);
351*62Sigor@sysoev.ru 
352*62Sigor@sysoev.ru     nxt_conn_proxy_read_process(task, p, client, p->peer);
353*62Sigor@sysoev.ru }
354*62Sigor@sysoev.ru 
355*62Sigor@sysoev.ru 
356*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_read_state
357*62Sigor@sysoev.ru     nxt_aligned(64) =
358*62Sigor@sysoev.ru {
359*62Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_peer_read_ready,
360*62Sigor@sysoev.ru     .close_handler = nxt_conn_proxy_close,
361*62Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_read_error,
362*62Sigor@sysoev.ru };
363*62Sigor@sysoev.ru 
364*62Sigor@sysoev.ru 
365*62Sigor@sysoev.ru static void
366*62Sigor@sysoev.ru nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data)
367*62Sigor@sysoev.ru {
368*62Sigor@sysoev.ru     nxt_conn_t        *peer;
369*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
370*62Sigor@sysoev.ru 
371*62Sigor@sysoev.ru     peer = obj;
372*62Sigor@sysoev.ru     p = data;
373*62Sigor@sysoev.ru 
374*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd);
375*62Sigor@sysoev.ru 
376*62Sigor@sysoev.ru     nxt_conn_proxy_read_process(task, p, peer, p->client);
377*62Sigor@sysoev.ru }
378*62Sigor@sysoev.ru 
379*62Sigor@sysoev.ru 
380*62Sigor@sysoev.ru static void
381*62Sigor@sysoev.ru nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
382*62Sigor@sysoev.ru     nxt_conn_t *source, nxt_conn_t *sink)
383*62Sigor@sysoev.ru {
384*62Sigor@sysoev.ru     nxt_buf_t  *rb, *wb;
385*62Sigor@sysoev.ru 
386*62Sigor@sysoev.ru     if (sink->socket.error != 0) {
387*62Sigor@sysoev.ru         nxt_debug(task, "conn proxy sink fd:%d error:%d",
388*62Sigor@sysoev.ru                   sink->socket.fd, sink->socket.error);
389*62Sigor@sysoev.ru 
390*62Sigor@sysoev.ru         nxt_conn_proxy_write_error(task, sink, sink->socket.data);
391*62Sigor@sysoev.ru         return;
392*62Sigor@sysoev.ru     }
393*62Sigor@sysoev.ru 
394*62Sigor@sysoev.ru     while (source->read != NULL) {
395*62Sigor@sysoev.ru 
396*62Sigor@sysoev.ru         rb = source->read;
397*62Sigor@sysoev.ru 
398*62Sigor@sysoev.ru         if (rb->mem.pos != rb->mem.free) {
399*62Sigor@sysoev.ru 
400*62Sigor@sysoev.ru             /* Add a read part to a write chain. */
401*62Sigor@sysoev.ru 
402*62Sigor@sysoev.ru             wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0);
403*62Sigor@sysoev.ru             if (wb == NULL) {
404*62Sigor@sysoev.ru                 /* An error completion. */
405*62Sigor@sysoev.ru                 nxt_conn_proxy_complete(task, p);
406*62Sigor@sysoev.ru                 return;
407*62Sigor@sysoev.ru             }
408*62Sigor@sysoev.ru 
409*62Sigor@sysoev.ru             wb->mem.pos = rb->mem.pos;
410*62Sigor@sysoev.ru             wb->mem.free = rb->mem.free;
411*62Sigor@sysoev.ru             wb->mem.start = rb->mem.pos;
412*62Sigor@sysoev.ru             wb->mem.end = rb->mem.free;
413*62Sigor@sysoev.ru 
414*62Sigor@sysoev.ru             rb->mem.pos = rb->mem.free;
415*62Sigor@sysoev.ru             rb->mem.start = rb->mem.free;
416*62Sigor@sysoev.ru 
417*62Sigor@sysoev.ru             nxt_conn_proxy_write_add(sink, wb);
418*62Sigor@sysoev.ru         }
419*62Sigor@sysoev.ru 
420*62Sigor@sysoev.ru         if (rb->mem.start != rb->mem.end) {
421*62Sigor@sysoev.ru             nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
422*62Sigor@sysoev.ru                                task, source, source->socket.data);
423*62Sigor@sysoev.ru             break;
424*62Sigor@sysoev.ru         }
425*62Sigor@sysoev.ru 
426*62Sigor@sysoev.ru         source->read = rb->next;
427*62Sigor@sysoev.ru         nxt_buf_free(source->mem_pool, rb);
428*62Sigor@sysoev.ru     }
429*62Sigor@sysoev.ru 
430*62Sigor@sysoev.ru     if (p->connected) {
431*62Sigor@sysoev.ru         nxt_conn_write(task->thread->engine, sink);
432*62Sigor@sysoev.ru     }
433*62Sigor@sysoev.ru }
434*62Sigor@sysoev.ru 
435*62Sigor@sysoev.ru 
436*62Sigor@sysoev.ru static void
437*62Sigor@sysoev.ru nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b)
438*62Sigor@sysoev.ru {
439*62Sigor@sysoev.ru     nxt_buf_t  *first, *second, *prev;
440*62Sigor@sysoev.ru 
441*62Sigor@sysoev.ru     first = c->write;
442*62Sigor@sysoev.ru 
443*62Sigor@sysoev.ru     if (first == NULL) {
444*62Sigor@sysoev.ru         c->write = b;
445*62Sigor@sysoev.ru         return;
446*62Sigor@sysoev.ru     }
447*62Sigor@sysoev.ru 
448*62Sigor@sysoev.ru     /*
449*62Sigor@sysoev.ru      * A event conn proxy maintains a buffer per each direction.
450*62Sigor@sysoev.ru      * The buffer is divided by read and write parts.  These parts are
451*62Sigor@sysoev.ru      * linked in buffer chains.  There can be no more than two buffers
452*62Sigor@sysoev.ru      * in write chain at any time, because an added buffer is coalesced
453*62Sigor@sysoev.ru      * with the last buffer if possible.
454*62Sigor@sysoev.ru      */
455*62Sigor@sysoev.ru 
456*62Sigor@sysoev.ru     second = first->next;
457*62Sigor@sysoev.ru 
458*62Sigor@sysoev.ru     if (second == NULL) {
459*62Sigor@sysoev.ru 
460*62Sigor@sysoev.ru         if (first->mem.end != b->mem.start) {
461*62Sigor@sysoev.ru             first->next = b;
462*62Sigor@sysoev.ru             return;
463*62Sigor@sysoev.ru         }
464*62Sigor@sysoev.ru 
465*62Sigor@sysoev.ru         /*
466*62Sigor@sysoev.ru          * The first buffer is just before the added buffer, so
467*62Sigor@sysoev.ru          * expand the first buffer to the end of the added buffer.
468*62Sigor@sysoev.ru          */
469*62Sigor@sysoev.ru         prev = first;
470*62Sigor@sysoev.ru 
471*62Sigor@sysoev.ru     } else {
472*62Sigor@sysoev.ru         if (second->mem.end != b->mem.start) {
473*62Sigor@sysoev.ru             nxt_thread_log_alert("event conn proxy write: second buffer end:%p "
474*62Sigor@sysoev.ru                                  "is not equal to added buffer start:%p",
475*62Sigor@sysoev.ru                                  second->mem.end, b->mem.start);
476*62Sigor@sysoev.ru             return;
477*62Sigor@sysoev.ru         }
478*62Sigor@sysoev.ru 
479*62Sigor@sysoev.ru         /*
480*62Sigor@sysoev.ru          * "second->mem.end == b->mem.start" must be always true here,
481*62Sigor@sysoev.ru          * that is the second buffer is just before the added buffer,
482*62Sigor@sysoev.ru          * so expand the second buffer to the end of added buffer.
483*62Sigor@sysoev.ru          */
484*62Sigor@sysoev.ru         prev = second;
485*62Sigor@sysoev.ru     }
486*62Sigor@sysoev.ru 
487*62Sigor@sysoev.ru     prev->mem.free = b->mem.end;
488*62Sigor@sysoev.ru     prev->mem.end = b->mem.end;
489*62Sigor@sysoev.ru 
490*62Sigor@sysoev.ru     nxt_buf_free(c->mem_pool, b);
491*62Sigor@sysoev.ru }
492*62Sigor@sysoev.ru 
493*62Sigor@sysoev.ru 
494*62Sigor@sysoev.ru static void
495*62Sigor@sysoev.ru nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
496*62Sigor@sysoev.ru {
497*62Sigor@sysoev.ru     nxt_conn_t        *source, *sink;
498*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
499*62Sigor@sysoev.ru 
500*62Sigor@sysoev.ru     source = obj;
501*62Sigor@sysoev.ru     p = data;
502*62Sigor@sysoev.ru 
503*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy read fd:%d", source->socket.fd);
504*62Sigor@sysoev.ru 
505*62Sigor@sysoev.ru     if (!source->socket.closed) {
506*62Sigor@sysoev.ru         sink = (source == p->client) ? p->peer : p->client;
507*62Sigor@sysoev.ru 
508*62Sigor@sysoev.ru         if (sink->socket.error == 0) {
509*62Sigor@sysoev.ru             nxt_conn_read(task->thread->engine, source);
510*62Sigor@sysoev.ru         }
511*62Sigor@sysoev.ru     }
512*62Sigor@sysoev.ru }
513*62Sigor@sysoev.ru 
514*62Sigor@sysoev.ru 
515*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_client_write_state
516*62Sigor@sysoev.ru     nxt_aligned(64) =
517*62Sigor@sysoev.ru {
518*62Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_client_write_ready,
519*62Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_write_error,
520*62Sigor@sysoev.ru 
521*62Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_write_timeout,
522*62Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
523*62Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout),
524*62Sigor@sysoev.ru     .timer_autoreset = 1,
525*62Sigor@sysoev.ru };
526*62Sigor@sysoev.ru 
527*62Sigor@sysoev.ru 
528*62Sigor@sysoev.ru static void
529*62Sigor@sysoev.ru nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data)
530*62Sigor@sysoev.ru {
531*62Sigor@sysoev.ru     nxt_conn_t        *client;
532*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
533*62Sigor@sysoev.ru 
534*62Sigor@sysoev.ru     client = obj;
535*62Sigor@sysoev.ru     p = data;
536*62Sigor@sysoev.ru 
537*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd);
538*62Sigor@sysoev.ru 
539*62Sigor@sysoev.ru     nxt_conn_proxy_write_process(task, p, client, p->peer);
540*62Sigor@sysoev.ru }
541*62Sigor@sysoev.ru 
542*62Sigor@sysoev.ru 
543*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_peer_write_state
544*62Sigor@sysoev.ru     nxt_aligned(64) =
545*62Sigor@sysoev.ru {
546*62Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_peer_write_ready,
547*62Sigor@sysoev.ru     .error_handler = nxt_conn_proxy_write_error,
548*62Sigor@sysoev.ru 
549*62Sigor@sysoev.ru     .timer_handler = nxt_conn_proxy_write_timeout,
550*62Sigor@sysoev.ru     .timer_value = nxt_conn_proxy_timeout_value,
551*62Sigor@sysoev.ru     .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout),
552*62Sigor@sysoev.ru     .timer_autoreset = 1,
553*62Sigor@sysoev.ru };
554*62Sigor@sysoev.ru 
555*62Sigor@sysoev.ru 
556*62Sigor@sysoev.ru static void
557*62Sigor@sysoev.ru nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data)
558*62Sigor@sysoev.ru {
559*62Sigor@sysoev.ru     nxt_conn_t        *peer;
560*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
561*62Sigor@sysoev.ru 
562*62Sigor@sysoev.ru     peer = obj;
563*62Sigor@sysoev.ru     p = data;
564*62Sigor@sysoev.ru 
565*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd);
566*62Sigor@sysoev.ru 
567*62Sigor@sysoev.ru     nxt_conn_proxy_write_process(task, p, peer, p->client);
568*62Sigor@sysoev.ru }
569*62Sigor@sysoev.ru 
570*62Sigor@sysoev.ru 
571*62Sigor@sysoev.ru static void
572*62Sigor@sysoev.ru nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
573*62Sigor@sysoev.ru     nxt_conn_t *sink, nxt_conn_t *source)
574*62Sigor@sysoev.ru {
575*62Sigor@sysoev.ru     nxt_buf_t  *rb, *wb;
576*62Sigor@sysoev.ru 
577*62Sigor@sysoev.ru     while (sink->write != NULL) {
578*62Sigor@sysoev.ru 
579*62Sigor@sysoev.ru         wb = sink->write;
580*62Sigor@sysoev.ru 
581*62Sigor@sysoev.ru         if (nxt_buf_is_sync(wb)) {
582*62Sigor@sysoev.ru 
583*62Sigor@sysoev.ru             /* A sync buffer marks the end of stream. */
584*62Sigor@sysoev.ru 
585*62Sigor@sysoev.ru             sink->write = NULL;
586*62Sigor@sysoev.ru             nxt_buf_free(sink->mem_pool, wb);
587*62Sigor@sysoev.ru             nxt_conn_proxy_shutdown(task, p, source, sink);
588*62Sigor@sysoev.ru             return;
589*62Sigor@sysoev.ru         }
590*62Sigor@sysoev.ru 
591*62Sigor@sysoev.ru         if (wb->mem.start != wb->mem.pos) {
592*62Sigor@sysoev.ru 
593*62Sigor@sysoev.ru             /* Add a written part to a read chain. */
594*62Sigor@sysoev.ru 
595*62Sigor@sysoev.ru             rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0);
596*62Sigor@sysoev.ru             if (rb == NULL) {
597*62Sigor@sysoev.ru                 /* An error completion. */
598*62Sigor@sysoev.ru                 nxt_conn_proxy_complete(task, p);
599*62Sigor@sysoev.ru                 return;
600*62Sigor@sysoev.ru             }
601*62Sigor@sysoev.ru 
602*62Sigor@sysoev.ru             rb->mem.pos = wb->mem.start;
603*62Sigor@sysoev.ru             rb->mem.free = wb->mem.start;
604*62Sigor@sysoev.ru             rb->mem.start = wb->mem.start;
605*62Sigor@sysoev.ru             rb->mem.end = wb->mem.pos;
606*62Sigor@sysoev.ru 
607*62Sigor@sysoev.ru             wb->mem.start = wb->mem.pos;
608*62Sigor@sysoev.ru 
609*62Sigor@sysoev.ru             nxt_conn_proxy_read_add(source, rb);
610*62Sigor@sysoev.ru         }
611*62Sigor@sysoev.ru 
612*62Sigor@sysoev.ru         if (wb->mem.pos != wb->mem.free) {
613*62Sigor@sysoev.ru             nxt_conn_write(task->thread->engine, sink);
614*62Sigor@sysoev.ru 
615*62Sigor@sysoev.ru             break;
616*62Sigor@sysoev.ru         }
617*62Sigor@sysoev.ru 
618*62Sigor@sysoev.ru         sink->write = wb->next;
619*62Sigor@sysoev.ru         nxt_buf_free(sink->mem_pool, wb);
620*62Sigor@sysoev.ru     }
621*62Sigor@sysoev.ru 
622*62Sigor@sysoev.ru     nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
623*62Sigor@sysoev.ru                        task, source, source->socket.data);
624*62Sigor@sysoev.ru }
625*62Sigor@sysoev.ru 
626*62Sigor@sysoev.ru 
627*62Sigor@sysoev.ru static void
628*62Sigor@sysoev.ru nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b)
629*62Sigor@sysoev.ru {
630*62Sigor@sysoev.ru     nxt_buf_t  *first, *second;
631*62Sigor@sysoev.ru 
632*62Sigor@sysoev.ru     first = c->read;
633*62Sigor@sysoev.ru 
634*62Sigor@sysoev.ru     if (first == NULL) {
635*62Sigor@sysoev.ru         c->read = b;
636*62Sigor@sysoev.ru         return;
637*62Sigor@sysoev.ru     }
638*62Sigor@sysoev.ru 
639*62Sigor@sysoev.ru     /*
640*62Sigor@sysoev.ru      * A event conn proxy maintains a buffer per each direction.
641*62Sigor@sysoev.ru      * The buffer is divided by read and write parts.  These parts are
642*62Sigor@sysoev.ru      * linked in buffer chains.  There can be no more than two buffers
643*62Sigor@sysoev.ru      * in read chain at any time, because an added buffer is coalesced
644*62Sigor@sysoev.ru      * with the last buffer if possible.  The first and the second
645*62Sigor@sysoev.ru      * buffers are also coalesced if possible.
646*62Sigor@sysoev.ru      */
647*62Sigor@sysoev.ru 
648*62Sigor@sysoev.ru     second = first->next;
649*62Sigor@sysoev.ru 
650*62Sigor@sysoev.ru     if (second == NULL) {
651*62Sigor@sysoev.ru 
652*62Sigor@sysoev.ru         if (first->mem.start == b->mem.end) {
653*62Sigor@sysoev.ru             /*
654*62Sigor@sysoev.ru              * The added buffer is just before the first buffer, so expand
655*62Sigor@sysoev.ru              * the first buffer to the beginning of the added buffer.
656*62Sigor@sysoev.ru              */
657*62Sigor@sysoev.ru             first->mem.pos = b->mem.start;
658*62Sigor@sysoev.ru             first->mem.free = b->mem.start;
659*62Sigor@sysoev.ru             first->mem.start = b->mem.start;
660*62Sigor@sysoev.ru 
661*62Sigor@sysoev.ru         } else if (first->mem.end == b->mem.start) {
662*62Sigor@sysoev.ru             /*
663*62Sigor@sysoev.ru              * The added buffer is just after the first buffer, so
664*62Sigor@sysoev.ru              * expand the first buffer to the end of the added buffer.
665*62Sigor@sysoev.ru              */
666*62Sigor@sysoev.ru             first->mem.end = b->mem.end;
667*62Sigor@sysoev.ru 
668*62Sigor@sysoev.ru         } else {
669*62Sigor@sysoev.ru             first->next = b;
670*62Sigor@sysoev.ru             return;
671*62Sigor@sysoev.ru         }
672*62Sigor@sysoev.ru 
673*62Sigor@sysoev.ru     } else {
674*62Sigor@sysoev.ru         if (second->mem.end != b->mem.start) {
675*62Sigor@sysoev.ru             nxt_thread_log_alert("event conn proxy read: second buffer end:%p "
676*62Sigor@sysoev.ru                                  "is not equal to added buffer start:%p",
677*62Sigor@sysoev.ru                                  second->mem.end, b->mem.start);
678*62Sigor@sysoev.ru             return;
679*62Sigor@sysoev.ru         }
680*62Sigor@sysoev.ru 
681*62Sigor@sysoev.ru         /*
682*62Sigor@sysoev.ru          * The added buffer is just after the second buffer, so
683*62Sigor@sysoev.ru          * expand the second buffer to the end of the added buffer.
684*62Sigor@sysoev.ru          */
685*62Sigor@sysoev.ru         second->mem.end = b->mem.end;
686*62Sigor@sysoev.ru 
687*62Sigor@sysoev.ru         if (first->mem.start == second->mem.end) {
688*62Sigor@sysoev.ru             /*
689*62Sigor@sysoev.ru              * The second buffer is just before the first buffer, so expand
690*62Sigor@sysoev.ru              * the first buffer to the beginning of the second buffer.
691*62Sigor@sysoev.ru              */
692*62Sigor@sysoev.ru             first->mem.pos = second->mem.start;
693*62Sigor@sysoev.ru             first->mem.free = second->mem.start;
694*62Sigor@sysoev.ru             first->mem.start = second->mem.start;
695*62Sigor@sysoev.ru             first->next = NULL;
696*62Sigor@sysoev.ru 
697*62Sigor@sysoev.ru             nxt_buf_free(c->mem_pool, second);
698*62Sigor@sysoev.ru         }
699*62Sigor@sysoev.ru     }
700*62Sigor@sysoev.ru 
701*62Sigor@sysoev.ru     nxt_buf_free(c->mem_pool, b);
702*62Sigor@sysoev.ru }
703*62Sigor@sysoev.ru 
704*62Sigor@sysoev.ru 
705*62Sigor@sysoev.ru static void
706*62Sigor@sysoev.ru nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data)
707*62Sigor@sysoev.ru {
708*62Sigor@sysoev.ru     nxt_buf_t         *b;
709*62Sigor@sysoev.ru     nxt_conn_t        *source, *sink;
710*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
711*62Sigor@sysoev.ru 
712*62Sigor@sysoev.ru     source = obj;
713*62Sigor@sysoev.ru     p = data;
714*62Sigor@sysoev.ru 
715*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy close fd:%d", source->socket.fd);
716*62Sigor@sysoev.ru 
717*62Sigor@sysoev.ru     sink = (source == p->client) ? p->peer : p->client;
718*62Sigor@sysoev.ru 
719*62Sigor@sysoev.ru     if (sink->write == NULL) {
720*62Sigor@sysoev.ru         nxt_conn_proxy_shutdown(task, p, source, sink);
721*62Sigor@sysoev.ru         return;
722*62Sigor@sysoev.ru     }
723*62Sigor@sysoev.ru 
724*62Sigor@sysoev.ru     b = nxt_buf_sync_alloc(source->mem_pool, 0);
725*62Sigor@sysoev.ru     if (b == NULL) {
726*62Sigor@sysoev.ru         /* An error completion. */
727*62Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
728*62Sigor@sysoev.ru         return;
729*62Sigor@sysoev.ru     }
730*62Sigor@sysoev.ru 
731*62Sigor@sysoev.ru     nxt_buf_chain_add(&sink->write, b);
732*62Sigor@sysoev.ru }
733*62Sigor@sysoev.ru 
734*62Sigor@sysoev.ru 
735*62Sigor@sysoev.ru static void
736*62Sigor@sysoev.ru nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data)
737*62Sigor@sysoev.ru {
738*62Sigor@sysoev.ru     nxt_conn_t        *c;
739*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
740*62Sigor@sysoev.ru 
741*62Sigor@sysoev.ru     c = obj;
742*62Sigor@sysoev.ru     p = data;
743*62Sigor@sysoev.ru 
744*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy error fd:%d", c->socket.fd);
745*62Sigor@sysoev.ru 
746*62Sigor@sysoev.ru     nxt_conn_proxy_close(task, c, p);
747*62Sigor@sysoev.ru }
748*62Sigor@sysoev.ru 
749*62Sigor@sysoev.ru 
750*62Sigor@sysoev.ru static void
751*62Sigor@sysoev.ru nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data)
752*62Sigor@sysoev.ru {
753*62Sigor@sysoev.ru     nxt_conn_t   *c;
754*62Sigor@sysoev.ru     nxt_timer_t  *timer;
755*62Sigor@sysoev.ru 
756*62Sigor@sysoev.ru     timer = obj;
757*62Sigor@sysoev.ru 
758*62Sigor@sysoev.ru     c = nxt_read_timer_conn(timer);
759*62Sigor@sysoev.ru     c->socket.timedout = 1;
760*62Sigor@sysoev.ru     c->socket.closed = 1;
761*62Sigor@sysoev.ru 
762*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd);
763*62Sigor@sysoev.ru 
764*62Sigor@sysoev.ru     nxt_conn_proxy_close(task, c, c->socket.data);
765*62Sigor@sysoev.ru }
766*62Sigor@sysoev.ru 
767*62Sigor@sysoev.ru 
768*62Sigor@sysoev.ru static void
769*62Sigor@sysoev.ru nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data)
770*62Sigor@sysoev.ru {
771*62Sigor@sysoev.ru     nxt_conn_t   *c;
772*62Sigor@sysoev.ru     nxt_timer_t  *timer;
773*62Sigor@sysoev.ru 
774*62Sigor@sysoev.ru     timer = obj;
775*62Sigor@sysoev.ru 
776*62Sigor@sysoev.ru     c = nxt_write_timer_conn(timer);
777*62Sigor@sysoev.ru     c->socket.timedout = 1;
778*62Sigor@sysoev.ru     c->socket.closed = 1;
779*62Sigor@sysoev.ru 
780*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd);
781*62Sigor@sysoev.ru 
782*62Sigor@sysoev.ru     nxt_conn_proxy_close(task, c, c->socket.data);
783*62Sigor@sysoev.ru }
784*62Sigor@sysoev.ru 
785*62Sigor@sysoev.ru 
786*62Sigor@sysoev.ru static nxt_msec_t
787*62Sigor@sysoev.ru nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data)
788*62Sigor@sysoev.ru {
789*62Sigor@sysoev.ru     nxt_msec_t        *timer;
790*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
791*62Sigor@sysoev.ru 
792*62Sigor@sysoev.ru     p = c->socket.data;
793*62Sigor@sysoev.ru 
794*62Sigor@sysoev.ru     timer = (nxt_msec_t *) ((char *) p + data);
795*62Sigor@sysoev.ru 
796*62Sigor@sysoev.ru     return *timer;
797*62Sigor@sysoev.ru }
798*62Sigor@sysoev.ru 
799*62Sigor@sysoev.ru 
800*62Sigor@sysoev.ru static void
801*62Sigor@sysoev.ru nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
802*62Sigor@sysoev.ru {
803*62Sigor@sysoev.ru     nxt_conn_t        *peer;
804*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
805*62Sigor@sysoev.ru 
806*62Sigor@sysoev.ru     peer = obj;
807*62Sigor@sysoev.ru     p = data;
808*62Sigor@sysoev.ru 
809*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd);
810*62Sigor@sysoev.ru 
811*62Sigor@sysoev.ru     if (p->retries == 0) {
812*62Sigor@sysoev.ru         /* An error completion. */
813*62Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
814*62Sigor@sysoev.ru         return;
815*62Sigor@sysoev.ru     }
816*62Sigor@sysoev.ru 
817*62Sigor@sysoev.ru     p->retries--;
818*62Sigor@sysoev.ru 
819*62Sigor@sysoev.ru     nxt_socket_close(task, peer->socket.fd);
820*62Sigor@sysoev.ru     peer->socket.fd = -1;
821*62Sigor@sysoev.ru     peer->socket.error = 0;
822*62Sigor@sysoev.ru 
823*62Sigor@sysoev.ru     p->delayed = 1;
824*62Sigor@sysoev.ru 
825*62Sigor@sysoev.ru     peer->write_timer.handler = nxt_conn_proxy_reconnect_handler;
826*62Sigor@sysoev.ru     nxt_timer_add(task->thread->engine, &peer->write_timer,
827*62Sigor@sysoev.ru                   p->reconnect_timeout);
828*62Sigor@sysoev.ru }
829*62Sigor@sysoev.ru 
830*62Sigor@sysoev.ru 
831*62Sigor@sysoev.ru static void
832*62Sigor@sysoev.ru nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
833*62Sigor@sysoev.ru {
834*62Sigor@sysoev.ru     nxt_conn_t        *peer;
835*62Sigor@sysoev.ru     nxt_timer_t       *timer;
836*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
837*62Sigor@sysoev.ru 
838*62Sigor@sysoev.ru     timer = obj;
839*62Sigor@sysoev.ru 
840*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy reconnect timer");
841*62Sigor@sysoev.ru 
842*62Sigor@sysoev.ru     peer = nxt_write_timer_conn(timer);
843*62Sigor@sysoev.ru     p = peer->socket.data;
844*62Sigor@sysoev.ru 
845*62Sigor@sysoev.ru     if (p->client->socket.closed) {
846*62Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
847*62Sigor@sysoev.ru         return;
848*62Sigor@sysoev.ru     }
849*62Sigor@sysoev.ru 
850*62Sigor@sysoev.ru     p->delayed = 0;
851*62Sigor@sysoev.ru 
852*62Sigor@sysoev.ru     peer->write_state = &nxt_conn_proxy_peer_connect_state;
853*62Sigor@sysoev.ru     /*
854*62Sigor@sysoev.ru      * Peer read event: disabled.
855*62Sigor@sysoev.ru      * Peer write event: waiting for connection with connect_timeout.
856*62Sigor@sysoev.ru      */
857*62Sigor@sysoev.ru     nxt_conn_connect(task->thread->engine, peer);
858*62Sigor@sysoev.ru }
859*62Sigor@sysoev.ru 
860*62Sigor@sysoev.ru 
861*62Sigor@sysoev.ru static void
862*62Sigor@sysoev.ru nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
863*62Sigor@sysoev.ru     nxt_conn_t *source, nxt_conn_t *sink)
864*62Sigor@sysoev.ru {
865*62Sigor@sysoev.ru     nxt_buf_t  *b;
866*62Sigor@sysoev.ru 
867*62Sigor@sysoev.ru     nxt_debug(source->socket.task,
868*62Sigor@sysoev.ru               "conn proxy shutdown source fd:%d cl:%d err:%d",
869*62Sigor@sysoev.ru               source->socket.fd, source->socket.closed, source->socket.error);
870*62Sigor@sysoev.ru 
871*62Sigor@sysoev.ru     nxt_debug(sink->socket.task,
872*62Sigor@sysoev.ru               "conn proxy shutdown sink fd:%d cl:%d err:%d",
873*62Sigor@sysoev.ru               sink->socket.fd, sink->socket.closed, sink->socket.error);
874*62Sigor@sysoev.ru 
875*62Sigor@sysoev.ru     if (!p->connected || p->delayed) {
876*62Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
877*62Sigor@sysoev.ru         return;
878*62Sigor@sysoev.ru     }
879*62Sigor@sysoev.ru 
880*62Sigor@sysoev.ru     if (sink->socket.error == 0 && !sink->socket.closed) {
881*62Sigor@sysoev.ru         sink->socket.shutdown = 1;
882*62Sigor@sysoev.ru         nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR);
883*62Sigor@sysoev.ru     }
884*62Sigor@sysoev.ru 
885*62Sigor@sysoev.ru     if (sink->socket.error != 0
886*62Sigor@sysoev.ru         || (sink->socket.closed && source->write == NULL))
887*62Sigor@sysoev.ru     {
888*62Sigor@sysoev.ru         /* The opposite direction also has been already closed. */
889*62Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
890*62Sigor@sysoev.ru         return;
891*62Sigor@sysoev.ru     }
892*62Sigor@sysoev.ru 
893*62Sigor@sysoev.ru     nxt_debug(source->socket.task, "free source buffer");
894*62Sigor@sysoev.ru 
895*62Sigor@sysoev.ru     /* Free the direction's buffer. */
896*62Sigor@sysoev.ru     b = (source == p->client) ? p->client_buffer : p->peer_buffer;
897*62Sigor@sysoev.ru     nxt_mem_free(source->mem_pool, b);
898*62Sigor@sysoev.ru }
899*62Sigor@sysoev.ru 
900*62Sigor@sysoev.ru 
901*62Sigor@sysoev.ru static void
902*62Sigor@sysoev.ru nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data)
903*62Sigor@sysoev.ru {
904*62Sigor@sysoev.ru     nxt_conn_t        *c;
905*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
906*62Sigor@sysoev.ru 
907*62Sigor@sysoev.ru     c = obj;
908*62Sigor@sysoev.ru     p = data;
909*62Sigor@sysoev.ru 
910*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd);
911*62Sigor@sysoev.ru 
912*62Sigor@sysoev.ru     nxt_conn_proxy_close(task, c, p);
913*62Sigor@sysoev.ru }
914*62Sigor@sysoev.ru 
915*62Sigor@sysoev.ru 
916*62Sigor@sysoev.ru static void
917*62Sigor@sysoev.ru nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
918*62Sigor@sysoev.ru {
919*62Sigor@sysoev.ru     nxt_conn_t        *source, *sink;
920*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
921*62Sigor@sysoev.ru 
922*62Sigor@sysoev.ru     sink = obj;
923*62Sigor@sysoev.ru     p = data;
924*62Sigor@sysoev.ru 
925*62Sigor@sysoev.ru     nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd);
926*62Sigor@sysoev.ru 
927*62Sigor@sysoev.ru     /* Clear data for the direction sink. */
928*62Sigor@sysoev.ru     sink->write = NULL;
929*62Sigor@sysoev.ru 
930*62Sigor@sysoev.ru     /* Block the direction source. */
931*62Sigor@sysoev.ru     source = (sink == p->client) ? p->peer : p->client;
932*62Sigor@sysoev.ru     nxt_fd_event_block_read(task->thread->engine, &source->socket);
933*62Sigor@sysoev.ru 
934*62Sigor@sysoev.ru     if (source->write == NULL) {
935*62Sigor@sysoev.ru         /*
936*62Sigor@sysoev.ru          * There is no data for the opposite direction and
937*62Sigor@sysoev.ru          * the next read from the sink will most probably fail.
938*62Sigor@sysoev.ru          */
939*62Sigor@sysoev.ru         nxt_conn_proxy_complete(task, p);
940*62Sigor@sysoev.ru     }
941*62Sigor@sysoev.ru }
942*62Sigor@sysoev.ru 
943*62Sigor@sysoev.ru 
944*62Sigor@sysoev.ru static const nxt_conn_state_t  nxt_conn_proxy_close_state
945*62Sigor@sysoev.ru     nxt_aligned(64) =
946*62Sigor@sysoev.ru {
947*62Sigor@sysoev.ru     .ready_handler = nxt_conn_proxy_completion,
948*62Sigor@sysoev.ru };
949*62Sigor@sysoev.ru 
950*62Sigor@sysoev.ru 
951*62Sigor@sysoev.ru static void
952*62Sigor@sysoev.ru nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p)
953*62Sigor@sysoev.ru {
954*62Sigor@sysoev.ru     nxt_event_engine_t  *engine;
955*62Sigor@sysoev.ru 
956*62Sigor@sysoev.ru     engine = task->thread->engine;
957*62Sigor@sysoev.ru 
958*62Sigor@sysoev.ru     nxt_debug(p->client->socket.task, "conn proxy complete %d:%d",
959*62Sigor@sysoev.ru               p->client->socket.fd, p->peer->socket.fd);
960*62Sigor@sysoev.ru 
961*62Sigor@sysoev.ru     if (p->delayed) {
962*62Sigor@sysoev.ru         p->delayed = 0;
963*62Sigor@sysoev.ru         nxt_queue_remove(&p->peer->link);
964*62Sigor@sysoev.ru     }
965*62Sigor@sysoev.ru 
966*62Sigor@sysoev.ru     if (p->client->socket.fd != -1) {
967*62Sigor@sysoev.ru         p->retain = 1;
968*62Sigor@sysoev.ru         p->client->write_state = &nxt_conn_proxy_close_state;
969*62Sigor@sysoev.ru         nxt_conn_close(engine, p->client);
970*62Sigor@sysoev.ru     }
971*62Sigor@sysoev.ru 
972*62Sigor@sysoev.ru     if (p->peer->socket.fd != -1) {
973*62Sigor@sysoev.ru         p->retain++;
974*62Sigor@sysoev.ru         p->peer->write_state = &nxt_conn_proxy_close_state;
975*62Sigor@sysoev.ru         nxt_conn_close(engine, p->peer);
976*62Sigor@sysoev.ru     }
977*62Sigor@sysoev.ru }
978*62Sigor@sysoev.ru 
979*62Sigor@sysoev.ru 
980*62Sigor@sysoev.ru static void
981*62Sigor@sysoev.ru nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data)
982*62Sigor@sysoev.ru {
983*62Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
984*62Sigor@sysoev.ru 
985*62Sigor@sysoev.ru     p = data;
986*62Sigor@sysoev.ru 
987*62Sigor@sysoev.ru     nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d",
988*62Sigor@sysoev.ru               p->retain, p->client->socket.fd, p->peer->socket.fd);
989*62Sigor@sysoev.ru 
990*62Sigor@sysoev.ru     p->retain--;
991*62Sigor@sysoev.ru 
992*62Sigor@sysoev.ru     if (p->retain == 0) {
993*62Sigor@sysoev.ru         nxt_mem_free(p->client->mem_pool, p->client_buffer);
994*62Sigor@sysoev.ru         nxt_mem_free(p->client->mem_pool, p->peer_buffer);
995*62Sigor@sysoev.ru 
996*62Sigor@sysoev.ru         p->completion_handler(task, p, NULL);
997*62Sigor@sysoev.ru     }
998*62Sigor@sysoev.ru }
999