xref: /unit/src/nxt_conn_proxy.c (revision 65:10688b89aa16)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
11     void *data);
12 static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj,
13     void *data);
14 static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data);
15 static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data);
16 static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj,
17     void *data);
18 static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj,
19     void *data);
20 static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
21     nxt_conn_t *source, nxt_conn_t *sink);
22 static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b);
23 static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data);
24 static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj,
25     void *data);
26 static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj,
27     void *data);
28 static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
29     nxt_conn_t *sink, nxt_conn_t *source);
30 static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b);
31 static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data);
32 static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data);
33 static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj,
34     void *data);
35 static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj,
36     void *data);
37 static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data);
38 static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data);
39 static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj,
40     void *data);
41 static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
42     nxt_conn_t *source, nxt_conn_t *sink);
43 static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data);
44 static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data);
45 static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p);
46 static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data);
47 
48 
49 static const nxt_conn_state_t  nxt_conn_proxy_client_wait_state;
50 static const nxt_conn_state_t  nxt_conn_proxy_client_first_read_state;
51 static const nxt_conn_state_t  nxt_conn_proxy_peer_connect_state;
52 static const nxt_conn_state_t  nxt_conn_proxy_peer_wait_state;
53 static const nxt_conn_state_t  nxt_conn_proxy_client_read_state;
54 static const nxt_conn_state_t  nxt_conn_proxy_peer_read_state;
55 static const nxt_conn_state_t  nxt_conn_proxy_client_write_state;
56 static const nxt_conn_state_t  nxt_conn_proxy_peer_write_state;
57 
58 
59 nxt_conn_proxy_t *
60 nxt_conn_proxy_create(nxt_conn_t *client)
61 {
62     nxt_conn_t        *peer;
63     nxt_thread_t      *thr;
64     nxt_conn_proxy_t  *p;
65 
66     p = nxt_mp_zget(client->mem_pool, sizeof(nxt_conn_proxy_t));
67     if (nxt_slow_path(p == NULL)) {
68         return NULL;
69     }
70 
71     peer = nxt_conn_create(client->mem_pool, client->socket.task);
72     if (nxt_slow_path(peer == NULL)) {
73         return NULL;
74     }
75 
76     thr = nxt_thread();
77 
78     client->read_work_queue = &thr->engine->read_work_queue;
79     client->write_work_queue = &thr->engine->write_work_queue;
80     client->socket.read_work_queue = &thr->engine->read_work_queue;
81     client->socket.write_work_queue = &thr->engine->write_work_queue;
82     peer->socket.read_work_queue = &thr->engine->read_work_queue;
83     peer->socket.write_work_queue = &thr->engine->write_work_queue;
84 
85     peer->socket.data = client->socket.data;
86 
87     peer->read_work_queue = client->read_work_queue;
88     peer->write_work_queue = client->write_work_queue;
89     peer->read_timer.work_queue = client->read_work_queue;
90     peer->write_timer.work_queue = client->write_work_queue;
91 
92     p->client = client;
93     p->peer = peer;
94 
95     return p;
96 }
97 
98 
99 void
100 nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p)
101 {
102     nxt_conn_t  *peer;
103 
104     /*
105      * Peer read event: not connected, disabled.
106      * Peer write event: not connected, disabled.
107      */
108 
109     if (p->client_wait_timeout == 0) {
110         /*
111          * Peer write event: waiting for connection
112          * to be established with connect_timeout.
113          */
114         peer = p->peer;
115         peer->write_state = &nxt_conn_proxy_peer_connect_state;
116 
117         nxt_conn_connect(task->thread->engine, peer);
118     }
119 
120     /*
121      * Client read event: waiting for client data with
122      * client_wait_timeout before buffer allocation.
123      */
124     p->client->read_state = &nxt_conn_proxy_client_wait_state;
125 
126     nxt_conn_wait(p->client);
127 }
128 
129 
130 static const nxt_conn_state_t  nxt_conn_proxy_client_wait_state
131     nxt_aligned(64) =
132 {
133     .ready_handler = nxt_conn_proxy_client_buffer_alloc,
134     .close_handler = nxt_conn_proxy_close,
135     .error_handler = nxt_conn_proxy_error,
136 
137     .timer_handler = nxt_conn_proxy_read_timeout,
138     .timer_value = nxt_conn_proxy_timeout_value,
139     .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
140 };
141 
142 
143 static void
144 nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data)
145 {
146     nxt_buf_t         *b;
147     nxt_conn_t        *client;
148     nxt_conn_proxy_t  *p;
149 
150     client = obj;
151     p = data;
152 
153     nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd);
154 
155     b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, 0);
156     if (nxt_slow_path(b == NULL)) {
157         /* An error completion. */
158         nxt_conn_proxy_complete(task, p);
159         return;
160     }
161 
162     p->client_buffer = b;
163     client->read = b;
164 
165     if (p->peer->socket.fd != -1) {
166         /*
167          * Client read event: waiting, no timeout.
168          * Client write event: blocked.
169          * Peer read event: disabled.
170          * Peer write event: waiting for connection to be established
171          * or blocked after the connection has established.
172          */
173         client->read_state = &nxt_conn_proxy_client_read_state;
174 
175     } else {
176         /*
177          * Client read event: waiting for data with client_wait_timeout
178          * before connecting to a peer.
179          * Client write event: blocked.
180          * Peer read event: not connected, disabled.
181          * Peer write event: not connected, disabled.
182          */
183         client->read_state = &nxt_conn_proxy_client_first_read_state;
184     }
185 
186     nxt_conn_read(task->thread->engine, client);
187 }
188 
189 
190 static const nxt_conn_state_t  nxt_conn_proxy_client_first_read_state
191     nxt_aligned(64) =
192 {
193     .ready_handler = nxt_conn_proxy_peer_connect,
194     .close_handler = nxt_conn_proxy_close,
195     .error_handler = nxt_conn_proxy_error,
196 
197     .timer_handler = nxt_conn_proxy_read_timeout,
198     .timer_value = nxt_conn_proxy_timeout_value,
199     .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
200     .timer_autoreset = 1,
201 };
202 
203 
204 static void
205 nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
206 {
207     nxt_conn_t        *client;
208     nxt_conn_proxy_t  *p;
209 
210     client = obj;
211     p = data;
212 
213     /*
214      * Client read event: waiting, no timeout.
215      * Client write event: blocked.
216      * Peer read event: disabled.
217      * Peer write event: waiting for connection to be established
218      * with connect_timeout.
219      */
220     client->read_state = &nxt_conn_proxy_client_read_state;
221 
222     p->peer->write_state = &nxt_conn_proxy_peer_connect_state;
223 
224     nxt_conn_connect(task->thread->engine, p->peer);
225 }
226 
227 
228 static const nxt_conn_state_t  nxt_conn_proxy_peer_connect_state
229     nxt_aligned(64) =
230 {
231     .ready_handler = nxt_conn_proxy_connected,
232     .close_handler = nxt_conn_proxy_refused,
233     .error_handler = nxt_conn_proxy_error,
234 
235     .timer_handler = nxt_conn_proxy_write_timeout,
236     .timer_value = nxt_conn_proxy_timeout_value,
237     .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout),
238     .timer_autoreset = 1,
239 };
240 
241 
242 static void
243 nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
244 {
245     nxt_conn_t        *client, *peer;
246     nxt_conn_proxy_t  *p;
247 
248     peer = obj;
249     p = data;
250 
251     nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd);
252 
253     p->connected = 1;
254 
255     nxt_conn_tcp_nodelay_on(task, peer);
256     nxt_conn_tcp_nodelay_on(task, p->client);
257 
258     /* Peer read event: waiting with peer_wait_timeout.  */
259 
260     peer->read_state = &nxt_conn_proxy_peer_wait_state;
261     peer->write_state = &nxt_conn_proxy_peer_write_state;
262 
263     nxt_conn_wait(peer);
264 
265     if (p->client_buffer != NULL) {
266         client = p->client;
267 
268         client->read_state = &nxt_conn_proxy_client_read_state;
269         client->write_state = &nxt_conn_proxy_client_write_state;
270         /*
271          * Send a client read data to the connected peer.
272          * Client write event: blocked.
273          */
274         nxt_conn_proxy_read_process(task, p, client, peer);
275     }
276 }
277 
278 
279 static const nxt_conn_state_t  nxt_conn_proxy_peer_wait_state
280     nxt_aligned(64) =
281 {
282     .ready_handler = nxt_conn_proxy_peer_read,
283     .close_handler = nxt_conn_proxy_close,
284     .error_handler = nxt_conn_proxy_error,
285 
286     .timer_handler = nxt_conn_proxy_read_timeout,
287     .timer_value = nxt_conn_proxy_timeout_value,
288     .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout),
289 };
290 
291 
292 static void
293 nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
294 {
295     nxt_buf_t         *b;
296     nxt_conn_t        *peer;
297     nxt_conn_proxy_t  *p;
298 
299     peer = obj;
300     p = data;
301 
302     nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd);
303 
304     b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, 0);
305     if (nxt_slow_path(b == NULL)) {
306         /* An error completion. */
307         nxt_conn_proxy_complete(task, p);
308         return;
309     }
310 
311     p->peer_buffer = b;
312     peer->read = b;
313 
314     p->client->write_state = &nxt_conn_proxy_client_write_state;
315     peer->read_state = &nxt_conn_proxy_peer_read_state;
316     peer->write_state = &nxt_conn_proxy_peer_write_state;
317 
318     /*
319      * Client read event: waiting, no timeout.
320      * Client write event: blocked.
321      * Peer read event: waiting with possible peer_wait_timeout.
322      * Peer write event: blocked.
323      */
324     nxt_conn_read(task->thread->engine, peer);
325 }
326 
327 
328 static const nxt_conn_state_t  nxt_conn_proxy_client_read_state
329     nxt_aligned(64) =
330 {
331     .ready_handler = nxt_conn_proxy_client_read_ready,
332     .close_handler = nxt_conn_proxy_close,
333     .error_handler = nxt_conn_proxy_read_error,
334 };
335 
336 
337 static void
338 nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data)
339 {
340     nxt_conn_t        *client;
341     nxt_conn_proxy_t  *p;
342 
343     client = obj;
344     p = data;
345 
346     nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd);
347 
348     nxt_conn_proxy_read_process(task, p, client, p->peer);
349 }
350 
351 
352 static const nxt_conn_state_t  nxt_conn_proxy_peer_read_state
353     nxt_aligned(64) =
354 {
355     .ready_handler = nxt_conn_proxy_peer_read_ready,
356     .close_handler = nxt_conn_proxy_close,
357     .error_handler = nxt_conn_proxy_read_error,
358 };
359 
360 
361 static void
362 nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data)
363 {
364     nxt_conn_t        *peer;
365     nxt_conn_proxy_t  *p;
366 
367     peer = obj;
368     p = data;
369 
370     nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd);
371 
372     nxt_conn_proxy_read_process(task, p, peer, p->client);
373 }
374 
375 
376 static void
377 nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
378     nxt_conn_t *source, nxt_conn_t *sink)
379 {
380     nxt_buf_t  *rb, *wb;
381 
382     if (sink->socket.error != 0) {
383         nxt_debug(task, "conn proxy sink fd:%d error:%d",
384                   sink->socket.fd, sink->socket.error);
385 
386         nxt_conn_proxy_write_error(task, sink, sink->socket.data);
387         return;
388     }
389 
390     while (source->read != NULL) {
391 
392         rb = source->read;
393 
394         if (rb->mem.pos != rb->mem.free) {
395 
396             /* Add a read part to a write chain. */
397 
398             wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0);
399             if (wb == NULL) {
400                 /* An error completion. */
401                 nxt_conn_proxy_complete(task, p);
402                 return;
403             }
404 
405             wb->mem.pos = rb->mem.pos;
406             wb->mem.free = rb->mem.free;
407             wb->mem.start = rb->mem.pos;
408             wb->mem.end = rb->mem.free;
409 
410             rb->mem.pos = rb->mem.free;
411             rb->mem.start = rb->mem.free;
412 
413             nxt_conn_proxy_write_add(sink, wb);
414         }
415 
416         if (rb->mem.start != rb->mem.end) {
417             nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
418                                task, source, source->socket.data);
419             break;
420         }
421 
422         source->read = rb->next;
423         nxt_buf_free(source->mem_pool, rb);
424     }
425 
426     if (p->connected) {
427         nxt_conn_write(task->thread->engine, sink);
428     }
429 }
430 
431 
432 static void
433 nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b)
434 {
435     nxt_buf_t  *first, *second, *prev;
436 
437     first = c->write;
438 
439     if (first == NULL) {
440         c->write = b;
441         return;
442     }
443 
444     /*
445      * A event conn proxy maintains a buffer per each direction.
446      * The buffer is divided by read and write parts.  These parts are
447      * linked in buffer chains.  There can be no more than two buffers
448      * in write chain at any time, because an added buffer is coalesced
449      * with the last buffer if possible.
450      */
451 
452     second = first->next;
453 
454     if (second == NULL) {
455 
456         if (first->mem.end != b->mem.start) {
457             first->next = b;
458             return;
459         }
460 
461         /*
462          * The first buffer is just before the added buffer, so
463          * expand the first buffer to the end of the added buffer.
464          */
465         prev = first;
466 
467     } else {
468         if (second->mem.end != b->mem.start) {
469             nxt_thread_log_alert("event conn proxy write: second buffer end:%p "
470                                  "is not equal to added buffer start:%p",
471                                  second->mem.end, b->mem.start);
472             return;
473         }
474 
475         /*
476          * "second->mem.end == b->mem.start" must be always true here,
477          * that is the second buffer is just before the added buffer,
478          * so expand the second buffer to the end of added buffer.
479          */
480         prev = second;
481     }
482 
483     prev->mem.free = b->mem.end;
484     prev->mem.end = b->mem.end;
485 
486     nxt_buf_free(c->mem_pool, b);
487 }
488 
489 
490 static void
491 nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
492 {
493     nxt_conn_t        *source, *sink;
494     nxt_conn_proxy_t  *p;
495 
496     source = obj;
497     p = data;
498 
499     nxt_debug(task, "conn proxy read fd:%d", source->socket.fd);
500 
501     if (!source->socket.closed) {
502         sink = (source == p->client) ? p->peer : p->client;
503 
504         if (sink->socket.error == 0) {
505             nxt_conn_read(task->thread->engine, source);
506         }
507     }
508 }
509 
510 
511 static const nxt_conn_state_t  nxt_conn_proxy_client_write_state
512     nxt_aligned(64) =
513 {
514     .ready_handler = nxt_conn_proxy_client_write_ready,
515     .error_handler = nxt_conn_proxy_write_error,
516 
517     .timer_handler = nxt_conn_proxy_write_timeout,
518     .timer_value = nxt_conn_proxy_timeout_value,
519     .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout),
520     .timer_autoreset = 1,
521 };
522 
523 
524 static void
525 nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data)
526 {
527     nxt_conn_t        *client;
528     nxt_conn_proxy_t  *p;
529 
530     client = obj;
531     p = data;
532 
533     nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd);
534 
535     nxt_conn_proxy_write_process(task, p, client, p->peer);
536 }
537 
538 
539 static const nxt_conn_state_t  nxt_conn_proxy_peer_write_state
540     nxt_aligned(64) =
541 {
542     .ready_handler = nxt_conn_proxy_peer_write_ready,
543     .error_handler = nxt_conn_proxy_write_error,
544 
545     .timer_handler = nxt_conn_proxy_write_timeout,
546     .timer_value = nxt_conn_proxy_timeout_value,
547     .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout),
548     .timer_autoreset = 1,
549 };
550 
551 
552 static void
553 nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data)
554 {
555     nxt_conn_t        *peer;
556     nxt_conn_proxy_t  *p;
557 
558     peer = obj;
559     p = data;
560 
561     nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd);
562 
563     nxt_conn_proxy_write_process(task, p, peer, p->client);
564 }
565 
566 
567 static void
568 nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
569     nxt_conn_t *sink, nxt_conn_t *source)
570 {
571     nxt_buf_t  *rb, *wb;
572 
573     while (sink->write != NULL) {
574 
575         wb = sink->write;
576 
577         if (nxt_buf_is_sync(wb)) {
578 
579             /* A sync buffer marks the end of stream. */
580 
581             sink->write = NULL;
582             nxt_buf_free(sink->mem_pool, wb);
583             nxt_conn_proxy_shutdown(task, p, source, sink);
584             return;
585         }
586 
587         if (wb->mem.start != wb->mem.pos) {
588 
589             /* Add a written part to a read chain. */
590 
591             rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0);
592             if (rb == NULL) {
593                 /* An error completion. */
594                 nxt_conn_proxy_complete(task, p);
595                 return;
596             }
597 
598             rb->mem.pos = wb->mem.start;
599             rb->mem.free = wb->mem.start;
600             rb->mem.start = wb->mem.start;
601             rb->mem.end = wb->mem.pos;
602 
603             wb->mem.start = wb->mem.pos;
604 
605             nxt_conn_proxy_read_add(source, rb);
606         }
607 
608         if (wb->mem.pos != wb->mem.free) {
609             nxt_conn_write(task->thread->engine, sink);
610 
611             break;
612         }
613 
614         sink->write = wb->next;
615         nxt_buf_free(sink->mem_pool, wb);
616     }
617 
618     nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
619                        task, source, source->socket.data);
620 }
621 
622 
623 static void
624 nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b)
625 {
626     nxt_buf_t  *first, *second;
627 
628     first = c->read;
629 
630     if (first == NULL) {
631         c->read = b;
632         return;
633     }
634 
635     /*
636      * A event conn proxy maintains a buffer per each direction.
637      * The buffer is divided by read and write parts.  These parts are
638      * linked in buffer chains.  There can be no more than two buffers
639      * in read chain at any time, because an added buffer is coalesced
640      * with the last buffer if possible.  The first and the second
641      * buffers are also coalesced if possible.
642      */
643 
644     second = first->next;
645 
646     if (second == NULL) {
647 
648         if (first->mem.start == b->mem.end) {
649             /*
650              * The added buffer is just before the first buffer, so expand
651              * the first buffer to the beginning of the added buffer.
652              */
653             first->mem.pos = b->mem.start;
654             first->mem.free = b->mem.start;
655             first->mem.start = b->mem.start;
656 
657         } else if (first->mem.end == b->mem.start) {
658             /*
659              * The added buffer is just after the first buffer, so
660              * expand the first buffer to the end of the added buffer.
661              */
662             first->mem.end = b->mem.end;
663 
664         } else {
665             first->next = b;
666             return;
667         }
668 
669     } else {
670         if (second->mem.end != b->mem.start) {
671             nxt_thread_log_alert("event conn proxy read: second buffer end:%p "
672                                  "is not equal to added buffer start:%p",
673                                  second->mem.end, b->mem.start);
674             return;
675         }
676 
677         /*
678          * The added buffer is just after the second buffer, so
679          * expand the second buffer to the end of the added buffer.
680          */
681         second->mem.end = b->mem.end;
682 
683         if (first->mem.start == second->mem.end) {
684             /*
685              * The second buffer is just before the first buffer, so expand
686              * the first buffer to the beginning of the second buffer.
687              */
688             first->mem.pos = second->mem.start;
689             first->mem.free = second->mem.start;
690             first->mem.start = second->mem.start;
691             first->next = NULL;
692 
693             nxt_buf_free(c->mem_pool, second);
694         }
695     }
696 
697     nxt_buf_free(c->mem_pool, b);
698 }
699 
700 
701 static void
702 nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data)
703 {
704     nxt_buf_t         *b;
705     nxt_conn_t        *source, *sink;
706     nxt_conn_proxy_t  *p;
707 
708     source = obj;
709     p = data;
710 
711     nxt_debug(task, "conn proxy close fd:%d", source->socket.fd);
712 
713     sink = (source == p->client) ? p->peer : p->client;
714 
715     if (sink->write == NULL) {
716         nxt_conn_proxy_shutdown(task, p, source, sink);
717         return;
718     }
719 
720     b = nxt_buf_sync_alloc(source->mem_pool, 0);
721     if (b == NULL) {
722         /* An error completion. */
723         nxt_conn_proxy_complete(task, p);
724         return;
725     }
726 
727     nxt_buf_chain_add(&sink->write, b);
728 }
729 
730 
731 static void
732 nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data)
733 {
734     nxt_conn_t        *c;
735     nxt_conn_proxy_t  *p;
736 
737     c = obj;
738     p = data;
739 
740     nxt_debug(task, "conn proxy error fd:%d", c->socket.fd);
741 
742     nxt_conn_proxy_close(task, c, p);
743 }
744 
745 
746 static void
747 nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data)
748 {
749     nxt_conn_t   *c;
750     nxt_timer_t  *timer;
751 
752     timer = obj;
753 
754     c = nxt_read_timer_conn(timer);
755     c->socket.timedout = 1;
756     c->socket.closed = 1;
757 
758     nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd);
759 
760     nxt_conn_proxy_close(task, c, c->socket.data);
761 }
762 
763 
764 static void
765 nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data)
766 {
767     nxt_conn_t   *c;
768     nxt_timer_t  *timer;
769 
770     timer = obj;
771 
772     c = nxt_write_timer_conn(timer);
773     c->socket.timedout = 1;
774     c->socket.closed = 1;
775 
776     nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd);
777 
778     nxt_conn_proxy_close(task, c, c->socket.data);
779 }
780 
781 
782 static nxt_msec_t
783 nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data)
784 {
785     nxt_msec_t        *timer;
786     nxt_conn_proxy_t  *p;
787 
788     p = c->socket.data;
789 
790     timer = (nxt_msec_t *) ((char *) p + data);
791 
792     return *timer;
793 }
794 
795 
796 static void
797 nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
798 {
799     nxt_conn_t        *peer;
800     nxt_conn_proxy_t  *p;
801 
802     peer = obj;
803     p = data;
804 
805     nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd);
806 
807     if (p->retries == 0) {
808         /* An error completion. */
809         nxt_conn_proxy_complete(task, p);
810         return;
811     }
812 
813     p->retries--;
814 
815     nxt_socket_close(task, peer->socket.fd);
816     peer->socket.fd = -1;
817     peer->socket.error = 0;
818 
819     p->delayed = 1;
820 
821     peer->write_timer.handler = nxt_conn_proxy_reconnect_handler;
822     nxt_timer_add(task->thread->engine, &peer->write_timer,
823                   p->reconnect_timeout);
824 }
825 
826 
827 static void
828 nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
829 {
830     nxt_conn_t        *peer;
831     nxt_timer_t       *timer;
832     nxt_conn_proxy_t  *p;
833 
834     timer = obj;
835 
836     nxt_debug(task, "conn proxy reconnect timer");
837 
838     peer = nxt_write_timer_conn(timer);
839     p = peer->socket.data;
840 
841     if (p->client->socket.closed) {
842         nxt_conn_proxy_complete(task, p);
843         return;
844     }
845 
846     p->delayed = 0;
847 
848     peer->write_state = &nxt_conn_proxy_peer_connect_state;
849     /*
850      * Peer read event: disabled.
851      * Peer write event: waiting for connection with connect_timeout.
852      */
853     nxt_conn_connect(task->thread->engine, peer);
854 }
855 
856 
857 static void
858 nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
859     nxt_conn_t *source, nxt_conn_t *sink)
860 {
861     nxt_buf_t  *b;
862 
863     nxt_debug(source->socket.task,
864               "conn proxy shutdown source fd:%d cl:%d err:%d",
865               source->socket.fd, source->socket.closed, source->socket.error);
866 
867     nxt_debug(sink->socket.task,
868               "conn proxy shutdown sink fd:%d cl:%d err:%d",
869               sink->socket.fd, sink->socket.closed, sink->socket.error);
870 
871     if (!p->connected || p->delayed) {
872         nxt_conn_proxy_complete(task, p);
873         return;
874     }
875 
876     if (sink->socket.error == 0 && !sink->socket.closed) {
877         sink->socket.shutdown = 1;
878         nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR);
879     }
880 
881     if (sink->socket.error != 0
882         || (sink->socket.closed && source->write == NULL))
883     {
884         /* The opposite direction also has been already closed. */
885         nxt_conn_proxy_complete(task, p);
886         return;
887     }
888 
889     nxt_debug(source->socket.task, "free source buffer");
890 
891     /* Free the direction's buffer. */
892     b = (source == p->client) ? p->client_buffer : p->peer_buffer;
893     nxt_mp_free(source->mem_pool, b);
894 }
895 
896 
897 static void
898 nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data)
899 {
900     nxt_conn_t        *c;
901     nxt_conn_proxy_t  *p;
902 
903     c = obj;
904     p = data;
905 
906     nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd);
907 
908     nxt_conn_proxy_close(task, c, p);
909 }
910 
911 
912 static void
913 nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
914 {
915     nxt_conn_t        *source, *sink;
916     nxt_conn_proxy_t  *p;
917 
918     sink = obj;
919     p = data;
920 
921     nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd);
922 
923     /* Clear data for the direction sink. */
924     sink->write = NULL;
925 
926     /* Block the direction source. */
927     source = (sink == p->client) ? p->peer : p->client;
928     nxt_fd_event_block_read(task->thread->engine, &source->socket);
929 
930     if (source->write == NULL) {
931         /*
932          * There is no data for the opposite direction and
933          * the next read from the sink will most probably fail.
934          */
935         nxt_conn_proxy_complete(task, p);
936     }
937 }
938 
939 
940 static const nxt_conn_state_t  nxt_conn_proxy_close_state
941     nxt_aligned(64) =
942 {
943     .ready_handler = nxt_conn_proxy_completion,
944 };
945 
946 
947 static void
948 nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p)
949 {
950     nxt_event_engine_t  *engine;
951 
952     engine = task->thread->engine;
953 
954     nxt_debug(p->client->socket.task, "conn proxy complete %d:%d",
955               p->client->socket.fd, p->peer->socket.fd);
956 
957     if (p->delayed) {
958         p->delayed = 0;
959         nxt_queue_remove(&p->peer->link);
960     }
961 
962     if (p->client->socket.fd != -1) {
963         p->retain = 1;
964         p->client->write_state = &nxt_conn_proxy_close_state;
965         nxt_conn_close(engine, p->client);
966     }
967 
968     if (p->peer->socket.fd != -1) {
969         p->retain++;
970         p->peer->write_state = &nxt_conn_proxy_close_state;
971         nxt_conn_close(engine, p->peer);
972     }
973 }
974 
975 
976 static void
977 nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data)
978 {
979     nxt_conn_proxy_t  *p;
980 
981     p = data;
982 
983     nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d",
984               p->retain, p->client->socket.fd, p->peer->socket.fd);
985 
986     p->retain--;
987 
988     if (p->retain == 0) {
989         nxt_mp_free(p->client->mem_pool, p->client_buffer);
990         nxt_mp_free(p->client->mem_pool, p->peer_buffer);
991 
992         p->completion_handler(task, p, NULL);
993     }
994 }
995