1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9
10
11 static void nxt_stream_connection_peer(nxt_task_t *task,
12 nxt_upstream_peer_t *up);
13 static void nxt_stream_connection_close(nxt_task_t *task, void *obj,
14 void *data);
15
16
17 void
nxt_stream_connection_init(nxt_task_t * task,void * obj,void * data)18 nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data)
19 {
20 nxt_conn_t *c;
21 nxt_runtime_t *rt;
22 nxt_upstream_peer_t *up;
23
24 c = obj;
25
26 nxt_debug(task, "stream connection init");
27
28 up = nxt_mp_zget(c->mem_pool, sizeof(nxt_upstream_peer_t));
29 if (nxt_slow_path(up == NULL)) {
30 goto fail;
31 }
32
33 up->data = c;
34
35 rt = task->thread->runtime;
36
37 if (rt->upstream.length != 0) {
38 up->addr = rt->upstream;
39
40 } else {
41 nxt_str_set(&up->addr, "127.0.0.1:8080");
42 }
43
44 up->ready_handler = nxt_stream_connection_peer;
45 up->mem_pool = c->mem_pool;
46
47 nxt_upstream_round_robin_peer(task, up);
48 return;
49
50 fail:
51
52 /* TODO: close connection */
53 return;
54 }
55
56
57 static void
nxt_stream_connection_peer(nxt_task_t * task,nxt_upstream_peer_t * up)58 nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
59 {
60 nxt_conn_t *c;
61 nxt_conn_proxy_t *p;
62
63 c = up->data;
64
65 up->sockaddr->type = SOCK_STREAM;
66
67 nxt_log_debug(c->socket.log, "stream connection peer %*s",
68 (size_t) up->sockaddr->length,
69 nxt_sockaddr_start(up->sockaddr));
70
71 p = nxt_conn_proxy_create(c);
72 if (nxt_slow_path(p == NULL)) {
73 goto fail;
74 }
75
76 p->client->socket.data = p;
77 p->peer->socket.data = p;
78
79 p->client_buffer_size = 1024;
80 p->peer_buffer_size = 4096;
81 //p->client_wait_timeout = 9000;
82 p->connect_timeout = 7000;
83 p->reconnect_timeout = 500;
84 //p->peer_wait_timeout = 5000;
85 p->client_write_timeout = 3000;
86 p->peer_write_timeout = 3000;
87 p->completion_handler = nxt_stream_connection_close;
88 //p->retries = 10;
89 p->peer->remote = up->sockaddr;
90
91 if (0) {
92 nxt_event_engine_t *engine;
93 nxt_event_write_rate_t *rate;
94
95 rate = nxt_mp_get(c->mem_pool, sizeof(nxt_event_write_rate_t));
96
97 if (nxt_slow_path(rate == NULL)) {
98 goto fail;
99 }
100
101 c->rate = rate;
102
103 rate->limit = 1024;
104 rate->limit_after = 0;
105 rate->average = rate->limit;
106
107 engine = nxt_thread_event_engine();
108 rate->last = engine->timers.now;
109 }
110
111 nxt_conn_proxy(task, p);
112 return;
113
114 fail:
115
116 /* TODO: close connection */
117 return;
118 }
119
120
121 static void
nxt_stream_connection_close(nxt_task_t * task,void * obj,void * data)122 nxt_stream_connection_close(nxt_task_t *task, void *obj, void *data)
123 {
124 nxt_event_conn_proxy_t *p;
125
126 p = obj;
127
128 nxt_log_debug(p->client->socket.log, "stream connection close");
129
130 nxt_mp_destroy(p->client->mem_pool);
131 }
132