xref: /unit/src/nxt_upstream_round_robin.c (revision 1440:d1ad3857769c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <math.h>
8 #include <nxt_router.h>
9 #include <nxt_http.h>
10 #include <nxt_upstream.h>
11 
12 
13 struct nxt_upstream_round_robin_server_s {
14     nxt_sockaddr_t                     *sockaddr;
15 
16     int32_t                            current_weight;
17     int32_t                            effective_weight;
18     int32_t                            weight;
19 
20     uint8_t                            protocol;
21 };
22 
23 
24 struct nxt_upstream_round_robin_s {
25     uint32_t                           items;
26     nxt_upstream_round_robin_server_t  server[0];
27 };
28 
29 
30 static nxt_upstream_t *nxt_upstream_round_robin_joint_create(
31     nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
32 static void nxt_upstream_round_robin_server_get(nxt_task_t *task,
33     nxt_upstream_server_t *us);
34 
35 
36 static const nxt_upstream_server_proto_t  nxt_upstream_round_robin_proto = {
37     .joint_create = nxt_upstream_round_robin_joint_create,
38     .get          = nxt_upstream_round_robin_server_get,
39 };
40 
41 
42 nxt_int_t
nxt_upstream_round_robin_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * upstream_conf,nxt_upstream_t * upstream)43 nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
44     nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream)
45 {
46     double                      total, k, w;
47     size_t                      size;
48     uint32_t                    i, n, next, wt;
49     nxt_mp_t                    *mp;
50     nxt_str_t                   name;
51     nxt_sockaddr_t              *sa;
52     nxt_conf_value_t            *servers_conf, *srvcf, *wtcf;
53     nxt_upstream_round_robin_t  *urr;
54 
55     static nxt_str_t  servers = nxt_string("servers");
56     static nxt_str_t  weight = nxt_string("weight");
57 
58     mp = tmcf->router_conf->mem_pool;
59 
60     servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL);
61     n = nxt_conf_object_members_count(servers_conf);
62 
63     total = 0.0;
64     next = 0;
65 
66     for (i = 0; i < n; i++) {
67         srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
68         wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL);
69         w = (wtcf != NULL) ? nxt_conf_get_number(wtcf) : 1;
70         total += w;
71     }
72 
73     /*
74      * This prevents overflow of int32_t
75      * in nxt_upstream_round_robin_server_get().
76      */
77     k = (total == 0) ? 0 : (NXT_INT32_T_MAX / 2) / total;
78 
79     if (isinf(k)) {
80         k = 1;
81     }
82 
83     size = sizeof(nxt_upstream_round_robin_t)
84            + n * sizeof(nxt_upstream_round_robin_server_t);
85 
86     urr = nxt_mp_zalloc(mp, size);
87     if (nxt_slow_path(urr == NULL)) {
88         return NXT_ERROR;
89     }
90 
91     urr->items = n;
92     next = 0;
93 
94     for (i = 0; i < n; i++) {
95         srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
96 
97         sa = nxt_sockaddr_parse(mp, &name);
98         if (nxt_slow_path(sa == NULL)) {
99             return NXT_ERROR;
100         }
101 
102         sa->type = SOCK_STREAM;
103 
104         urr->server[i].sockaddr = sa;
105         urr->server[i].protocol = NXT_HTTP_PROTO_H1;
106 
107         wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL);
108         w = (wtcf != NULL) ? k * nxt_conf_get_number(wtcf) : k;
109         wt = (w > 1 || w == 0) ? round(w) : 1;
110 
111         urr->server[i].weight = wt;
112         urr->server[i].effective_weight = wt;
113     }
114 
115     upstream->proto = &nxt_upstream_round_robin_proto;
116     upstream->type.round_robin = urr;
117 
118     return NXT_OK;
119 }
120 
121 
122 static nxt_upstream_t *
nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t * tmcf,nxt_upstream_t * upstream)123 nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf,
124     nxt_upstream_t *upstream)
125 {
126     size_t                      size;
127     uint32_t                    i, n;
128     nxt_mp_t                    *mp;
129     nxt_upstream_t              *u;
130     nxt_upstream_round_robin_t  *urr, *urrcf;
131 
132     mp = tmcf->router_conf->mem_pool;
133 
134     u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
135     if (nxt_slow_path(u == NULL)) {
136         return NULL;
137     }
138 
139     *u = *upstream;
140 
141     urrcf = upstream->type.round_robin;
142 
143     size = sizeof(nxt_upstream_round_robin_t)
144            + urrcf->items * sizeof(nxt_upstream_round_robin_server_t);
145 
146     urr = nxt_mp_alloc(mp, size);
147     if (nxt_slow_path(urr == NULL)) {
148         return NULL;
149     }
150 
151     u->type.round_robin = urr;
152 
153     n = urrcf->items;
154     urr->items = n;
155 
156     for (i = 0; i < n; i++) {
157         urr->server[i] = urrcf->server[i];
158     }
159 
160     return u;
161 }
162 
163 
164 static void
nxt_upstream_round_robin_server_get(nxt_task_t * task,nxt_upstream_server_t * us)165 nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
166 {
167     int32_t                            total;
168     uint32_t                           i, n;
169     nxt_upstream_round_robin_t         *round_robin;
170     nxt_upstream_round_robin_server_t  *s, *best;
171 
172     best = NULL;
173     total = 0;
174 
175     round_robin = us->upstream->type.round_robin;
176 
177     s = round_robin->server;
178     n = round_robin->items;
179 
180     for (i = 0; i < n; i++) {
181 
182         s[i].current_weight += s[i].effective_weight;
183         total += s[i].effective_weight;
184 
185         if (s[i].effective_weight < s[i].weight) {
186             s[i].effective_weight++;
187         }
188 
189         if (best == NULL || s[i].current_weight > best->current_weight) {
190             best = &s[i];
191         }
192     }
193 
194     if (best == NULL || total == 0) {
195         us->state->error(task, us);
196         return;
197     }
198 
199     best->current_weight -= total;
200     us->sockaddr = best->sockaddr;
201     us->protocol = best->protocol;
202     us->server.round_robin = best;
203 
204     us->state->ready(task, us);
205 }
206