nxt_upstream_round_robin.c (1393:c2a2867bb5e5) nxt_upstream_round_robin.c (1394:20b41ebfff79)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_router.h>
8#include <nxt_http.h>
9#include <nxt_upstream.h>
10
11
12struct nxt_upstream_round_robin_server_s {
13 nxt_sockaddr_t *sockaddr;
14
15 int32_t current_weight;
16 int32_t effective_weight;
17 int32_t weight;
18
19 uint8_t protocol;
20};
21
22
23struct nxt_upstream_round_robin_s {
24 uint32_t items;
25 nxt_upstream_round_robin_server_t server[0];
26};
27
28
29static nxt_upstream_t *nxt_upstream_round_robin_joint_create(
30 nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
31static void nxt_upstream_round_robin_server_get(nxt_task_t *task,
32 nxt_upstream_server_t *us);
33
34
35static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = {
36 .joint_create = nxt_upstream_round_robin_joint_create,
37 .get = nxt_upstream_round_robin_server_get,
38};
39
40
41static nxt_conf_map_t nxt_upstream_round_robin_server_conf[] = {
42 {
43 nxt_string("weight"),
44 NXT_CONF_MAP_INT32,
45 offsetof(nxt_upstream_round_robin_server_t, weight),
46 },
47};
48
49
50nxt_int_t
51nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
52 nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream)
53{
54 size_t size;
55 uint32_t i, n, next;
56 nxt_mp_t *mp;
57 nxt_str_t name;
58 nxt_sockaddr_t *sa;
59 nxt_conf_value_t *servers_conf, *srvcf;
60 nxt_upstream_round_robin_t *urr;
61
62 static nxt_str_t servers = nxt_string("servers");
63
64 mp = tmcf->router_conf->mem_pool;
65
66 servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL);
67 n = nxt_conf_object_members_count(servers_conf);
68
69 size = sizeof(nxt_upstream_round_robin_t)
70 + n * sizeof(nxt_upstream_round_robin_server_t);
71
72 urr = nxt_mp_zalloc(mp, size);
73 if (nxt_slow_path(urr == NULL)) {
74 return NXT_ERROR;
75 }
76
77 urr->items = n;
78 next = 0;
79
80 for (i = 0; i < n; i++) {
81 srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
82
83 sa = nxt_sockaddr_parse(mp, &name);
84 if (nxt_slow_path(sa == NULL)) {
85 return NXT_ERROR;
86 }
87
88 sa->type = SOCK_STREAM;
89
90 urr->server[i].sockaddr = sa;
91 urr->server[i].weight = 1;
92 urr->server[i].protocol = NXT_HTTP_PROTO_H1;
93
94 nxt_conf_map_object(mp, srvcf, nxt_upstream_round_robin_server_conf,
95 nxt_nitems(nxt_upstream_round_robin_server_conf),
96 &urr->server[i]);
97
98 urr->server[i].effective_weight = urr->server[i].weight;
99 }
100
101 upstream->proto = &nxt_upstream_round_robin_proto;
102 upstream->type.round_robin = urr;
103
104 return NXT_OK;
105}
106
107
108static nxt_upstream_t *
109nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf,
110 nxt_upstream_t *upstream)
111{
112 size_t size;
113 uint32_t i, n;
114 nxt_mp_t *mp;
115 nxt_upstream_t *u;
116 nxt_upstream_round_robin_t *urr, *urrcf;
117
118 mp = tmcf->router_conf->mem_pool;
119
120 u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
121 if (nxt_slow_path(u == NULL)) {
122 return NULL;
123 }
124
125 *u = *upstream;
126
127 urrcf = upstream->type.round_robin;
128
129 size = sizeof(nxt_upstream_round_robin_t)
130 + urrcf->items * sizeof(nxt_upstream_round_robin_server_t);
131
132 urr = nxt_mp_alloc(mp, size);
133 if (nxt_slow_path(urr == NULL)) {
134 return NULL;
135 }
136
137 u->type.round_robin = urr;
138
139 n = urrcf->items;
140 urr->items = n;
141
142 for (i = 0; i < n; i++) {
143 urr->server[i] = urrcf->server[i];
144 }
145
146 return u;
147}
148
149
150static void
151nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
152{
153 int32_t total;
154 uint32_t i, n;
155 nxt_upstream_round_robin_t *round_robin;
156 nxt_upstream_round_robin_server_t *s, *best;
157
158 best = NULL;
159 total = 0;
160
161 round_robin = us->upstream->type.round_robin;
162
163 s = round_robin->server;
164 n = round_robin->items;
165
166 for (i = 0; i < n; i++) {
167
168 s[i].current_weight += s[i].effective_weight;
169 total += s[i].effective_weight;
170
171 if (s[i].effective_weight < s[i].weight) {
172 s[i].effective_weight++;
173 }
174
175 if (best == NULL || s[i].current_weight > best->current_weight) {
176 best = &s[i];
177 }
178 }
179
180 if (best == NULL) {
181 us->state->error(task, us);
182 return;
183 }
184
185 best->current_weight -= total;
186 us->sockaddr = best->sockaddr;
187 us->protocol = best->protocol;
188 us->server.round_robin = best;
189
190 us->state->ready(task, us);
191}