xref: /unit/src/nxt_http_proxy.c (revision 1271:d9c8ee25590a)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 
10 
11 typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task,
12     nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
13 
14 
15 struct nxt_http_upstream_s {
16     uint32_t                     current;
17     uint32_t                     n;
18     uint8_t                      protocol;
19     nxt_http_upstream_connect_t  connect;
20     nxt_sockaddr_t               *sockaddr[1];
21 };
22 
23 
24 static void nxt_http_upstream_connect(nxt_task_t *task,
25     nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
26 static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task,
27     nxt_http_request_t *r, nxt_http_action_t *action);
28 static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
29 static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data);
30 static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data);
31 static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data);
32 static void nxt_http_proxy_request_send(nxt_task_t *task,
33     nxt_http_request_t *r, nxt_buf_t *out);
34 static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data);
35 static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj,
36     void *data);
37 static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data);
38 
39 
40 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state;
41 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state;
42 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state;
43 static const nxt_http_request_state_t  nxt_http_proxy_read_state;
44 
45 
46 nxt_int_t
47 nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
48 {
49     nxt_str_t            name;
50     nxt_sockaddr_t       *sa;
51     nxt_http_upstream_t  *upstream;
52 
53     sa = NULL;
54     name = action->name;
55 
56     if (nxt_str_start(&name, "http://", 7)) {
57         name.length -= 7;
58         name.start += 7;
59 
60         sa = nxt_sockaddr_parse(mp, &name);
61         if (nxt_slow_path(sa == NULL)) {
62             return NXT_ERROR;
63         }
64 
65         sa->type = SOCK_STREAM;
66     }
67 
68     if (sa != NULL) {
69         upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t));
70         if (nxt_slow_path(upstream == NULL)) {
71             return NXT_ERROR;
72         }
73 
74         upstream->current = 0;
75         upstream->n = 1;
76         upstream->protocol = NXT_HTTP_PROTO_H1;
77         upstream->connect = nxt_http_upstream_connect;
78         upstream->sockaddr[0] = sa;
79 
80         action->u.upstream = upstream;
81         action->handler = nxt_http_proxy_handler;
82     }
83 
84     return NXT_OK;
85 }
86 
87 
88 static nxt_http_action_t *
89 nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
90     nxt_http_action_t *action)
91 {
92     nxt_http_peer_t  *peer;
93 
94     peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t));
95     if (nxt_slow_path(peer == NULL)) {
96         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
97         return NULL;
98     }
99 
100     peer->request = r;
101     r->peer = peer;
102 
103     nxt_mp_retain(r->mem_pool);
104 
105     action->u.upstream->connect(task, action->u.upstream, peer);
106 
107     return NULL;
108 }
109 
110 
111 static void
112 nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream,
113     nxt_http_peer_t *peer)
114 {
115     peer->protocol = upstream->protocol;
116     peer->sockaddr = upstream->sockaddr[0];
117 
118     peer->request->state = &nxt_http_proxy_header_send_state;
119 
120     nxt_http_proto[peer->protocol].peer_connect(task, peer);
121 }
122 
123 
124 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state
125     nxt_aligned(64) =
126 {
127     .ready_handler = nxt_http_proxy_header_send,
128     .error_handler = nxt_http_proxy_error,
129 };
130 
131 
132 static void
133 nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data)
134 {
135     nxt_http_peer_t     *peer;
136     nxt_http_request_t  *r;
137 
138     r = obj;
139     peer = data;
140     r->state = &nxt_http_proxy_header_sent_state;
141 
142     nxt_http_proto[peer->protocol].peer_header_send(task, peer);
143 }
144 
145 
146 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state
147     nxt_aligned(64) =
148 {
149     .ready_handler = nxt_http_proxy_header_sent,
150     .error_handler = nxt_http_proxy_error,
151 };
152 
153 
154 static void
155 nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data)
156 {
157     nxt_http_peer_t     *peer;
158     nxt_http_request_t  *r;
159 
160     r = obj;
161     peer = data;
162     r->state = &nxt_http_proxy_header_read_state;
163 
164     nxt_http_proto[peer->protocol].peer_header_read(task, peer);
165 }
166 
167 
168 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state
169     nxt_aligned(64) =
170 {
171     .ready_handler = nxt_http_proxy_header_read,
172     .error_handler = nxt_http_proxy_error,
173 };
174 
175 
176 static void
177 nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
178 {
179     nxt_http_peer_t     *peer;
180     nxt_http_field_t    *f, *field;
181     nxt_http_request_t  *r;
182 
183     r = obj;
184     peer = data;
185 
186     r->status = peer->status;
187 
188     nxt_debug(task, "http proxy status: %d", peer->status);
189 
190     if (r->resp.content_length_n > 0) {
191         peer->remainder = r->resp.content_length_n;
192     }
193 
194     nxt_list_each(field, peer->fields) {
195 
196         nxt_debug(task, "http proxy header: \"%*s: %*s\"",
197                   (size_t) field->name_length, field->name,
198                   (size_t) field->value_length, field->value);
199 
200         if (!field->skip) {
201             f = nxt_list_add(r->resp.fields);
202             if (nxt_slow_path(f == NULL)) {
203                 nxt_http_proxy_error(task, r, peer);
204                 return;
205             }
206 
207             *f = *field;
208         }
209 
210     } nxt_list_loop;
211 
212     nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer);
213 }
214 
215 
216 static void
217 nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
218 {
219     nxt_buf_t           *out;
220     nxt_http_peer_t     *peer;
221     nxt_http_request_t  *r;
222 
223     r = obj;
224     peer = data;
225     out = peer->body;
226 
227     if (out != NULL) {
228         peer->body = NULL;
229         nxt_http_proxy_request_send(task, r, out);
230     }
231 
232     r->state = &nxt_http_proxy_read_state;
233 
234     nxt_http_proto[peer->protocol].peer_read(task, peer);
235 }
236 
237 
238 static void
239 nxt_http_proxy_request_send(nxt_task_t *task, nxt_http_request_t *r,
240     nxt_buf_t *out)
241 {
242     size_t  length;
243 
244     if (r->peer->remainder > 0) {
245         length = nxt_buf_chain_length(out);
246         r->peer->remainder -= length;
247     }
248 
249     nxt_http_request_send(task, r, out);
250 }
251 
252 
253 static const nxt_http_request_state_t  nxt_http_proxy_read_state
254     nxt_aligned(64) =
255 {
256     .ready_handler = nxt_http_proxy_read,
257     .error_handler = nxt_http_proxy_error,
258 };
259 
260 
261 static void
262 nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
263 {
264     nxt_buf_t           *out;
265     nxt_bool_t          last;
266     nxt_http_peer_t     *peer;
267     nxt_http_request_t  *r;
268 
269     r = obj;
270     peer = data;
271     out = peer->body;
272     peer->body = NULL;
273     last = nxt_buf_is_last(out);
274 
275     nxt_http_proxy_request_send(task, r, out);
276 
277     if (!last) {
278         nxt_http_proto[peer->protocol].peer_read(task, peer);
279 
280     } else {
281         r->inconsistent = (peer->remainder != 0);
282 
283         nxt_http_proto[peer->protocol].peer_close(task, peer);
284 
285         nxt_mp_release(r->mem_pool);
286     }
287 }
288 
289 
290 nxt_buf_t *
291 nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r,
292     size_t size)
293 {
294     nxt_buf_t  *b;
295 
296     b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
297     if (nxt_fast_path(b != NULL)) {
298         b->completion_handler = nxt_http_proxy_buf_mem_completion;
299         b->parent = r;
300         nxt_mp_retain(r->mem_pool);
301 
302     } else {
303         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
304     }
305 
306     return b;
307 }
308 
309 
310 static void
311 nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
312 {
313     nxt_buf_t           *b, *next;
314     nxt_http_peer_t     *peer;
315     nxt_http_request_t  *r;
316 
317     b = obj;
318     r = data;
319 
320     peer = r->peer;
321 
322     do {
323         next = b->next;
324 
325         nxt_http_proxy_buf_mem_free(task, r, b);
326 
327         b = next;
328     } while (b != NULL);
329 
330     if (!peer->closed) {
331         nxt_http_proto[peer->protocol].peer_read(task, peer);
332     }
333 }
334 
335 
336 void
337 nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r,
338     nxt_buf_t *b)
339 {
340     nxt_event_engine_buf_mem_free(task->thread->engine, b);
341 
342     nxt_mp_release(r->mem_pool);
343 }
344 
345 
346 static void
347 nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data)
348 {
349     nxt_http_peer_t     *peer;
350     nxt_http_request_t  *r;
351 
352     r = obj;
353     peer = r->peer;
354 
355     nxt_http_proto[peer->protocol].peer_close(task, peer);
356 
357     nxt_mp_release(r->mem_pool);
358 
359     nxt_http_request_error(task, r, peer->status);
360 }
361 
362 
363 nxt_int_t
364 nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data)
365 {
366     nxt_http_request_t  *r;
367 
368     r = ctx;
369 
370     r->resp.date = field;
371 
372     return NXT_OK;
373 }
374 
375 
376 nxt_int_t
377 nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field,
378     uintptr_t data)
379 {
380     nxt_off_t           n;
381     nxt_http_request_t  *r;
382 
383     r = ctx;
384 
385     r->resp.content_length = field;
386 
387     n = nxt_off_t_parse(field->value, field->value_length);
388 
389     if (nxt_fast_path(n >= 0)) {
390         r->resp.content_length_n = n;
391     }
392 
393     return NXT_OK;
394 }
395 
396 
397 nxt_int_t
398 nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data)
399 {
400     field->skip = 1;
401 
402     return NXT_OK;
403 }
404