xref: /unit/src/nxt_stream_module.c (revision 493:745222d540a2)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 
10 
11 static void nxt_stream_connection_peer(nxt_task_t *task,
12     nxt_upstream_peer_t *up);
13 static void nxt_stream_connection_close(nxt_task_t *task, void *obj,
14     void *data);
15 
16 
17 void
nxt_stream_connection_init(nxt_task_t * task,void * obj,void * data)18 nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data)
19 {
20     nxt_conn_t           *c;
21     nxt_runtime_t        *rt;
22     nxt_upstream_peer_t  *up;
23 
24     c = obj;
25 
26     nxt_debug(task, "stream connection init");
27 
28     up = nxt_mp_zget(c->mem_pool, sizeof(nxt_upstream_peer_t));
29     if (nxt_slow_path(up == NULL)) {
30         goto fail;
31     }
32 
33     up->data = c;
34 
35     rt = task->thread->runtime;
36 
37     if (rt->upstream.length != 0) {
38         up->addr = rt->upstream;
39 
40     } else {
41         nxt_str_set(&up->addr, "127.0.0.1:8080");
42     }
43 
44     up->ready_handler = nxt_stream_connection_peer;
45     up->mem_pool = c->mem_pool;
46 
47     nxt_upstream_round_robin_peer(task, up);
48     return;
49 
50 fail:
51 
52     /* TODO: close connection */
53     return;
54 }
55 
56 
57 static void
nxt_stream_connection_peer(nxt_task_t * task,nxt_upstream_peer_t * up)58 nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
59 {
60     nxt_conn_t        *c;
61     nxt_conn_proxy_t  *p;
62 
63     c = up->data;
64 
65     up->sockaddr->type = SOCK_STREAM;
66 
67     nxt_log_debug(c->socket.log, "stream connection peer %*s",
68                   (size_t) up->sockaddr->length,
69                   nxt_sockaddr_start(up->sockaddr));
70 
71     p = nxt_conn_proxy_create(c);
72     if (nxt_slow_path(p == NULL)) {
73         goto fail;
74     }
75 
76     p->client->socket.data = p;
77     p->peer->socket.data = p;
78 
79     p->client_buffer_size = 1024;
80     p->peer_buffer_size = 4096;
81     //p->client_wait_timeout = 9000;
82     p->connect_timeout = 7000;
83     p->reconnect_timeout = 500;
84     //p->peer_wait_timeout = 5000;
85     p->client_write_timeout = 3000;
86     p->peer_write_timeout = 3000;
87     p->completion_handler = nxt_stream_connection_close;
88     //p->retries = 10;
89     p->peer->remote = up->sockaddr;
90 
91     if (0) {
92         nxt_event_engine_t      *engine;
93         nxt_event_write_rate_t  *rate;
94 
95         rate = nxt_mp_get(c->mem_pool, sizeof(nxt_event_write_rate_t));
96 
97         if (nxt_slow_path(rate == NULL)) {
98             goto fail;
99         }
100 
101         c->rate = rate;
102 
103         rate->limit = 1024;
104         rate->limit_after = 0;
105         rate->average = rate->limit;
106 
107         engine = nxt_thread_event_engine();
108         rate->last = engine->timers.now;
109     }
110 
111     nxt_conn_proxy(task, p);
112     return;
113 
114 fail:
115 
116     /* TODO: close connection */
117     return;
118 }
119 
120 
121 static void
nxt_stream_connection_close(nxt_task_t * task,void * obj,void * data)122 nxt_stream_connection_close(nxt_task_t *task, void *obj, void *data)
123 {
124     nxt_event_conn_proxy_t  *p;
125 
126     p = obj;
127 
128     nxt_log_debug(p->client->socket.log, "stream connection close");
129 
130     nxt_mp_destroy(p->client->mem_pool);
131 }
132