xref: /unit/src/nxt_upstream_round_robin.c (revision 0:a63ceefd6ab0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 typedef struct {
11     int32_t                          weight;
12     int32_t                          effective_weight;
13     int32_t                          current_weight;
14     uint32_t                         down;              /* 1 bit */
15     nxt_msec_t                       last_accessed;
16     nxt_sockaddr_t                   *sockaddr;
17 } nxt_upstream_round_robin_peer_t;
18 
19 
20 typedef struct {
21     nxt_uint_t                       npeers;
22     nxt_upstream_round_robin_peer_t  *peers;
23     nxt_thread_spinlock_t            lock;
24 } nxt_upstream_round_robin_t;
25 
26 
27 static void nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj,
28     void *data);
29 static void nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj,
30     void *data);
31 static void nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up);
32 
33 
34 void
35 nxt_upstream_round_robin_peer(nxt_upstream_peer_t *up)
36 {
37     nxt_job_sockaddr_parse_t  *jbs;
38 
39     if (up->upstream != NULL) {
40         nxt_upstream_round_robin_get_peer(up);
41     }
42 
43     jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t));
44     if (nxt_slow_path(jbs == NULL)) {
45         up->ready_handler(up);
46         return;
47     }
48 
49     jbs->resolve.job.data = up;
50     jbs->resolve.port = up->port;
51     jbs->resolve.log_level = NXT_LOG_ERR;
52     jbs->resolve.ready_handler = nxt_upstream_round_robin_create;
53     jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error;
54     jbs->addr = up->addr;
55 
56     nxt_job_sockaddr_parse(jbs);
57 }
58 
59 
60 static void
61 nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj, void *data)
62 {
63     nxt_uint_t                       i;
64     nxt_sockaddr_t                   *sa;
65     nxt_upstream_peer_t              *up;
66     nxt_job_sockaddr_parse_t         *jbs;
67     nxt_upstream_round_robin_t       *urr;
68     nxt_upstream_round_robin_peer_t  *peer;
69 
70     jbs = obj;
71     up = jbs->resolve.job.data;
72 
73     urr = nxt_mem_zalloc(up->mem_pool, sizeof(nxt_upstream_round_robin_t));
74     if (nxt_slow_path(urr == NULL)) {
75         goto fail;
76     }
77 
78     urr->npeers = jbs->resolve.count;
79 
80     peer = nxt_mem_zalloc(up->mem_pool,
81                    urr->npeers * sizeof(nxt_upstream_round_robin_peer_t));
82     if (nxt_slow_path(peer == NULL)) {
83         goto fail;
84     }
85 
86     urr->peers = peer;
87 
88     for (i = 0; i < urr->npeers; i++) {
89         peer[i].weight = 1;
90         peer[i].effective_weight = 1;
91 
92         sa = jbs->resolve.sockaddrs[i];
93 
94         /* STUB */
95         sa->type = SOCK_STREAM;
96 
97         /* TODO: test ret */
98         (void) nxt_sockaddr_text(up->mem_pool, sa, 1);
99 
100         nxt_log_debug(thr->log, "upstream peer: %*s", sa->text_len, sa->text);
101 
102         /* TODO: memcpy to shared memory pool. */
103         peer[i].sockaddr = sa;
104     }
105 
106     up->upstream = urr;
107 
108     /* STUB */
109     up->sockaddr = peer[0].sockaddr;
110 
111     nxt_job_destroy(jbs);
112     up->ready_handler(up);
113 
114     //nxt_upstream_round_robin_get_peer(up);
115     return;
116 
117 fail:
118 
119     nxt_job_destroy(jbs);
120 
121     up->ready_handler(up);
122 }
123 
124 
125 static void
126 nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj, void *data)
127 {
128     nxt_upstream_peer_t       *up;
129     nxt_job_sockaddr_parse_t  *jbs;
130 
131     jbs = obj;
132     up = jbs->resolve.job.data;
133 
134     up->ready_handler(up);
135 }
136 
137 
138 static void
139 nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up)
140 {
141     int32_t                          effective_weights;
142     nxt_uint_t                       i;
143     nxt_msec_t                       now;
144     nxt_event_engine_t               *engine;
145     nxt_upstream_round_robin_t       *urr;
146     nxt_upstream_round_robin_peer_t  *peer, *best;
147 
148     urr = up->upstream;
149 
150     engine = nxt_thread_event_engine();
151     now = engine->timers.now;
152 
153     nxt_thread_spin_lock(&urr->lock);
154 
155     best = NULL;
156     effective_weights = 0;
157     peer = urr->peers;
158 
159     for (i = 0; i < urr->npeers; i++) {
160 
161         if (peer[i].down) {
162             continue;
163         }
164 
165 #if 0
166         if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) {
167             good = peer[i].last_accessed + peer[i].fail_timeout;
168 
169             if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) {
170                 continue;
171             }
172         }
173 #endif
174 
175         peer[i].current_weight += peer[i].effective_weight;
176         effective_weights += peer[i].effective_weight;
177 
178         if (peer[i].effective_weight < peer[i].weight) {
179             peer[i].effective_weight++;
180         }
181 
182         if (best == NULL || peer[i].current_weight > best->current_weight) {
183             best = &peer[i];
184         }
185     }
186 
187     if (best != NULL) {
188         best->current_weight -= effective_weights;
189         best->last_accessed = now;
190 
191         up->sockaddr = best->sockaddr;
192 
193     } else {
194         up->sockaddr = NULL;
195     }
196 
197     nxt_thread_spin_unlock(&urr->lock);
198 
199     up->ready_handler(up);
200 }
201