113Sigor@sysoev.ru 213Sigor@sysoev.ru /* 313Sigor@sysoev.ru * Copyright (C) Igor Sysoev 413Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 513Sigor@sysoev.ru */ 613Sigor@sysoev.ru 713Sigor@sysoev.ru #include <nxt_main.h> 820Sigor@sysoev.ru #include <nxt_runtime.h> 913Sigor@sysoev.ru 1013Sigor@sysoev.ru 1113Sigor@sysoev.ru static void nxt_stream_connection_peer(nxt_task_t *task, 1213Sigor@sysoev.ru nxt_upstream_peer_t *up); 1313Sigor@sysoev.ru static void nxt_stream_connection_close(nxt_task_t *task, void *obj, 1413Sigor@sysoev.ru void *data); 1513Sigor@sysoev.ru 1613Sigor@sysoev.ru 1713Sigor@sysoev.ru void 1813Sigor@sysoev.ru nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data) 1913Sigor@sysoev.ru { 2062Sigor@sysoev.ru nxt_conn_t *c; 2120Sigor@sysoev.ru nxt_runtime_t *rt; 2213Sigor@sysoev.ru nxt_upstream_peer_t *up; 2313Sigor@sysoev.ru 2413Sigor@sysoev.ru c = obj; 2513Sigor@sysoev.ru 2613Sigor@sysoev.ru nxt_debug(task, "stream connection init"); 2713Sigor@sysoev.ru 2865Sigor@sysoev.ru up = nxt_mp_zget(c->mem_pool, sizeof(nxt_upstream_peer_t)); 2913Sigor@sysoev.ru if (nxt_slow_path(up == NULL)) { 3013Sigor@sysoev.ru goto fail; 3113Sigor@sysoev.ru } 3213Sigor@sysoev.ru 3313Sigor@sysoev.ru up->data = c; 3413Sigor@sysoev.ru 3520Sigor@sysoev.ru rt = task->thread->runtime; 3613Sigor@sysoev.ru 3720Sigor@sysoev.ru if (rt->upstream.length != 0) { 3820Sigor@sysoev.ru up->addr = rt->upstream; 3913Sigor@sysoev.ru 4013Sigor@sysoev.ru } else { 4113Sigor@sysoev.ru nxt_str_set(&up->addr, "127.0.0.1:8080"); 4213Sigor@sysoev.ru } 4313Sigor@sysoev.ru 4413Sigor@sysoev.ru up->ready_handler = nxt_stream_connection_peer; 4513Sigor@sysoev.ru up->mem_pool = c->mem_pool; 4613Sigor@sysoev.ru 4713Sigor@sysoev.ru nxt_upstream_round_robin_peer(task, up); 4813Sigor@sysoev.ru return; 4913Sigor@sysoev.ru 5013Sigor@sysoev.ru fail: 5113Sigor@sysoev.ru 5213Sigor@sysoev.ru /* TODO: close connection */ 5313Sigor@sysoev.ru return; 5413Sigor@sysoev.ru } 5513Sigor@sysoev.ru 5613Sigor@sysoev.ru 5713Sigor@sysoev.ru static void 5813Sigor@sysoev.ru nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up) 5913Sigor@sysoev.ru { 6062Sigor@sysoev.ru nxt_conn_t *c; 6162Sigor@sysoev.ru nxt_conn_proxy_t *p; 6213Sigor@sysoev.ru 6313Sigor@sysoev.ru c = up->data; 6413Sigor@sysoev.ru 6513Sigor@sysoev.ru up->sockaddr->type = SOCK_STREAM; 6613Sigor@sysoev.ru 6713Sigor@sysoev.ru nxt_log_debug(c->socket.log, "stream connection peer %*s", 68*493Spluknet@nginx.com (size_t) up->sockaddr->length, 69*493Spluknet@nginx.com nxt_sockaddr_start(up->sockaddr)); 7013Sigor@sysoev.ru 7162Sigor@sysoev.ru p = nxt_conn_proxy_create(c); 7213Sigor@sysoev.ru if (nxt_slow_path(p == NULL)) { 7313Sigor@sysoev.ru goto fail; 7413Sigor@sysoev.ru } 7513Sigor@sysoev.ru 7613Sigor@sysoev.ru p->client->socket.data = p; 7713Sigor@sysoev.ru p->peer->socket.data = p; 7813Sigor@sysoev.ru 7913Sigor@sysoev.ru p->client_buffer_size = 1024; 8013Sigor@sysoev.ru p->peer_buffer_size = 4096; 8113Sigor@sysoev.ru //p->client_wait_timeout = 9000; 8213Sigor@sysoev.ru p->connect_timeout = 7000; 8313Sigor@sysoev.ru p->reconnect_timeout = 500; 8413Sigor@sysoev.ru //p->peer_wait_timeout = 5000; 8513Sigor@sysoev.ru p->client_write_timeout = 3000; 8613Sigor@sysoev.ru p->peer_write_timeout = 3000; 8713Sigor@sysoev.ru p->completion_handler = nxt_stream_connection_close; 8813Sigor@sysoev.ru //p->retries = 10; 8913Sigor@sysoev.ru p->peer->remote = up->sockaddr; 9013Sigor@sysoev.ru 9113Sigor@sysoev.ru if (0) { 9213Sigor@sysoev.ru nxt_event_engine_t *engine; 9313Sigor@sysoev.ru nxt_event_write_rate_t *rate; 9413Sigor@sysoev.ru 9565Sigor@sysoev.ru rate = nxt_mp_get(c->mem_pool, sizeof(nxt_event_write_rate_t)); 9613Sigor@sysoev.ru 9713Sigor@sysoev.ru if (nxt_slow_path(rate == NULL)) { 9813Sigor@sysoev.ru goto fail; 9913Sigor@sysoev.ru } 10013Sigor@sysoev.ru 10113Sigor@sysoev.ru c->rate = rate; 10213Sigor@sysoev.ru 10313Sigor@sysoev.ru rate->limit = 1024; 10413Sigor@sysoev.ru rate->limit_after = 0; 10513Sigor@sysoev.ru rate->average = rate->limit; 10613Sigor@sysoev.ru 10713Sigor@sysoev.ru engine = nxt_thread_event_engine(); 10813Sigor@sysoev.ru rate->last = engine->timers.now; 10913Sigor@sysoev.ru } 11013Sigor@sysoev.ru 11162Sigor@sysoev.ru nxt_conn_proxy(task, p); 11213Sigor@sysoev.ru return; 11313Sigor@sysoev.ru 11413Sigor@sysoev.ru fail: 11513Sigor@sysoev.ru 11613Sigor@sysoev.ru /* TODO: close connection */ 11713Sigor@sysoev.ru return; 11813Sigor@sysoev.ru } 11913Sigor@sysoev.ru 12013Sigor@sysoev.ru 12113Sigor@sysoev.ru static void 12213Sigor@sysoev.ru nxt_stream_connection_close(nxt_task_t *task, void *obj, void *data) 12313Sigor@sysoev.ru { 12413Sigor@sysoev.ru nxt_event_conn_proxy_t *p; 12513Sigor@sysoev.ru 12613Sigor@sysoev.ru p = obj; 12713Sigor@sysoev.ru 12813Sigor@sysoev.ru nxt_log_debug(p->client->socket.log, "stream connection close"); 12913Sigor@sysoev.ru 13065Sigor@sysoev.ru nxt_mp_destroy(p->client->mem_pool); 13113Sigor@sysoev.ru } 132