xref: /unit/src/nxt_conn_proxy.c (revision 62:5e1efcc7b740)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
11     void *data);
12 static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj,
13     void *data);
14 static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data);
15 static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data);
16 static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj,
17     void *data);
18 static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj,
19     void *data);
20 static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
21     nxt_conn_t *source, nxt_conn_t *sink);
22 static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b);
23 static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data);
24 static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj,
25     void *data);
26 static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj,
27     void *data);
28 static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
29     nxt_conn_t *sink, nxt_conn_t *source);
30 static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b);
31 static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data);
32 static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data);
33 static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj,
34     void *data);
35 static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj,
36     void *data);
37 static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data);
38 static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data);
39 static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj,
40     void *data);
41 static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
42     nxt_conn_t *source, nxt_conn_t *sink);
43 static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data);
44 static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data);
45 static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p);
46 static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data);
47 
48 
49 static const nxt_conn_state_t  nxt_conn_proxy_client_wait_state;
50 static const nxt_conn_state_t  nxt_conn_proxy_client_first_read_state;
51 static const nxt_conn_state_t  nxt_conn_proxy_peer_connect_state;
52 static const nxt_conn_state_t  nxt_conn_proxy_peer_wait_state;
53 static const nxt_conn_state_t  nxt_conn_proxy_client_read_state;
54 static const nxt_conn_state_t  nxt_conn_proxy_peer_read_state;
55 static const nxt_conn_state_t  nxt_conn_proxy_client_write_state;
56 static const nxt_conn_state_t  nxt_conn_proxy_peer_write_state;
57 
58 
59 nxt_conn_proxy_t *
60 nxt_conn_proxy_create(nxt_conn_t *client)
61 {
62     nxt_conn_t        *peer;
63     nxt_thread_t      *thr;
64     nxt_conn_proxy_t  *p;
65 
66     p = nxt_mem_zalloc(client->mem_pool, sizeof(nxt_conn_proxy_t));
67     if (nxt_slow_path(p == NULL)) {
68         return NULL;
69     }
70 
71     peer = nxt_conn_create(client->mem_pool, client->socket.task);
72     if (nxt_slow_path(peer == NULL)) {
73         return NULL;
74     }
75 
76     thr = nxt_thread();
77 
78     client->read_work_queue = &thr->engine->read_work_queue;
79     client->write_work_queue = &thr->engine->write_work_queue;
80     client->socket.read_work_queue = &thr->engine->read_work_queue;
81     client->socket.write_work_queue = &thr->engine->write_work_queue;
82     peer->socket.read_work_queue = &thr->engine->read_work_queue;
83     peer->socket.write_work_queue = &thr->engine->write_work_queue;
84 
85     peer->socket.data = client->socket.data;
86 
87     peer->read_work_queue = client->read_work_queue;
88     peer->write_work_queue = client->write_work_queue;
89     peer->read_timer.work_queue = client->read_work_queue;
90     peer->write_timer.work_queue = client->write_work_queue;
91 
92     p->client = client;
93     p->peer = peer;
94 
95     return p;
96 }
97 
98 
99 void
100 nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p)
101 {
102     nxt_conn_t  *peer;
103 
104     /*
105      * Peer read event: not connected, disabled.
106      * Peer write event: not connected, disabled.
107      */
108 
109     if (p->client_wait_timeout == 0) {
110         /*
111          * Peer write event: waiting for connection
112          * to be established with connect_timeout.
113          */
114         peer = p->peer;
115         peer->write_state = &nxt_conn_proxy_peer_connect_state;
116 
117         nxt_conn_connect(task->thread->engine, peer);
118     }
119 
120     /*
121      * Client read event: waiting for client data with
122      * client_wait_timeout before buffer allocation.
123      */
124     p->client->read_state = &nxt_conn_proxy_client_wait_state;
125 
126     nxt_conn_wait(p->client);
127 }
128 
129 
130 static const nxt_conn_state_t  nxt_conn_proxy_client_wait_state
131     nxt_aligned(64) =
132 {
133     .ready_handler = nxt_conn_proxy_client_buffer_alloc,
134     .close_handler = nxt_conn_proxy_close,
135     .error_handler = nxt_conn_proxy_error,
136 
137     .timer_handler = nxt_conn_proxy_read_timeout,
138     .timer_value = nxt_conn_proxy_timeout_value,
139     .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
140 };
141 
142 
143 static void
144 nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data)
145 {
146     nxt_buf_t         *b;
147     nxt_conn_t        *client;
148     nxt_conn_proxy_t  *p;
149 
150     client = obj;
151     p = data;
152 
153     nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd);
154 
155     b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size,
156                           NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE);
157 
158     if (nxt_slow_path(b == NULL)) {
159         /* An error completion. */
160         nxt_conn_proxy_complete(task, p);
161         return;
162     }
163 
164     p->client_buffer = b;
165     client->read = b;
166 
167     if (p->peer->socket.fd != -1) {
168         /*
169          * Client read event: waiting, no timeout.
170          * Client write event: blocked.
171          * Peer read event: disabled.
172          * Peer write event: waiting for connection to be established
173          * or blocked after the connection has established.
174          */
175         client->read_state = &nxt_conn_proxy_client_read_state;
176 
177     } else {
178         /*
179          * Client read event: waiting for data with client_wait_timeout
180          * before connecting to a peer.
181          * Client write event: blocked.
182          * Peer read event: not connected, disabled.
183          * Peer write event: not connected, disabled.
184          */
185         client->read_state = &nxt_conn_proxy_client_first_read_state;
186     }
187 
188     nxt_conn_read(task->thread->engine, client);
189 }
190 
191 
192 static const nxt_conn_state_t  nxt_conn_proxy_client_first_read_state
193     nxt_aligned(64) =
194 {
195     .ready_handler = nxt_conn_proxy_peer_connect,
196     .close_handler = nxt_conn_proxy_close,
197     .error_handler = nxt_conn_proxy_error,
198 
199     .timer_handler = nxt_conn_proxy_read_timeout,
200     .timer_value = nxt_conn_proxy_timeout_value,
201     .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
202     .timer_autoreset = 1,
203 };
204 
205 
206 static void
207 nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
208 {
209     nxt_conn_t        *client;
210     nxt_conn_proxy_t  *p;
211 
212     client = obj;
213     p = data;
214 
215     /*
216      * Client read event: waiting, no timeout.
217      * Client write event: blocked.
218      * Peer read event: disabled.
219      * Peer write event: waiting for connection to be established
220      * with connect_timeout.
221      */
222     client->read_state = &nxt_conn_proxy_client_read_state;
223 
224     p->peer->write_state = &nxt_conn_proxy_peer_connect_state;
225 
226     nxt_conn_connect(task->thread->engine, p->peer);
227 }
228 
229 
230 static const nxt_conn_state_t  nxt_conn_proxy_peer_connect_state
231     nxt_aligned(64) =
232 {
233     .ready_handler = nxt_conn_proxy_connected,
234     .close_handler = nxt_conn_proxy_refused,
235     .error_handler = nxt_conn_proxy_error,
236 
237     .timer_handler = nxt_conn_proxy_write_timeout,
238     .timer_value = nxt_conn_proxy_timeout_value,
239     .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout),
240     .timer_autoreset = 1,
241 };
242 
243 
244 static void
245 nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
246 {
247     nxt_conn_t        *client, *peer;
248     nxt_conn_proxy_t  *p;
249 
250     peer = obj;
251     p = data;
252 
253     nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd);
254 
255     p->connected = 1;
256 
257     nxt_conn_tcp_nodelay_on(task, peer);
258     nxt_conn_tcp_nodelay_on(task, p->client);
259 
260     /* Peer read event: waiting with peer_wait_timeout.  */
261 
262     peer->read_state = &nxt_conn_proxy_peer_wait_state;
263     peer->write_state = &nxt_conn_proxy_peer_write_state;
264 
265     nxt_conn_wait(peer);
266 
267     if (p->client_buffer != NULL) {
268         client = p->client;
269 
270         client->read_state = &nxt_conn_proxy_client_read_state;
271         client->write_state = &nxt_conn_proxy_client_write_state;
272         /*
273          * Send a client read data to the connected peer.
274          * Client write event: blocked.
275          */
276         nxt_conn_proxy_read_process(task, p, client, peer);
277     }
278 }
279 
280 
281 static const nxt_conn_state_t  nxt_conn_proxy_peer_wait_state
282     nxt_aligned(64) =
283 {
284     .ready_handler = nxt_conn_proxy_peer_read,
285     .close_handler = nxt_conn_proxy_close,
286     .error_handler = nxt_conn_proxy_error,
287 
288     .timer_handler = nxt_conn_proxy_read_timeout,
289     .timer_value = nxt_conn_proxy_timeout_value,
290     .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout),
291 };
292 
293 
294 static void
295 nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
296 {
297     nxt_buf_t         *b;
298     nxt_conn_t        *peer;
299     nxt_conn_proxy_t  *p;
300 
301     peer = obj;
302     p = data;
303 
304     nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd);
305 
306     b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size,
307                           NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE);
308 
309     if (nxt_slow_path(b == NULL)) {
310         /* An error completion. */
311         nxt_conn_proxy_complete(task, p);
312         return;
313     }
314 
315     p->peer_buffer = b;
316     peer->read = b;
317 
318     p->client->write_state = &nxt_conn_proxy_client_write_state;
319     peer->read_state = &nxt_conn_proxy_peer_read_state;
320     peer->write_state = &nxt_conn_proxy_peer_write_state;
321 
322     /*
323      * Client read event: waiting, no timeout.
324      * Client write event: blocked.
325      * Peer read event: waiting with possible peer_wait_timeout.
326      * Peer write event: blocked.
327      */
328     nxt_conn_read(task->thread->engine, peer);
329 }
330 
331 
332 static const nxt_conn_state_t  nxt_conn_proxy_client_read_state
333     nxt_aligned(64) =
334 {
335     .ready_handler = nxt_conn_proxy_client_read_ready,
336     .close_handler = nxt_conn_proxy_close,
337     .error_handler = nxt_conn_proxy_read_error,
338 };
339 
340 
341 static void
342 nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data)
343 {
344     nxt_conn_t        *client;
345     nxt_conn_proxy_t  *p;
346 
347     client = obj;
348     p = data;
349 
350     nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd);
351 
352     nxt_conn_proxy_read_process(task, p, client, p->peer);
353 }
354 
355 
356 static const nxt_conn_state_t  nxt_conn_proxy_peer_read_state
357     nxt_aligned(64) =
358 {
359     .ready_handler = nxt_conn_proxy_peer_read_ready,
360     .close_handler = nxt_conn_proxy_close,
361     .error_handler = nxt_conn_proxy_read_error,
362 };
363 
364 
365 static void
366 nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data)
367 {
368     nxt_conn_t        *peer;
369     nxt_conn_proxy_t  *p;
370 
371     peer = obj;
372     p = data;
373 
374     nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd);
375 
376     nxt_conn_proxy_read_process(task, p, peer, p->client);
377 }
378 
379 
380 static void
381 nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
382     nxt_conn_t *source, nxt_conn_t *sink)
383 {
384     nxt_buf_t  *rb, *wb;
385 
386     if (sink->socket.error != 0) {
387         nxt_debug(task, "conn proxy sink fd:%d error:%d",
388                   sink->socket.fd, sink->socket.error);
389 
390         nxt_conn_proxy_write_error(task, sink, sink->socket.data);
391         return;
392     }
393 
394     while (source->read != NULL) {
395 
396         rb = source->read;
397 
398         if (rb->mem.pos != rb->mem.free) {
399 
400             /* Add a read part to a write chain. */
401 
402             wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0);
403             if (wb == NULL) {
404                 /* An error completion. */
405                 nxt_conn_proxy_complete(task, p);
406                 return;
407             }
408 
409             wb->mem.pos = rb->mem.pos;
410             wb->mem.free = rb->mem.free;
411             wb->mem.start = rb->mem.pos;
412             wb->mem.end = rb->mem.free;
413 
414             rb->mem.pos = rb->mem.free;
415             rb->mem.start = rb->mem.free;
416 
417             nxt_conn_proxy_write_add(sink, wb);
418         }
419 
420         if (rb->mem.start != rb->mem.end) {
421             nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
422                                task, source, source->socket.data);
423             break;
424         }
425 
426         source->read = rb->next;
427         nxt_buf_free(source->mem_pool, rb);
428     }
429 
430     if (p->connected) {
431         nxt_conn_write(task->thread->engine, sink);
432     }
433 }
434 
435 
436 static void
437 nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b)
438 {
439     nxt_buf_t  *first, *second, *prev;
440 
441     first = c->write;
442 
443     if (first == NULL) {
444         c->write = b;
445         return;
446     }
447 
448     /*
449      * A event conn proxy maintains a buffer per each direction.
450      * The buffer is divided by read and write parts.  These parts are
451      * linked in buffer chains.  There can be no more than two buffers
452      * in write chain at any time, because an added buffer is coalesced
453      * with the last buffer if possible.
454      */
455 
456     second = first->next;
457 
458     if (second == NULL) {
459 
460         if (first->mem.end != b->mem.start) {
461             first->next = b;
462             return;
463         }
464 
465         /*
466          * The first buffer is just before the added buffer, so
467          * expand the first buffer to the end of the added buffer.
468          */
469         prev = first;
470 
471     } else {
472         if (second->mem.end != b->mem.start) {
473             nxt_thread_log_alert("event conn proxy write: second buffer end:%p "
474                                  "is not equal to added buffer start:%p",
475                                  second->mem.end, b->mem.start);
476             return;
477         }
478 
479         /*
480          * "second->mem.end == b->mem.start" must be always true here,
481          * that is the second buffer is just before the added buffer,
482          * so expand the second buffer to the end of added buffer.
483          */
484         prev = second;
485     }
486 
487     prev->mem.free = b->mem.end;
488     prev->mem.end = b->mem.end;
489 
490     nxt_buf_free(c->mem_pool, b);
491 }
492 
493 
494 static void
495 nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
496 {
497     nxt_conn_t        *source, *sink;
498     nxt_conn_proxy_t  *p;
499 
500     source = obj;
501     p = data;
502 
503     nxt_debug(task, "conn proxy read fd:%d", source->socket.fd);
504 
505     if (!source->socket.closed) {
506         sink = (source == p->client) ? p->peer : p->client;
507 
508         if (sink->socket.error == 0) {
509             nxt_conn_read(task->thread->engine, source);
510         }
511     }
512 }
513 
514 
515 static const nxt_conn_state_t  nxt_conn_proxy_client_write_state
516     nxt_aligned(64) =
517 {
518     .ready_handler = nxt_conn_proxy_client_write_ready,
519     .error_handler = nxt_conn_proxy_write_error,
520 
521     .timer_handler = nxt_conn_proxy_write_timeout,
522     .timer_value = nxt_conn_proxy_timeout_value,
523     .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout),
524     .timer_autoreset = 1,
525 };
526 
527 
528 static void
529 nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data)
530 {
531     nxt_conn_t        *client;
532     nxt_conn_proxy_t  *p;
533 
534     client = obj;
535     p = data;
536 
537     nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd);
538 
539     nxt_conn_proxy_write_process(task, p, client, p->peer);
540 }
541 
542 
543 static const nxt_conn_state_t  nxt_conn_proxy_peer_write_state
544     nxt_aligned(64) =
545 {
546     .ready_handler = nxt_conn_proxy_peer_write_ready,
547     .error_handler = nxt_conn_proxy_write_error,
548 
549     .timer_handler = nxt_conn_proxy_write_timeout,
550     .timer_value = nxt_conn_proxy_timeout_value,
551     .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout),
552     .timer_autoreset = 1,
553 };
554 
555 
556 static void
557 nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data)
558 {
559     nxt_conn_t        *peer;
560     nxt_conn_proxy_t  *p;
561 
562     peer = obj;
563     p = data;
564 
565     nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd);
566 
567     nxt_conn_proxy_write_process(task, p, peer, p->client);
568 }
569 
570 
571 static void
572 nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
573     nxt_conn_t *sink, nxt_conn_t *source)
574 {
575     nxt_buf_t  *rb, *wb;
576 
577     while (sink->write != NULL) {
578 
579         wb = sink->write;
580 
581         if (nxt_buf_is_sync(wb)) {
582 
583             /* A sync buffer marks the end of stream. */
584 
585             sink->write = NULL;
586             nxt_buf_free(sink->mem_pool, wb);
587             nxt_conn_proxy_shutdown(task, p, source, sink);
588             return;
589         }
590 
591         if (wb->mem.start != wb->mem.pos) {
592 
593             /* Add a written part to a read chain. */
594 
595             rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0);
596             if (rb == NULL) {
597                 /* An error completion. */
598                 nxt_conn_proxy_complete(task, p);
599                 return;
600             }
601 
602             rb->mem.pos = wb->mem.start;
603             rb->mem.free = wb->mem.start;
604             rb->mem.start = wb->mem.start;
605             rb->mem.end = wb->mem.pos;
606 
607             wb->mem.start = wb->mem.pos;
608 
609             nxt_conn_proxy_read_add(source, rb);
610         }
611 
612         if (wb->mem.pos != wb->mem.free) {
613             nxt_conn_write(task->thread->engine, sink);
614 
615             break;
616         }
617 
618         sink->write = wb->next;
619         nxt_buf_free(sink->mem_pool, wb);
620     }
621 
622     nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
623                        task, source, source->socket.data);
624 }
625 
626 
627 static void
628 nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b)
629 {
630     nxt_buf_t  *first, *second;
631 
632     first = c->read;
633 
634     if (first == NULL) {
635         c->read = b;
636         return;
637     }
638 
639     /*
640      * A event conn proxy maintains a buffer per each direction.
641      * The buffer is divided by read and write parts.  These parts are
642      * linked in buffer chains.  There can be no more than two buffers
643      * in read chain at any time, because an added buffer is coalesced
644      * with the last buffer if possible.  The first and the second
645      * buffers are also coalesced if possible.
646      */
647 
648     second = first->next;
649 
650     if (second == NULL) {
651 
652         if (first->mem.start == b->mem.end) {
653             /*
654              * The added buffer is just before the first buffer, so expand
655              * the first buffer to the beginning of the added buffer.
656              */
657             first->mem.pos = b->mem.start;
658             first->mem.free = b->mem.start;
659             first->mem.start = b->mem.start;
660 
661         } else if (first->mem.end == b->mem.start) {
662             /*
663              * The added buffer is just after the first buffer, so
664              * expand the first buffer to the end of the added buffer.
665              */
666             first->mem.end = b->mem.end;
667 
668         } else {
669             first->next = b;
670             return;
671         }
672 
673     } else {
674         if (second->mem.end != b->mem.start) {
675             nxt_thread_log_alert("event conn proxy read: second buffer end:%p "
676                                  "is not equal to added buffer start:%p",
677                                  second->mem.end, b->mem.start);
678             return;
679         }
680 
681         /*
682          * The added buffer is just after the second buffer, so
683          * expand the second buffer to the end of the added buffer.
684          */
685         second->mem.end = b->mem.end;
686 
687         if (first->mem.start == second->mem.end) {
688             /*
689              * The second buffer is just before the first buffer, so expand
690              * the first buffer to the beginning of the second buffer.
691              */
692             first->mem.pos = second->mem.start;
693             first->mem.free = second->mem.start;
694             first->mem.start = second->mem.start;
695             first->next = NULL;
696 
697             nxt_buf_free(c->mem_pool, second);
698         }
699     }
700 
701     nxt_buf_free(c->mem_pool, b);
702 }
703 
704 
705 static void
706 nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data)
707 {
708     nxt_buf_t         *b;
709     nxt_conn_t        *source, *sink;
710     nxt_conn_proxy_t  *p;
711 
712     source = obj;
713     p = data;
714 
715     nxt_debug(task, "conn proxy close fd:%d", source->socket.fd);
716 
717     sink = (source == p->client) ? p->peer : p->client;
718 
719     if (sink->write == NULL) {
720         nxt_conn_proxy_shutdown(task, p, source, sink);
721         return;
722     }
723 
724     b = nxt_buf_sync_alloc(source->mem_pool, 0);
725     if (b == NULL) {
726         /* An error completion. */
727         nxt_conn_proxy_complete(task, p);
728         return;
729     }
730 
731     nxt_buf_chain_add(&sink->write, b);
732 }
733 
734 
735 static void
736 nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data)
737 {
738     nxt_conn_t        *c;
739     nxt_conn_proxy_t  *p;
740 
741     c = obj;
742     p = data;
743 
744     nxt_debug(task, "conn proxy error fd:%d", c->socket.fd);
745 
746     nxt_conn_proxy_close(task, c, p);
747 }
748 
749 
750 static void
751 nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data)
752 {
753     nxt_conn_t   *c;
754     nxt_timer_t  *timer;
755 
756     timer = obj;
757 
758     c = nxt_read_timer_conn(timer);
759     c->socket.timedout = 1;
760     c->socket.closed = 1;
761 
762     nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd);
763 
764     nxt_conn_proxy_close(task, c, c->socket.data);
765 }
766 
767 
768 static void
769 nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data)
770 {
771     nxt_conn_t   *c;
772     nxt_timer_t  *timer;
773 
774     timer = obj;
775 
776     c = nxt_write_timer_conn(timer);
777     c->socket.timedout = 1;
778     c->socket.closed = 1;
779 
780     nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd);
781 
782     nxt_conn_proxy_close(task, c, c->socket.data);
783 }
784 
785 
786 static nxt_msec_t
787 nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data)
788 {
789     nxt_msec_t        *timer;
790     nxt_conn_proxy_t  *p;
791 
792     p = c->socket.data;
793 
794     timer = (nxt_msec_t *) ((char *) p + data);
795 
796     return *timer;
797 }
798 
799 
800 static void
801 nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
802 {
803     nxt_conn_t        *peer;
804     nxt_conn_proxy_t  *p;
805 
806     peer = obj;
807     p = data;
808 
809     nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd);
810 
811     if (p->retries == 0) {
812         /* An error completion. */
813         nxt_conn_proxy_complete(task, p);
814         return;
815     }
816 
817     p->retries--;
818 
819     nxt_socket_close(task, peer->socket.fd);
820     peer->socket.fd = -1;
821     peer->socket.error = 0;
822 
823     p->delayed = 1;
824 
825     peer->write_timer.handler = nxt_conn_proxy_reconnect_handler;
826     nxt_timer_add(task->thread->engine, &peer->write_timer,
827                   p->reconnect_timeout);
828 }
829 
830 
831 static void
832 nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
833 {
834     nxt_conn_t        *peer;
835     nxt_timer_t       *timer;
836     nxt_conn_proxy_t  *p;
837 
838     timer = obj;
839 
840     nxt_debug(task, "conn proxy reconnect timer");
841 
842     peer = nxt_write_timer_conn(timer);
843     p = peer->socket.data;
844 
845     if (p->client->socket.closed) {
846         nxt_conn_proxy_complete(task, p);
847         return;
848     }
849 
850     p->delayed = 0;
851 
852     peer->write_state = &nxt_conn_proxy_peer_connect_state;
853     /*
854      * Peer read event: disabled.
855      * Peer write event: waiting for connection with connect_timeout.
856      */
857     nxt_conn_connect(task->thread->engine, peer);
858 }
859 
860 
861 static void
862 nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
863     nxt_conn_t *source, nxt_conn_t *sink)
864 {
865     nxt_buf_t  *b;
866 
867     nxt_debug(source->socket.task,
868               "conn proxy shutdown source fd:%d cl:%d err:%d",
869               source->socket.fd, source->socket.closed, source->socket.error);
870 
871     nxt_debug(sink->socket.task,
872               "conn proxy shutdown sink fd:%d cl:%d err:%d",
873               sink->socket.fd, sink->socket.closed, sink->socket.error);
874 
875     if (!p->connected || p->delayed) {
876         nxt_conn_proxy_complete(task, p);
877         return;
878     }
879 
880     if (sink->socket.error == 0 && !sink->socket.closed) {
881         sink->socket.shutdown = 1;
882         nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR);
883     }
884 
885     if (sink->socket.error != 0
886         || (sink->socket.closed && source->write == NULL))
887     {
888         /* The opposite direction also has been already closed. */
889         nxt_conn_proxy_complete(task, p);
890         return;
891     }
892 
893     nxt_debug(source->socket.task, "free source buffer");
894 
895     /* Free the direction's buffer. */
896     b = (source == p->client) ? p->client_buffer : p->peer_buffer;
897     nxt_mem_free(source->mem_pool, b);
898 }
899 
900 
901 static void
902 nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data)
903 {
904     nxt_conn_t        *c;
905     nxt_conn_proxy_t  *p;
906 
907     c = obj;
908     p = data;
909 
910     nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd);
911 
912     nxt_conn_proxy_close(task, c, p);
913 }
914 
915 
916 static void
917 nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
918 {
919     nxt_conn_t        *source, *sink;
920     nxt_conn_proxy_t  *p;
921 
922     sink = obj;
923     p = data;
924 
925     nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd);
926 
927     /* Clear data for the direction sink. */
928     sink->write = NULL;
929 
930     /* Block the direction source. */
931     source = (sink == p->client) ? p->peer : p->client;
932     nxt_fd_event_block_read(task->thread->engine, &source->socket);
933 
934     if (source->write == NULL) {
935         /*
936          * There is no data for the opposite direction and
937          * the next read from the sink will most probably fail.
938          */
939         nxt_conn_proxy_complete(task, p);
940     }
941 }
942 
943 
944 static const nxt_conn_state_t  nxt_conn_proxy_close_state
945     nxt_aligned(64) =
946 {
947     .ready_handler = nxt_conn_proxy_completion,
948 };
949 
950 
951 static void
952 nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p)
953 {
954     nxt_event_engine_t  *engine;
955 
956     engine = task->thread->engine;
957 
958     nxt_debug(p->client->socket.task, "conn proxy complete %d:%d",
959               p->client->socket.fd, p->peer->socket.fd);
960 
961     if (p->delayed) {
962         p->delayed = 0;
963         nxt_queue_remove(&p->peer->link);
964     }
965 
966     if (p->client->socket.fd != -1) {
967         p->retain = 1;
968         p->client->write_state = &nxt_conn_proxy_close_state;
969         nxt_conn_close(engine, p->client);
970     }
971 
972     if (p->peer->socket.fd != -1) {
973         p->retain++;
974         p->peer->write_state = &nxt_conn_proxy_close_state;
975         nxt_conn_close(engine, p->peer);
976     }
977 }
978 
979 
980 static void
981 nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data)
982 {
983     nxt_conn_proxy_t  *p;
984 
985     p = data;
986 
987     nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d",
988               p->retain, p->client->socket.fd, p->peer->socket.fd);
989 
990     p->retain--;
991 
992     if (p->retain == 0) {
993         nxt_mem_free(p->client->mem_pool, p->client_buffer);
994         nxt_mem_free(p->client->mem_pool, p->peer_buffer);
995 
996         p->completion_handler(task, p, NULL);
997     }
998 }
999