xref: /unit/src/nxt_http_proxy.c (revision 1394:20b41ebfff79)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_upstream.h>
10 
11 
12 struct nxt_upstream_proxy_s {
13     nxt_sockaddr_t  *sockaddr;
14     uint8_t         protocol;
15 };
16 
17 
18 static void nxt_http_proxy_server_get(nxt_task_t *task,
19     nxt_upstream_server_t *us);
20 static void nxt_http_proxy_upstream_ready(nxt_task_t *task,
21     nxt_upstream_server_t *us);
22 static void nxt_http_proxy_upstream_error(nxt_task_t *task,
23     nxt_upstream_server_t *us);
24 static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task,
25     nxt_http_request_t *r, nxt_http_action_t *action);
26 static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
27 static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data);
28 static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data);
29 static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data);
30 static void nxt_http_proxy_request_send(nxt_task_t *task,
31     nxt_http_request_t *r, nxt_buf_t *out);
32 static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data);
33 static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj,
34     void *data);
35 static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data);
36 
37 
38 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state;
39 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state;
40 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state;
41 static const nxt_http_request_state_t  nxt_http_proxy_read_state;
42 
43 
44 static const nxt_upstream_server_proto_t  nxt_upstream_simple_proto = {
45     .get = nxt_http_proxy_server_get,
46 };
47 
48 
49 static const nxt_upstream_peer_state_t  nxt_upstream_proxy_state = {
50     .ready = nxt_http_proxy_upstream_ready,
51     .error = nxt_http_proxy_upstream_error,
52 };
53 
54 
55 nxt_int_t
56 nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
57 {
58     nxt_str_t             name;
59     nxt_sockaddr_t        *sa;
60     nxt_upstream_t        *up;
61     nxt_upstream_proxy_t  *proxy;
62 
63     sa = NULL;
64     name = action->name;
65 
66     if (nxt_str_start(&name, "http://", 7)) {
67         name.length -= 7;
68         name.start += 7;
69 
70         sa = nxt_sockaddr_parse(mp, &name);
71         if (nxt_slow_path(sa == NULL)) {
72             return NXT_ERROR;
73         }
74 
75         sa->type = SOCK_STREAM;
76     }
77 
78     if (sa != NULL) {
79         up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
80         if (nxt_slow_path(up == NULL)) {
81             return NXT_ERROR;
82         }
83 
84         up->name.length = sa->length;
85         up->name.start = nxt_sockaddr_start(sa);
86         up->proto = &nxt_upstream_simple_proto;
87 
88         proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t));
89         if (nxt_slow_path(proxy == NULL)) {
90             return NXT_ERROR;
91         }
92 
93         proxy->sockaddr = sa;
94         proxy->protocol = NXT_HTTP_PROTO_H1;
95         up->type.proxy = proxy;
96 
97         action->u.upstream = up;
98         action->handler = nxt_http_proxy_handler;
99     }
100 
101     return NXT_OK;
102 }
103 
104 
105 static nxt_http_action_t *
106 nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
107     nxt_http_action_t *action)
108 {
109     return nxt_upstream_proxy_handler(task, r, action->u.upstream);
110 }
111 
112 
113 nxt_http_action_t *
114 nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
115     nxt_upstream_t *upstream)
116 {
117     nxt_http_peer_t        *peer;
118     nxt_upstream_server_t  *us;
119 
120     us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t));
121     if (nxt_slow_path(us == NULL)) {
122         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
123         return NULL;
124     }
125 
126     peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t));
127     if (nxt_slow_path(peer == NULL)) {
128         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
129         return NULL;
130     }
131 
132     peer->request = r;
133     r->peer = peer;
134 
135     nxt_mp_retain(r->mem_pool);
136 
137     us->state = &nxt_upstream_proxy_state;
138     us->peer.http = peer;
139     peer->server = us;
140 
141     us->upstream = upstream;
142     upstream->proto->get(task, us);
143 
144     return NULL;
145 }
146 
147 
148 static void
149 nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
150 {
151     nxt_upstream_proxy_t  *proxy;
152 
153     proxy = us->upstream->type.proxy;
154 
155     us->sockaddr = proxy->sockaddr;
156     us->protocol = proxy->protocol;
157 
158     us->state->ready(task, us);
159 }
160 
161 
162 static void
163 nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us)
164 {
165     nxt_http_peer_t  *peer;
166 
167     peer = us->peer.http;
168 
169     peer->protocol = us->protocol;
170 
171     peer->request->state = &nxt_http_proxy_header_send_state;
172 
173     nxt_http_proto[peer->protocol].peer_connect(task, peer);
174 }
175 
176 
177 static void
178 nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us)
179 {
180     nxt_http_request_t  *r;
181 
182     r = us->peer.http->request;
183 
184     nxt_mp_release(r->mem_pool);
185 
186     nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY);
187 }
188 
189 
190 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state
191     nxt_aligned(64) =
192 {
193     .ready_handler = nxt_http_proxy_header_send,
194     .error_handler = nxt_http_proxy_error,
195 };
196 
197 
198 static void
199 nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data)
200 {
201     nxt_http_peer_t     *peer;
202     nxt_http_request_t  *r;
203 
204     r = obj;
205     peer = data;
206     r->state = &nxt_http_proxy_header_sent_state;
207 
208     nxt_http_proto[peer->protocol].peer_header_send(task, peer);
209 }
210 
211 
212 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state
213     nxt_aligned(64) =
214 {
215     .ready_handler = nxt_http_proxy_header_sent,
216     .error_handler = nxt_http_proxy_error,
217 };
218 
219 
220 static void
221 nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data)
222 {
223     nxt_http_peer_t     *peer;
224     nxt_http_request_t  *r;
225 
226     r = obj;
227     peer = data;
228     r->state = &nxt_http_proxy_header_read_state;
229 
230     nxt_http_proto[peer->protocol].peer_header_read(task, peer);
231 }
232 
233 
234 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state
235     nxt_aligned(64) =
236 {
237     .ready_handler = nxt_http_proxy_header_read,
238     .error_handler = nxt_http_proxy_error,
239 };
240 
241 
242 static void
243 nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
244 {
245     nxt_http_peer_t     *peer;
246     nxt_http_field_t    *f, *field;
247     nxt_http_request_t  *r;
248 
249     r = obj;
250     peer = data;
251 
252     r->status = peer->status;
253 
254     nxt_debug(task, "http proxy status: %d", peer->status);
255 
256     if (r->resp.content_length_n > 0) {
257         peer->remainder = r->resp.content_length_n;
258     }
259 
260     nxt_list_each(field, peer->fields) {
261 
262         nxt_debug(task, "http proxy header: \"%*s: %*s\"",
263                   (size_t) field->name_length, field->name,
264                   (size_t) field->value_length, field->value);
265 
266         if (!field->skip) {
267             f = nxt_list_add(r->resp.fields);
268             if (nxt_slow_path(f == NULL)) {
269                 nxt_http_proxy_error(task, r, peer);
270                 return;
271             }
272 
273             *f = *field;
274         }
275 
276     } nxt_list_loop;
277 
278     nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer);
279 }
280 
281 
282 static void
283 nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
284 {
285     nxt_buf_t           *out;
286     nxt_http_peer_t     *peer;
287     nxt_http_request_t  *r;
288 
289     r = obj;
290     peer = data;
291     out = peer->body;
292 
293     if (out != NULL) {
294         peer->body = NULL;
295         nxt_http_proxy_request_send(task, r, out);
296     }
297 
298     r->state = &nxt_http_proxy_read_state;
299 
300     nxt_http_proto[peer->protocol].peer_read(task, peer);
301 }
302 
303 
304 static void
305 nxt_http_proxy_request_send(nxt_task_t *task, nxt_http_request_t *r,
306     nxt_buf_t *out)
307 {
308     size_t  length;
309 
310     if (r->peer->remainder > 0) {
311         length = nxt_buf_chain_length(out);
312         r->peer->remainder -= length;
313     }
314 
315     nxt_http_request_send(task, r, out);
316 }
317 
318 
319 static const nxt_http_request_state_t  nxt_http_proxy_read_state
320     nxt_aligned(64) =
321 {
322     .ready_handler = nxt_http_proxy_read,
323     .error_handler = nxt_http_proxy_error,
324 };
325 
326 
327 static void
328 nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
329 {
330     nxt_buf_t           *out;
331     nxt_bool_t          last;
332     nxt_http_peer_t     *peer;
333     nxt_http_request_t  *r;
334 
335     r = obj;
336     peer = data;
337     out = peer->body;
338     peer->body = NULL;
339     last = nxt_buf_is_last(out);
340 
341     nxt_http_proxy_request_send(task, r, out);
342 
343     if (!last) {
344         nxt_http_proto[peer->protocol].peer_read(task, peer);
345 
346     } else {
347         r->inconsistent = (peer->remainder != 0);
348 
349         nxt_http_proto[peer->protocol].peer_close(task, peer);
350 
351         nxt_mp_release(r->mem_pool);
352     }
353 }
354 
355 
356 nxt_buf_t *
357 nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r,
358     size_t size)
359 {
360     nxt_buf_t  *b;
361 
362     b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
363     if (nxt_fast_path(b != NULL)) {
364         b->completion_handler = nxt_http_proxy_buf_mem_completion;
365         b->parent = r;
366         nxt_mp_retain(r->mem_pool);
367 
368     } else {
369         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
370     }
371 
372     return b;
373 }
374 
375 
376 static void
377 nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
378 {
379     nxt_buf_t           *b, *next;
380     nxt_http_peer_t     *peer;
381     nxt_http_request_t  *r;
382 
383     b = obj;
384     r = data;
385 
386     peer = r->peer;
387 
388     do {
389         next = b->next;
390 
391         nxt_http_proxy_buf_mem_free(task, r, b);
392 
393         b = next;
394     } while (b != NULL);
395 
396     if (!peer->closed) {
397         nxt_http_proto[peer->protocol].peer_read(task, peer);
398     }
399 }
400 
401 
402 void
403 nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r,
404     nxt_buf_t *b)
405 {
406     nxt_event_engine_buf_mem_free(task->thread->engine, b);
407 
408     nxt_mp_release(r->mem_pool);
409 }
410 
411 
412 static void
413 nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data)
414 {
415     nxt_http_peer_t     *peer;
416     nxt_http_request_t  *r;
417 
418     r = obj;
419     peer = r->peer;
420 
421     nxt_http_proto[peer->protocol].peer_close(task, peer);
422 
423     nxt_mp_release(r->mem_pool);
424 
425     nxt_http_request_error(task, r, peer->status);
426 }
427 
428 
429 nxt_int_t
430 nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data)
431 {
432     nxt_http_request_t  *r;
433 
434     r = ctx;
435 
436     r->resp.date = field;
437 
438     return NXT_OK;
439 }
440 
441 
442 nxt_int_t
443 nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field,
444     uintptr_t data)
445 {
446     nxt_off_t           n;
447     nxt_http_request_t  *r;
448 
449     r = ctx;
450 
451     r->resp.content_length = field;
452 
453     n = nxt_off_t_parse(field->value, field->value_length);
454 
455     if (nxt_fast_path(n >= 0)) {
456         r->resp.content_length_n = n;
457     }
458 
459     return NXT_OK;
460 }
461 
462 
463 nxt_int_t
464 nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data)
465 {
466     field->skip = 1;
467 
468     return NXT_OK;
469 }
470