xref: /unit/src/nxt_http_proxy.c (revision 1924:96d090de7534)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_upstream.h>
10 
11 
12 struct nxt_upstream_proxy_s {
13     nxt_sockaddr_t  *sockaddr;
14     uint8_t         protocol;
15 };
16 
17 
18 static void nxt_http_proxy_server_get(nxt_task_t *task,
19     nxt_upstream_server_t *us);
20 static void nxt_http_proxy_upstream_ready(nxt_task_t *task,
21     nxt_upstream_server_t *us);
22 static void nxt_http_proxy_upstream_error(nxt_task_t *task,
23     nxt_upstream_server_t *us);
24 static nxt_http_action_t *nxt_http_proxy(nxt_task_t *task,
25     nxt_http_request_t *r, nxt_http_action_t *action);
26 static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
27 static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data);
28 static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data);
29 static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data);
30 static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj,
31     void *data);
32 static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data);
33 
34 
35 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state;
36 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state;
37 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state;
38 static const nxt_http_request_state_t  nxt_http_proxy_read_state;
39 
40 
41 static const nxt_upstream_server_proto_t  nxt_upstream_simple_proto = {
42     .get = nxt_http_proxy_server_get,
43 };
44 
45 
46 static const nxt_upstream_peer_state_t  nxt_upstream_proxy_state = {
47     .ready = nxt_http_proxy_upstream_ready,
48     .error = nxt_http_proxy_upstream_error,
49 };
50 
51 
52 nxt_int_t
nxt_http_proxy_init(nxt_mp_t * mp,nxt_http_action_t * action,nxt_http_action_conf_t * acf)53 nxt_http_proxy_init(nxt_mp_t *mp, nxt_http_action_t *action,
54     nxt_http_action_conf_t *acf)
55 {
56     nxt_str_t             name;
57     nxt_sockaddr_t        *sa;
58     nxt_upstream_t        *up;
59     nxt_upstream_proxy_t  *proxy;
60 
61     sa = NULL;
62     nxt_conf_get_string(acf->proxy, &name);
63 
64     if (nxt_str_start(&name, "http://", 7)) {
65         name.length -= 7;
66         name.start += 7;
67 
68         sa = nxt_sockaddr_parse(mp, &name);
69         if (nxt_slow_path(sa == NULL)) {
70             return NXT_ERROR;
71         }
72 
73         sa->type = SOCK_STREAM;
74     }
75 
76     if (sa != NULL) {
77         up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
78         if (nxt_slow_path(up == NULL)) {
79             return NXT_ERROR;
80         }
81 
82         up->name.length = sa->length;
83         up->name.start = nxt_sockaddr_start(sa);
84         up->proto = &nxt_upstream_simple_proto;
85 
86         proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t));
87         if (nxt_slow_path(proxy == NULL)) {
88             return NXT_ERROR;
89         }
90 
91         proxy->sockaddr = sa;
92         proxy->protocol = NXT_HTTP_PROTO_H1;
93         up->type.proxy = proxy;
94 
95         action->u.upstream = up;
96         action->handler = nxt_http_proxy;
97     }
98 
99     return NXT_OK;
100 }
101 
102 
103 static nxt_http_action_t *
nxt_http_proxy(nxt_task_t * task,nxt_http_request_t * r,nxt_http_action_t * action)104 nxt_http_proxy(nxt_task_t *task, nxt_http_request_t *r,
105     nxt_http_action_t *action)
106 {
107     nxt_upstream_t  *u;
108 
109     u = action->u.upstream;
110 
111     nxt_debug(task, "http proxy: \"%V\"", &u->name);
112 
113     return nxt_upstream_proxy_handler(task, r, u);
114 }
115 
116 
117 nxt_http_action_t *
nxt_upstream_proxy_handler(nxt_task_t * task,nxt_http_request_t * r,nxt_upstream_t * upstream)118 nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
119     nxt_upstream_t *upstream)
120 {
121     nxt_http_peer_t        *peer;
122     nxt_upstream_server_t  *us;
123 
124     us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t));
125     if (nxt_slow_path(us == NULL)) {
126         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
127         return NULL;
128     }
129 
130     peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t));
131     if (nxt_slow_path(peer == NULL)) {
132         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
133         return NULL;
134     }
135 
136     peer->request = r;
137     r->peer = peer;
138 
139     nxt_mp_retain(r->mem_pool);
140 
141     us->state = &nxt_upstream_proxy_state;
142     us->peer.http = peer;
143     peer->server = us;
144 
145     us->upstream = upstream;
146     upstream->proto->get(task, us);
147 
148     return NULL;
149 }
150 
151 
152 static void
nxt_http_proxy_server_get(nxt_task_t * task,nxt_upstream_server_t * us)153 nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
154 {
155     nxt_upstream_proxy_t  *proxy;
156 
157     proxy = us->upstream->type.proxy;
158 
159     us->sockaddr = proxy->sockaddr;
160     us->protocol = proxy->protocol;
161 
162     us->state->ready(task, us);
163 }
164 
165 
166 static void
nxt_http_proxy_upstream_ready(nxt_task_t * task,nxt_upstream_server_t * us)167 nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us)
168 {
169     nxt_http_peer_t  *peer;
170 
171     peer = us->peer.http;
172 
173     peer->protocol = us->protocol;
174 
175     peer->request->state = &nxt_http_proxy_header_send_state;
176 
177     nxt_http_proto[peer->protocol].peer_connect(task, peer);
178 }
179 
180 
181 static void
nxt_http_proxy_upstream_error(nxt_task_t * task,nxt_upstream_server_t * us)182 nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us)
183 {
184     nxt_http_request_t  *r;
185 
186     r = us->peer.http->request;
187 
188     nxt_mp_release(r->mem_pool);
189 
190     nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY);
191 }
192 
193 
194 static const nxt_http_request_state_t  nxt_http_proxy_header_send_state
195     nxt_aligned(64) =
196 {
197     .ready_handler = nxt_http_proxy_header_send,
198     .error_handler = nxt_http_proxy_error,
199 };
200 
201 
202 static void
nxt_http_proxy_header_send(nxt_task_t * task,void * obj,void * data)203 nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data)
204 {
205     nxt_http_peer_t     *peer;
206     nxt_http_request_t  *r;
207 
208     r = obj;
209     peer = data;
210     r->state = &nxt_http_proxy_header_sent_state;
211 
212     nxt_http_proto[peer->protocol].peer_header_send(task, peer);
213 }
214 
215 
216 static const nxt_http_request_state_t  nxt_http_proxy_header_sent_state
217     nxt_aligned(64) =
218 {
219     .ready_handler = nxt_http_proxy_header_sent,
220     .error_handler = nxt_http_proxy_error,
221 };
222 
223 
224 static void
nxt_http_proxy_header_sent(nxt_task_t * task,void * obj,void * data)225 nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data)
226 {
227     nxt_http_peer_t     *peer;
228     nxt_http_request_t  *r;
229 
230     r = obj;
231     peer = data;
232     r->state = &nxt_http_proxy_header_read_state;
233 
234     nxt_http_proto[peer->protocol].peer_header_read(task, peer);
235 }
236 
237 
238 static const nxt_http_request_state_t  nxt_http_proxy_header_read_state
239     nxt_aligned(64) =
240 {
241     .ready_handler = nxt_http_proxy_header_read,
242     .error_handler = nxt_http_proxy_error,
243 };
244 
245 
246 static void
nxt_http_proxy_header_read(nxt_task_t * task,void * obj,void * data)247 nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
248 {
249     nxt_http_peer_t     *peer;
250     nxt_http_field_t    *f, *field;
251     nxt_http_request_t  *r;
252 
253     r = obj;
254     peer = data;
255 
256     r->status = peer->status;
257 
258     nxt_debug(task, "http proxy status: %d", peer->status);
259 
260     nxt_list_each(field, peer->fields) {
261 
262         nxt_debug(task, "http proxy header: \"%*s: %*s\"",
263                   (size_t) field->name_length, field->name,
264                   (size_t) field->value_length, field->value);
265 
266         if (!field->skip) {
267             f = nxt_list_add(r->resp.fields);
268             if (nxt_slow_path(f == NULL)) {
269                 nxt_http_proxy_error(task, r, peer);
270                 return;
271             }
272 
273             *f = *field;
274         }
275 
276     } nxt_list_loop;
277 
278     r->state = &nxt_http_proxy_read_state;
279 
280     nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer);
281 }
282 
283 
284 static const nxt_http_request_state_t  nxt_http_proxy_read_state
285     nxt_aligned(64) =
286 {
287     .ready_handler = nxt_http_proxy_send_body,
288     .error_handler = nxt_http_proxy_error,
289 };
290 
291 
292 static void
nxt_http_proxy_send_body(nxt_task_t * task,void * obj,void * data)293 nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
294 {
295     nxt_buf_t           *out;
296     nxt_http_peer_t     *peer;
297     nxt_http_request_t  *r;
298 
299     r = obj;
300     peer = data;
301     out = peer->body;
302 
303     if (out != NULL) {
304         peer->body = NULL;
305         nxt_http_request_send(task, r, out);
306     }
307 
308     if (!peer->closed) {
309         nxt_http_proto[peer->protocol].peer_read(task, peer);
310 
311     } else {
312         nxt_http_proto[peer->protocol].peer_close(task, peer);
313 
314         nxt_mp_release(r->mem_pool);
315     }
316 }
317 
318 
319 nxt_buf_t *
nxt_http_proxy_buf_mem_alloc(nxt_task_t * task,nxt_http_request_t * r,size_t size)320 nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r,
321     size_t size)
322 {
323     nxt_buf_t  *b;
324 
325     b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
326     if (nxt_fast_path(b != NULL)) {
327         b->completion_handler = nxt_http_proxy_buf_mem_completion;
328         b->parent = r;
329         nxt_mp_retain(r->mem_pool);
330 
331     } else {
332         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
333     }
334 
335     return b;
336 }
337 
338 
339 static void
nxt_http_proxy_buf_mem_completion(nxt_task_t * task,void * obj,void * data)340 nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
341 {
342     nxt_buf_t           *b, *next;
343     nxt_http_peer_t     *peer;
344     nxt_http_request_t  *r;
345 
346     b = obj;
347     r = data;
348 
349     peer = r->peer;
350 
351     do {
352         next = b->next;
353 
354         nxt_http_proxy_buf_mem_free(task, r, b);
355 
356         b = next;
357     } while (b != NULL);
358 
359     if (!peer->closed) {
360         nxt_http_proto[peer->protocol].peer_read(task, peer);
361     }
362 }
363 
364 
365 void
nxt_http_proxy_buf_mem_free(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * b)366 nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r,
367     nxt_buf_t *b)
368 {
369     nxt_event_engine_buf_mem_free(task->thread->engine, b);
370 
371     nxt_mp_release(r->mem_pool);
372 }
373 
374 
375 static void
nxt_http_proxy_error(nxt_task_t * task,void * obj,void * data)376 nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data)
377 {
378     nxt_http_peer_t     *peer;
379     nxt_http_request_t  *r;
380 
381     r = obj;
382     peer = r->peer;
383 
384     nxt_http_proto[peer->protocol].peer_close(task, peer);
385 
386     nxt_mp_release(r->mem_pool);
387 
388     nxt_http_request_error(&r->task, r, peer->status);
389 }
390 
391 
392 nxt_int_t
nxt_http_proxy_date(void * ctx,nxt_http_field_t * field,uintptr_t data)393 nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data)
394 {
395     nxt_http_request_t  *r;
396 
397     r = ctx;
398 
399     r->resp.date = field;
400 
401     return NXT_OK;
402 }
403 
404 
405 nxt_int_t
nxt_http_proxy_content_length(void * ctx,nxt_http_field_t * field,uintptr_t data)406 nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field,
407     uintptr_t data)
408 {
409     nxt_off_t           n;
410     nxt_http_request_t  *r;
411 
412     r = ctx;
413 
414     r->resp.content_length = field;
415 
416     n = nxt_off_t_parse(field->value, field->value_length);
417 
418     if (nxt_fast_path(n >= 0)) {
419         r->resp.content_length_n = n;
420     }
421 
422     return NXT_OK;
423 }
424 
425 
426 nxt_int_t
nxt_http_proxy_skip(void * ctx,nxt_http_field_t * field,uintptr_t data)427 nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data)
428 {
429     field->skip = 1;
430 
431     return NXT_OK;
432 }
433