1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
11 void *data);
12 static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj,
13 void *data);
14 static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data);
15 static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data);
16 static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj,
17 void *data);
18 static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj,
19 void *data);
20 static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
21 nxt_conn_t *source, nxt_conn_t *sink);
22 static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b);
23 static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data);
24 static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj,
25 void *data);
26 static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj,
27 void *data);
28 static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
29 nxt_conn_t *sink, nxt_conn_t *source);
30 static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b);
31 static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data);
32 static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data);
33 static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj,
34 void *data);
35 static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj,
36 void *data);
37 static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data);
38 static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data);
39 static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj,
40 void *data);
41 static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
42 nxt_conn_t *source, nxt_conn_t *sink);
43 static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data);
44 static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data);
45 static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p);
46 static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data);
47
48
49 static const nxt_conn_state_t nxt_conn_proxy_client_wait_state;
50 static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state;
51 static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state;
52 static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state;
53 static const nxt_conn_state_t nxt_conn_proxy_client_read_state;
54 static const nxt_conn_state_t nxt_conn_proxy_peer_read_state;
55 static const nxt_conn_state_t nxt_conn_proxy_client_write_state;
56 static const nxt_conn_state_t nxt_conn_proxy_peer_write_state;
57
58
59 nxt_conn_proxy_t *
nxt_conn_proxy_create(nxt_conn_t * client)60 nxt_conn_proxy_create(nxt_conn_t *client)
61 {
62 nxt_conn_t *peer;
63 nxt_thread_t *thr;
64 nxt_conn_proxy_t *p;
65
66 p = nxt_mp_zget(client->mem_pool, sizeof(nxt_conn_proxy_t));
67 if (nxt_slow_path(p == NULL)) {
68 return NULL;
69 }
70
71 peer = nxt_conn_create(client->mem_pool, client->socket.task);
72 if (nxt_slow_path(peer == NULL)) {
73 return NULL;
74 }
75
76 thr = nxt_thread();
77
78 client->read_work_queue = &thr->engine->read_work_queue;
79 client->write_work_queue = &thr->engine->write_work_queue;
80 client->socket.read_work_queue = &thr->engine->read_work_queue;
81 client->socket.write_work_queue = &thr->engine->write_work_queue;
82 peer->socket.read_work_queue = &thr->engine->read_work_queue;
83 peer->socket.write_work_queue = &thr->engine->write_work_queue;
84
85 peer->socket.data = client->socket.data;
86
87 peer->read_work_queue = client->read_work_queue;
88 peer->write_work_queue = client->write_work_queue;
89 peer->read_timer.work_queue = client->read_work_queue;
90 peer->write_timer.work_queue = client->write_work_queue;
91
92 p->client = client;
93 p->peer = peer;
94
95 return p;
96 }
97
98
99 void
nxt_conn_proxy(nxt_task_t * task,nxt_conn_proxy_t * p)100 nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p)
101 {
102 nxt_conn_t *peer;
103
104 /*
105 * Peer read event: not connected, disabled.
106 * Peer write event: not connected, disabled.
107 */
108
109 if (p->client_wait_timeout == 0) {
110 /*
111 * Peer write event: waiting for connection
112 * to be established with connect_timeout.
113 */
114 peer = p->peer;
115 peer->write_state = &nxt_conn_proxy_peer_connect_state;
116
117 nxt_conn_connect(task->thread->engine, peer);
118 }
119
120 /*
121 * Client read event: waiting for client data with
122 * client_wait_timeout before buffer allocation.
123 */
124 p->client->read_state = &nxt_conn_proxy_client_wait_state;
125
126 nxt_conn_wait(p->client);
127 }
128
129
130 static const nxt_conn_state_t nxt_conn_proxy_client_wait_state
131 nxt_aligned(64) =
132 {
133 .ready_handler = nxt_conn_proxy_client_buffer_alloc,
134 .close_handler = nxt_conn_proxy_close,
135 .error_handler = nxt_conn_proxy_error,
136
137 .timer_handler = nxt_conn_proxy_read_timeout,
138 .timer_value = nxt_conn_proxy_timeout_value,
139 .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
140 };
141
142
143 static void
nxt_conn_proxy_client_buffer_alloc(nxt_task_t * task,void * obj,void * data)144 nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data)
145 {
146 nxt_buf_t *b;
147 nxt_conn_t *client;
148 nxt_conn_proxy_t *p;
149
150 client = obj;
151 p = data;
152
153 nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd);
154
155 b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, 0);
156 if (nxt_slow_path(b == NULL)) {
157 /* An error completion. */
158 nxt_conn_proxy_complete(task, p);
159 return;
160 }
161
162 p->client_buffer = b;
163 client->read = b;
164
165 if (p->peer->socket.fd != -1) {
166 /*
167 * Client read event: waiting, no timeout.
168 * Client write event: blocked.
169 * Peer read event: disabled.
170 * Peer write event: waiting for connection to be established
171 * or blocked after the connection has established.
172 */
173 client->read_state = &nxt_conn_proxy_client_read_state;
174
175 } else {
176 /*
177 * Client read event: waiting for data with client_wait_timeout
178 * before connecting to a peer.
179 * Client write event: blocked.
180 * Peer read event: not connected, disabled.
181 * Peer write event: not connected, disabled.
182 */
183 client->read_state = &nxt_conn_proxy_client_first_read_state;
184 }
185
186 nxt_conn_read(task->thread->engine, client);
187 }
188
189
190 static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state
191 nxt_aligned(64) =
192 {
193 .ready_handler = nxt_conn_proxy_peer_connect,
194 .close_handler = nxt_conn_proxy_close,
195 .error_handler = nxt_conn_proxy_error,
196
197 .timer_handler = nxt_conn_proxy_read_timeout,
198 .timer_value = nxt_conn_proxy_timeout_value,
199 .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
200 .timer_autoreset = 1,
201 };
202
203
204 static void
nxt_conn_proxy_peer_connect(nxt_task_t * task,void * obj,void * data)205 nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
206 {
207 nxt_conn_t *client;
208 nxt_conn_proxy_t *p;
209
210 client = obj;
211 p = data;
212
213 /*
214 * Client read event: waiting, no timeout.
215 * Client write event: blocked.
216 * Peer read event: disabled.
217 * Peer write event: waiting for connection to be established
218 * with connect_timeout.
219 */
220 client->read_state = &nxt_conn_proxy_client_read_state;
221
222 p->peer->write_state = &nxt_conn_proxy_peer_connect_state;
223
224 nxt_conn_connect(task->thread->engine, p->peer);
225 }
226
227
228 static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state
229 nxt_aligned(64) =
230 {
231 .ready_handler = nxt_conn_proxy_connected,
232 .close_handler = nxt_conn_proxy_refused,
233 .error_handler = nxt_conn_proxy_error,
234
235 .timer_handler = nxt_conn_proxy_write_timeout,
236 .timer_value = nxt_conn_proxy_timeout_value,
237 .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout),
238 .timer_autoreset = 1,
239 };
240
241
242 static void
nxt_conn_proxy_connected(nxt_task_t * task,void * obj,void * data)243 nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
244 {
245 nxt_conn_t *client, *peer;
246 nxt_conn_proxy_t *p;
247
248 peer = obj;
249 p = data;
250
251 nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd);
252
253 p->connected = 1;
254
255 nxt_conn_tcp_nodelay_on(task, peer);
256 nxt_conn_tcp_nodelay_on(task, p->client);
257
258 /* Peer read event: waiting with peer_wait_timeout. */
259
260 peer->read_state = &nxt_conn_proxy_peer_wait_state;
261 peer->write_state = &nxt_conn_proxy_peer_write_state;
262
263 nxt_conn_wait(peer);
264
265 if (p->client_buffer != NULL) {
266 client = p->client;
267
268 client->read_state = &nxt_conn_proxy_client_read_state;
269 client->write_state = &nxt_conn_proxy_client_write_state;
270 /*
271 * Send a client read data to the connected peer.
272 * Client write event: blocked.
273 */
274 nxt_conn_proxy_read_process(task, p, client, peer);
275 }
276 }
277
278
279 static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state
280 nxt_aligned(64) =
281 {
282 .ready_handler = nxt_conn_proxy_peer_read,
283 .close_handler = nxt_conn_proxy_close,
284 .error_handler = nxt_conn_proxy_error,
285
286 .timer_handler = nxt_conn_proxy_read_timeout,
287 .timer_value = nxt_conn_proxy_timeout_value,
288 .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout),
289 };
290
291
292 static void
nxt_conn_proxy_peer_read(nxt_task_t * task,void * obj,void * data)293 nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
294 {
295 nxt_buf_t *b;
296 nxt_conn_t *peer;
297 nxt_conn_proxy_t *p;
298
299 peer = obj;
300 p = data;
301
302 nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd);
303
304 b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, 0);
305 if (nxt_slow_path(b == NULL)) {
306 /* An error completion. */
307 nxt_conn_proxy_complete(task, p);
308 return;
309 }
310
311 p->peer_buffer = b;
312 peer->read = b;
313
314 p->client->write_state = &nxt_conn_proxy_client_write_state;
315 peer->read_state = &nxt_conn_proxy_peer_read_state;
316 peer->write_state = &nxt_conn_proxy_peer_write_state;
317
318 /*
319 * Client read event: waiting, no timeout.
320 * Client write event: blocked.
321 * Peer read event: waiting with possible peer_wait_timeout.
322 * Peer write event: blocked.
323 */
324 nxt_conn_read(task->thread->engine, peer);
325 }
326
327
328 static const nxt_conn_state_t nxt_conn_proxy_client_read_state
329 nxt_aligned(64) =
330 {
331 .ready_handler = nxt_conn_proxy_client_read_ready,
332 .close_handler = nxt_conn_proxy_close,
333 .error_handler = nxt_conn_proxy_read_error,
334 };
335
336
337 static void
nxt_conn_proxy_client_read_ready(nxt_task_t * task,void * obj,void * data)338 nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data)
339 {
340 nxt_conn_t *client;
341 nxt_conn_proxy_t *p;
342
343 client = obj;
344 p = data;
345
346 nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd);
347
348 nxt_conn_proxy_read_process(task, p, client, p->peer);
349 }
350
351
352 static const nxt_conn_state_t nxt_conn_proxy_peer_read_state
353 nxt_aligned(64) =
354 {
355 .ready_handler = nxt_conn_proxy_peer_read_ready,
356 .close_handler = nxt_conn_proxy_close,
357 .error_handler = nxt_conn_proxy_read_error,
358 };
359
360
361 static void
nxt_conn_proxy_peer_read_ready(nxt_task_t * task,void * obj,void * data)362 nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data)
363 {
364 nxt_conn_t *peer;
365 nxt_conn_proxy_t *p;
366
367 peer = obj;
368 p = data;
369
370 nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd);
371
372 nxt_conn_proxy_read_process(task, p, peer, p->client);
373 }
374
375
376 static void
nxt_conn_proxy_read_process(nxt_task_t * task,nxt_conn_proxy_t * p,nxt_conn_t * source,nxt_conn_t * sink)377 nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
378 nxt_conn_t *source, nxt_conn_t *sink)
379 {
380 nxt_buf_t *rb, *wb;
381
382 if (sink->socket.error != 0) {
383 nxt_debug(task, "conn proxy sink fd:%d error:%d",
384 sink->socket.fd, sink->socket.error);
385
386 nxt_conn_proxy_write_error(task, sink, sink->socket.data);
387 return;
388 }
389
390 while (source->read != NULL) {
391
392 rb = source->read;
393
394 if (rb->mem.pos != rb->mem.free) {
395
396 /* Add a read part to a write chain. */
397
398 wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0);
399 if (wb == NULL) {
400 /* An error completion. */
401 nxt_conn_proxy_complete(task, p);
402 return;
403 }
404
405 wb->mem.pos = rb->mem.pos;
406 wb->mem.free = rb->mem.free;
407 wb->mem.start = rb->mem.pos;
408 wb->mem.end = rb->mem.free;
409
410 rb->mem.pos = rb->mem.free;
411 rb->mem.start = rb->mem.free;
412
413 nxt_conn_proxy_write_add(sink, wb);
414 }
415
416 if (rb->mem.start != rb->mem.end) {
417 nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
418 task, source, source->socket.data);
419 break;
420 }
421
422 source->read = rb->next;
423 nxt_buf_free(source->mem_pool, rb);
424 }
425
426 if (p->connected) {
427 nxt_conn_write(task->thread->engine, sink);
428 }
429 }
430
431
432 static void
nxt_conn_proxy_write_add(nxt_conn_t * c,nxt_buf_t * b)433 nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b)
434 {
435 nxt_buf_t *first, *second, *prev;
436
437 first = c->write;
438
439 if (first == NULL) {
440 c->write = b;
441 return;
442 }
443
444 /*
445 * A event conn proxy maintains a buffer per each direction.
446 * The buffer is divided by read and write parts. These parts are
447 * linked in buffer chains. There can be no more than two buffers
448 * in write chain at any time, because an added buffer is coalesced
449 * with the last buffer if possible.
450 */
451
452 second = first->next;
453
454 if (second == NULL) {
455
456 if (first->mem.end != b->mem.start) {
457 first->next = b;
458 return;
459 }
460
461 /*
462 * The first buffer is just before the added buffer, so
463 * expand the first buffer to the end of the added buffer.
464 */
465 prev = first;
466
467 } else {
468 if (second->mem.end != b->mem.start) {
469 nxt_thread_log_alert("event conn proxy write: second buffer end:%p "
470 "is not equal to added buffer start:%p",
471 second->mem.end, b->mem.start);
472 return;
473 }
474
475 /*
476 * "second->mem.end == b->mem.start" must be always true here,
477 * that is the second buffer is just before the added buffer,
478 * so expand the second buffer to the end of added buffer.
479 */
480 prev = second;
481 }
482
483 prev->mem.free = b->mem.end;
484 prev->mem.end = b->mem.end;
485
486 nxt_buf_free(c->mem_pool, b);
487 }
488
489
490 static void
nxt_conn_proxy_read(nxt_task_t * task,void * obj,void * data)491 nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
492 {
493 nxt_conn_t *source, *sink;
494 nxt_conn_proxy_t *p;
495
496 source = obj;
497 p = data;
498
499 nxt_debug(task, "conn proxy read fd:%d", source->socket.fd);
500
501 if (!source->socket.closed) {
502 sink = (source == p->client) ? p->peer : p->client;
503
504 if (sink->socket.error == 0) {
505 nxt_conn_read(task->thread->engine, source);
506 }
507 }
508 }
509
510
511 static const nxt_conn_state_t nxt_conn_proxy_client_write_state
512 nxt_aligned(64) =
513 {
514 .ready_handler = nxt_conn_proxy_client_write_ready,
515 .error_handler = nxt_conn_proxy_write_error,
516
517 .timer_handler = nxt_conn_proxy_write_timeout,
518 .timer_value = nxt_conn_proxy_timeout_value,
519 .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout),
520 .timer_autoreset = 1,
521 };
522
523
524 static void
nxt_conn_proxy_client_write_ready(nxt_task_t * task,void * obj,void * data)525 nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data)
526 {
527 nxt_conn_t *client;
528 nxt_conn_proxy_t *p;
529
530 client = obj;
531 p = data;
532
533 nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd);
534
535 nxt_conn_proxy_write_process(task, p, client, p->peer);
536 }
537
538
539 static const nxt_conn_state_t nxt_conn_proxy_peer_write_state
540 nxt_aligned(64) =
541 {
542 .ready_handler = nxt_conn_proxy_peer_write_ready,
543 .error_handler = nxt_conn_proxy_write_error,
544
545 .timer_handler = nxt_conn_proxy_write_timeout,
546 .timer_value = nxt_conn_proxy_timeout_value,
547 .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout),
548 .timer_autoreset = 1,
549 };
550
551
552 static void
nxt_conn_proxy_peer_write_ready(nxt_task_t * task,void * obj,void * data)553 nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data)
554 {
555 nxt_conn_t *peer;
556 nxt_conn_proxy_t *p;
557
558 peer = obj;
559 p = data;
560
561 nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd);
562
563 nxt_conn_proxy_write_process(task, p, peer, p->client);
564 }
565
566
567 static void
nxt_conn_proxy_write_process(nxt_task_t * task,nxt_conn_proxy_t * p,nxt_conn_t * sink,nxt_conn_t * source)568 nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
569 nxt_conn_t *sink, nxt_conn_t *source)
570 {
571 nxt_buf_t *rb, *wb;
572
573 while (sink->write != NULL) {
574
575 wb = sink->write;
576
577 if (nxt_buf_is_sync(wb)) {
578
579 /* A sync buffer marks the end of stream. */
580
581 sink->write = NULL;
582 nxt_buf_free(sink->mem_pool, wb);
583 nxt_conn_proxy_shutdown(task, p, source, sink);
584 return;
585 }
586
587 if (wb->mem.start != wb->mem.pos) {
588
589 /* Add a written part to a read chain. */
590
591 rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0);
592 if (rb == NULL) {
593 /* An error completion. */
594 nxt_conn_proxy_complete(task, p);
595 return;
596 }
597
598 rb->mem.pos = wb->mem.start;
599 rb->mem.free = wb->mem.start;
600 rb->mem.start = wb->mem.start;
601 rb->mem.end = wb->mem.pos;
602
603 wb->mem.start = wb->mem.pos;
604
605 nxt_conn_proxy_read_add(source, rb);
606 }
607
608 if (wb->mem.pos != wb->mem.free) {
609 nxt_conn_write(task->thread->engine, sink);
610
611 break;
612 }
613
614 sink->write = wb->next;
615 nxt_buf_free(sink->mem_pool, wb);
616 }
617
618 nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
619 task, source, source->socket.data);
620 }
621
622
623 static void
nxt_conn_proxy_read_add(nxt_conn_t * c,nxt_buf_t * b)624 nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b)
625 {
626 nxt_buf_t *first, *second;
627
628 first = c->read;
629
630 if (first == NULL) {
631 c->read = b;
632 return;
633 }
634
635 /*
636 * A event conn proxy maintains a buffer per each direction.
637 * The buffer is divided by read and write parts. These parts are
638 * linked in buffer chains. There can be no more than two buffers
639 * in read chain at any time, because an added buffer is coalesced
640 * with the last buffer if possible. The first and the second
641 * buffers are also coalesced if possible.
642 */
643
644 second = first->next;
645
646 if (second == NULL) {
647
648 if (first->mem.start == b->mem.end) {
649 /*
650 * The added buffer is just before the first buffer, so expand
651 * the first buffer to the beginning of the added buffer.
652 */
653 first->mem.pos = b->mem.start;
654 first->mem.free = b->mem.start;
655 first->mem.start = b->mem.start;
656
657 } else if (first->mem.end == b->mem.start) {
658 /*
659 * The added buffer is just after the first buffer, so
660 * expand the first buffer to the end of the added buffer.
661 */
662 first->mem.end = b->mem.end;
663
664 } else {
665 first->next = b;
666 return;
667 }
668
669 } else {
670 if (second->mem.end != b->mem.start) {
671 nxt_thread_log_alert("event conn proxy read: second buffer end:%p "
672 "is not equal to added buffer start:%p",
673 second->mem.end, b->mem.start);
674 return;
675 }
676
677 /*
678 * The added buffer is just after the second buffer, so
679 * expand the second buffer to the end of the added buffer.
680 */
681 second->mem.end = b->mem.end;
682
683 if (first->mem.start == second->mem.end) {
684 /*
685 * The second buffer is just before the first buffer, so expand
686 * the first buffer to the beginning of the second buffer.
687 */
688 first->mem.pos = second->mem.start;
689 first->mem.free = second->mem.start;
690 first->mem.start = second->mem.start;
691 first->next = NULL;
692
693 nxt_buf_free(c->mem_pool, second);
694 }
695 }
696
697 nxt_buf_free(c->mem_pool, b);
698 }
699
700
701 static void
nxt_conn_proxy_close(nxt_task_t * task,void * obj,void * data)702 nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data)
703 {
704 nxt_buf_t *b;
705 nxt_conn_t *source, *sink;
706 nxt_conn_proxy_t *p;
707
708 source = obj;
709 p = data;
710
711 nxt_debug(task, "conn proxy close fd:%d", source->socket.fd);
712
713 sink = (source == p->client) ? p->peer : p->client;
714
715 if (sink->write == NULL) {
716 nxt_conn_proxy_shutdown(task, p, source, sink);
717 return;
718 }
719
720 b = nxt_buf_sync_alloc(source->mem_pool, 0);
721 if (b == NULL) {
722 /* An error completion. */
723 nxt_conn_proxy_complete(task, p);
724 return;
725 }
726
727 nxt_buf_chain_add(&sink->write, b);
728 }
729
730
731 static void
nxt_conn_proxy_error(nxt_task_t * task,void * obj,void * data)732 nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data)
733 {
734 nxt_conn_t *c;
735 nxt_conn_proxy_t *p;
736
737 c = obj;
738 p = data;
739
740 nxt_debug(task, "conn proxy error fd:%d", c->socket.fd);
741
742 nxt_conn_proxy_close(task, c, p);
743 }
744
745
746 static void
nxt_conn_proxy_read_timeout(nxt_task_t * task,void * obj,void * data)747 nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data)
748 {
749 nxt_conn_t *c;
750 nxt_timer_t *timer;
751
752 timer = obj;
753
754 c = nxt_read_timer_conn(timer);
755 c->socket.timedout = 1;
756 c->socket.closed = 1;
757
758 nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd);
759
760 nxt_conn_proxy_close(task, c, c->socket.data);
761 }
762
763
764 static void
nxt_conn_proxy_write_timeout(nxt_task_t * task,void * obj,void * data)765 nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data)
766 {
767 nxt_conn_t *c;
768 nxt_timer_t *timer;
769
770 timer = obj;
771
772 c = nxt_write_timer_conn(timer);
773 c->socket.timedout = 1;
774 c->socket.closed = 1;
775
776 nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd);
777
778 nxt_conn_proxy_close(task, c, c->socket.data);
779 }
780
781
782 static nxt_msec_t
nxt_conn_proxy_timeout_value(nxt_conn_t * c,uintptr_t data)783 nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data)
784 {
785 return nxt_value_at(nxt_msec_t, c->socket.data, data);
786 }
787
788
789 static void
nxt_conn_proxy_refused(nxt_task_t * task,void * obj,void * data)790 nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
791 {
792 nxt_conn_t *peer;
793 nxt_conn_proxy_t *p;
794
795 peer = obj;
796 p = data;
797
798 nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd);
799
800 if (p->retries == 0) {
801 /* An error completion. */
802 nxt_conn_proxy_complete(task, p);
803 return;
804 }
805
806 p->retries--;
807
808 nxt_socket_close(task, peer->socket.fd);
809 peer->socket.fd = -1;
810 peer->socket.error = 0;
811
812 p->delayed = 1;
813
814 peer->write_timer.handler = nxt_conn_proxy_reconnect_handler;
815 nxt_timer_add(task->thread->engine, &peer->write_timer,
816 p->reconnect_timeout);
817 }
818
819
820 static void
nxt_conn_proxy_reconnect_handler(nxt_task_t * task,void * obj,void * data)821 nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
822 {
823 nxt_conn_t *peer;
824 nxt_timer_t *timer;
825 nxt_conn_proxy_t *p;
826
827 timer = obj;
828
829 nxt_debug(task, "conn proxy reconnect timer");
830
831 peer = nxt_write_timer_conn(timer);
832 p = peer->socket.data;
833
834 if (p->client->socket.closed) {
835 nxt_conn_proxy_complete(task, p);
836 return;
837 }
838
839 p->delayed = 0;
840
841 peer->write_state = &nxt_conn_proxy_peer_connect_state;
842 /*
843 * Peer read event: disabled.
844 * Peer write event: waiting for connection with connect_timeout.
845 */
846 nxt_conn_connect(task->thread->engine, peer);
847 }
848
849
850 static void
nxt_conn_proxy_shutdown(nxt_task_t * task,nxt_conn_proxy_t * p,nxt_conn_t * source,nxt_conn_t * sink)851 nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
852 nxt_conn_t *source, nxt_conn_t *sink)
853 {
854 nxt_buf_t *b;
855
856 nxt_debug(source->socket.task,
857 "conn proxy shutdown source fd:%d cl:%d err:%d",
858 source->socket.fd, source->socket.closed, source->socket.error);
859
860 nxt_debug(sink->socket.task,
861 "conn proxy shutdown sink fd:%d cl:%d err:%d",
862 sink->socket.fd, sink->socket.closed, sink->socket.error);
863
864 if (!p->connected || p->delayed) {
865 nxt_conn_proxy_complete(task, p);
866 return;
867 }
868
869 if (sink->socket.error == 0 && !sink->socket.closed) {
870 sink->socket.shutdown = 1;
871 nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR);
872 }
873
874 if (sink->socket.error != 0
875 || (sink->socket.closed && source->write == NULL))
876 {
877 /* The opposite direction also has been already closed. */
878 nxt_conn_proxy_complete(task, p);
879 return;
880 }
881
882 nxt_debug(source->socket.task, "free source buffer");
883
884 /* Free the direction's buffer. */
885 b = (source == p->client) ? p->client_buffer : p->peer_buffer;
886 nxt_mp_free(source->mem_pool, b);
887 }
888
889
890 static void
nxt_conn_proxy_read_error(nxt_task_t * task,void * obj,void * data)891 nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data)
892 {
893 nxt_conn_t *c;
894 nxt_conn_proxy_t *p;
895
896 c = obj;
897 p = data;
898
899 nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd);
900
901 nxt_conn_proxy_close(task, c, p);
902 }
903
904
905 static void
nxt_conn_proxy_write_error(nxt_task_t * task,void * obj,void * data)906 nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
907 {
908 nxt_conn_t *source, *sink;
909 nxt_conn_proxy_t *p;
910
911 sink = obj;
912 p = data;
913
914 nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd);
915
916 /* Clear data for the direction sink. */
917 sink->write = NULL;
918
919 /* Block the direction source. */
920 source = (sink == p->client) ? p->peer : p->client;
921 nxt_fd_event_block_read(task->thread->engine, &source->socket);
922
923 if (source->write == NULL) {
924 /*
925 * There is no data for the opposite direction and
926 * the next read from the sink will most probably fail.
927 */
928 nxt_conn_proxy_complete(task, p);
929 }
930 }
931
932
933 static const nxt_conn_state_t nxt_conn_proxy_close_state
934 nxt_aligned(64) =
935 {
936 .ready_handler = nxt_conn_proxy_completion,
937 };
938
939
940 static void
nxt_conn_proxy_complete(nxt_task_t * task,nxt_conn_proxy_t * p)941 nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p)
942 {
943 nxt_event_engine_t *engine;
944
945 engine = task->thread->engine;
946
947 nxt_debug(p->client->socket.task, "conn proxy complete %d:%d",
948 p->client->socket.fd, p->peer->socket.fd);
949
950 if (p->delayed) {
951 p->delayed = 0;
952 nxt_queue_remove(&p->peer->link);
953 }
954
955 if (p->client->socket.fd != -1) {
956 p->retain = 1;
957 p->client->write_state = &nxt_conn_proxy_close_state;
958 nxt_conn_close(engine, p->client);
959 }
960
961 if (p->peer->socket.fd != -1) {
962 p->retain++;
963 p->peer->write_state = &nxt_conn_proxy_close_state;
964 nxt_conn_close(engine, p->peer);
965 }
966 }
967
968
969 static void
nxt_conn_proxy_completion(nxt_task_t * task,void * obj,void * data)970 nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data)
971 {
972 nxt_conn_proxy_t *p;
973
974 p = data;
975
976 nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d",
977 p->retain, p->client->socket.fd, p->peer->socket.fd);
978
979 p->retain--;
980
981 if (p->retain == 0) {
982 nxt_mp_free(p->client->mem_pool, p->client_buffer);
983 nxt_mp_free(p->client->mem_pool, p->peer_buffer);
984
985 p->completion_handler(task, p, NULL);
986 }
987 }
988