xref: /unit/src/nxt_stream_module.c (revision 493)
113Sigor@sysoev.ru 
213Sigor@sysoev.ru /*
313Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
413Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
513Sigor@sysoev.ru  */
613Sigor@sysoev.ru 
713Sigor@sysoev.ru #include <nxt_main.h>
820Sigor@sysoev.ru #include <nxt_runtime.h>
913Sigor@sysoev.ru 
1013Sigor@sysoev.ru 
1113Sigor@sysoev.ru static void nxt_stream_connection_peer(nxt_task_t *task,
1213Sigor@sysoev.ru     nxt_upstream_peer_t *up);
1313Sigor@sysoev.ru static void nxt_stream_connection_close(nxt_task_t *task, void *obj,
1413Sigor@sysoev.ru     void *data);
1513Sigor@sysoev.ru 
1613Sigor@sysoev.ru 
1713Sigor@sysoev.ru void
1813Sigor@sysoev.ru nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data)
1913Sigor@sysoev.ru {
2062Sigor@sysoev.ru     nxt_conn_t           *c;
2120Sigor@sysoev.ru     nxt_runtime_t        *rt;
2213Sigor@sysoev.ru     nxt_upstream_peer_t  *up;
2313Sigor@sysoev.ru 
2413Sigor@sysoev.ru     c = obj;
2513Sigor@sysoev.ru 
2613Sigor@sysoev.ru     nxt_debug(task, "stream connection init");
2713Sigor@sysoev.ru 
2865Sigor@sysoev.ru     up = nxt_mp_zget(c->mem_pool, sizeof(nxt_upstream_peer_t));
2913Sigor@sysoev.ru     if (nxt_slow_path(up == NULL)) {
3013Sigor@sysoev.ru         goto fail;
3113Sigor@sysoev.ru     }
3213Sigor@sysoev.ru 
3313Sigor@sysoev.ru     up->data = c;
3413Sigor@sysoev.ru 
3520Sigor@sysoev.ru     rt = task->thread->runtime;
3613Sigor@sysoev.ru 
3720Sigor@sysoev.ru     if (rt->upstream.length != 0) {
3820Sigor@sysoev.ru         up->addr = rt->upstream;
3913Sigor@sysoev.ru 
4013Sigor@sysoev.ru     } else {
4113Sigor@sysoev.ru         nxt_str_set(&up->addr, "127.0.0.1:8080");
4213Sigor@sysoev.ru     }
4313Sigor@sysoev.ru 
4413Sigor@sysoev.ru     up->ready_handler = nxt_stream_connection_peer;
4513Sigor@sysoev.ru     up->mem_pool = c->mem_pool;
4613Sigor@sysoev.ru 
4713Sigor@sysoev.ru     nxt_upstream_round_robin_peer(task, up);
4813Sigor@sysoev.ru     return;
4913Sigor@sysoev.ru 
5013Sigor@sysoev.ru fail:
5113Sigor@sysoev.ru 
5213Sigor@sysoev.ru     /* TODO: close connection */
5313Sigor@sysoev.ru     return;
5413Sigor@sysoev.ru }
5513Sigor@sysoev.ru 
5613Sigor@sysoev.ru 
5713Sigor@sysoev.ru static void
5813Sigor@sysoev.ru nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
5913Sigor@sysoev.ru {
6062Sigor@sysoev.ru     nxt_conn_t        *c;
6162Sigor@sysoev.ru     nxt_conn_proxy_t  *p;
6213Sigor@sysoev.ru 
6313Sigor@sysoev.ru     c = up->data;
6413Sigor@sysoev.ru 
6513Sigor@sysoev.ru     up->sockaddr->type = SOCK_STREAM;
6613Sigor@sysoev.ru 
6713Sigor@sysoev.ru     nxt_log_debug(c->socket.log, "stream connection peer %*s",
68*493Spluknet@nginx.com                   (size_t) up->sockaddr->length,
69*493Spluknet@nginx.com                   nxt_sockaddr_start(up->sockaddr));
7013Sigor@sysoev.ru 
7162Sigor@sysoev.ru     p = nxt_conn_proxy_create(c);
7213Sigor@sysoev.ru     if (nxt_slow_path(p == NULL)) {
7313Sigor@sysoev.ru         goto fail;
7413Sigor@sysoev.ru     }
7513Sigor@sysoev.ru 
7613Sigor@sysoev.ru     p->client->socket.data = p;
7713Sigor@sysoev.ru     p->peer->socket.data = p;
7813Sigor@sysoev.ru 
7913Sigor@sysoev.ru     p->client_buffer_size = 1024;
8013Sigor@sysoev.ru     p->peer_buffer_size = 4096;
8113Sigor@sysoev.ru     //p->client_wait_timeout = 9000;
8213Sigor@sysoev.ru     p->connect_timeout = 7000;
8313Sigor@sysoev.ru     p->reconnect_timeout = 500;
8413Sigor@sysoev.ru     //p->peer_wait_timeout = 5000;
8513Sigor@sysoev.ru     p->client_write_timeout = 3000;
8613Sigor@sysoev.ru     p->peer_write_timeout = 3000;
8713Sigor@sysoev.ru     p->completion_handler = nxt_stream_connection_close;
8813Sigor@sysoev.ru     //p->retries = 10;
8913Sigor@sysoev.ru     p->peer->remote = up->sockaddr;
9013Sigor@sysoev.ru 
9113Sigor@sysoev.ru     if (0) {
9213Sigor@sysoev.ru         nxt_event_engine_t      *engine;
9313Sigor@sysoev.ru         nxt_event_write_rate_t  *rate;
9413Sigor@sysoev.ru 
9565Sigor@sysoev.ru         rate = nxt_mp_get(c->mem_pool, sizeof(nxt_event_write_rate_t));
9613Sigor@sysoev.ru 
9713Sigor@sysoev.ru         if (nxt_slow_path(rate == NULL)) {
9813Sigor@sysoev.ru             goto fail;
9913Sigor@sysoev.ru         }
10013Sigor@sysoev.ru 
10113Sigor@sysoev.ru         c->rate = rate;
10213Sigor@sysoev.ru 
10313Sigor@sysoev.ru         rate->limit = 1024;
10413Sigor@sysoev.ru         rate->limit_after = 0;
10513Sigor@sysoev.ru         rate->average = rate->limit;
10613Sigor@sysoev.ru 
10713Sigor@sysoev.ru         engine = nxt_thread_event_engine();
10813Sigor@sysoev.ru         rate->last = engine->timers.now;
10913Sigor@sysoev.ru     }
11013Sigor@sysoev.ru 
11162Sigor@sysoev.ru     nxt_conn_proxy(task, p);
11213Sigor@sysoev.ru     return;
11313Sigor@sysoev.ru 
11413Sigor@sysoev.ru fail:
11513Sigor@sysoev.ru 
11613Sigor@sysoev.ru     /* TODO: close connection */
11713Sigor@sysoev.ru     return;
11813Sigor@sysoev.ru }
11913Sigor@sysoev.ru 
12013Sigor@sysoev.ru 
12113Sigor@sysoev.ru static void
12213Sigor@sysoev.ru nxt_stream_connection_close(nxt_task_t *task, void *obj, void *data)
12313Sigor@sysoev.ru {
12413Sigor@sysoev.ru     nxt_event_conn_proxy_t  *p;
12513Sigor@sysoev.ru 
12613Sigor@sysoev.ru     p = obj;
12713Sigor@sysoev.ru 
12813Sigor@sysoev.ru     nxt_log_debug(p->client->socket.log, "stream connection close");
12913Sigor@sysoev.ru 
13065Sigor@sysoev.ru     nxt_mp_destroy(p->client->mem_pool);
13113Sigor@sysoev.ru }
132