xref: /unit/src/nxt_http_source.c (revision 1:fdc027c56872)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 typedef struct {
11     nxt_http_chunk_parse_t  parse;
12     nxt_source_hook_t       next;
13 } nxt_http_source_chunk_t;
14 
15 
16 static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs);
17 
18 static void nxt_http_source_status_filter(nxt_task_t *task, void *obj,
19     void *data);
20 static void nxt_http_source_header_filter(nxt_task_t *task, void *obj,
21     void *data);
22 
23 static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs);
24 static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us,
25     nxt_name_value_t *nv);
26 static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
27     nxt_name_value_t *nv);
28 
29 static void nxt_http_source_header_ready(nxt_task_t *task,
30     nxt_http_source_t *hs, nxt_buf_t *rest);
31 static void nxt_http_source_chunk_filter(nxt_task_t *task, void *obj,
32     void *data);
33 static void nxt_http_source_chunk_error(nxt_task_t *task, void *obj,
34     void *data);
35 static void nxt_http_source_body_filter(nxt_task_t *task, void *obj,
36     void *data);
37 
38 static void nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs,
39     nxt_buf_t *b);
40 static void nxt_http_source_error(nxt_task_t *task,
41     nxt_stream_source_t *stream);
42 static void nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs);
43 static void nxt_http_source_message(const char *msg, size_t len, u_char *p);
44 
45 
46 void
47 nxt_http_source_handler(nxt_task_t *task, nxt_upstream_source_t *us,
48     nxt_http_source_request_create_t request_create)
49 {
50     nxt_http_source_t    *hs;
51     nxt_stream_source_t  *stream;
52 
53     hs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_http_source_t));
54     if (nxt_slow_path(hs == NULL)) {
55         goto fail;
56     }
57 
58     us->protocol_source = hs;
59 
60     hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
61                                          sizeof(nxt_name_value_t));
62     if (nxt_slow_path(hs->header_in.list == NULL)) {
63         goto fail;
64     }
65 
66     hs->header_in.hash = us->header_hash;
67     hs->upstream = us;
68     hs->request_create = request_create;
69 
70     stream = us->stream;
71 
72     if (stream == NULL) {
73         stream = nxt_mem_zalloc(us->buffers.mem_pool,
74                                 sizeof(nxt_stream_source_t));
75         if (nxt_slow_path(stream == NULL)) {
76             goto fail;
77         }
78 
79         us->stream = stream;
80         stream->upstream = us;
81 
82     } else {
83         nxt_memzero(stream, sizeof(nxt_stream_source_t));
84     }
85 
86     /*
87      * Create the HTTP source filter chain:
88      *   stream source | HTTP status line filter
89      */
90     stream->next = &hs->query;
91     stream->error_handler = nxt_http_source_error;
92 
93     hs->query.context = hs;
94     hs->query.filter = nxt_http_source_status_filter;
95 
96     hs->header_in.content_length = -1;
97 
98     stream->out = nxt_http_source_request_create(hs);
99 
100     if (nxt_fast_path(stream->out != NULL)) {
101         nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t));
102 
103         nxt_stream_source_connect(task, stream);
104         return;
105     }
106 
107 fail:
108 
109     nxt_http_source_fail(task, hs);
110 }
111 
112 
113 nxt_inline u_char *
114 nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len)
115 {
116     u_char  *s;
117 
118     if (nxt_fast_path(len >= src->len)) {
119         len = src->len;
120         src->len = 0;
121 
122     } else {
123         src->len -= len;
124     }
125 
126     s = src->data;
127     src->data += len;
128 
129     return nxt_cpymem(p, s, len);
130 }
131 
132 
133 static nxt_buf_t *
134 nxt_http_source_request_create(nxt_http_source_t *hs)
135 {
136     nxt_int_t  ret;
137     nxt_buf_t  *b, *req, **prev;
138 
139     nxt_thread_log_debug("http source create request");
140 
141     prev = &req;
142 
143 new_buffer:
144 
145     ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0);
146     if (nxt_slow_path(ret != NXT_OK)) {
147         return NULL;
148     }
149 
150     b = hs->upstream->buffers.current;
151     hs->upstream->buffers.current = NULL;
152 
153     *prev = b;
154     prev = &b->next;
155 
156     for ( ;; ) {
157         ret = hs->request_create(hs);
158 
159         if (nxt_fast_path(ret == NXT_OK)) {
160             b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy,
161                                                b->mem.end - b->mem.free);
162 
163             if (nxt_fast_path(hs->u.request.copy.len == 0)) {
164                 continue;
165             }
166 
167             nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
168                                  b->mem.pos);
169 
170             goto new_buffer;
171         }
172 
173         if (nxt_slow_path(ret == NXT_ERROR)) {
174             return NULL;
175         }
176 
177         /* ret == NXT_DONE */
178         break;
179     }
180 
181     nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
182 
183     return req;
184 }
185 
186 
187 static void
188 nxt_http_source_status_filter(nxt_task_t *task, void *obj, void *data)
189 {
190     nxt_int_t          ret;
191     nxt_buf_t          *b;
192     nxt_http_source_t  *hs;
193 
194     hs = obj;
195     b = data;
196 
197     /*
198      * No cycle over buffer chain is required since at
199      * start the stream source passes buffers one at a time.
200      */
201 
202     nxt_debug(task, "http source status filter");
203 
204     if (nxt_slow_path(nxt_buf_is_sync(b))) {
205         nxt_http_source_sync_buffer(task, hs, b);
206         return;
207     }
208 
209     ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem);
210 
211     if (nxt_fast_path(ret == NXT_OK)) {
212         /*
213          * Change the HTTP source filter chain:
214          *    stream source | HTTP header filter
215          */
216         hs->query.filter = nxt_http_source_header_filter;
217 
218         nxt_debug(task, "upstream status: \"%*s\"",
219                   hs->u.status_parse.end - b->mem.start, b->mem.start);
220 
221         hs->header_in.status = hs->u.status_parse.code;
222 
223         nxt_debug(task, "upstream version:%d status:%uD \"%*s\"",
224                   hs->u.status_parse.http_version,
225                   hs->u.status_parse.code,
226                   hs->u.status_parse.end - hs->u.status_parse.start,
227                   hs->u.status_parse.start);
228 
229         nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t));
230         hs->u.header.mem_pool = hs->upstream->buffers.mem_pool;
231 
232         nxt_http_source_header_filter(task, hs, b);
233         return;
234     }
235 
236     if (nxt_slow_path(ret == NXT_ERROR)) {
237         /* HTTP/0.9 response. */
238         hs->header_in.status = 200;
239         nxt_http_source_header_ready(task, hs, b);
240         return;
241     }
242 
243     /* ret == NXT_AGAIN */
244 
245     /*
246      * b->mem.pos is always equal to b->mem.end because b is a buffer
247      * which points to a response part read by the stream source.
248      * However, since the stream source is an immediate source of the
249      * status filter, b->parent is a buffer the stream source reads in.
250      */
251     if (b->parent->mem.pos == b->parent->mem.end) {
252         nxt_http_source_message("upstream sent too long status line: \"%*s\"",
253                                 b->mem.pos - b->mem.start, b->mem.start);
254 
255         nxt_http_source_fail(task, hs);
256     }
257 }
258 
259 
260 static void
261 nxt_http_source_header_filter(nxt_task_t *task, void *obj, void *data)
262 {
263     nxt_int_t          ret;
264     nxt_buf_t          *b;
265     nxt_http_source_t  *hs;
266 
267     hs = obj;
268     b = data;
269 
270     /*
271      * No cycle over buffer chain is required since at
272      * start the stream source passes buffers one at a time.
273      */
274 
275     nxt_debug(task, "http source header filter");
276 
277     if (nxt_slow_path(nxt_buf_is_sync(b))) {
278         nxt_http_source_sync_buffer(task, hs, b);
279         return;
280     }
281 
282     for ( ;; ) {
283         ret = nxt_http_split_header_parse(&hs->u.header, &b->mem);
284 
285         if (nxt_slow_path(ret != NXT_OK)) {
286             break;
287         }
288 
289         ret = nxt_http_source_header_line_process(hs);
290 
291         if (nxt_slow_path(ret != NXT_OK)) {
292             break;
293         }
294     }
295 
296     if (nxt_fast_path(ret == NXT_DONE)) {
297         nxt_debug(task, "http source header done");
298         nxt_http_source_header_ready(task, hs, b);
299         return;
300     }
301 
302     if (nxt_fast_path(ret == NXT_AGAIN)) {
303         return;
304     }
305 
306     if (ret != NXT_ERROR) {
307         /* ret == NXT_DECLINED: "\r" is not followed by "\n" */
308         nxt_log(task, NXT_LOG_ERR,
309                 "upstream sent invalid header line: \"%*s\\r...\"",
310                 hs->u.header.parse.header_end
311                     - hs->u.header.parse.header_name_start,
312                 hs->u.header.parse.header_name_start);
313     }
314 
315     /* ret == NXT_ERROR */
316 
317     nxt_http_source_fail(task, hs);
318 }
319 
320 
321 static nxt_int_t
322 nxt_http_source_header_line_process(nxt_http_source_t *hs)
323 {
324     size_t                     name_len;
325     nxt_name_value_t           *nv;
326     nxt_lvlhsh_query_t         lhq;
327     nxt_http_header_parse_t    *hp;
328     nxt_upstream_name_value_t  *unv;
329 
330     hp = &hs->u.header.parse;
331 
332     name_len = hp->header_name_end - hp->header_name_start;
333 
334     if (name_len > 255) {
335         nxt_http_source_message("upstream sent too long header field name: "
336                                 "\"%*s\"", name_len, hp->header_name_start);
337         return NXT_ERROR;
338     }
339 
340     nv = nxt_list_add(hs->header_in.list);
341     if (nxt_slow_path(nv == NULL)) {
342         return NXT_ERROR;
343     }
344 
345     nv->hash = hp->header_hash;
346     nv->skip = 0;
347     nv->name_len = name_len;
348     nv->name_start = hp->header_name_start;
349     nv->value_len = hp->header_end - hp->header_start;
350     nv->value_start = hp->header_start;
351 
352     nxt_thread_log_debug("upstream header: \"%*s: %*s\"",
353                          nv->name_len, nv->name_start,
354                          nv->value_len, nv->value_start);
355 
356     lhq.key_hash = nv->hash;
357     lhq.key.len = nv->name_len;
358     lhq.key.data = nv->name_start;
359     lhq.proto = &nxt_upstream_header_hash_proto;
360 
361     if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) {
362         unv = lhq.value;
363 
364         if (unv->handler(hs->upstream, nv) != NXT_OK) {
365             return NXT_ERROR;
366         }
367     }
368 
369     return NXT_OK;
370 }
371 
372 
373 static const nxt_upstream_name_value_t  nxt_http_source_headers[]
374     nxt_aligned(32) =
375 {
376     { nxt_http_source_content_length,
377       nxt_upstream_name_value("content-length") },
378 
379     { nxt_http_source_transfer_encoding,
380       nxt_upstream_name_value("transfer-encoding") },
381 };
382 
383 
384 nxt_int_t
385 nxt_http_source_hash_create(nxt_mem_pool_t *mp, nxt_lvlhsh_t *lh)
386 {
387     return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers,
388                                         nxt_nitems(nxt_http_source_headers));
389 }
390 
391 
392 static nxt_int_t
393 nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv)
394 {
395     nxt_off_t          length;
396     nxt_http_source_t  *hs;
397 
398     length = nxt_off_t_parse(nv->value_start, nv->value_len);
399 
400     if (nxt_fast_path(length > 0)) {
401         hs = us->protocol_source;
402         hs->header_in.content_length = length;
403         return NXT_OK;
404     }
405 
406     return NXT_ERROR;
407 }
408 
409 
410 static nxt_int_t
411 nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
412     nxt_name_value_t *nv)
413 {
414     u_char             *end;
415     nxt_http_source_t  *hs;
416 
417     end = nv->value_start + nv->value_len;
418 
419     if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) {
420         hs = us->protocol_source;
421         hs->chunked = 1;
422     }
423 
424     return NXT_OK;
425 }
426 
427 
428 static void
429 nxt_http_source_header_ready(nxt_task_t *task, nxt_http_source_t *hs,
430     nxt_buf_t *rest)
431 {
432     nxt_buf_t                *b;
433     nxt_upstream_source_t    *us;
434     nxt_http_source_chunk_t  *hsc;
435 
436     us = hs->upstream;
437 
438     /* Free buffers used for request header. */
439 
440     for (b = us->stream->out; b != NULL; b = b->next) {
441         nxt_buf_pool_free(&us->buffers, b);
442     }
443 
444     if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) {
445 
446         if (hs->chunked) {
447             hsc = nxt_mem_zalloc(hs->upstream->buffers.mem_pool,
448                                  sizeof(nxt_http_source_chunk_t));
449             if (nxt_slow_path(hsc == NULL)) {
450                 goto fail;
451             }
452 
453             /*
454              * Change the HTTP source filter chain:
455              *    stream source | chunk filter | HTTP body filter
456              */
457             hs->query.context = hsc;
458             hs->query.filter = nxt_http_source_chunk_filter;
459 
460             hsc->next.context = hs;
461             hsc->next.filter = nxt_http_source_body_filter;
462 
463             hsc->parse.mem_pool = hs->upstream->buffers.mem_pool;
464 
465             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
466                 hs->rest = nxt_http_chunk_parse(task, &hsc->parse, rest);
467 
468                 if (nxt_slow_path(hs->rest == NULL)) {
469                     goto fail;
470                 }
471             }
472 
473         } else {
474             /*
475              * Change the HTTP source filter chain:
476              *    stream source | HTTP body filter
477              */
478             hs->query.filter = nxt_http_source_body_filter;
479 
480             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
481                 hs->rest = rest;
482             }
483         }
484 
485         hs->upstream->state->ready_handler(hs);
486         return;
487     }
488 
489     nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each "
490                          "are not enough to read upstream response",
491                          us->buffers.max, us->buffers.size / 1024);
492 fail:
493 
494     nxt_http_source_fail(task, hs);
495 }
496 
497 
498 static void
499 nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, void *data)
500 {
501     nxt_buf_t                *b;
502     nxt_http_source_t        *hs;
503     nxt_http_source_chunk_t  *hsc;
504 
505     hsc = obj;
506     b = data;
507 
508     nxt_debug(task, "http source chunk filter");
509 
510     b = nxt_http_chunk_parse(task, &hsc->parse, b);
511 
512     hs = hsc->next.context;
513 
514     if (hsc->parse.error) {
515         nxt_http_source_fail(task, hs);
516         return;
517     }
518 
519     if (hsc->parse.chunk_error) {
520         /* Output all parsed before a chunk error and close upstream. */
521         nxt_thread_current_work_queue_add(task->thread,
522                                           nxt_http_source_chunk_error,
523                                           task, hs, NULL);
524     }
525 
526     if (b != NULL) {
527         nxt_source_filter(task->thread, hs->upstream->work_queue, task,
528                           &hsc->next, b);
529     }
530 }
531 
532 
533 static void
534 nxt_http_source_chunk_error(nxt_task_t *task, void *obj, void *data)
535 {
536     nxt_http_source_t  *hs;
537 
538     hs = obj;
539 
540     nxt_http_source_fail(task, hs);
541 }
542 
543 
544 /*
545  * The HTTP source body filter accumulates first body buffers before the next
546  * filter will be established and sets completion handler for the last buffer.
547  */
548 
549 static void
550 nxt_http_source_body_filter(nxt_task_t *task, void *obj, void *data)
551 {
552     nxt_buf_t          *b, *in;
553     nxt_http_source_t  *hs;
554 
555     hs = obj;
556     in = data;
557 
558     nxt_debug(task, "http source body filter");
559 
560     for (b = in; b != NULL; b = b->next) {
561 
562         if (nxt_buf_is_last(b)) {
563             b->data = hs->upstream->data;
564             b->completion_handler = hs->upstream->state->completion_handler;
565         }
566     }
567 
568     if (hs->next != NULL) {
569         nxt_source_filter(task->thread, hs->upstream->work_queue, task,
570                           hs->next, in);
571         return;
572     }
573 
574     nxt_buf_chain_add(&hs->rest, in);
575 }
576 
577 
578 static void
579 nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs,
580     nxt_buf_t *b)
581 {
582     if (nxt_buf_is_last(b)) {
583         nxt_log(task, NXT_LOG_ERR,
584                 "upstream closed prematurely connection");
585 
586     } else {
587         nxt_log(task, NXT_LOG_ERR,"%ui buffers %uz each are not "
588                 "enough to process upstream response header",
589                 hs->upstream->buffers.max, hs->upstream->buffers.size);
590     }
591 
592     /* The stream source sends only the last and the nobuf sync buffer. */
593 
594     nxt_http_source_fail(task, hs);
595 }
596 
597 
598 static void
599 nxt_http_source_error(nxt_task_t *task, nxt_stream_source_t *stream)
600 {
601     nxt_http_source_t  *hs;
602 
603     nxt_thread_log_debug("http source error");
604 
605     hs = stream->next->context;
606     nxt_http_source_fail(task, hs);
607 }
608 
609 
610 static void
611 nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs)
612 {
613     nxt_debug(task, "http source fail");
614 
615     /* TODO: fail, next upstream, or bad gateway */
616 
617     hs->upstream->state->error_handler(task, hs, NULL);
618 }
619 
620 
621 static void
622 nxt_http_source_message(const char *msg, size_t len, u_char *p)
623 {
624     if (len > NXT_MAX_ERROR_STR - 300) {
625         len = NXT_MAX_ERROR_STR - 300;
626         p[len++] = '.'; p[len++] = '.'; p[len++] = '.';
627     }
628 
629     nxt_thread_log_error(NXT_LOG_ERR, msg, len, p);
630 }
631