xref: /unit/src/nxt_upstream_round_robin.c (revision 20:4dc92b438f58)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 typedef struct {
11     int32_t                          weight;
12     int32_t                          effective_weight;
13     int32_t                          current_weight;
14     uint32_t                         down;              /* 1 bit */
15     nxt_msec_t                       last_accessed;
16     nxt_sockaddr_t                   *sockaddr;
17 } nxt_upstream_round_robin_peer_t;
18 
19 
20 typedef struct {
21     nxt_uint_t                       npeers;
22     nxt_upstream_round_robin_peer_t  *peers;
23     nxt_thread_spinlock_t            lock;
24 } nxt_upstream_round_robin_t;
25 
26 
27 static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj,
28     void *data);
29 static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj,
30     void *data);
31 static void nxt_upstream_round_robin_get_peer(nxt_task_t *task,
32     nxt_upstream_peer_t *up);
33 
34 
35 void
36 nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
37 {
38     nxt_job_sockaddr_parse_t  *jbs;
39 
40     if (up->upstream != NULL) {
41         nxt_upstream_round_robin_get_peer(task, up);
42     }
43 
44     jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t));
45     if (nxt_slow_path(jbs == NULL)) {
46         up->ready_handler(task, up);
47         return;
48     }
49 
50     jbs->resolve.job.task = task;
51     jbs->resolve.job.data = up;
52     jbs->resolve.port = up->port;
53     jbs->resolve.log_level = NXT_LOG_ERR;
54     jbs->resolve.ready_handler = nxt_upstream_round_robin_create;
55     jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error;
56     jbs->addr = up->addr;
57 
58     nxt_job_sockaddr_parse(jbs);
59 }
60 
61 
62 static void
63 nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data)
64 {
65     nxt_uint_t                       i;
66     nxt_sockaddr_t                   *sa;
67     nxt_upstream_peer_t              *up;
68     nxt_job_sockaddr_parse_t         *jbs;
69     nxt_upstream_round_robin_t       *urr;
70     nxt_upstream_round_robin_peer_t  *peer;
71 
72     jbs = obj;
73     up = jbs->resolve.job.data;
74 
75     urr = nxt_mem_zalloc(up->mem_pool, sizeof(nxt_upstream_round_robin_t));
76     if (nxt_slow_path(urr == NULL)) {
77         goto fail;
78     }
79 
80     urr->npeers = jbs->resolve.count;
81 
82     peer = nxt_mem_zalloc(up->mem_pool,
83                    urr->npeers * sizeof(nxt_upstream_round_robin_peer_t));
84     if (nxt_slow_path(peer == NULL)) {
85         goto fail;
86     }
87 
88     urr->peers = peer;
89 
90     for (i = 0; i < urr->npeers; i++) {
91         peer[i].weight = 1;
92         peer[i].effective_weight = 1;
93 
94         sa = jbs->resolve.sockaddrs[i];
95 
96         /* STUB */
97         sa->type = SOCK_STREAM;
98 
99         nxt_sockaddr_text(sa);
100 
101         nxt_debug(task, "upstream peer: %*s",
102                   sa->length, nxt_sockaddr_start(sa));
103 
104         /* TODO: memcpy to shared memory pool. */
105         peer[i].sockaddr = sa;
106     }
107 
108     up->upstream = urr;
109 
110     /* STUB */
111     up->sockaddr = peer[0].sockaddr;
112 
113     nxt_job_destroy(task, jbs);
114     up->ready_handler(task, up);
115 
116     //nxt_upstream_round_robin_get_peer(up);
117     return;
118 
119 fail:
120 
121     nxt_job_destroy(task, jbs);
122 
123     up->ready_handler(task, up);
124 }
125 
126 
127 static void
128 nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data)
129 {
130     nxt_upstream_peer_t       *up;
131     nxt_job_sockaddr_parse_t  *jbs;
132 
133     jbs = obj;
134     up = jbs->resolve.job.data;
135 
136     up->ready_handler(task, up);
137 }
138 
139 
140 static void
141 nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
142 {
143     int32_t                          effective_weights;
144     nxt_uint_t                       i;
145     nxt_msec_t                       now;
146     nxt_upstream_round_robin_t       *urr;
147     nxt_upstream_round_robin_peer_t  *peer, *best;
148 
149     urr = up->upstream;
150 
151     now = task->thread->engine->timers.now;
152 
153     nxt_thread_spin_lock(&urr->lock);
154 
155     best = NULL;
156     effective_weights = 0;
157     peer = urr->peers;
158 
159     for (i = 0; i < urr->npeers; i++) {
160 
161         if (peer[i].down) {
162             continue;
163         }
164 
165 #if 0
166         if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) {
167             good = peer[i].last_accessed + peer[i].fail_timeout;
168 
169             if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) {
170                 continue;
171             }
172         }
173 #endif
174 
175         peer[i].current_weight += peer[i].effective_weight;
176         effective_weights += peer[i].effective_weight;
177 
178         if (peer[i].effective_weight < peer[i].weight) {
179             peer[i].effective_weight++;
180         }
181 
182         if (best == NULL || peer[i].current_weight > best->current_weight) {
183             best = &peer[i];
184         }
185     }
186 
187     if (best != NULL) {
188         best->current_weight -= effective_weights;
189         best->last_accessed = now;
190 
191         up->sockaddr = best->sockaddr;
192 
193     } else {
194         up->sockaddr = NULL;
195     }
196 
197     nxt_thread_spin_unlock(&urr->lock);
198 
199     up->ready_handler(task, up);
200 }
201