xref: /unit/src/nxt_http_source.c (revision 0)
1*0Sigor@sysoev.ru 
2*0Sigor@sysoev.ru /*
3*0Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
4*0Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
5*0Sigor@sysoev.ru  */
6*0Sigor@sysoev.ru 
7*0Sigor@sysoev.ru #include <nxt_main.h>
8*0Sigor@sysoev.ru 
9*0Sigor@sysoev.ru 
10*0Sigor@sysoev.ru typedef struct {
11*0Sigor@sysoev.ru     nxt_http_chunk_parse_t  parse;
12*0Sigor@sysoev.ru     nxt_source_hook_t       next;
13*0Sigor@sysoev.ru } nxt_http_source_chunk_t;
14*0Sigor@sysoev.ru 
15*0Sigor@sysoev.ru 
16*0Sigor@sysoev.ru static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs);
17*0Sigor@sysoev.ru 
18*0Sigor@sysoev.ru static void nxt_http_source_status_filter(nxt_thread_t *thr, void *obj,
19*0Sigor@sysoev.ru     void *data);
20*0Sigor@sysoev.ru static void nxt_http_source_header_filter(nxt_thread_t *thr, void *obj,
21*0Sigor@sysoev.ru     void *data);
22*0Sigor@sysoev.ru 
23*0Sigor@sysoev.ru static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs);
24*0Sigor@sysoev.ru static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us,
25*0Sigor@sysoev.ru     nxt_name_value_t *nv);
26*0Sigor@sysoev.ru static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
27*0Sigor@sysoev.ru     nxt_name_value_t *nv);
28*0Sigor@sysoev.ru 
29*0Sigor@sysoev.ru static void nxt_http_source_header_ready(nxt_http_source_t *hs,
30*0Sigor@sysoev.ru     nxt_buf_t *rest);
31*0Sigor@sysoev.ru static void nxt_http_source_chunk_filter(nxt_thread_t *thr, void *obj,
32*0Sigor@sysoev.ru     void *data);
33*0Sigor@sysoev.ru static void nxt_http_source_chunk_error(nxt_thread_t *thr, void *obj,
34*0Sigor@sysoev.ru     void *data);
35*0Sigor@sysoev.ru static void nxt_http_source_body_filter(nxt_thread_t *thr, void *obj,
36*0Sigor@sysoev.ru     void *data);
37*0Sigor@sysoev.ru 
38*0Sigor@sysoev.ru static void nxt_http_source_sync_buffer(nxt_thread_t *thr,
39*0Sigor@sysoev.ru     nxt_http_source_t *hs, nxt_buf_t *b);
40*0Sigor@sysoev.ru static void nxt_http_source_error(nxt_stream_source_t *stream);
41*0Sigor@sysoev.ru static void nxt_http_source_fail(nxt_http_source_t *hs);
42*0Sigor@sysoev.ru static void nxt_http_source_message(const char *msg, size_t len, u_char *p);
43*0Sigor@sysoev.ru 
44*0Sigor@sysoev.ru 
45*0Sigor@sysoev.ru void
46*0Sigor@sysoev.ru nxt_http_source_handler(nxt_upstream_source_t *us,
47*0Sigor@sysoev.ru     nxt_http_source_request_create_t request_create)
48*0Sigor@sysoev.ru {
49*0Sigor@sysoev.ru     nxt_http_source_t    *hs;
50*0Sigor@sysoev.ru     nxt_stream_source_t  *stream;
51*0Sigor@sysoev.ru 
52*0Sigor@sysoev.ru     hs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_http_source_t));
53*0Sigor@sysoev.ru     if (nxt_slow_path(hs == NULL)) {
54*0Sigor@sysoev.ru         goto fail;
55*0Sigor@sysoev.ru     }
56*0Sigor@sysoev.ru 
57*0Sigor@sysoev.ru     us->protocol_source = hs;
58*0Sigor@sysoev.ru 
59*0Sigor@sysoev.ru     hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
60*0Sigor@sysoev.ru                                          sizeof(nxt_name_value_t));
61*0Sigor@sysoev.ru     if (nxt_slow_path(hs->header_in.list == NULL)) {
62*0Sigor@sysoev.ru         goto fail;
63*0Sigor@sysoev.ru     }
64*0Sigor@sysoev.ru 
65*0Sigor@sysoev.ru     hs->header_in.hash = us->header_hash;
66*0Sigor@sysoev.ru     hs->upstream = us;
67*0Sigor@sysoev.ru     hs->request_create = request_create;
68*0Sigor@sysoev.ru 
69*0Sigor@sysoev.ru     stream = us->stream;
70*0Sigor@sysoev.ru 
71*0Sigor@sysoev.ru     if (stream == NULL) {
72*0Sigor@sysoev.ru         stream = nxt_mem_zalloc(us->buffers.mem_pool,
73*0Sigor@sysoev.ru                                 sizeof(nxt_stream_source_t));
74*0Sigor@sysoev.ru         if (nxt_slow_path(stream == NULL)) {
75*0Sigor@sysoev.ru             goto fail;
76*0Sigor@sysoev.ru         }
77*0Sigor@sysoev.ru 
78*0Sigor@sysoev.ru         us->stream = stream;
79*0Sigor@sysoev.ru         stream->upstream = us;
80*0Sigor@sysoev.ru 
81*0Sigor@sysoev.ru     } else {
82*0Sigor@sysoev.ru         nxt_memzero(stream, sizeof(nxt_stream_source_t));
83*0Sigor@sysoev.ru     }
84*0Sigor@sysoev.ru 
85*0Sigor@sysoev.ru     /*
86*0Sigor@sysoev.ru      * Create the HTTP source filter chain:
87*0Sigor@sysoev.ru      *   stream source | HTTP status line filter
88*0Sigor@sysoev.ru      */
89*0Sigor@sysoev.ru     stream->next = &hs->query;
90*0Sigor@sysoev.ru     stream->error_handler = nxt_http_source_error;
91*0Sigor@sysoev.ru 
92*0Sigor@sysoev.ru     hs->query.context = hs;
93*0Sigor@sysoev.ru     hs->query.filter = nxt_http_source_status_filter;
94*0Sigor@sysoev.ru 
95*0Sigor@sysoev.ru     hs->header_in.content_length = -1;
96*0Sigor@sysoev.ru 
97*0Sigor@sysoev.ru     stream->out = nxt_http_source_request_create(hs);
98*0Sigor@sysoev.ru 
99*0Sigor@sysoev.ru     if (nxt_fast_path(stream->out != NULL)) {
100*0Sigor@sysoev.ru         nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t));
101*0Sigor@sysoev.ru 
102*0Sigor@sysoev.ru         nxt_stream_source_connect(stream);
103*0Sigor@sysoev.ru         return;
104*0Sigor@sysoev.ru     }
105*0Sigor@sysoev.ru 
106*0Sigor@sysoev.ru fail:
107*0Sigor@sysoev.ru 
108*0Sigor@sysoev.ru     nxt_http_source_fail(hs);
109*0Sigor@sysoev.ru }
110*0Sigor@sysoev.ru 
111*0Sigor@sysoev.ru 
112*0Sigor@sysoev.ru nxt_inline u_char *
113*0Sigor@sysoev.ru nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len)
114*0Sigor@sysoev.ru {
115*0Sigor@sysoev.ru     u_char  *s;
116*0Sigor@sysoev.ru 
117*0Sigor@sysoev.ru     if (nxt_fast_path(len >= src->len)) {
118*0Sigor@sysoev.ru         len = src->len;
119*0Sigor@sysoev.ru         src->len = 0;
120*0Sigor@sysoev.ru 
121*0Sigor@sysoev.ru     } else {
122*0Sigor@sysoev.ru         src->len -= len;
123*0Sigor@sysoev.ru     }
124*0Sigor@sysoev.ru 
125*0Sigor@sysoev.ru     s = src->data;
126*0Sigor@sysoev.ru     src->data += len;
127*0Sigor@sysoev.ru 
128*0Sigor@sysoev.ru     return nxt_cpymem(p, s, len);
129*0Sigor@sysoev.ru }
130*0Sigor@sysoev.ru 
131*0Sigor@sysoev.ru 
132*0Sigor@sysoev.ru static nxt_buf_t *
133*0Sigor@sysoev.ru nxt_http_source_request_create(nxt_http_source_t *hs)
134*0Sigor@sysoev.ru {
135*0Sigor@sysoev.ru     nxt_int_t  ret;
136*0Sigor@sysoev.ru     nxt_buf_t  *b, *req, **prev;
137*0Sigor@sysoev.ru 
138*0Sigor@sysoev.ru     nxt_thread_log_debug("http source create request");
139*0Sigor@sysoev.ru 
140*0Sigor@sysoev.ru     prev = &req;
141*0Sigor@sysoev.ru 
142*0Sigor@sysoev.ru new_buffer:
143*0Sigor@sysoev.ru 
144*0Sigor@sysoev.ru     ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0);
145*0Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
146*0Sigor@sysoev.ru         return NULL;
147*0Sigor@sysoev.ru     }
148*0Sigor@sysoev.ru 
149*0Sigor@sysoev.ru     b = hs->upstream->buffers.current;
150*0Sigor@sysoev.ru     hs->upstream->buffers.current = NULL;
151*0Sigor@sysoev.ru 
152*0Sigor@sysoev.ru     *prev = b;
153*0Sigor@sysoev.ru     prev = &b->next;
154*0Sigor@sysoev.ru 
155*0Sigor@sysoev.ru     for ( ;; ) {
156*0Sigor@sysoev.ru         ret = hs->request_create(hs);
157*0Sigor@sysoev.ru 
158*0Sigor@sysoev.ru         if (nxt_fast_path(ret == NXT_OK)) {
159*0Sigor@sysoev.ru             b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy,
160*0Sigor@sysoev.ru                                                b->mem.end - b->mem.free);
161*0Sigor@sysoev.ru 
162*0Sigor@sysoev.ru             if (nxt_fast_path(hs->u.request.copy.len == 0)) {
163*0Sigor@sysoev.ru                 continue;
164*0Sigor@sysoev.ru             }
165*0Sigor@sysoev.ru 
166*0Sigor@sysoev.ru             nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
167*0Sigor@sysoev.ru                                  b->mem.pos);
168*0Sigor@sysoev.ru 
169*0Sigor@sysoev.ru             goto new_buffer;
170*0Sigor@sysoev.ru         }
171*0Sigor@sysoev.ru 
172*0Sigor@sysoev.ru         if (nxt_slow_path(ret == NXT_ERROR)) {
173*0Sigor@sysoev.ru             return NULL;
174*0Sigor@sysoev.ru         }
175*0Sigor@sysoev.ru 
176*0Sigor@sysoev.ru         /* ret == NXT_DONE */
177*0Sigor@sysoev.ru         break;
178*0Sigor@sysoev.ru     }
179*0Sigor@sysoev.ru 
180*0Sigor@sysoev.ru     nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
181*0Sigor@sysoev.ru 
182*0Sigor@sysoev.ru     return req;
183*0Sigor@sysoev.ru }
184*0Sigor@sysoev.ru 
185*0Sigor@sysoev.ru 
186*0Sigor@sysoev.ru static void
187*0Sigor@sysoev.ru nxt_http_source_status_filter(nxt_thread_t *thr, void *obj, void *data)
188*0Sigor@sysoev.ru {
189*0Sigor@sysoev.ru     nxt_int_t          ret;
190*0Sigor@sysoev.ru     nxt_buf_t          *b;
191*0Sigor@sysoev.ru     nxt_http_source_t  *hs;
192*0Sigor@sysoev.ru 
193*0Sigor@sysoev.ru     hs = obj;
194*0Sigor@sysoev.ru     b = data;
195*0Sigor@sysoev.ru 
196*0Sigor@sysoev.ru     /*
197*0Sigor@sysoev.ru      * No cycle over buffer chain is required since at
198*0Sigor@sysoev.ru      * start the stream source passes buffers one at a time.
199*0Sigor@sysoev.ru      */
200*0Sigor@sysoev.ru 
201*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "http source status filter");
202*0Sigor@sysoev.ru 
203*0Sigor@sysoev.ru     if (nxt_slow_path(nxt_buf_is_sync(b))) {
204*0Sigor@sysoev.ru         nxt_http_source_sync_buffer(thr, hs, b);
205*0Sigor@sysoev.ru         return;
206*0Sigor@sysoev.ru     }
207*0Sigor@sysoev.ru 
208*0Sigor@sysoev.ru     ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem);
209*0Sigor@sysoev.ru 
210*0Sigor@sysoev.ru     if (nxt_fast_path(ret == NXT_OK)) {
211*0Sigor@sysoev.ru         /*
212*0Sigor@sysoev.ru          * Change the HTTP source filter chain:
213*0Sigor@sysoev.ru          *    stream source | HTTP header filter
214*0Sigor@sysoev.ru          */
215*0Sigor@sysoev.ru         hs->query.filter = nxt_http_source_header_filter;
216*0Sigor@sysoev.ru 
217*0Sigor@sysoev.ru         nxt_log_debug(thr->log, "upstream status: \"%*s\"",
218*0Sigor@sysoev.ru                       hs->u.status_parse.end - b->mem.start, b->mem.start);
219*0Sigor@sysoev.ru 
220*0Sigor@sysoev.ru         hs->header_in.status = hs->u.status_parse.code;
221*0Sigor@sysoev.ru 
222*0Sigor@sysoev.ru         nxt_log_debug(thr->log, "upstream version:%d status:%uD \"%*s\"",
223*0Sigor@sysoev.ru                       hs->u.status_parse.http_version,
224*0Sigor@sysoev.ru                       hs->u.status_parse.code,
225*0Sigor@sysoev.ru                       hs->u.status_parse.end - hs->u.status_parse.start,
226*0Sigor@sysoev.ru                       hs->u.status_parse.start);
227*0Sigor@sysoev.ru 
228*0Sigor@sysoev.ru         nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t));
229*0Sigor@sysoev.ru         hs->u.header.mem_pool = hs->upstream->buffers.mem_pool;
230*0Sigor@sysoev.ru 
231*0Sigor@sysoev.ru         nxt_http_source_header_filter(thr, hs, b);
232*0Sigor@sysoev.ru         return;
233*0Sigor@sysoev.ru     }
234*0Sigor@sysoev.ru 
235*0Sigor@sysoev.ru     if (nxt_slow_path(ret == NXT_ERROR)) {
236*0Sigor@sysoev.ru         /* HTTP/0.9 response. */
237*0Sigor@sysoev.ru         hs->header_in.status = 200;
238*0Sigor@sysoev.ru         nxt_http_source_header_ready(hs, b);
239*0Sigor@sysoev.ru         return;
240*0Sigor@sysoev.ru     }
241*0Sigor@sysoev.ru 
242*0Sigor@sysoev.ru     /* ret == NXT_AGAIN */
243*0Sigor@sysoev.ru 
244*0Sigor@sysoev.ru     /*
245*0Sigor@sysoev.ru      * b->mem.pos is always equal to b->mem.end because b is a buffer
246*0Sigor@sysoev.ru      * which points to a response part read by the stream source.
247*0Sigor@sysoev.ru      * However, since the stream source is an immediate source of the
248*0Sigor@sysoev.ru      * status filter, b->parent is a buffer the stream source reads in.
249*0Sigor@sysoev.ru      */
250*0Sigor@sysoev.ru     if (b->parent->mem.pos == b->parent->mem.end) {
251*0Sigor@sysoev.ru         nxt_http_source_message("upstream sent too long status line: \"%*s\"",
252*0Sigor@sysoev.ru                                 b->mem.pos - b->mem.start, b->mem.start);
253*0Sigor@sysoev.ru 
254*0Sigor@sysoev.ru         nxt_http_source_fail(hs);
255*0Sigor@sysoev.ru     }
256*0Sigor@sysoev.ru }
257*0Sigor@sysoev.ru 
258*0Sigor@sysoev.ru 
259*0Sigor@sysoev.ru static void
260*0Sigor@sysoev.ru nxt_http_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
261*0Sigor@sysoev.ru {
262*0Sigor@sysoev.ru     nxt_int_t          ret;
263*0Sigor@sysoev.ru     nxt_buf_t          *b;
264*0Sigor@sysoev.ru     nxt_http_source_t  *hs;
265*0Sigor@sysoev.ru 
266*0Sigor@sysoev.ru     hs = obj;
267*0Sigor@sysoev.ru     b = data;
268*0Sigor@sysoev.ru 
269*0Sigor@sysoev.ru     /*
270*0Sigor@sysoev.ru      * No cycle over buffer chain is required since at
271*0Sigor@sysoev.ru      * start the stream source passes buffers one at a time.
272*0Sigor@sysoev.ru      */
273*0Sigor@sysoev.ru 
274*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "http source header filter");
275*0Sigor@sysoev.ru 
276*0Sigor@sysoev.ru     if (nxt_slow_path(nxt_buf_is_sync(b))) {
277*0Sigor@sysoev.ru         nxt_http_source_sync_buffer(thr, hs, b);
278*0Sigor@sysoev.ru         return;
279*0Sigor@sysoev.ru     }
280*0Sigor@sysoev.ru 
281*0Sigor@sysoev.ru     for ( ;; ) {
282*0Sigor@sysoev.ru         ret = nxt_http_split_header_parse(&hs->u.header, &b->mem);
283*0Sigor@sysoev.ru 
284*0Sigor@sysoev.ru         if (nxt_slow_path(ret != NXT_OK)) {
285*0Sigor@sysoev.ru             break;
286*0Sigor@sysoev.ru         }
287*0Sigor@sysoev.ru 
288*0Sigor@sysoev.ru         ret = nxt_http_source_header_line_process(hs);
289*0Sigor@sysoev.ru 
290*0Sigor@sysoev.ru         if (nxt_slow_path(ret != NXT_OK)) {
291*0Sigor@sysoev.ru             break;
292*0Sigor@sysoev.ru         }
293*0Sigor@sysoev.ru     }
294*0Sigor@sysoev.ru 
295*0Sigor@sysoev.ru     if (nxt_fast_path(ret == NXT_DONE)) {
296*0Sigor@sysoev.ru         nxt_log_debug(thr->log, "http source header done");
297*0Sigor@sysoev.ru         nxt_http_source_header_ready(hs, b);
298*0Sigor@sysoev.ru         return;
299*0Sigor@sysoev.ru     }
300*0Sigor@sysoev.ru 
301*0Sigor@sysoev.ru     if (nxt_fast_path(ret == NXT_AGAIN)) {
302*0Sigor@sysoev.ru         return;
303*0Sigor@sysoev.ru     }
304*0Sigor@sysoev.ru 
305*0Sigor@sysoev.ru     if (ret != NXT_ERROR) {
306*0Sigor@sysoev.ru         /* ret == NXT_DECLINED: "\r" is not followed by "\n" */
307*0Sigor@sysoev.ru         nxt_log_error(NXT_LOG_ERR, thr->log,
308*0Sigor@sysoev.ru                       "upstream sent invalid header line: \"%*s\\r...\"",
309*0Sigor@sysoev.ru                       hs->u.header.parse.header_end
310*0Sigor@sysoev.ru                           - hs->u.header.parse.header_name_start,
311*0Sigor@sysoev.ru                       hs->u.header.parse.header_name_start);
312*0Sigor@sysoev.ru     }
313*0Sigor@sysoev.ru 
314*0Sigor@sysoev.ru     /* ret == NXT_ERROR */
315*0Sigor@sysoev.ru 
316*0Sigor@sysoev.ru     nxt_http_source_fail(hs);
317*0Sigor@sysoev.ru }
318*0Sigor@sysoev.ru 
319*0Sigor@sysoev.ru 
320*0Sigor@sysoev.ru static nxt_int_t
321*0Sigor@sysoev.ru nxt_http_source_header_line_process(nxt_http_source_t *hs)
322*0Sigor@sysoev.ru {
323*0Sigor@sysoev.ru     size_t                     name_len;
324*0Sigor@sysoev.ru     nxt_name_value_t           *nv;
325*0Sigor@sysoev.ru     nxt_lvlhsh_query_t         lhq;
326*0Sigor@sysoev.ru     nxt_http_header_parse_t    *hp;
327*0Sigor@sysoev.ru     nxt_upstream_name_value_t  *unv;
328*0Sigor@sysoev.ru 
329*0Sigor@sysoev.ru     hp = &hs->u.header.parse;
330*0Sigor@sysoev.ru 
331*0Sigor@sysoev.ru     name_len = hp->header_name_end - hp->header_name_start;
332*0Sigor@sysoev.ru 
333*0Sigor@sysoev.ru     if (name_len > 255) {
334*0Sigor@sysoev.ru         nxt_http_source_message("upstream sent too long header field name: "
335*0Sigor@sysoev.ru                                 "\"%*s\"", name_len, hp->header_name_start);
336*0Sigor@sysoev.ru         return NXT_ERROR;
337*0Sigor@sysoev.ru     }
338*0Sigor@sysoev.ru 
339*0Sigor@sysoev.ru     nv = nxt_list_add(hs->header_in.list);
340*0Sigor@sysoev.ru     if (nxt_slow_path(nv == NULL)) {
341*0Sigor@sysoev.ru         return NXT_ERROR;
342*0Sigor@sysoev.ru     }
343*0Sigor@sysoev.ru 
344*0Sigor@sysoev.ru     nv->hash = hp->header_hash;
345*0Sigor@sysoev.ru     nv->skip = 0;
346*0Sigor@sysoev.ru     nv->name_len = name_len;
347*0Sigor@sysoev.ru     nv->name_start = hp->header_name_start;
348*0Sigor@sysoev.ru     nv->value_len = hp->header_end - hp->header_start;
349*0Sigor@sysoev.ru     nv->value_start = hp->header_start;
350*0Sigor@sysoev.ru 
351*0Sigor@sysoev.ru     nxt_thread_log_debug("upstream header: \"%*s: %*s\"",
352*0Sigor@sysoev.ru                          nv->name_len, nv->name_start,
353*0Sigor@sysoev.ru                          nv->value_len, nv->value_start);
354*0Sigor@sysoev.ru 
355*0Sigor@sysoev.ru     lhq.key_hash = nv->hash;
356*0Sigor@sysoev.ru     lhq.key.len = nv->name_len;
357*0Sigor@sysoev.ru     lhq.key.data = nv->name_start;
358*0Sigor@sysoev.ru     lhq.proto = &nxt_upstream_header_hash_proto;
359*0Sigor@sysoev.ru 
360*0Sigor@sysoev.ru     if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) {
361*0Sigor@sysoev.ru         unv = lhq.value;
362*0Sigor@sysoev.ru 
363*0Sigor@sysoev.ru         if (unv->handler(hs->upstream, nv) != NXT_OK) {
364*0Sigor@sysoev.ru             return NXT_ERROR;
365*0Sigor@sysoev.ru         }
366*0Sigor@sysoev.ru     }
367*0Sigor@sysoev.ru 
368*0Sigor@sysoev.ru     return NXT_OK;
369*0Sigor@sysoev.ru }
370*0Sigor@sysoev.ru 
371*0Sigor@sysoev.ru 
372*0Sigor@sysoev.ru static const nxt_upstream_name_value_t  nxt_http_source_headers[]
373*0Sigor@sysoev.ru     nxt_aligned(32) =
374*0Sigor@sysoev.ru {
375*0Sigor@sysoev.ru     { nxt_http_source_content_length,
376*0Sigor@sysoev.ru       nxt_upstream_name_value("content-length") },
377*0Sigor@sysoev.ru 
378*0Sigor@sysoev.ru     { nxt_http_source_transfer_encoding,
379*0Sigor@sysoev.ru       nxt_upstream_name_value("transfer-encoding") },
380*0Sigor@sysoev.ru };
381*0Sigor@sysoev.ru 
382*0Sigor@sysoev.ru 
383*0Sigor@sysoev.ru nxt_int_t
384*0Sigor@sysoev.ru nxt_http_source_hash_create(nxt_mem_pool_t *mp, nxt_lvlhsh_t *lh)
385*0Sigor@sysoev.ru {
386*0Sigor@sysoev.ru     return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers,
387*0Sigor@sysoev.ru                                         nxt_nitems(nxt_http_source_headers));
388*0Sigor@sysoev.ru }
389*0Sigor@sysoev.ru 
390*0Sigor@sysoev.ru 
391*0Sigor@sysoev.ru static nxt_int_t
392*0Sigor@sysoev.ru nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv)
393*0Sigor@sysoev.ru {
394*0Sigor@sysoev.ru     nxt_off_t          length;
395*0Sigor@sysoev.ru     nxt_http_source_t  *hs;
396*0Sigor@sysoev.ru 
397*0Sigor@sysoev.ru     length = nxt_off_t_parse(nv->value_start, nv->value_len);
398*0Sigor@sysoev.ru 
399*0Sigor@sysoev.ru     if (nxt_fast_path(length > 0)) {
400*0Sigor@sysoev.ru         hs = us->protocol_source;
401*0Sigor@sysoev.ru         hs->header_in.content_length = length;
402*0Sigor@sysoev.ru         return NXT_OK;
403*0Sigor@sysoev.ru     }
404*0Sigor@sysoev.ru 
405*0Sigor@sysoev.ru     return NXT_ERROR;
406*0Sigor@sysoev.ru }
407*0Sigor@sysoev.ru 
408*0Sigor@sysoev.ru 
409*0Sigor@sysoev.ru static nxt_int_t
410*0Sigor@sysoev.ru nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
411*0Sigor@sysoev.ru     nxt_name_value_t *nv)
412*0Sigor@sysoev.ru {
413*0Sigor@sysoev.ru     u_char             *end;
414*0Sigor@sysoev.ru     nxt_http_source_t  *hs;
415*0Sigor@sysoev.ru 
416*0Sigor@sysoev.ru     end = nv->value_start + nv->value_len;
417*0Sigor@sysoev.ru 
418*0Sigor@sysoev.ru     if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) {
419*0Sigor@sysoev.ru         hs = us->protocol_source;
420*0Sigor@sysoev.ru         hs->chunked = 1;
421*0Sigor@sysoev.ru     }
422*0Sigor@sysoev.ru 
423*0Sigor@sysoev.ru     return NXT_OK;
424*0Sigor@sysoev.ru }
425*0Sigor@sysoev.ru 
426*0Sigor@sysoev.ru 
427*0Sigor@sysoev.ru static void
428*0Sigor@sysoev.ru nxt_http_source_header_ready(nxt_http_source_t *hs, nxt_buf_t *rest)
429*0Sigor@sysoev.ru {
430*0Sigor@sysoev.ru     nxt_buf_t                *b;
431*0Sigor@sysoev.ru     nxt_upstream_source_t    *us;
432*0Sigor@sysoev.ru     nxt_http_source_chunk_t  *hsc;
433*0Sigor@sysoev.ru 
434*0Sigor@sysoev.ru     us = hs->upstream;
435*0Sigor@sysoev.ru 
436*0Sigor@sysoev.ru     /* Free buffers used for request header. */
437*0Sigor@sysoev.ru 
438*0Sigor@sysoev.ru     for (b = us->stream->out; b != NULL; b = b->next) {
439*0Sigor@sysoev.ru         nxt_buf_pool_free(&us->buffers, b);
440*0Sigor@sysoev.ru     }
441*0Sigor@sysoev.ru 
442*0Sigor@sysoev.ru     if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) {
443*0Sigor@sysoev.ru 
444*0Sigor@sysoev.ru         if (hs->chunked) {
445*0Sigor@sysoev.ru             hsc = nxt_mem_zalloc(hs->upstream->buffers.mem_pool,
446*0Sigor@sysoev.ru                                  sizeof(nxt_http_source_chunk_t));
447*0Sigor@sysoev.ru             if (nxt_slow_path(hsc == NULL)) {
448*0Sigor@sysoev.ru                 goto fail;
449*0Sigor@sysoev.ru             }
450*0Sigor@sysoev.ru 
451*0Sigor@sysoev.ru             /*
452*0Sigor@sysoev.ru              * Change the HTTP source filter chain:
453*0Sigor@sysoev.ru              *    stream source | chunk filter | HTTP body filter
454*0Sigor@sysoev.ru              */
455*0Sigor@sysoev.ru             hs->query.context = hsc;
456*0Sigor@sysoev.ru             hs->query.filter = nxt_http_source_chunk_filter;
457*0Sigor@sysoev.ru 
458*0Sigor@sysoev.ru             hsc->next.context = hs;
459*0Sigor@sysoev.ru             hsc->next.filter = nxt_http_source_body_filter;
460*0Sigor@sysoev.ru 
461*0Sigor@sysoev.ru             hsc->parse.mem_pool = hs->upstream->buffers.mem_pool;
462*0Sigor@sysoev.ru 
463*0Sigor@sysoev.ru             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
464*0Sigor@sysoev.ru                 hs->rest = nxt_http_chunk_parse(&hsc->parse, rest);
465*0Sigor@sysoev.ru 
466*0Sigor@sysoev.ru                 if (nxt_slow_path(hs->rest == NULL)) {
467*0Sigor@sysoev.ru                     goto fail;
468*0Sigor@sysoev.ru                 }
469*0Sigor@sysoev.ru             }
470*0Sigor@sysoev.ru 
471*0Sigor@sysoev.ru         } else {
472*0Sigor@sysoev.ru             /*
473*0Sigor@sysoev.ru              * Change the HTTP source filter chain:
474*0Sigor@sysoev.ru              *    stream source | HTTP body filter
475*0Sigor@sysoev.ru              */
476*0Sigor@sysoev.ru             hs->query.filter = nxt_http_source_body_filter;
477*0Sigor@sysoev.ru 
478*0Sigor@sysoev.ru             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
479*0Sigor@sysoev.ru                 hs->rest = rest;
480*0Sigor@sysoev.ru             }
481*0Sigor@sysoev.ru         }
482*0Sigor@sysoev.ru 
483*0Sigor@sysoev.ru         hs->upstream->state->ready_handler(hs);
484*0Sigor@sysoev.ru         return;
485*0Sigor@sysoev.ru     }
486*0Sigor@sysoev.ru 
487*0Sigor@sysoev.ru     nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each "
488*0Sigor@sysoev.ru                          "are not enough to read upstream response",
489*0Sigor@sysoev.ru                          us->buffers.max, us->buffers.size / 1024);
490*0Sigor@sysoev.ru fail:
491*0Sigor@sysoev.ru 
492*0Sigor@sysoev.ru     nxt_http_source_fail(hs);
493*0Sigor@sysoev.ru }
494*0Sigor@sysoev.ru 
495*0Sigor@sysoev.ru 
496*0Sigor@sysoev.ru static void
497*0Sigor@sysoev.ru nxt_http_source_chunk_filter(nxt_thread_t *thr, void *obj, void *data)
498*0Sigor@sysoev.ru {
499*0Sigor@sysoev.ru     nxt_buf_t                *b;
500*0Sigor@sysoev.ru     nxt_http_source_t        *hs;
501*0Sigor@sysoev.ru     nxt_http_source_chunk_t  *hsc;
502*0Sigor@sysoev.ru 
503*0Sigor@sysoev.ru     hsc = obj;
504*0Sigor@sysoev.ru     b = data;
505*0Sigor@sysoev.ru 
506*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "http source chunk filter");
507*0Sigor@sysoev.ru 
508*0Sigor@sysoev.ru     b = nxt_http_chunk_parse(&hsc->parse, b);
509*0Sigor@sysoev.ru 
510*0Sigor@sysoev.ru     hs = hsc->next.context;
511*0Sigor@sysoev.ru 
512*0Sigor@sysoev.ru     if (hsc->parse.error) {
513*0Sigor@sysoev.ru         nxt_http_source_fail(hs);
514*0Sigor@sysoev.ru         return;
515*0Sigor@sysoev.ru     }
516*0Sigor@sysoev.ru 
517*0Sigor@sysoev.ru     if (hsc->parse.chunk_error) {
518*0Sigor@sysoev.ru         /* Output all parsed before a chunk error and close upstream. */
519*0Sigor@sysoev.ru         nxt_thread_current_work_queue_add(thr, nxt_http_source_chunk_error,
520*0Sigor@sysoev.ru                                           hs, NULL, thr->log);
521*0Sigor@sysoev.ru     }
522*0Sigor@sysoev.ru 
523*0Sigor@sysoev.ru     if (b != NULL) {
524*0Sigor@sysoev.ru         nxt_source_filter(thr, hs->upstream->work_queue, &hsc->next, b);
525*0Sigor@sysoev.ru     }
526*0Sigor@sysoev.ru }
527*0Sigor@sysoev.ru 
528*0Sigor@sysoev.ru 
529*0Sigor@sysoev.ru static void
530*0Sigor@sysoev.ru nxt_http_source_chunk_error(nxt_thread_t *thr, void *obj, void *data)
531*0Sigor@sysoev.ru {
532*0Sigor@sysoev.ru     nxt_http_source_t  *hs;
533*0Sigor@sysoev.ru 
534*0Sigor@sysoev.ru     hs = obj;
535*0Sigor@sysoev.ru 
536*0Sigor@sysoev.ru     nxt_http_source_fail(hs);
537*0Sigor@sysoev.ru }
538*0Sigor@sysoev.ru 
539*0Sigor@sysoev.ru 
540*0Sigor@sysoev.ru /*
541*0Sigor@sysoev.ru  * The HTTP source body filter accumulates first body buffers before the next
542*0Sigor@sysoev.ru  * filter will be established and sets completion handler for the last buffer.
543*0Sigor@sysoev.ru  */
544*0Sigor@sysoev.ru 
545*0Sigor@sysoev.ru static void
546*0Sigor@sysoev.ru nxt_http_source_body_filter(nxt_thread_t *thr, void *obj, void *data)
547*0Sigor@sysoev.ru {
548*0Sigor@sysoev.ru     nxt_buf_t          *b, *in;
549*0Sigor@sysoev.ru     nxt_http_source_t  *hs;
550*0Sigor@sysoev.ru 
551*0Sigor@sysoev.ru     hs = obj;
552*0Sigor@sysoev.ru     in = data;
553*0Sigor@sysoev.ru 
554*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "http source body filter");
555*0Sigor@sysoev.ru 
556*0Sigor@sysoev.ru     for (b = in; b != NULL; b = b->next) {
557*0Sigor@sysoev.ru 
558*0Sigor@sysoev.ru         if (nxt_buf_is_last(b)) {
559*0Sigor@sysoev.ru             b->data = hs->upstream->data;
560*0Sigor@sysoev.ru             b->completion_handler = hs->upstream->state->completion_handler;
561*0Sigor@sysoev.ru         }
562*0Sigor@sysoev.ru     }
563*0Sigor@sysoev.ru 
564*0Sigor@sysoev.ru     if (hs->next != NULL) {
565*0Sigor@sysoev.ru         nxt_source_filter(thr, hs->upstream->work_queue, hs->next, in);
566*0Sigor@sysoev.ru         return;
567*0Sigor@sysoev.ru     }
568*0Sigor@sysoev.ru 
569*0Sigor@sysoev.ru     nxt_buf_chain_add(&hs->rest, in);
570*0Sigor@sysoev.ru }
571*0Sigor@sysoev.ru 
572*0Sigor@sysoev.ru 
573*0Sigor@sysoev.ru static void
574*0Sigor@sysoev.ru nxt_http_source_sync_buffer(nxt_thread_t *thr, nxt_http_source_t *hs,
575*0Sigor@sysoev.ru     nxt_buf_t *b)
576*0Sigor@sysoev.ru {
577*0Sigor@sysoev.ru     if (nxt_buf_is_last(b)) {
578*0Sigor@sysoev.ru         nxt_log_error(NXT_LOG_ERR, thr->log,
579*0Sigor@sysoev.ru                       "upstream closed prematurely connection");
580*0Sigor@sysoev.ru 
581*0Sigor@sysoev.ru     } else {
582*0Sigor@sysoev.ru         nxt_log_error(NXT_LOG_ERR, thr->log, "%ui buffers %uz each are not "
583*0Sigor@sysoev.ru                       "enough to process upstream response header",
584*0Sigor@sysoev.ru                       hs->upstream->buffers.max,
585*0Sigor@sysoev.ru                       hs->upstream->buffers.size);
586*0Sigor@sysoev.ru     }
587*0Sigor@sysoev.ru 
588*0Sigor@sysoev.ru     /* The stream source sends only the last and the nobuf sync buffer. */
589*0Sigor@sysoev.ru 
590*0Sigor@sysoev.ru     nxt_http_source_fail(hs);
591*0Sigor@sysoev.ru }
592*0Sigor@sysoev.ru 
593*0Sigor@sysoev.ru 
594*0Sigor@sysoev.ru static void
595*0Sigor@sysoev.ru nxt_http_source_error(nxt_stream_source_t *stream)
596*0Sigor@sysoev.ru {
597*0Sigor@sysoev.ru     nxt_http_source_t  *hs;
598*0Sigor@sysoev.ru 
599*0Sigor@sysoev.ru     nxt_thread_log_debug("http source error");
600*0Sigor@sysoev.ru 
601*0Sigor@sysoev.ru     hs = stream->next->context;
602*0Sigor@sysoev.ru     nxt_http_source_fail(hs);
603*0Sigor@sysoev.ru }
604*0Sigor@sysoev.ru 
605*0Sigor@sysoev.ru 
606*0Sigor@sysoev.ru static void
607*0Sigor@sysoev.ru nxt_http_source_fail(nxt_http_source_t *hs)
608*0Sigor@sysoev.ru {
609*0Sigor@sysoev.ru     nxt_thread_t  *thr;
610*0Sigor@sysoev.ru 
611*0Sigor@sysoev.ru     thr = nxt_thread();
612*0Sigor@sysoev.ru 
613*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "http source fail");
614*0Sigor@sysoev.ru 
615*0Sigor@sysoev.ru     /* TODO: fail, next upstream, or bad gateway */
616*0Sigor@sysoev.ru 
617*0Sigor@sysoev.ru     hs->upstream->state->error_handler(thr, hs, NULL);
618*0Sigor@sysoev.ru }
619*0Sigor@sysoev.ru 
620*0Sigor@sysoev.ru 
621*0Sigor@sysoev.ru static void
622*0Sigor@sysoev.ru nxt_http_source_message(const char *msg, size_t len, u_char *p)
623*0Sigor@sysoev.ru {
624*0Sigor@sysoev.ru     if (len > NXT_MAX_ERROR_STR - 300) {
625*0Sigor@sysoev.ru         len = NXT_MAX_ERROR_STR - 300;
626*0Sigor@sysoev.ru         p[len++] = '.'; p[len++] = '.'; p[len++] = '.';
627*0Sigor@sysoev.ru     }
628*0Sigor@sysoev.ru 
629*0Sigor@sysoev.ru     nxt_thread_log_error(NXT_LOG_ERR, msg, len, p);
630*0Sigor@sysoev.ru }
631