xref: /unit/src/nxt_http_proxy.c (revision 1589:a80bba0c55f2)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_upstream.h>
10 
11 
12 struct nxt_upstream_proxy_s {
13     nxt_sockaddr_t  *sockaddr;
14     uint8_t         protocol;
15 };
16 
17 
18 static void nxt_http_proxy_server_get(nxt_task_t *task,
19     nxt_upstream_server_t *us);
20 static void nxt_http_proxy_upstream_ready(nxt_task_t *task,
21     nxt_upstream_server_t *us);
22 static void nxt_http_proxy_upstream_error(nxt_task_t *task,
23     nxt_upstream_server_t *us);
24 static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task,
25     nxt_http_request_t *r, nxt_http_action_t *action);
26 static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
27 static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data);
28 static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data);
29 static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data);
30 static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj,
31     void *data);
32 static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data);
33 
34 
35 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state;
36 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state;
37 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state;
38 static const nxt_http_request_state_t  nxt_http_proxy_read_state;
39 
40 
41 static const nxt_upstream_server_proto_t  nxt_upstream_simple_proto = {
42     .get = nxt_http_proxy_server_get,
43 };
44 
45 
46 static const nxt_upstream_peer_state_t  nxt_upstream_proxy_state = {
47     .ready = nxt_http_proxy_upstream_ready,
48     .error = nxt_http_proxy_upstream_error,
49 };
50 
51 
52 nxt_int_t
53 nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
54 {
55     nxt_str_t             name;
56     nxt_sockaddr_t        *sa;
57     nxt_upstream_t        *up;
58     nxt_upstream_proxy_t  *proxy;
59 
60     sa = NULL;
61     name = action->name;
62 
63     if (nxt_str_start(&name, "http://", 7)) {
64         name.length -= 7;
65         name.start += 7;
66 
67         sa = nxt_sockaddr_parse(mp, &name);
68         if (nxt_slow_path(sa == NULL)) {
69             return NXT_ERROR;
70         }
71 
72         sa->type = SOCK_STREAM;
73     }
74 
75     if (sa != NULL) {
76         up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
77         if (nxt_slow_path(up == NULL)) {
78             return NXT_ERROR;
79         }
80 
81         up->name.length = sa->length;
82         up->name.start = nxt_sockaddr_start(sa);
83         up->proto = &nxt_upstream_simple_proto;
84 
85         proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t));
86         if (nxt_slow_path(proxy == NULL)) {
87             return NXT_ERROR;
88         }
89 
90         proxy->sockaddr = sa;
91         proxy->protocol = NXT_HTTP_PROTO_H1;
92         up->type.proxy = proxy;
93 
94         action->u.upstream = up;
95         action->handler = nxt_http_proxy_handler;
96     }
97 
98     return NXT_OK;
99 }
100 
101 
102 static nxt_http_action_t *
103 nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
104     nxt_http_action_t *action)
105 {
106     return nxt_upstream_proxy_handler(task, r, action->u.upstream);
107 }
108 
109 
110 nxt_http_action_t *
111 nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
112     nxt_upstream_t *upstream)
113 {
114     nxt_http_peer_t        *peer;
115     nxt_upstream_server_t  *us;
116 
117     us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t));
118     if (nxt_slow_path(us == NULL)) {
119         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
120         return NULL;
121     }
122 
123     peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t));
124     if (nxt_slow_path(peer == NULL)) {
125         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
126         return NULL;
127     }
128 
129     peer->request = r;
130     r->peer = peer;
131 
132     nxt_mp_retain(r->mem_pool);
133 
134     us->state = &nxt_upstream_proxy_state;
135     us->peer.http = peer;
136     peer->server = us;
137 
138     us->upstream = upstream;
139     upstream->proto->get(task, us);
140 
141     return NULL;
142 }
143 
144 
145 static void
146 nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
147 {
148     nxt_upstream_proxy_t  *proxy;
149 
150     proxy = us->upstream->type.proxy;
151 
152     us->sockaddr = proxy->sockaddr;
153     us->protocol = proxy->protocol;
154 
155     us->state->ready(task, us);
156 }
157 
158 
159 static void
160 nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us)
161 {
162     nxt_http_peer_t  *peer;
163 
164     peer = us->peer.http;
165 
166     peer->protocol = us->protocol;
167 
168     peer->request->state = &nxt_http_proxy_header_send_state;
169 
170     nxt_http_proto[peer->protocol].peer_connect(task, peer);
171 }
172 
173 
174 static void
175 nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us)
176 {
177     nxt_http_request_t  *r;
178 
179     r = us->peer.http->request;
180 
181     nxt_mp_release(r->mem_pool);
182 
183     nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY);
184 }
185 
186 
187 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state
188     nxt_aligned(64) =
189 {
190     .ready_handler = nxt_http_proxy_header_send,
191     .error_handler = nxt_http_proxy_error,
192 };
193 
194 
195 static void
196 nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data)
197 {
198     nxt_http_peer_t     *peer;
199     nxt_http_request_t  *r;
200 
201     r = obj;
202     peer = data;
203     r->state = &nxt_http_proxy_header_sent_state;
204 
205     nxt_http_proto[peer->protocol].peer_header_send(task, peer);
206 }
207 
208 
209 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state
210     nxt_aligned(64) =
211 {
212     .ready_handler = nxt_http_proxy_header_sent,
213     .error_handler = nxt_http_proxy_error,
214 };
215 
216 
217 static void
218 nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data)
219 {
220     nxt_http_peer_t     *peer;
221     nxt_http_request_t  *r;
222 
223     r = obj;
224     peer = data;
225     r->state = &nxt_http_proxy_header_read_state;
226 
227     nxt_http_proto[peer->protocol].peer_header_read(task, peer);
228 }
229 
230 
231 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state
232     nxt_aligned(64) =
233 {
234     .ready_handler = nxt_http_proxy_header_read,
235     .error_handler = nxt_http_proxy_error,
236 };
237 
238 
239 static void
240 nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
241 {
242     nxt_http_peer_t     *peer;
243     nxt_http_field_t    *f, *field;
244     nxt_http_request_t  *r;
245 
246     r = obj;
247     peer = data;
248 
249     r->status = peer->status;
250 
251     nxt_debug(task, "http proxy status: %d", peer->status);
252 
253     nxt_list_each(field, peer->fields) {
254 
255         nxt_debug(task, "http proxy header: \"%*s: %*s\"",
256                   (size_t) field->name_length, field->name,
257                   (size_t) field->value_length, field->value);
258 
259         if (!field->skip) {
260             f = nxt_list_add(r->resp.fields);
261             if (nxt_slow_path(f == NULL)) {
262                 nxt_http_proxy_error(task, r, peer);
263                 return;
264             }
265 
266             *f = *field;
267         }
268 
269     } nxt_list_loop;
270 
271     r->state = &nxt_http_proxy_read_state;
272 
273     nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer);
274 }
275 
276 
277 static const nxt_http_request_state_t  nxt_http_proxy_read_state
278     nxt_aligned(64) =
279 {
280     .ready_handler = nxt_http_proxy_send_body,
281     .error_handler = nxt_http_proxy_error,
282 };
283 
284 
285 static void
286 nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
287 {
288     nxt_buf_t           *out;
289     nxt_http_peer_t     *peer;
290     nxt_http_request_t  *r;
291 
292     r = obj;
293     peer = data;
294     out = peer->body;
295 
296     if (out != NULL) {
297         peer->body = NULL;
298         nxt_http_request_send(task, r, out);
299     }
300 
301     if (!peer->closed) {
302         nxt_http_proto[peer->protocol].peer_read(task, peer);
303 
304     } else {
305         nxt_http_proto[peer->protocol].peer_close(task, peer);
306 
307         nxt_mp_release(r->mem_pool);
308     }
309 }
310 
311 
312 nxt_buf_t *
313 nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r,
314     size_t size)
315 {
316     nxt_buf_t  *b;
317 
318     b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
319     if (nxt_fast_path(b != NULL)) {
320         b->completion_handler = nxt_http_proxy_buf_mem_completion;
321         b->parent = r;
322         nxt_mp_retain(r->mem_pool);
323 
324     } else {
325         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
326     }
327 
328     return b;
329 }
330 
331 
332 static void
333 nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
334 {
335     nxt_buf_t           *b, *next;
336     nxt_http_peer_t     *peer;
337     nxt_http_request_t  *r;
338 
339     b = obj;
340     r = data;
341 
342     peer = r->peer;
343 
344     do {
345         next = b->next;
346 
347         nxt_http_proxy_buf_mem_free(task, r, b);
348 
349         b = next;
350     } while (b != NULL);
351 
352     if (!peer->closed) {
353         nxt_http_proto[peer->protocol].peer_read(task, peer);
354     }
355 }
356 
357 
358 void
359 nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r,
360     nxt_buf_t *b)
361 {
362     nxt_event_engine_buf_mem_free(task->thread->engine, b);
363 
364     nxt_mp_release(r->mem_pool);
365 }
366 
367 
368 static void
369 nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data)
370 {
371     nxt_http_peer_t     *peer;
372     nxt_http_request_t  *r;
373 
374     r = obj;
375     peer = r->peer;
376 
377     nxt_http_proto[peer->protocol].peer_close(task, peer);
378 
379     nxt_mp_release(r->mem_pool);
380 
381     nxt_http_request_error(&r->task, r, peer->status);
382 }
383 
384 
385 nxt_int_t
386 nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data)
387 {
388     nxt_http_request_t  *r;
389 
390     r = ctx;
391 
392     r->resp.date = field;
393 
394     return NXT_OK;
395 }
396 
397 
398 nxt_int_t
399 nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field,
400     uintptr_t data)
401 {
402     nxt_off_t           n;
403     nxt_http_request_t  *r;
404 
405     r = ctx;
406 
407     r->resp.content_length = field;
408 
409     n = nxt_off_t_parse(field->value, field->value_length);
410 
411     if (nxt_fast_path(n >= 0)) {
412         r->resp.content_length_n = n;
413     }
414 
415     return NXT_OK;
416 }
417 
418 
419 nxt_int_t
420 nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data)
421 {
422     field->skip = 1;
423 
424     return NXT_OK;
425 }
426