xref: /unit/src/nxt_stream_module.c (revision 13)
1*13Sigor@sysoev.ru 
2*13Sigor@sysoev.ru /*
3*13Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
4*13Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
5*13Sigor@sysoev.ru  */
6*13Sigor@sysoev.ru 
7*13Sigor@sysoev.ru #include <nxt_main.h>
8*13Sigor@sysoev.ru #include <nxt_cycle.h>
9*13Sigor@sysoev.ru 
10*13Sigor@sysoev.ru 
11*13Sigor@sysoev.ru static void nxt_stream_connection_peer(nxt_task_t *task,
12*13Sigor@sysoev.ru     nxt_upstream_peer_t *up);
13*13Sigor@sysoev.ru static void nxt_stream_connection_close(nxt_task_t *task, void *obj,
14*13Sigor@sysoev.ru     void *data);
15*13Sigor@sysoev.ru 
16*13Sigor@sysoev.ru 
17*13Sigor@sysoev.ru void
18*13Sigor@sysoev.ru nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data)
19*13Sigor@sysoev.ru {
20*13Sigor@sysoev.ru     nxt_cycle_t          *cycle;
21*13Sigor@sysoev.ru     nxt_event_conn_t     *c;
22*13Sigor@sysoev.ru     nxt_upstream_peer_t  *up;
23*13Sigor@sysoev.ru 
24*13Sigor@sysoev.ru     c = obj;
25*13Sigor@sysoev.ru 
26*13Sigor@sysoev.ru     nxt_debug(task, "stream connection init");
27*13Sigor@sysoev.ru 
28*13Sigor@sysoev.ru     up = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_upstream_peer_t));
29*13Sigor@sysoev.ru     if (nxt_slow_path(up == NULL)) {
30*13Sigor@sysoev.ru         goto fail;
31*13Sigor@sysoev.ru     }
32*13Sigor@sysoev.ru 
33*13Sigor@sysoev.ru     up->data = c;
34*13Sigor@sysoev.ru 
35*13Sigor@sysoev.ru     cycle = nxt_thread_cycle();
36*13Sigor@sysoev.ru 
37*13Sigor@sysoev.ru     if (cycle->upstream.length != 0) {
38*13Sigor@sysoev.ru         up->addr = cycle->upstream;
39*13Sigor@sysoev.ru 
40*13Sigor@sysoev.ru     } else {
41*13Sigor@sysoev.ru         nxt_str_set(&up->addr, "127.0.0.1:8080");
42*13Sigor@sysoev.ru     }
43*13Sigor@sysoev.ru 
44*13Sigor@sysoev.ru     up->ready_handler = nxt_stream_connection_peer;
45*13Sigor@sysoev.ru     up->mem_pool = c->mem_pool;
46*13Sigor@sysoev.ru 
47*13Sigor@sysoev.ru     nxt_upstream_round_robin_peer(task, up);
48*13Sigor@sysoev.ru     return;
49*13Sigor@sysoev.ru 
50*13Sigor@sysoev.ru fail:
51*13Sigor@sysoev.ru 
52*13Sigor@sysoev.ru     /* TODO: close connection */
53*13Sigor@sysoev.ru     return;
54*13Sigor@sysoev.ru }
55*13Sigor@sysoev.ru 
56*13Sigor@sysoev.ru 
57*13Sigor@sysoev.ru static void
58*13Sigor@sysoev.ru nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
59*13Sigor@sysoev.ru {
60*13Sigor@sysoev.ru     nxt_event_conn_t        *c;
61*13Sigor@sysoev.ru     nxt_event_conn_proxy_t  *p;
62*13Sigor@sysoev.ru 
63*13Sigor@sysoev.ru     c = up->data;
64*13Sigor@sysoev.ru 
65*13Sigor@sysoev.ru     up->sockaddr->type = SOCK_STREAM;
66*13Sigor@sysoev.ru 
67*13Sigor@sysoev.ru     nxt_log_debug(c->socket.log, "stream connection peer %*s",
68*13Sigor@sysoev.ru                   up->sockaddr->length, nxt_sockaddr_start(up->sockaddr));
69*13Sigor@sysoev.ru 
70*13Sigor@sysoev.ru     p = nxt_event_conn_proxy_create(c);
71*13Sigor@sysoev.ru     if (nxt_slow_path(p == NULL)) {
72*13Sigor@sysoev.ru         goto fail;
73*13Sigor@sysoev.ru     }
74*13Sigor@sysoev.ru 
75*13Sigor@sysoev.ru     p->client->socket.data = p;
76*13Sigor@sysoev.ru     p->peer->socket.data = p;
77*13Sigor@sysoev.ru 
78*13Sigor@sysoev.ru     p->client_buffer_size = 1024;
79*13Sigor@sysoev.ru     p->peer_buffer_size = 4096;
80*13Sigor@sysoev.ru     //p->client_wait_timeout = 9000;
81*13Sigor@sysoev.ru     p->connect_timeout = 7000;
82*13Sigor@sysoev.ru     p->reconnect_timeout = 500;
83*13Sigor@sysoev.ru     //p->peer_wait_timeout = 5000;
84*13Sigor@sysoev.ru     p->client_write_timeout = 3000;
85*13Sigor@sysoev.ru     p->peer_write_timeout = 3000;
86*13Sigor@sysoev.ru     p->completion_handler = nxt_stream_connection_close;
87*13Sigor@sysoev.ru     //p->retries = 10;
88*13Sigor@sysoev.ru     p->peer->remote = up->sockaddr;
89*13Sigor@sysoev.ru 
90*13Sigor@sysoev.ru     if (0) {
91*13Sigor@sysoev.ru         nxt_event_engine_t      *engine;
92*13Sigor@sysoev.ru         nxt_event_write_rate_t  *rate;
93*13Sigor@sysoev.ru 
94*13Sigor@sysoev.ru         rate = nxt_mem_alloc(c->mem_pool, sizeof(nxt_event_write_rate_t));
95*13Sigor@sysoev.ru 
96*13Sigor@sysoev.ru         if (nxt_slow_path(rate == NULL)) {
97*13Sigor@sysoev.ru             goto fail;
98*13Sigor@sysoev.ru         }
99*13Sigor@sysoev.ru 
100*13Sigor@sysoev.ru         c->rate = rate;
101*13Sigor@sysoev.ru 
102*13Sigor@sysoev.ru         rate->limit = 1024;
103*13Sigor@sysoev.ru         rate->limit_after = 0;
104*13Sigor@sysoev.ru         rate->average = rate->limit;
105*13Sigor@sysoev.ru 
106*13Sigor@sysoev.ru         engine = nxt_thread_event_engine();
107*13Sigor@sysoev.ru         rate->last = engine->timers.now;
108*13Sigor@sysoev.ru     }
109*13Sigor@sysoev.ru 
110*13Sigor@sysoev.ru     nxt_event_conn_proxy(task, p);
111*13Sigor@sysoev.ru     return;
112*13Sigor@sysoev.ru 
113*13Sigor@sysoev.ru fail:
114*13Sigor@sysoev.ru 
115*13Sigor@sysoev.ru     /* TODO: close connection */
116*13Sigor@sysoev.ru     return;
117*13Sigor@sysoev.ru }
118*13Sigor@sysoev.ru 
119*13Sigor@sysoev.ru 
120*13Sigor@sysoev.ru static void
121*13Sigor@sysoev.ru nxt_stream_connection_close(nxt_task_t *task, void *obj, void *data)
122*13Sigor@sysoev.ru {
123*13Sigor@sysoev.ru     nxt_event_conn_proxy_t  *p;
124*13Sigor@sysoev.ru 
125*13Sigor@sysoev.ru     p = obj;
126*13Sigor@sysoev.ru 
127*13Sigor@sysoev.ru     nxt_log_debug(p->client->socket.log, "stream connection close");
128*13Sigor@sysoev.ru 
129*13Sigor@sysoev.ru     nxt_mem_pool_destroy(p->client->mem_pool);
130*13Sigor@sysoev.ru }
131