113Sigor@sysoev.ru 213Sigor@sysoev.ru /* 313Sigor@sysoev.ru * Copyright (C) Igor Sysoev 413Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 513Sigor@sysoev.ru */ 613Sigor@sysoev.ru 713Sigor@sysoev.ru #include <nxt_main.h> 820Sigor@sysoev.ru #include <nxt_runtime.h> 913Sigor@sysoev.ru 1013Sigor@sysoev.ru 1113Sigor@sysoev.ru static void nxt_stream_connection_peer(nxt_task_t *task, 1213Sigor@sysoev.ru nxt_upstream_peer_t *up); 1313Sigor@sysoev.ru static void nxt_stream_connection_close(nxt_task_t *task, void *obj, 1413Sigor@sysoev.ru void *data); 1513Sigor@sysoev.ru 1613Sigor@sysoev.ru 1713Sigor@sysoev.ru void 1813Sigor@sysoev.ru nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data) 1913Sigor@sysoev.ru { 20*62Sigor@sysoev.ru nxt_conn_t *c; 2120Sigor@sysoev.ru nxt_runtime_t *rt; 2213Sigor@sysoev.ru nxt_upstream_peer_t *up; 2313Sigor@sysoev.ru 2413Sigor@sysoev.ru c = obj; 2513Sigor@sysoev.ru 2613Sigor@sysoev.ru nxt_debug(task, "stream connection init"); 2713Sigor@sysoev.ru 2813Sigor@sysoev.ru up = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_upstream_peer_t)); 2913Sigor@sysoev.ru if (nxt_slow_path(up == NULL)) { 3013Sigor@sysoev.ru goto fail; 3113Sigor@sysoev.ru } 3213Sigor@sysoev.ru 3313Sigor@sysoev.ru up->data = c; 3413Sigor@sysoev.ru 3520Sigor@sysoev.ru rt = task->thread->runtime; 3613Sigor@sysoev.ru 3720Sigor@sysoev.ru if (rt->upstream.length != 0) { 3820Sigor@sysoev.ru up->addr = rt->upstream; 3913Sigor@sysoev.ru 4013Sigor@sysoev.ru } else { 4113Sigor@sysoev.ru nxt_str_set(&up->addr, "127.0.0.1:8080"); 4213Sigor@sysoev.ru } 4313Sigor@sysoev.ru 4413Sigor@sysoev.ru up->ready_handler = nxt_stream_connection_peer; 4513Sigor@sysoev.ru up->mem_pool = c->mem_pool; 4613Sigor@sysoev.ru 4713Sigor@sysoev.ru nxt_upstream_round_robin_peer(task, up); 4813Sigor@sysoev.ru return; 4913Sigor@sysoev.ru 5013Sigor@sysoev.ru fail: 5113Sigor@sysoev.ru 5213Sigor@sysoev.ru /* TODO: close connection */ 5313Sigor@sysoev.ru return; 5413Sigor@sysoev.ru } 5513Sigor@sysoev.ru 5613Sigor@sysoev.ru 5713Sigor@sysoev.ru static void 5813Sigor@sysoev.ru nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up) 5913Sigor@sysoev.ru { 60*62Sigor@sysoev.ru nxt_conn_t *c; 61*62Sigor@sysoev.ru nxt_conn_proxy_t *p; 6213Sigor@sysoev.ru 6313Sigor@sysoev.ru c = up->data; 6413Sigor@sysoev.ru 6513Sigor@sysoev.ru up->sockaddr->type = SOCK_STREAM; 6613Sigor@sysoev.ru 6713Sigor@sysoev.ru nxt_log_debug(c->socket.log, "stream connection peer %*s", 6813Sigor@sysoev.ru up->sockaddr->length, nxt_sockaddr_start(up->sockaddr)); 6913Sigor@sysoev.ru 70*62Sigor@sysoev.ru p = nxt_conn_proxy_create(c); 7113Sigor@sysoev.ru if (nxt_slow_path(p == NULL)) { 7213Sigor@sysoev.ru goto fail; 7313Sigor@sysoev.ru } 7413Sigor@sysoev.ru 7513Sigor@sysoev.ru p->client->socket.data = p; 7613Sigor@sysoev.ru p->peer->socket.data = p; 7713Sigor@sysoev.ru 7813Sigor@sysoev.ru p->client_buffer_size = 1024; 7913Sigor@sysoev.ru p->peer_buffer_size = 4096; 8013Sigor@sysoev.ru //p->client_wait_timeout = 9000; 8113Sigor@sysoev.ru p->connect_timeout = 7000; 8213Sigor@sysoev.ru p->reconnect_timeout = 500; 8313Sigor@sysoev.ru //p->peer_wait_timeout = 5000; 8413Sigor@sysoev.ru p->client_write_timeout = 3000; 8513Sigor@sysoev.ru p->peer_write_timeout = 3000; 8613Sigor@sysoev.ru p->completion_handler = nxt_stream_connection_close; 8713Sigor@sysoev.ru //p->retries = 10; 8813Sigor@sysoev.ru p->peer->remote = up->sockaddr; 8913Sigor@sysoev.ru 9013Sigor@sysoev.ru if (0) { 9113Sigor@sysoev.ru nxt_event_engine_t *engine; 9213Sigor@sysoev.ru nxt_event_write_rate_t *rate; 9313Sigor@sysoev.ru 9413Sigor@sysoev.ru rate = nxt_mem_alloc(c->mem_pool, sizeof(nxt_event_write_rate_t)); 9513Sigor@sysoev.ru 9613Sigor@sysoev.ru if (nxt_slow_path(rate == NULL)) { 9713Sigor@sysoev.ru goto fail; 9813Sigor@sysoev.ru } 9913Sigor@sysoev.ru 10013Sigor@sysoev.ru c->rate = rate; 10113Sigor@sysoev.ru 10213Sigor@sysoev.ru rate->limit = 1024; 10313Sigor@sysoev.ru rate->limit_after = 0; 10413Sigor@sysoev.ru rate->average = rate->limit; 10513Sigor@sysoev.ru 10613Sigor@sysoev.ru engine = nxt_thread_event_engine(); 10713Sigor@sysoev.ru rate->last = engine->timers.now; 10813Sigor@sysoev.ru } 10913Sigor@sysoev.ru 110*62Sigor@sysoev.ru nxt_conn_proxy(task, p); 11113Sigor@sysoev.ru return; 11213Sigor@sysoev.ru 11313Sigor@sysoev.ru fail: 11413Sigor@sysoev.ru 11513Sigor@sysoev.ru /* TODO: close connection */ 11613Sigor@sysoev.ru return; 11713Sigor@sysoev.ru } 11813Sigor@sysoev.ru 11913Sigor@sysoev.ru 12013Sigor@sysoev.ru static void 12113Sigor@sysoev.ru nxt_stream_connection_close(nxt_task_t *task, void *obj, void *data) 12213Sigor@sysoev.ru { 12313Sigor@sysoev.ru nxt_event_conn_proxy_t *p; 12413Sigor@sysoev.ru 12513Sigor@sysoev.ru p = obj; 12613Sigor@sysoev.ru 12713Sigor@sysoev.ru nxt_log_debug(p->client->socket.log, "stream connection close"); 12813Sigor@sysoev.ru 12913Sigor@sysoev.ru nxt_mem_pool_destroy(p->client->mem_pool); 13013Sigor@sysoev.ru } 131