nxt_upstream_round_robin.c (65:10688b89aa16) nxt_upstream_round_robin.c (493:745222d540a2)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10typedef struct {
11 int32_t weight;
12 int32_t effective_weight;
13 int32_t current_weight;
14 uint32_t down; /* 1 bit */
15 nxt_msec_t last_accessed;
16 nxt_sockaddr_t *sockaddr;
17} nxt_upstream_round_robin_peer_t;
18
19
20typedef struct {
21 nxt_uint_t npeers;
22 nxt_upstream_round_robin_peer_t *peers;
23 nxt_thread_spinlock_t lock;
24} nxt_upstream_round_robin_t;
25
26
27static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj,
28 void *data);
29static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj,
30 void *data);
31static void nxt_upstream_round_robin_get_peer(nxt_task_t *task,
32 nxt_upstream_peer_t *up);
33
34
35void
36nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
37{
38 nxt_job_sockaddr_parse_t *jbs;
39
40 if (up->upstream != NULL) {
41 nxt_upstream_round_robin_get_peer(task, up);
42 }
43
44 jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t));
45 if (nxt_slow_path(jbs == NULL)) {
46 up->ready_handler(task, up);
47 return;
48 }
49
50 jbs->resolve.job.task = task;
51 jbs->resolve.job.data = up;
52 jbs->resolve.port = up->port;
53 jbs->resolve.log_level = NXT_LOG_ERR;
54 jbs->resolve.ready_handler = nxt_upstream_round_robin_create;
55 jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error;
56 jbs->addr = up->addr;
57
58 nxt_job_sockaddr_parse(jbs);
59}
60
61
62static void
63nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data)
64{
65 nxt_uint_t i;
66 nxt_sockaddr_t *sa;
67 nxt_upstream_peer_t *up;
68 nxt_job_sockaddr_parse_t *jbs;
69 nxt_upstream_round_robin_t *urr;
70 nxt_upstream_round_robin_peer_t *peer;
71
72 jbs = obj;
73 up = jbs->resolve.job.data;
74
75 urr = nxt_mp_zget(up->mem_pool, sizeof(nxt_upstream_round_robin_t));
76 if (nxt_slow_path(urr == NULL)) {
77 goto fail;
78 }
79
80 urr->npeers = jbs->resolve.count;
81
82 peer = nxt_mp_zget(up->mem_pool,
83 urr->npeers * sizeof(nxt_upstream_round_robin_peer_t));
84 if (nxt_slow_path(peer == NULL)) {
85 goto fail;
86 }
87
88 urr->peers = peer;
89
90 for (i = 0; i < urr->npeers; i++) {
91 peer[i].weight = 1;
92 peer[i].effective_weight = 1;
93
94 sa = jbs->resolve.sockaddrs[i];
95
96 /* STUB */
97 sa->type = SOCK_STREAM;
98
99 nxt_sockaddr_text(sa);
100
101 nxt_debug(task, "upstream peer: %*s",
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10typedef struct {
11 int32_t weight;
12 int32_t effective_weight;
13 int32_t current_weight;
14 uint32_t down; /* 1 bit */
15 nxt_msec_t last_accessed;
16 nxt_sockaddr_t *sockaddr;
17} nxt_upstream_round_robin_peer_t;
18
19
20typedef struct {
21 nxt_uint_t npeers;
22 nxt_upstream_round_robin_peer_t *peers;
23 nxt_thread_spinlock_t lock;
24} nxt_upstream_round_robin_t;
25
26
27static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj,
28 void *data);
29static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj,
30 void *data);
31static void nxt_upstream_round_robin_get_peer(nxt_task_t *task,
32 nxt_upstream_peer_t *up);
33
34
35void
36nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
37{
38 nxt_job_sockaddr_parse_t *jbs;
39
40 if (up->upstream != NULL) {
41 nxt_upstream_round_robin_get_peer(task, up);
42 }
43
44 jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t));
45 if (nxt_slow_path(jbs == NULL)) {
46 up->ready_handler(task, up);
47 return;
48 }
49
50 jbs->resolve.job.task = task;
51 jbs->resolve.job.data = up;
52 jbs->resolve.port = up->port;
53 jbs->resolve.log_level = NXT_LOG_ERR;
54 jbs->resolve.ready_handler = nxt_upstream_round_robin_create;
55 jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error;
56 jbs->addr = up->addr;
57
58 nxt_job_sockaddr_parse(jbs);
59}
60
61
62static void
63nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data)
64{
65 nxt_uint_t i;
66 nxt_sockaddr_t *sa;
67 nxt_upstream_peer_t *up;
68 nxt_job_sockaddr_parse_t *jbs;
69 nxt_upstream_round_robin_t *urr;
70 nxt_upstream_round_robin_peer_t *peer;
71
72 jbs = obj;
73 up = jbs->resolve.job.data;
74
75 urr = nxt_mp_zget(up->mem_pool, sizeof(nxt_upstream_round_robin_t));
76 if (nxt_slow_path(urr == NULL)) {
77 goto fail;
78 }
79
80 urr->npeers = jbs->resolve.count;
81
82 peer = nxt_mp_zget(up->mem_pool,
83 urr->npeers * sizeof(nxt_upstream_round_robin_peer_t));
84 if (nxt_slow_path(peer == NULL)) {
85 goto fail;
86 }
87
88 urr->peers = peer;
89
90 for (i = 0; i < urr->npeers; i++) {
91 peer[i].weight = 1;
92 peer[i].effective_weight = 1;
93
94 sa = jbs->resolve.sockaddrs[i];
95
96 /* STUB */
97 sa->type = SOCK_STREAM;
98
99 nxt_sockaddr_text(sa);
100
101 nxt_debug(task, "upstream peer: %*s",
102 sa->length, nxt_sockaddr_start(sa));
102 (size_t) sa->length, nxt_sockaddr_start(sa));
103
104 /* TODO: memcpy to shared memory pool. */
105 peer[i].sockaddr = sa;
106 }
107
108 up->upstream = urr;
109
110 /* STUB */
111 up->sockaddr = peer[0].sockaddr;
112
113 nxt_job_destroy(task, jbs);
114 up->ready_handler(task, up);
115
116 //nxt_upstream_round_robin_get_peer(up);
117 return;
118
119fail:
120
121 nxt_job_destroy(task, jbs);
122
123 up->ready_handler(task, up);
124}
125
126
127static void
128nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data)
129{
130 nxt_upstream_peer_t *up;
131 nxt_job_sockaddr_parse_t *jbs;
132
133 jbs = obj;
134 up = jbs->resolve.job.data;
135
136 up->ready_handler(task, up);
137}
138
139
140static void
141nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
142{
143 int32_t effective_weights;
144 nxt_uint_t i;
145 nxt_msec_t now;
146 nxt_upstream_round_robin_t *urr;
147 nxt_upstream_round_robin_peer_t *peer, *best;
148
149 urr = up->upstream;
150
151 now = task->thread->engine->timers.now;
152
153 nxt_thread_spin_lock(&urr->lock);
154
155 best = NULL;
156 effective_weights = 0;
157 peer = urr->peers;
158
159 for (i = 0; i < urr->npeers; i++) {
160
161 if (peer[i].down) {
162 continue;
163 }
164
165#if 0
166 if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) {
167 good = peer[i].last_accessed + peer[i].fail_timeout;
168
169 if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) {
170 continue;
171 }
172 }
173#endif
174
175 peer[i].current_weight += peer[i].effective_weight;
176 effective_weights += peer[i].effective_weight;
177
178 if (peer[i].effective_weight < peer[i].weight) {
179 peer[i].effective_weight++;
180 }
181
182 if (best == NULL || peer[i].current_weight > best->current_weight) {
183 best = &peer[i];
184 }
185 }
186
187 if (best != NULL) {
188 best->current_weight -= effective_weights;
189 best->last_accessed = now;
190
191 up->sockaddr = best->sockaddr;
192
193 } else {
194 up->sockaddr = NULL;
195 }
196
197 nxt_thread_spin_unlock(&urr->lock);
198
199 up->ready_handler(task, up);
200}
103
104 /* TODO: memcpy to shared memory pool. */
105 peer[i].sockaddr = sa;
106 }
107
108 up->upstream = urr;
109
110 /* STUB */
111 up->sockaddr = peer[0].sockaddr;
112
113 nxt_job_destroy(task, jbs);
114 up->ready_handler(task, up);
115
116 //nxt_upstream_round_robin_get_peer(up);
117 return;
118
119fail:
120
121 nxt_job_destroy(task, jbs);
122
123 up->ready_handler(task, up);
124}
125
126
127static void
128nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data)
129{
130 nxt_upstream_peer_t *up;
131 nxt_job_sockaddr_parse_t *jbs;
132
133 jbs = obj;
134 up = jbs->resolve.job.data;
135
136 up->ready_handler(task, up);
137}
138
139
140static void
141nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
142{
143 int32_t effective_weights;
144 nxt_uint_t i;
145 nxt_msec_t now;
146 nxt_upstream_round_robin_t *urr;
147 nxt_upstream_round_robin_peer_t *peer, *best;
148
149 urr = up->upstream;
150
151 now = task->thread->engine->timers.now;
152
153 nxt_thread_spin_lock(&urr->lock);
154
155 best = NULL;
156 effective_weights = 0;
157 peer = urr->peers;
158
159 for (i = 0; i < urr->npeers; i++) {
160
161 if (peer[i].down) {
162 continue;
163 }
164
165#if 0
166 if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) {
167 good = peer[i].last_accessed + peer[i].fail_timeout;
168
169 if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) {
170 continue;
171 }
172 }
173#endif
174
175 peer[i].current_weight += peer[i].effective_weight;
176 effective_weights += peer[i].effective_weight;
177
178 if (peer[i].effective_weight < peer[i].weight) {
179 peer[i].effective_weight++;
180 }
181
182 if (best == NULL || peer[i].current_weight > best->current_weight) {
183 best = &peer[i];
184 }
185 }
186
187 if (best != NULL) {
188 best->current_weight -= effective_weights;
189 best->last_accessed = now;
190
191 up->sockaddr = best->sockaddr;
192
193 } else {
194 up->sockaddr = NULL;
195 }
196
197 nxt_thread_spin_unlock(&urr->lock);
198
199 up->ready_handler(task, up);
200}