1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10typedef struct { 11 int32_t weight; 12 int32_t effective_weight; 13 int32_t current_weight; 14 uint32_t down; /* 1 bit */ 15 nxt_msec_t last_accessed; 16 nxt_sockaddr_t *sockaddr; 17} nxt_upstream_round_robin_peer_t; 18 19 20typedef struct { 21 nxt_uint_t npeers; 22 nxt_upstream_round_robin_peer_t *peers; 23 nxt_thread_spinlock_t lock; 24} nxt_upstream_round_robin_t; 25 26 27static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, 28 void *data); 29static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, 30 void *data); 31static void nxt_upstream_round_robin_get_peer(nxt_task_t *task, 32 nxt_upstream_peer_t *up); 33 34 35void 36nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up) 37{ 38 nxt_job_sockaddr_parse_t *jbs; 39 40 if (up->upstream != NULL) { 41 nxt_upstream_round_robin_get_peer(task, up); 42 } 43 44 jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t)); 45 if (nxt_slow_path(jbs == NULL)) { 46 up->ready_handler(task, up); 47 return; 48 } 49 50 jbs->resolve.job.task = task; 51 jbs->resolve.job.data = up; 52 jbs->resolve.port = up->port; 53 jbs->resolve.log_level = NXT_LOG_ERR; 54 jbs->resolve.ready_handler = nxt_upstream_round_robin_create; 55 jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error; 56 jbs->addr = up->addr; 57 58 nxt_job_sockaddr_parse(jbs); 59} 60 61 62static void 63nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data) 64{ 65 nxt_uint_t i; 66 nxt_sockaddr_t *sa; 67 nxt_upstream_peer_t *up; 68 nxt_job_sockaddr_parse_t *jbs; 69 nxt_upstream_round_robin_t *urr; 70 nxt_upstream_round_robin_peer_t *peer; 71 72 jbs = obj; 73 up = jbs->resolve.job.data; 74 75 urr = nxt_mp_zget(up->mem_pool, sizeof(nxt_upstream_round_robin_t)); 76 if (nxt_slow_path(urr == NULL)) { 77 goto fail; 78 } 79 80 urr->npeers = jbs->resolve.count; 81 82 peer = nxt_mp_zget(up->mem_pool, 83 urr->npeers * sizeof(nxt_upstream_round_robin_peer_t)); 84 if (nxt_slow_path(peer == NULL)) { 85 goto fail; 86 } 87 88 urr->peers = peer; 89 90 for (i = 0; i < urr->npeers; i++) { 91 peer[i].weight = 1; 92 peer[i].effective_weight = 1; 93 94 sa = jbs->resolve.sockaddrs[i]; 95 96 /* STUB */ 97 sa->type = SOCK_STREAM; 98 99 nxt_sockaddr_text(sa); 100 101 nxt_debug(task, "upstream peer: %*s",
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10typedef struct { 11 int32_t weight; 12 int32_t effective_weight; 13 int32_t current_weight; 14 uint32_t down; /* 1 bit */ 15 nxt_msec_t last_accessed; 16 nxt_sockaddr_t *sockaddr; 17} nxt_upstream_round_robin_peer_t; 18 19 20typedef struct { 21 nxt_uint_t npeers; 22 nxt_upstream_round_robin_peer_t *peers; 23 nxt_thread_spinlock_t lock; 24} nxt_upstream_round_robin_t; 25 26 27static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, 28 void *data); 29static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, 30 void *data); 31static void nxt_upstream_round_robin_get_peer(nxt_task_t *task, 32 nxt_upstream_peer_t *up); 33 34 35void 36nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up) 37{ 38 nxt_job_sockaddr_parse_t *jbs; 39 40 if (up->upstream != NULL) { 41 nxt_upstream_round_robin_get_peer(task, up); 42 } 43 44 jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t)); 45 if (nxt_slow_path(jbs == NULL)) { 46 up->ready_handler(task, up); 47 return; 48 } 49 50 jbs->resolve.job.task = task; 51 jbs->resolve.job.data = up; 52 jbs->resolve.port = up->port; 53 jbs->resolve.log_level = NXT_LOG_ERR; 54 jbs->resolve.ready_handler = nxt_upstream_round_robin_create; 55 jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error; 56 jbs->addr = up->addr; 57 58 nxt_job_sockaddr_parse(jbs); 59} 60 61 62static void 63nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data) 64{ 65 nxt_uint_t i; 66 nxt_sockaddr_t *sa; 67 nxt_upstream_peer_t *up; 68 nxt_job_sockaddr_parse_t *jbs; 69 nxt_upstream_round_robin_t *urr; 70 nxt_upstream_round_robin_peer_t *peer; 71 72 jbs = obj; 73 up = jbs->resolve.job.data; 74 75 urr = nxt_mp_zget(up->mem_pool, sizeof(nxt_upstream_round_robin_t)); 76 if (nxt_slow_path(urr == NULL)) { 77 goto fail; 78 } 79 80 urr->npeers = jbs->resolve.count; 81 82 peer = nxt_mp_zget(up->mem_pool, 83 urr->npeers * sizeof(nxt_upstream_round_robin_peer_t)); 84 if (nxt_slow_path(peer == NULL)) { 85 goto fail; 86 } 87 88 urr->peers = peer; 89 90 for (i = 0; i < urr->npeers; i++) { 91 peer[i].weight = 1; 92 peer[i].effective_weight = 1; 93 94 sa = jbs->resolve.sockaddrs[i]; 95 96 /* STUB */ 97 sa->type = SOCK_STREAM; 98 99 nxt_sockaddr_text(sa); 100 101 nxt_debug(task, "upstream peer: %*s",
|
103 104 /* TODO: memcpy to shared memory pool. */ 105 peer[i].sockaddr = sa; 106 } 107 108 up->upstream = urr; 109 110 /* STUB */ 111 up->sockaddr = peer[0].sockaddr; 112 113 nxt_job_destroy(task, jbs); 114 up->ready_handler(task, up); 115 116 //nxt_upstream_round_robin_get_peer(up); 117 return; 118 119fail: 120 121 nxt_job_destroy(task, jbs); 122 123 up->ready_handler(task, up); 124} 125 126 127static void 128nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data) 129{ 130 nxt_upstream_peer_t *up; 131 nxt_job_sockaddr_parse_t *jbs; 132 133 jbs = obj; 134 up = jbs->resolve.job.data; 135 136 up->ready_handler(task, up); 137} 138 139 140static void 141nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up) 142{ 143 int32_t effective_weights; 144 nxt_uint_t i; 145 nxt_msec_t now; 146 nxt_upstream_round_robin_t *urr; 147 nxt_upstream_round_robin_peer_t *peer, *best; 148 149 urr = up->upstream; 150 151 now = task->thread->engine->timers.now; 152 153 nxt_thread_spin_lock(&urr->lock); 154 155 best = NULL; 156 effective_weights = 0; 157 peer = urr->peers; 158 159 for (i = 0; i < urr->npeers; i++) { 160 161 if (peer[i].down) { 162 continue; 163 } 164 165#if 0 166 if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) { 167 good = peer[i].last_accessed + peer[i].fail_timeout; 168 169 if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) { 170 continue; 171 } 172 } 173#endif 174 175 peer[i].current_weight += peer[i].effective_weight; 176 effective_weights += peer[i].effective_weight; 177 178 if (peer[i].effective_weight < peer[i].weight) { 179 peer[i].effective_weight++; 180 } 181 182 if (best == NULL || peer[i].current_weight > best->current_weight) { 183 best = &peer[i]; 184 } 185 } 186 187 if (best != NULL) { 188 best->current_weight -= effective_weights; 189 best->last_accessed = now; 190 191 up->sockaddr = best->sockaddr; 192 193 } else { 194 up->sockaddr = NULL; 195 } 196 197 nxt_thread_spin_unlock(&urr->lock); 198 199 up->ready_handler(task, up); 200}
| 103 104 /* TODO: memcpy to shared memory pool. */ 105 peer[i].sockaddr = sa; 106 } 107 108 up->upstream = urr; 109 110 /* STUB */ 111 up->sockaddr = peer[0].sockaddr; 112 113 nxt_job_destroy(task, jbs); 114 up->ready_handler(task, up); 115 116 //nxt_upstream_round_robin_get_peer(up); 117 return; 118 119fail: 120 121 nxt_job_destroy(task, jbs); 122 123 up->ready_handler(task, up); 124} 125 126 127static void 128nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data) 129{ 130 nxt_upstream_peer_t *up; 131 nxt_job_sockaddr_parse_t *jbs; 132 133 jbs = obj; 134 up = jbs->resolve.job.data; 135 136 up->ready_handler(task, up); 137} 138 139 140static void 141nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up) 142{ 143 int32_t effective_weights; 144 nxt_uint_t i; 145 nxt_msec_t now; 146 nxt_upstream_round_robin_t *urr; 147 nxt_upstream_round_robin_peer_t *peer, *best; 148 149 urr = up->upstream; 150 151 now = task->thread->engine->timers.now; 152 153 nxt_thread_spin_lock(&urr->lock); 154 155 best = NULL; 156 effective_weights = 0; 157 peer = urr->peers; 158 159 for (i = 0; i < urr->npeers; i++) { 160 161 if (peer[i].down) { 162 continue; 163 } 164 165#if 0 166 if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) { 167 good = peer[i].last_accessed + peer[i].fail_timeout; 168 169 if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) { 170 continue; 171 } 172 } 173#endif 174 175 peer[i].current_weight += peer[i].effective_weight; 176 effective_weights += peer[i].effective_weight; 177 178 if (peer[i].effective_weight < peer[i].weight) { 179 peer[i].effective_weight++; 180 } 181 182 if (best == NULL || peer[i].current_weight > best->current_weight) { 183 best = &peer[i]; 184 } 185 } 186 187 if (best != NULL) { 188 best->current_weight -= effective_weights; 189 best->last_accessed = now; 190 191 up->sockaddr = best->sockaddr; 192 193 } else { 194 up->sockaddr = NULL; 195 } 196 197 nxt_thread_spin_unlock(&urr->lock); 198 199 up->ready_handler(task, up); 200}
|