xref: /unit/src/nxt_http_proxy.c (revision 1505:d18f2b38596b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_upstream.h>
10 
11 
12 struct nxt_upstream_proxy_s {
13     nxt_sockaddr_t  *sockaddr;
14     uint8_t         protocol;
15 };
16 
17 
18 static void nxt_http_proxy_server_get(nxt_task_t *task,
19     nxt_upstream_server_t *us);
20 static void nxt_http_proxy_upstream_ready(nxt_task_t *task,
21     nxt_upstream_server_t *us);
22 static void nxt_http_proxy_upstream_error(nxt_task_t *task,
23     nxt_upstream_server_t *us);
24 static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task,
25     nxt_http_request_t *r, nxt_http_action_t *action);
26 static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
27 static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data);
28 static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data);
29 static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data);
30 static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data);
31 static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj,
32     void *data);
33 static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data);
34 
35 
36 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state;
37 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state;
38 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state;
39 static const nxt_http_request_state_t  nxt_http_proxy_read_state;
40 
41 
42 static const nxt_upstream_server_proto_t  nxt_upstream_simple_proto = {
43     .get = nxt_http_proxy_server_get,
44 };
45 
46 
47 static const nxt_upstream_peer_state_t  nxt_upstream_proxy_state = {
48     .ready = nxt_http_proxy_upstream_ready,
49     .error = nxt_http_proxy_upstream_error,
50 };
51 
52 
53 nxt_int_t
54 nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
55 {
56     nxt_str_t             name;
57     nxt_sockaddr_t        *sa;
58     nxt_upstream_t        *up;
59     nxt_upstream_proxy_t  *proxy;
60 
61     sa = NULL;
62     name = action->name;
63 
64     if (nxt_str_start(&name, "http://", 7)) {
65         name.length -= 7;
66         name.start += 7;
67 
68         sa = nxt_sockaddr_parse(mp, &name);
69         if (nxt_slow_path(sa == NULL)) {
70             return NXT_ERROR;
71         }
72 
73         sa->type = SOCK_STREAM;
74     }
75 
76     if (sa != NULL) {
77         up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
78         if (nxt_slow_path(up == NULL)) {
79             return NXT_ERROR;
80         }
81 
82         up->name.length = sa->length;
83         up->name.start = nxt_sockaddr_start(sa);
84         up->proto = &nxt_upstream_simple_proto;
85 
86         proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t));
87         if (nxt_slow_path(proxy == NULL)) {
88             return NXT_ERROR;
89         }
90 
91         proxy->sockaddr = sa;
92         proxy->protocol = NXT_HTTP_PROTO_H1;
93         up->type.proxy = proxy;
94 
95         action->u.upstream = up;
96         action->handler = nxt_http_proxy_handler;
97     }
98 
99     return NXT_OK;
100 }
101 
102 
103 static nxt_http_action_t *
104 nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
105     nxt_http_action_t *action)
106 {
107     return nxt_upstream_proxy_handler(task, r, action->u.upstream);
108 }
109 
110 
111 nxt_http_action_t *
112 nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
113     nxt_upstream_t *upstream)
114 {
115     nxt_http_peer_t        *peer;
116     nxt_upstream_server_t  *us;
117 
118     us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t));
119     if (nxt_slow_path(us == NULL)) {
120         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
121         return NULL;
122     }
123 
124     peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t));
125     if (nxt_slow_path(peer == NULL)) {
126         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
127         return NULL;
128     }
129 
130     peer->request = r;
131     r->peer = peer;
132 
133     nxt_mp_retain(r->mem_pool);
134 
135     us->state = &nxt_upstream_proxy_state;
136     us->peer.http = peer;
137     peer->server = us;
138 
139     us->upstream = upstream;
140     upstream->proto->get(task, us);
141 
142     return NULL;
143 }
144 
145 
146 static void
147 nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
148 {
149     nxt_upstream_proxy_t  *proxy;
150 
151     proxy = us->upstream->type.proxy;
152 
153     us->sockaddr = proxy->sockaddr;
154     us->protocol = proxy->protocol;
155 
156     us->state->ready(task, us);
157 }
158 
159 
160 static void
161 nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us)
162 {
163     nxt_http_peer_t  *peer;
164 
165     peer = us->peer.http;
166 
167     peer->protocol = us->protocol;
168 
169     peer->request->state = &nxt_http_proxy_header_send_state;
170 
171     nxt_http_proto[peer->protocol].peer_connect(task, peer);
172 }
173 
174 
175 static void
176 nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us)
177 {
178     nxt_http_request_t  *r;
179 
180     r = us->peer.http->request;
181 
182     nxt_mp_release(r->mem_pool);
183 
184     nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY);
185 }
186 
187 
188 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state
189     nxt_aligned(64) =
190 {
191     .ready_handler = nxt_http_proxy_header_send,
192     .error_handler = nxt_http_proxy_error,
193 };
194 
195 
196 static void
197 nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data)
198 {
199     nxt_http_peer_t     *peer;
200     nxt_http_request_t  *r;
201 
202     r = obj;
203     peer = data;
204     r->state = &nxt_http_proxy_header_sent_state;
205 
206     nxt_http_proto[peer->protocol].peer_header_send(task, peer);
207 }
208 
209 
210 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state
211     nxt_aligned(64) =
212 {
213     .ready_handler = nxt_http_proxy_header_sent,
214     .error_handler = nxt_http_proxy_error,
215 };
216 
217 
218 static void
219 nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data)
220 {
221     nxt_http_peer_t     *peer;
222     nxt_http_request_t  *r;
223 
224     r = obj;
225     peer = data;
226     r->state = &nxt_http_proxy_header_read_state;
227 
228     nxt_http_proto[peer->protocol].peer_header_read(task, peer);
229 }
230 
231 
232 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state
233     nxt_aligned(64) =
234 {
235     .ready_handler = nxt_http_proxy_header_read,
236     .error_handler = nxt_http_proxy_error,
237 };
238 
239 
240 static void
241 nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
242 {
243     nxt_http_peer_t     *peer;
244     nxt_http_field_t    *f, *field;
245     nxt_http_request_t  *r;
246 
247     r = obj;
248     peer = data;
249 
250     r->status = peer->status;
251 
252     nxt_debug(task, "http proxy status: %d", peer->status);
253 
254     nxt_list_each(field, peer->fields) {
255 
256         nxt_debug(task, "http proxy header: \"%*s: %*s\"",
257                   (size_t) field->name_length, field->name,
258                   (size_t) field->value_length, field->value);
259 
260         if (!field->skip) {
261             f = nxt_list_add(r->resp.fields);
262             if (nxt_slow_path(f == NULL)) {
263                 nxt_http_proxy_error(task, r, peer);
264                 return;
265             }
266 
267             *f = *field;
268         }
269 
270     } nxt_list_loop;
271 
272     r->state = &nxt_http_proxy_read_state;
273 
274     nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer);
275 }
276 
277 
278 static void
279 nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
280 {
281     nxt_buf_t           *out;
282     nxt_http_peer_t     *peer;
283     nxt_http_request_t  *r;
284 
285     r = obj;
286     peer = data;
287     out = peer->body;
288 
289     if (out != NULL) {
290         peer->body = NULL;
291         nxt_http_request_send(task, r, out);
292 
293     }
294 
295     if (!peer->closed) {
296         nxt_http_proto[peer->protocol].peer_read(task, peer);
297     }
298 }
299 
300 
301 static const nxt_http_request_state_t  nxt_http_proxy_read_state
302     nxt_aligned(64) =
303 {
304     .ready_handler = nxt_http_proxy_read,
305     .error_handler = nxt_http_proxy_error,
306 };
307 
308 
309 static void
310 nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
311 {
312     nxt_buf_t           *out;
313     nxt_http_peer_t     *peer;
314     nxt_http_request_t  *r;
315 
316     r = obj;
317     peer = data;
318     out = peer->body;
319     peer->body = NULL;
320 
321     if (out != NULL) {
322         nxt_http_request_send(task, r, out);
323     }
324 
325     if (!peer->closed) {
326         nxt_http_proto[peer->protocol].peer_read(task, peer);
327 
328     } else {
329         nxt_http_proto[peer->protocol].peer_close(task, peer);
330 
331         nxt_mp_release(r->mem_pool);
332     }
333 }
334 
335 
336 nxt_buf_t *
337 nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r,
338     size_t size)
339 {
340     nxt_buf_t  *b;
341 
342     b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
343     if (nxt_fast_path(b != NULL)) {
344         b->completion_handler = nxt_http_proxy_buf_mem_completion;
345         b->parent = r;
346         nxt_mp_retain(r->mem_pool);
347 
348     } else {
349         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
350     }
351 
352     return b;
353 }
354 
355 
356 static void
357 nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
358 {
359     nxt_buf_t           *b, *next;
360     nxt_http_peer_t     *peer;
361     nxt_http_request_t  *r;
362 
363     b = obj;
364     r = data;
365 
366     peer = r->peer;
367 
368     do {
369         next = b->next;
370 
371         nxt_http_proxy_buf_mem_free(task, r, b);
372 
373         b = next;
374     } while (b != NULL);
375 
376     if (!peer->closed) {
377         nxt_http_proto[peer->protocol].peer_read(task, peer);
378     }
379 }
380 
381 
382 void
383 nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r,
384     nxt_buf_t *b)
385 {
386     nxt_event_engine_buf_mem_free(task->thread->engine, b);
387 
388     nxt_mp_release(r->mem_pool);
389 }
390 
391 
392 static void
393 nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data)
394 {
395     nxt_http_peer_t     *peer;
396     nxt_http_request_t  *r;
397 
398     r = obj;
399     peer = r->peer;
400 
401     nxt_http_proto[peer->protocol].peer_close(task, peer);
402 
403     nxt_mp_release(r->mem_pool);
404 
405     nxt_http_request_error(&r->task, r, peer->status);
406 }
407 
408 
409 nxt_int_t
410 nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data)
411 {
412     nxt_http_request_t  *r;
413 
414     r = ctx;
415 
416     r->resp.date = field;
417 
418     return NXT_OK;
419 }
420 
421 
422 nxt_int_t
423 nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field,
424     uintptr_t data)
425 {
426     nxt_off_t           n;
427     nxt_http_request_t  *r;
428 
429     r = ctx;
430 
431     r->resp.content_length = field;
432 
433     n = nxt_off_t_parse(field->value, field->value_length);
434 
435     if (nxt_fast_path(n >= 0)) {
436         r->resp.content_length_n = n;
437     }
438 
439     return NXT_OK;
440 }
441 
442 
443 nxt_int_t
444 nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data)
445 {
446     field->skip = 1;
447 
448     return NXT_OK;
449 }
450