xref: /unit/src/nxt_http_proxy.c (revision 1270:9efa309be18b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 
10 
11 typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task,
12     nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
13 
14 
15 struct nxt_http_upstream_s {
16     uint32_t                     current;
17     uint32_t                     n;
18     uint8_t                      protocol;
19     nxt_http_upstream_connect_t  connect;
20     nxt_sockaddr_t               *sockaddr[1];
21 };
22 
23 
24 static void nxt_http_upstream_connect(nxt_task_t *task,
25     nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
26 static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task,
27     nxt_http_request_t *r, nxt_http_action_t *action);
28 static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
29 static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data);
30 static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data);
31 static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data);
32 static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data);
33 static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj,
34     void *data);
35 static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data);
36 
37 
38 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state;
39 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state;
40 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state;
41 static const nxt_http_request_state_t  nxt_http_proxy_read_state;
42 
43 
44 nxt_int_t
45 nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
46 {
47     nxt_str_t            name;
48     nxt_sockaddr_t       *sa;
49     nxt_http_upstream_t  *upstream;
50 
51     sa = NULL;
52     name = action->name;
53 
54     if (nxt_str_start(&name, "http://", 7)) {
55         name.length -= 7;
56         name.start += 7;
57 
58         sa = nxt_sockaddr_parse(mp, &name);
59         if (nxt_slow_path(sa == NULL)) {
60             return NXT_ERROR;
61         }
62 
63         sa->type = SOCK_STREAM;
64     }
65 
66     if (sa != NULL) {
67         upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t));
68         if (nxt_slow_path(upstream == NULL)) {
69             return NXT_ERROR;
70         }
71 
72         upstream->current = 0;
73         upstream->n = 1;
74         upstream->protocol = NXT_HTTP_PROTO_H1;
75         upstream->connect = nxt_http_upstream_connect;
76         upstream->sockaddr[0] = sa;
77 
78         action->u.upstream = upstream;
79         action->handler = nxt_http_proxy_handler;
80     }
81 
82     return NXT_OK;
83 }
84 
85 
86 static nxt_http_action_t *
87 nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
88     nxt_http_action_t *action)
89 {
90     nxt_http_peer_t  *peer;
91 
92     peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t));
93     if (nxt_slow_path(peer == NULL)) {
94         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
95         return NULL;
96     }
97 
98     peer->request = r;
99     r->peer = peer;
100 
101     nxt_mp_retain(r->mem_pool);
102 
103     action->u.upstream->connect(task, action->u.upstream, peer);
104 
105     return NULL;
106 }
107 
108 
109 static void
110 nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream,
111     nxt_http_peer_t *peer)
112 {
113     peer->protocol = upstream->protocol;
114     peer->sockaddr = upstream->sockaddr[0];
115 
116     peer->request->state = &nxt_http_proxy_header_send_state;
117 
118     nxt_http_proto[peer->protocol].peer_connect(task, peer);
119 }
120 
121 
122 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state
123     nxt_aligned(64) =
124 {
125     .ready_handler = nxt_http_proxy_header_send,
126     .error_handler = nxt_http_proxy_error,
127 };
128 
129 
130 static void
131 nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data)
132 {
133     nxt_http_peer_t     *peer;
134     nxt_http_request_t  *r;
135 
136     r = obj;
137     peer = data;
138     r->state = &nxt_http_proxy_header_sent_state;
139 
140     nxt_http_proto[peer->protocol].peer_header_send(task, peer);
141 }
142 
143 
144 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state
145     nxt_aligned(64) =
146 {
147     .ready_handler = nxt_http_proxy_header_sent,
148     .error_handler = nxt_http_proxy_error,
149 };
150 
151 
152 static void
153 nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data)
154 {
155     nxt_http_peer_t     *peer;
156     nxt_http_request_t  *r;
157 
158     r = obj;
159     peer = data;
160     r->state = &nxt_http_proxy_header_read_state;
161 
162     nxt_http_proto[peer->protocol].peer_header_read(task, peer);
163 }
164 
165 
166 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state
167     nxt_aligned(64) =
168 {
169     .ready_handler = nxt_http_proxy_header_read,
170     .error_handler = nxt_http_proxy_error,
171 };
172 
173 
174 static void
175 nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
176 {
177     nxt_http_peer_t     *peer;
178     nxt_http_field_t    *f, *field;
179     nxt_http_request_t  *r;
180 
181     r = obj;
182     peer = data;
183 
184     r->status = peer->status;
185 
186     nxt_debug(task, "http proxy status: %d", peer->status);
187 
188     nxt_list_each(field, peer->fields) {
189 
190         nxt_debug(task, "http proxy header: \"%*s: %*s\"",
191                   (size_t) field->name_length, field->name,
192                   (size_t) field->value_length, field->value);
193 
194         if (!field->skip) {
195             f = nxt_list_add(r->resp.fields);
196             if (nxt_slow_path(f == NULL)) {
197                 nxt_http_proxy_error(task, r, peer);
198                 return;
199             }
200 
201             *f = *field;
202         }
203 
204     } nxt_list_loop;
205 
206     nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer);
207 }
208 
209 
210 static void
211 nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
212 {
213     nxt_buf_t           *out;
214     nxt_http_peer_t     *peer;
215     nxt_http_request_t  *r;
216 
217     r = obj;
218     peer = data;
219     out = peer->body;
220 
221     if (out != NULL) {
222         peer->body = NULL;
223         nxt_http_request_send(task, r, out);
224     }
225 
226     r->state = &nxt_http_proxy_read_state;
227 
228     nxt_http_proto[peer->protocol].peer_read(task, peer);
229 }
230 
231 
232 static const nxt_http_request_state_t  nxt_http_proxy_read_state
233     nxt_aligned(64) =
234 {
235     .ready_handler = nxt_http_proxy_read,
236     .error_handler = nxt_http_proxy_error,
237 };
238 
239 
240 static void
241 nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
242 {
243     nxt_buf_t           *out;
244     nxt_bool_t          last;
245     nxt_http_peer_t     *peer;
246     nxt_http_request_t  *r;
247 
248     r = obj;
249     peer = data;
250     out = peer->body;
251     peer->body = NULL;
252     last = nxt_buf_is_last(out);
253 
254     nxt_http_request_send(task, r, out);
255 
256     if (!last) {
257         nxt_http_proto[peer->protocol].peer_read(task, peer);
258 
259     } else {
260         nxt_http_proto[peer->protocol].peer_close(task, peer);
261 
262         nxt_mp_release(r->mem_pool);
263     }
264 }
265 
266 
267 nxt_buf_t *
268 nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r,
269     size_t size)
270 {
271     nxt_buf_t  *b;
272 
273     b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
274     if (nxt_fast_path(b != NULL)) {
275         b->completion_handler = nxt_http_proxy_buf_mem_completion;
276         b->parent = r;
277         nxt_mp_retain(r->mem_pool);
278 
279     } else {
280         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
281     }
282 
283     return b;
284 }
285 
286 
287 static void
288 nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
289 {
290     nxt_buf_t           *b, *next;
291     nxt_http_peer_t     *peer;
292     nxt_http_request_t  *r;
293 
294     b = obj;
295     r = data;
296 
297     peer = r->peer;
298 
299     do {
300         next = b->next;
301 
302         nxt_http_proxy_buf_mem_free(task, r, b);
303 
304         b = next;
305     } while (b != NULL);
306 
307     if (!peer->closed) {
308         nxt_http_proto[peer->protocol].peer_read(task, peer);
309     }
310 }
311 
312 
313 void
314 nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r,
315     nxt_buf_t *b)
316 {
317     nxt_event_engine_buf_mem_free(task->thread->engine, b);
318 
319     nxt_mp_release(r->mem_pool);
320 }
321 
322 
323 static void
324 nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data)
325 {
326     nxt_http_peer_t     *peer;
327     nxt_http_request_t  *r;
328 
329     r = obj;
330     peer = r->peer;
331 
332     nxt_http_proto[peer->protocol].peer_close(task, peer);
333 
334     nxt_mp_release(r->mem_pool);
335 
336     nxt_http_request_error(task, r, peer->status);
337 }
338 
339 
340 nxt_int_t
341 nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data)
342 {
343     nxt_http_request_t  *r;
344 
345     r = ctx;
346 
347     r->resp.date = field;
348 
349     return NXT_OK;
350 }
351 
352 
353 nxt_int_t
354 nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field,
355     uintptr_t data)
356 {
357     nxt_off_t           n;
358     nxt_http_request_t  *r;
359 
360     r = ctx;
361 
362     r->resp.content_length = field;
363 
364     n = nxt_off_t_parse(field->value, field->value_length);
365 
366     if (nxt_fast_path(n >= 0)) {
367         r->resp.content_length_n = n;
368     }
369 
370     return NXT_OK;
371 }
372 
373 
374 nxt_int_t
375 nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data)
376 {
377     field->skip = 1;
378 
379     return NXT_OK;
380 }
381